diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go index 6e6db3277..5785a4a0f 100644 --- a/weed/remote_storage/azure/azure_storage_client.go +++ b/weed/remote_storage/azure/azure_storage_client.go @@ -41,9 +41,6 @@ const ( // with consistent retry configuration across the application. // This centralizes the retry policy to ensure uniform behavior between // remote storage and replication sink implementations. -// -// Related: Use DefaultAzureOpTimeout for context.WithTimeout when calling Azure operations -// to ensure the timeout accommodates all retry attempts configured here. func DefaultAzBlobClientOptions() *azblob.ClientOptions { return &azblob.ClientOptions{ ClientOptions: azcore.ClientOptions{ @@ -130,6 +127,32 @@ type azureRemoteStorageClient struct { var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{}) +func (az *azureRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) { + key := loc.Path[1:] + ctx, cancel := context.WithTimeout(context.Background(), DefaultAzureOpTimeout) + defer cancel() + resp, err := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key).GetProperties(ctx, nil) + if err != nil { + if bloberror.HasCode(err, bloberror.BlobNotFound) { + return nil, remote_storage.ErrRemoteObjectNotFound + } + return nil, fmt.Errorf("stat azure %s%s: %w", loc.Bucket, loc.Path, err) + } + remoteEntry = &filer_pb.RemoteEntry{ + StorageName: az.conf.Name, + } + if resp.ContentLength != nil { + remoteEntry.RemoteSize = *resp.ContentLength + } + if resp.LastModified != nil { + remoteEntry.RemoteMtime = resp.LastModified.Unix() + } + if resp.ETag != nil { + remoteEntry.RemoteETag = string(*resp.ETag) + } + return remoteEntry, nil +} + func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { pathKey := loc.Path[1:] @@ -241,29 +264,7 @@ func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocati } func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { - key := loc.Path[1:] - blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key) - - props, err := blobClient.GetProperties(context.Background(), nil) - if err != nil { - return nil, err - } - - remoteEntry := &filer_pb.RemoteEntry{ - StorageName: az.conf.Name, - } - - if props.LastModified != nil { - remoteEntry.RemoteMtime = props.LastModified.Unix() - } - if props.ContentLength != nil { - remoteEntry.RemoteSize = *props.ContentLength - } - if props.ETag != nil { - remoteEntry.RemoteETag = string(*props.ETag) - } - - return remoteEntry, nil + return az.StatFile(loc) } func toMetadata(attributes map[string][]byte) map[string]*string { diff --git a/weed/remote_storage/azure/azure_storage_client_test.go b/weed/remote_storage/azure/azure_storage_client_test.go index 9e0e552e3..51ed9c3e1 100644 --- a/weed/remote_storage/azure/azure_storage_client_test.go +++ b/weed/remote_storage/azure/azure_storage_client_test.go @@ -10,7 +10,9 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/stretchr/testify/require" ) // TestAzureStorageClientBasic tests basic Azure storage client operations @@ -378,3 +380,13 @@ func TestAzureStorageClientErrors(t *testing.T) { t.Log("Expected error with invalid credentials on ReadFile, but got none (might be cached)") } } + +func TestAzureRemoteStorageClientImplementsInterface(t *testing.T) { + var _ remote_storage.RemoteStorageClient = (*azureRemoteStorageClient)(nil) +} + +func TestAzureErrRemoteObjectNotFoundIsAccessible(t *testing.T) { + require.Error(t, remote_storage.ErrRemoteObjectNotFound) + require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error()) +} + diff --git a/weed/remote_storage/gcs/gcs_storage_client.go b/weed/remote_storage/gcs/gcs_storage_client.go index 2d090ae6e..7a9bc1e31 100644 --- a/weed/remote_storage/gcs/gcs_storage_client.go +++ b/weed/remote_storage/gcs/gcs_storage_client.go @@ -2,11 +2,13 @@ package gcs import ( "context" + "errors" "fmt" "io" "os" "reflect" "strings" + "time" "cloud.google.com/go/storage" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -126,6 +128,28 @@ func (gcs *gcsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation } return } + +const defaultGCSOpTimeout = 30 * time.Second + +func (gcs *gcsRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) { + key := loc.Path[1:] + ctx, cancel := context.WithTimeout(context.Background(), defaultGCSOpTimeout) + defer cancel() + attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(ctx) + if err != nil { + if errors.Is(err, storage.ErrObjectNotExist) { + return nil, remote_storage.ErrRemoteObjectNotFound + } + return nil, fmt.Errorf("stat gcs %s%s: %w", loc.Bucket, loc.Path, err) + } + return &filer_pb.RemoteEntry{ + StorageName: gcs.conf.Name, + RemoteMtime: attr.Updated.Unix(), + RemoteSize: attr.Size, + RemoteETag: attr.Etag, + }, nil +} + func (gcs *gcsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { key := loc.Path[1:] @@ -170,20 +194,7 @@ func (gcs *gcsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocatio } func (gcs *gcsRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { - key := loc.Path[1:] - attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(context.Background()) - - if err != nil { - return nil, err - } - - return &filer_pb.RemoteEntry{ - RemoteMtime: attr.Updated.Unix(), - RemoteSize: attr.Size, - RemoteETag: attr.Etag, - StorageName: gcs.conf.Name, - }, nil - + return gcs.StatFile(loc) } func toMetadata(attributes map[string][]byte) map[string]string { diff --git a/weed/remote_storage/gcs/gcs_storage_client_test.go b/weed/remote_storage/gcs/gcs_storage_client_test.go new file mode 100644 index 000000000..8e74a25a8 --- /dev/null +++ b/weed/remote_storage/gcs/gcs_storage_client_test.go @@ -0,0 +1,17 @@ +package gcs + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/remote_storage" + "github.com/stretchr/testify/require" +) + +func TestGCSRemoteStorageClientImplementsInterface(t *testing.T) { + var _ remote_storage.RemoteStorageClient = (*gcsRemoteStorageClient)(nil) +} + +func TestGCSErrRemoteObjectNotFoundIsAccessible(t *testing.T) { + require.Error(t, remote_storage.ErrRemoteObjectNotFound) + require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error()) +} diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go index 04d9520e6..e8f54d944 100644 --- a/weed/remote_storage/remote_storage.go +++ b/weed/remote_storage/remote_storage.go @@ -1,6 +1,7 @@ package remote_storage import ( + "errors" "fmt" "io" "sort" @@ -69,8 +70,12 @@ type Bucket struct { CreatedAt time.Time } +// ErrRemoteObjectNotFound is returned by StatFile when the object does not exist in the remote storage backend. +var ErrRemoteObjectNotFound = errors.New("remote object not found") + type RemoteStorageClient interface { Traverse(loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error + StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go index 46c44048c..6700fbdd3 100644 --- a/weed/remote_storage/s3/s3_storage_client.go +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -3,9 +3,11 @@ package s3 import ( "fmt" "io" + "net/http" "reflect" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" @@ -119,6 +121,33 @@ func (s *s3RemoteStorageClient) Traverse(remote *remote_pb.RemoteStorageLocation } return } + +func (s *s3RemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) { + resp, err := s.conn.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + }) + if err != nil { + if reqErr, ok := err.(awserr.RequestFailure); ok && reqErr.StatusCode() == http.StatusNotFound { + return nil, remote_storage.ErrRemoteObjectNotFound + } + return nil, fmt.Errorf("stat s3 %s%s: %w", loc.Bucket, loc.Path, err) + } + remoteEntry = &filer_pb.RemoteEntry{ + StorageName: s.conf.Name, + } + if resp.ContentLength != nil { + remoteEntry.RemoteSize = *resp.ContentLength + } + if resp.LastModified != nil { + remoteEntry.RemoteMtime = resp.LastModified.Unix() + } + if resp.ETag != nil { + remoteEntry.RemoteETag = *resp.ETag + } + return remoteEntry, nil +} + func (s *s3RemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) { u.PartSize = int64(4 * 1024 * 1024) @@ -208,21 +237,7 @@ func toTagging(attributes map[string][]byte) *s3.Tagging { } func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { - resp, err := s.conn.HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(loc.Bucket), - Key: aws.String(loc.Path[1:]), - }) - if err != nil { - return nil, err - } - - return &filer_pb.RemoteEntry{ - RemoteMtime: resp.LastModified.Unix(), - RemoteSize: *resp.ContentLength, - RemoteETag: *resp.ETag, - StorageName: s.conf.Name, - }, nil - + return s.StatFile(loc) } func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) { diff --git a/weed/remote_storage/s3/s3_storage_client_test.go b/weed/remote_storage/s3/s3_storage_client_test.go index 2965c77cd..4976cad77 100644 --- a/weed/remote_storage/s3/s3_storage_client_test.go +++ b/weed/remote_storage/s3/s3_storage_client_test.go @@ -6,6 +6,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" awss3 "github.com/aws/aws-sdk-go/service/s3" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage" "github.com/stretchr/testify/require" ) @@ -55,3 +56,12 @@ func TestS3MakeUsesStaticCredentialsWhenKeysAreProvided(t *testing.T) { require.Equal(t, conf.S3AccessKey, credValue.AccessKeyID) require.Equal(t, conf.S3SecretKey, credValue.SecretAccessKey) } + +func TestS3RemoteStorageClientImplementsInterface(t *testing.T) { + var _ remote_storage.RemoteStorageClient = (*s3RemoteStorageClient)(nil) +} + +func TestS3ErrRemoteObjectNotFoundIsAccessible(t *testing.T) { + require.Error(t, remote_storage.ErrRemoteObjectNotFound) + require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error()) +}