Browse Source
implement S3 remote storage backend and wire into FetchAndWriteNeedle
implement S3 remote storage backend and wire into FetchAndWriteNeedle
- Add remote_storage module with RemoteStorageClient trait - Implement S3RemoteStorageClient using aws-sdk-s3 (covers all S3-compatible providers: AWS, Wasabi, Backblaze, Aliyun, etc.) - FetchAndWriteNeedle now fetches data from S3, writes locally as needle, and replicates to peers - Add 3 integration tests using weed mini as S3 backend: - Full round-trip fetch from S3 - Byte-range (partial) read from S3 - Error handling for non-existent S3 objectsrust-volume-server
7 changed files with 1620 additions and 95 deletions
-
1002seaweed-volume/Cargo.lock
-
8seaweed-volume/Cargo.toml
-
1seaweed-volume/src/lib.rs
-
155seaweed-volume/src/remote_storage/mod.rs
-
178seaweed-volume/src/remote_storage/s3.rs
-
83seaweed-volume/src/server/grpc_server.rs
-
288test/volume_server/grpc/fetch_remote_s3_test.go
1002
seaweed-volume/Cargo.lock
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,155 @@ |
|||
//! Remote storage backends for tiered storage support.
|
|||
//!
|
|||
//! Provides a trait-based abstraction over cloud storage providers (S3, GCS, Azure, etc.)
|
|||
//! and a registry to create clients from protobuf RemoteConf messages.
|
|||
|
|||
pub mod s3;
|
|||
|
|||
use crate::pb::remote_pb::{RemoteConf, RemoteStorageLocation};
|
|||
|
|||
/// Error type for remote storage operations.
|
|||
#[derive(Debug, thiserror::Error)]
|
|||
pub enum RemoteStorageError {
|
|||
#[error("remote storage type {0} not found")]
|
|||
TypeNotFound(String),
|
|||
#[error("remote object not found: {0}")]
|
|||
ObjectNotFound(String),
|
|||
#[error("remote storage error: {0}")]
|
|||
Other(String),
|
|||
#[error("io error: {0}")]
|
|||
Io(#[from] std::io::Error),
|
|||
}
|
|||
|
|||
/// Metadata about a remote file entry.
|
|||
#[derive(Debug, Clone)]
|
|||
pub struct RemoteEntry {
|
|||
pub size: i64,
|
|||
pub last_modified_at: i64, // Unix seconds
|
|||
pub e_tag: String,
|
|||
pub storage_name: String,
|
|||
}
|
|||
|
|||
/// Trait for remote storage clients. Matches Go's RemoteStorageClient interface.
|
|||
#[async_trait::async_trait]
|
|||
pub trait RemoteStorageClient: Send + Sync {
|
|||
/// Read (part of) a file from remote storage.
|
|||
async fn read_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
offset: i64,
|
|||
size: i64,
|
|||
) -> Result<Vec<u8>, RemoteStorageError>;
|
|||
|
|||
/// Write a file to remote storage.
|
|||
async fn write_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
data: &[u8],
|
|||
) -> Result<RemoteEntry, RemoteStorageError>;
|
|||
|
|||
/// Get metadata for a file in remote storage.
|
|||
async fn stat_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
) -> Result<RemoteEntry, RemoteStorageError>;
|
|||
|
|||
/// Delete a file from remote storage.
|
|||
async fn delete_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
) -> Result<(), RemoteStorageError>;
|
|||
|
|||
/// List all buckets.
|
|||
async fn list_buckets(&self) -> Result<Vec<String>, RemoteStorageError>;
|
|||
|
|||
/// The RemoteConf used to create this client.
|
|||
fn remote_conf(&self) -> &RemoteConf;
|
|||
}
|
|||
|
|||
/// Create a new remote storage client from a RemoteConf.
|
|||
pub fn make_remote_storage_client(
|
|||
conf: &RemoteConf,
|
|||
) -> Result<Box<dyn RemoteStorageClient>, RemoteStorageError> {
|
|||
match conf.r#type.as_str() {
|
|||
// All S3-compatible backends use the same client with different credentials
|
|||
"s3" | "wasabi" | "backblaze" | "aliyun" | "tencent" | "baidu"
|
|||
| "filebase" | "storj" | "contabo" => {
|
|||
let (access_key, secret_key, endpoint, region) = extract_s3_credentials(conf);
|
|||
Ok(Box::new(s3::S3RemoteStorageClient::new(
|
|||
conf.clone(),
|
|||
&access_key,
|
|||
&secret_key,
|
|||
®ion,
|
|||
&endpoint,
|
|||
conf.s3_force_path_style,
|
|||
)))
|
|||
}
|
|||
other => Err(RemoteStorageError::TypeNotFound(other.to_string())),
|
|||
}
|
|||
}
|
|||
|
|||
/// Extract S3-compatible credentials from a RemoteConf based on its type.
|
|||
fn extract_s3_credentials(conf: &RemoteConf) -> (String, String, String, String) {
|
|||
match conf.r#type.as_str() {
|
|||
"s3" => (
|
|||
conf.s3_access_key.clone(),
|
|||
conf.s3_secret_key.clone(),
|
|||
conf.s3_endpoint.clone(),
|
|||
if conf.s3_region.is_empty() { "us-east-1".to_string() } else { conf.s3_region.clone() },
|
|||
),
|
|||
"wasabi" => (
|
|||
conf.wasabi_access_key.clone(),
|
|||
conf.wasabi_secret_key.clone(),
|
|||
conf.wasabi_endpoint.clone(),
|
|||
conf.wasabi_region.clone(),
|
|||
),
|
|||
"backblaze" => (
|
|||
conf.backblaze_key_id.clone(),
|
|||
conf.backblaze_application_key.clone(),
|
|||
conf.backblaze_endpoint.clone(),
|
|||
conf.backblaze_region.clone(),
|
|||
),
|
|||
"aliyun" => (
|
|||
conf.aliyun_access_key.clone(),
|
|||
conf.aliyun_secret_key.clone(),
|
|||
conf.aliyun_endpoint.clone(),
|
|||
conf.aliyun_region.clone(),
|
|||
),
|
|||
"tencent" => (
|
|||
conf.tencent_secret_id.clone(),
|
|||
conf.tencent_secret_key.clone(),
|
|||
conf.tencent_endpoint.clone(),
|
|||
String::new(),
|
|||
),
|
|||
"baidu" => (
|
|||
conf.baidu_access_key.clone(),
|
|||
conf.baidu_secret_key.clone(),
|
|||
conf.baidu_endpoint.clone(),
|
|||
conf.baidu_region.clone(),
|
|||
),
|
|||
"filebase" => (
|
|||
conf.filebase_access_key.clone(),
|
|||
conf.filebase_secret_key.clone(),
|
|||
conf.filebase_endpoint.clone(),
|
|||
String::new(),
|
|||
),
|
|||
"storj" => (
|
|||
conf.storj_access_key.clone(),
|
|||
conf.storj_secret_key.clone(),
|
|||
conf.storj_endpoint.clone(),
|
|||
String::new(),
|
|||
),
|
|||
"contabo" => (
|
|||
conf.contabo_access_key.clone(),
|
|||
conf.contabo_secret_key.clone(),
|
|||
conf.contabo_endpoint.clone(),
|
|||
conf.contabo_region.clone(),
|
|||
),
|
|||
_ => (
|
|||
conf.s3_access_key.clone(),
|
|||
conf.s3_secret_key.clone(),
|
|||
conf.s3_endpoint.clone(),
|
|||
conf.s3_region.clone(),
|
|||
),
|
|||
}
|
|||
}
|
|||
@ -0,0 +1,178 @@ |
|||
//! S3-compatible remote storage client.
|
|||
//!
|
|||
//! Works with AWS S3, MinIO, SeaweedFS S3, and all S3-compatible providers.
|
|||
|
|||
use aws_sdk_s3::Client;
|
|||
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
|
|||
use aws_sdk_s3::primitives::ByteStream;
|
|||
|
|||
use crate::pb::remote_pb::{RemoteConf, RemoteStorageLocation};
|
|||
use super::{RemoteEntry, RemoteStorageClient, RemoteStorageError};
|
|||
|
|||
/// S3-compatible remote storage client.
|
|||
pub struct S3RemoteStorageClient {
|
|||
client: Client,
|
|||
conf: RemoteConf,
|
|||
}
|
|||
|
|||
impl S3RemoteStorageClient {
|
|||
/// Create a new S3 client from credentials and endpoint configuration.
|
|||
pub fn new(
|
|||
conf: RemoteConf,
|
|||
access_key: &str,
|
|||
secret_key: &str,
|
|||
region: &str,
|
|||
endpoint: &str,
|
|||
force_path_style: bool,
|
|||
) -> Self {
|
|||
let region = if region.is_empty() { "us-east-1" } else { region };
|
|||
|
|||
let credentials = Credentials::new(
|
|||
access_key,
|
|||
secret_key,
|
|||
None, // session token
|
|||
None, // expiry
|
|||
"seaweedfs-volume",
|
|||
);
|
|||
|
|||
let mut s3_config = aws_sdk_s3::Config::builder()
|
|||
.behavior_version(BehaviorVersion::latest())
|
|||
.region(Region::new(region.to_string()))
|
|||
.credentials_provider(credentials)
|
|||
.force_path_style(force_path_style);
|
|||
|
|||
if !endpoint.is_empty() {
|
|||
s3_config = s3_config.endpoint_url(endpoint);
|
|||
}
|
|||
|
|||
let client = Client::from_conf(s3_config.build());
|
|||
|
|||
S3RemoteStorageClient { client, conf }
|
|||
}
|
|||
}
|
|||
|
|||
#[async_trait::async_trait]
|
|||
impl RemoteStorageClient for S3RemoteStorageClient {
|
|||
async fn read_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
offset: i64,
|
|||
size: i64,
|
|||
) -> Result<Vec<u8>, RemoteStorageError> {
|
|||
let key = loc.path.trim_start_matches('/');
|
|||
|
|||
let mut req = self.client.get_object()
|
|||
.bucket(&loc.bucket)
|
|||
.key(key);
|
|||
|
|||
// Set byte range if specified
|
|||
if size > 0 {
|
|||
let end = offset + size - 1;
|
|||
req = req.range(format!("bytes={}-{}", offset, end));
|
|||
} else if offset > 0 {
|
|||
req = req.range(format!("bytes={}-", offset));
|
|||
}
|
|||
|
|||
let resp = req.send().await.map_err(|e| {
|
|||
let msg = format!("{}", e);
|
|||
if msg.contains("NoSuchKey") || msg.contains("404") {
|
|||
RemoteStorageError::ObjectNotFound(format!("{}/{}", loc.bucket, key))
|
|||
} else {
|
|||
RemoteStorageError::Other(format!("s3 get object: {}", e))
|
|||
}
|
|||
})?;
|
|||
|
|||
let data = resp.body.collect().await
|
|||
.map_err(|e| RemoteStorageError::Other(format!("s3 read body: {}", e)))?;
|
|||
|
|||
Ok(data.into_bytes().to_vec())
|
|||
}
|
|||
|
|||
async fn write_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
data: &[u8],
|
|||
) -> Result<RemoteEntry, RemoteStorageError> {
|
|||
let key = loc.path.trim_start_matches('/');
|
|||
|
|||
let resp = self.client.put_object()
|
|||
.bucket(&loc.bucket)
|
|||
.key(key)
|
|||
.body(ByteStream::from(data.to_vec()))
|
|||
.send()
|
|||
.await
|
|||
.map_err(|e| RemoteStorageError::Other(format!("s3 put object: {}", e)))?;
|
|||
|
|||
Ok(RemoteEntry {
|
|||
size: data.len() as i64,
|
|||
last_modified_at: std::time::SystemTime::now()
|
|||
.duration_since(std::time::UNIX_EPOCH)
|
|||
.unwrap_or_default()
|
|||
.as_secs() as i64,
|
|||
e_tag: resp.e_tag().unwrap_or_default().to_string(),
|
|||
storage_name: loc.name.clone(),
|
|||
})
|
|||
}
|
|||
|
|||
async fn stat_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
) -> Result<RemoteEntry, RemoteStorageError> {
|
|||
let key = loc.path.trim_start_matches('/');
|
|||
|
|||
let resp = self.client.head_object()
|
|||
.bucket(&loc.bucket)
|
|||
.key(key)
|
|||
.send()
|
|||
.await
|
|||
.map_err(|e| {
|
|||
let msg = format!("{}", e);
|
|||
if msg.contains("404") || msg.contains("NotFound") {
|
|||
RemoteStorageError::ObjectNotFound(format!("{}/{}", loc.bucket, key))
|
|||
} else {
|
|||
RemoteStorageError::Other(format!("s3 head object: {}", e))
|
|||
}
|
|||
})?;
|
|||
|
|||
Ok(RemoteEntry {
|
|||
size: resp.content_length().unwrap_or(0),
|
|||
last_modified_at: resp.last_modified()
|
|||
.map(|t| t.secs())
|
|||
.unwrap_or(0),
|
|||
e_tag: resp.e_tag().unwrap_or_default().to_string(),
|
|||
storage_name: loc.name.clone(),
|
|||
})
|
|||
}
|
|||
|
|||
async fn delete_file(
|
|||
&self,
|
|||
loc: &RemoteStorageLocation,
|
|||
) -> Result<(), RemoteStorageError> {
|
|||
let key = loc.path.trim_start_matches('/');
|
|||
|
|||
self.client.delete_object()
|
|||
.bucket(&loc.bucket)
|
|||
.key(key)
|
|||
.send()
|
|||
.await
|
|||
.map_err(|e| RemoteStorageError::Other(format!("s3 delete object: {}", e)))?;
|
|||
|
|||
Ok(())
|
|||
}
|
|||
|
|||
async fn list_buckets(&self) -> Result<Vec<String>, RemoteStorageError> {
|
|||
let resp = self.client.list_buckets()
|
|||
.send()
|
|||
.await
|
|||
.map_err(|e| RemoteStorageError::Other(format!("s3 list buckets: {}", e)))?;
|
|||
|
|||
Ok(resp.buckets()
|
|||
.iter()
|
|||
.filter_map(|b| b.name().map(String::from))
|
|||
.collect())
|
|||
}
|
|||
|
|||
fn remote_conf(&self) -> &RemoteConf {
|
|||
&self.conf
|
|||
}
|
|||
}
|
|||
@ -0,0 +1,288 @@ |
|||
package volume_server_grpc_test |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"fmt" |
|||
"net" |
|||
"os" |
|||
"os/exec" |
|||
"path/filepath" |
|||
"strings" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/aws/aws-sdk-go/aws" |
|||
"github.com/aws/aws-sdk-go/aws/credentials" |
|||
"github.com/aws/aws-sdk-go/aws/session" |
|||
"github.com/aws/aws-sdk-go/service/s3" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/test/volume_server/framework" |
|||
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" |
|||
) |
|||
|
|||
// findAvailablePort finds a free TCP port on localhost.
|
|||
func findAvailablePort() (int, error) { |
|||
l, err := net.Listen("tcp", "127.0.0.1:0") |
|||
if err != nil { |
|||
return 0, err |
|||
} |
|||
port := l.Addr().(*net.TCPAddr).Port |
|||
l.Close() |
|||
return port, nil |
|||
} |
|||
|
|||
// waitForPort waits until a TCP port is listening, up to timeout.
|
|||
func waitForPort(port int, timeout time.Duration) error { |
|||
deadline := time.Now().Add(timeout) |
|||
for time.Now().Before(deadline) { |
|||
conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 500*time.Millisecond) |
|||
if err == nil { |
|||
conn.Close() |
|||
return nil |
|||
} |
|||
time.Sleep(200 * time.Millisecond) |
|||
} |
|||
return fmt.Errorf("port %d not listening after %v", port, timeout) |
|||
} |
|||
|
|||
// startWeedMini starts a weed mini subprocess and returns the S3 endpoint and cleanup func.
|
|||
func startWeedMini(t *testing.T) (s3Endpoint string, cleanup func()) { |
|||
t.Helper() |
|||
|
|||
weedBin, err := exec.LookPath("weed") |
|||
if err != nil { |
|||
weedBin = filepath.Join("..", "..", "..", "weed", "weed_binary") |
|||
if _, err := os.Stat(weedBin); os.IsNotExist(err) { |
|||
t.Skip("weed binary not found, skipping S3 remote storage test") |
|||
} |
|||
} |
|||
|
|||
miniMasterPort, _ := findAvailablePort() |
|||
miniVolumePort, _ := findAvailablePort() |
|||
miniFilerPort, _ := findAvailablePort() |
|||
miniS3Port, _ := findAvailablePort() |
|||
miniDir := t.TempDir() |
|||
os.WriteFile(filepath.Join(miniDir, "security.toml"), []byte("# empty\n"), 0644) |
|||
|
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
|
|||
miniCmd := exec.CommandContext(ctx, weedBin, "mini", |
|||
fmt.Sprintf("-dir=%s", miniDir), |
|||
fmt.Sprintf("-master.port=%d", miniMasterPort), |
|||
fmt.Sprintf("-volume.port=%d", miniVolumePort), |
|||
fmt.Sprintf("-filer.port=%d", miniFilerPort), |
|||
fmt.Sprintf("-s3.port=%d", miniS3Port), |
|||
) |
|||
miniCmd.Env = append(os.Environ(), "AWS_ACCESS_KEY_ID=admin", "AWS_SECRET_ACCESS_KEY=admin") |
|||
miniCmd.Dir = miniDir |
|||
logFile, _ := os.CreateTemp("", "weed-mini-*.log") |
|||
miniCmd.Stdout = logFile |
|||
miniCmd.Stderr = logFile |
|||
t.Logf("weed mini logs at %s", logFile.Name()) |
|||
|
|||
if err := miniCmd.Start(); err != nil { |
|||
cancel() |
|||
logFile.Close() |
|||
t.Fatalf("start weed mini: %v", err) |
|||
} |
|||
|
|||
if err := waitForPort(miniS3Port, 30*time.Second); err != nil { |
|||
cancel() |
|||
miniCmd.Wait() |
|||
logFile.Close() |
|||
t.Fatalf("weed mini S3 not ready: %v", err) |
|||
} |
|||
t.Logf("weed mini S3 ready on port %d", miniS3Port) |
|||
|
|||
return fmt.Sprintf("http://127.0.0.1:%d", miniS3Port), func() { |
|||
cancel() |
|||
miniCmd.Wait() |
|||
logFile.Close() |
|||
} |
|||
} |
|||
|
|||
func newS3Client(endpoint string) *s3.S3 { |
|||
sess, _ := session.NewSession(&aws.Config{ |
|||
Region: aws.String("us-east-1"), |
|||
Endpoint: aws.String(endpoint), |
|||
Credentials: credentials.NewStaticCredentials("admin", "admin", ""), |
|||
DisableSSL: aws.Bool(true), |
|||
S3ForcePathStyle: aws.Bool(true), |
|||
}) |
|||
return s3.New(sess) |
|||
} |
|||
|
|||
// TestFetchAndWriteNeedleFromS3 tests the full FetchAndWriteNeedle flow:
|
|||
// 1. Start a weed mini instance as S3 backend
|
|||
// 2. Upload a test object to it via S3 API
|
|||
// 3. Call FetchAndWriteNeedle on the volume server to fetch from S3
|
|||
// 4. Verify the response contains a valid e_tag
|
|||
func TestFetchAndWriteNeedleFromS3(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("skipping integration test in short mode") |
|||
} |
|||
|
|||
s3Endpoint, cleanupMini := startWeedMini(t) |
|||
defer cleanupMini() |
|||
|
|||
s3Client := newS3Client(s3Endpoint) |
|||
|
|||
// Create bucket and upload test data
|
|||
bucket := "test-remote-fetch" |
|||
s3Client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucket)}) |
|||
|
|||
testData := []byte("Hello from S3 remote storage! This is test data for FetchAndWriteNeedle.") |
|||
testKey := "test-object.dat" |
|||
_, err := s3Client.PutObject(&s3.PutObjectInput{ |
|||
Bucket: aws.String(bucket), |
|||
Key: aws.String(testKey), |
|||
Body: bytes.NewReader(testData), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("put object: %v", err) |
|||
} |
|||
t.Logf("uploaded %d bytes to s3://%s/%s", len(testData), bucket, testKey) |
|||
|
|||
// Start volume server
|
|||
clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) |
|||
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) |
|||
defer conn.Close() |
|||
|
|||
const volumeID = uint32(99) |
|||
framework.AllocateVolume(t, grpcClient, volumeID, "") |
|||
|
|||
grpcCtx, grpcCancel := context.WithTimeout(context.Background(), 30*time.Second) |
|||
defer grpcCancel() |
|||
|
|||
// FetchAndWriteNeedle from S3
|
|||
resp, err := grpcClient.FetchAndWriteNeedle(grpcCtx, &volume_server_pb.FetchAndWriteNeedleRequest{ |
|||
VolumeId: volumeID, |
|||
NeedleId: 42, |
|||
Cookie: 12345, |
|||
Offset: 0, |
|||
Size: int64(len(testData)), |
|||
RemoteConf: &remote_pb.RemoteConf{ |
|||
Name: "test-s3", |
|||
Type: "s3", |
|||
S3AccessKey: "admin", |
|||
S3SecretKey: "admin", |
|||
S3Region: "us-east-1", |
|||
S3Endpoint: s3Endpoint, |
|||
S3ForcePathStyle: true, |
|||
}, |
|||
RemoteLocation: &remote_pb.RemoteStorageLocation{ |
|||
Name: "test-s3", |
|||
Bucket: bucket, |
|||
Path: "/" + testKey, |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("FetchAndWriteNeedle failed: %v", err) |
|||
} |
|||
if resp.GetETag() == "" { |
|||
t.Fatal("FetchAndWriteNeedle returned empty e_tag") |
|||
} |
|||
t.Logf("FetchAndWriteNeedle success: e_tag=%s", resp.GetETag()) |
|||
} |
|||
|
|||
// TestFetchAndWriteNeedleFromS3WithPartialRead tests reading a byte range from S3.
|
|||
func TestFetchAndWriteNeedleFromS3WithPartialRead(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("skipping integration test in short mode") |
|||
} |
|||
|
|||
s3Endpoint, cleanupMini := startWeedMini(t) |
|||
defer cleanupMini() |
|||
|
|||
s3Client := newS3Client(s3Endpoint) |
|||
|
|||
bucket := "partial-read-test" |
|||
s3Client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucket)}) |
|||
|
|||
// Upload 1KB of data
|
|||
fullData := make([]byte, 1024) |
|||
for i := range fullData { |
|||
fullData[i] = byte(i % 256) |
|||
} |
|||
s3Client.PutObject(&s3.PutObjectInput{ |
|||
Bucket: aws.String(bucket), Key: aws.String("big.dat"), |
|||
Body: bytes.NewReader(fullData), |
|||
}) |
|||
|
|||
clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) |
|||
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) |
|||
defer conn.Close() |
|||
|
|||
framework.AllocateVolume(t, grpcClient, 98, "") |
|||
|
|||
grpcCtx, grpcCancel := context.WithTimeout(context.Background(), 30*time.Second) |
|||
defer grpcCancel() |
|||
|
|||
// Fetch only bytes 100-199 (100 bytes) from the 1KB object
|
|||
resp, err := grpcClient.FetchAndWriteNeedle(grpcCtx, &volume_server_pb.FetchAndWriteNeedleRequest{ |
|||
VolumeId: 98, NeedleId: 7, Cookie: 999, |
|||
Offset: 100, Size: 100, |
|||
RemoteConf: &remote_pb.RemoteConf{ |
|||
Name: "test-s3-partial", Type: "s3", |
|||
S3AccessKey: "admin", S3SecretKey: "admin", |
|||
S3Region: "us-east-1", S3Endpoint: s3Endpoint, S3ForcePathStyle: true, |
|||
}, |
|||
RemoteLocation: &remote_pb.RemoteStorageLocation{ |
|||
Name: "test-s3-partial", Bucket: bucket, Path: "/big.dat", |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("FetchAndWriteNeedle partial read failed: %v", err) |
|||
} |
|||
if resp.GetETag() == "" { |
|||
t.Fatal("empty e_tag for partial read") |
|||
} |
|||
t.Logf("FetchAndWriteNeedle partial read success: e_tag=%s", resp.GetETag()) |
|||
} |
|||
|
|||
// TestFetchAndWriteNeedleS3NotFound tests that fetching a non-existent S3 object returns an error.
|
|||
func TestFetchAndWriteNeedleS3NotFound(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("skipping integration test in short mode") |
|||
} |
|||
|
|||
s3Endpoint, cleanupMini := startWeedMini(t) |
|||
defer cleanupMini() |
|||
|
|||
s3Client := newS3Client(s3Endpoint) |
|||
|
|||
bucket := "notfound-test" |
|||
s3Client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucket)}) |
|||
|
|||
clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) |
|||
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) |
|||
defer conn.Close() |
|||
|
|||
framework.AllocateVolume(t, grpcClient, 97, "") |
|||
|
|||
grpcCtx, grpcCancel := context.WithTimeout(context.Background(), 30*time.Second) |
|||
defer grpcCancel() |
|||
|
|||
_, err := grpcClient.FetchAndWriteNeedle(grpcCtx, &volume_server_pb.FetchAndWriteNeedleRequest{ |
|||
VolumeId: 97, NeedleId: 1, Cookie: 1, |
|||
Offset: 0, Size: 100, |
|||
RemoteConf: &remote_pb.RemoteConf{ |
|||
Name: "test-s3-nf", Type: "s3", |
|||
S3AccessKey: "admin", S3SecretKey: "admin", |
|||
S3Region: "us-east-1", S3Endpoint: s3Endpoint, S3ForcePathStyle: true, |
|||
}, |
|||
RemoteLocation: &remote_pb.RemoteStorageLocation{ |
|||
Name: "test-s3-nf", Bucket: bucket, Path: "/does-not-exist.dat", |
|||
}, |
|||
}) |
|||
if err == nil { |
|||
t.Fatal("FetchAndWriteNeedle should fail for non-existent object") |
|||
} |
|||
if !strings.Contains(err.Error(), "read from remote") { |
|||
t.Fatalf("expected 'read from remote' error, got: %v", err) |
|||
} |
|||
t.Logf("correctly got error for non-existent object: %v", err) |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue