Browse Source

feat: add statfile remote storage (#8443)

* feat: add statfile; add error for remote storage misses

* feat: statfile implementations for storage providers

* test: add unit tests for StatFile method across providers

Add comprehensive unit tests for the StatFile implementation covering:
- S3: interface compliance and error constant accessibility
- Azure: interface compliance, error constants, and field population
- GCS: interface compliance, error constants, error detection, and field population

Also fix variable shadowing issue in S3 and Azure StatFile implementations where
named return parameters were being shadowed by local variable declarations.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix: address StatFile review feedback

- Use errors.New for ErrRemoteObjectNotFound sentinel
- Fix S3 HeadObject 404 detection to use awserr.Error code check
- Remove hollow field-population tests that tested nothing
- Remove redundant stdlib error detection tests
- Trim verbose doc comment on ErrRemoteObjectNotFound

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix: address second round of StatFile review feedback

- Rename interface assertion tests to TestXxxRemoteStorageClientImplementsInterface
- Delegate readFileRemoteEntry to StatFile in all three providers
- Revert S3 404 detection to RequestFailure.StatusCode() check
- Fix double-slash in GCS error message format string
- Add storage type prefix to S3 error message for consistency

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix: comments

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
expand-the-s3-PutObject-permission-to-the-multipart-permissions
Peter Dodd 15 hours ago
committed by GitHub
parent
commit
0910252e31
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 53
      weed/remote_storage/azure/azure_storage_client.go
  2. 12
      weed/remote_storage/azure/azure_storage_client_test.go
  3. 39
      weed/remote_storage/gcs/gcs_storage_client.go
  4. 17
      weed/remote_storage/gcs/gcs_storage_client_test.go
  5. 5
      weed/remote_storage/remote_storage.go
  6. 45
      weed/remote_storage/s3/s3_storage_client.go
  7. 10
      weed/remote_storage/s3/s3_storage_client_test.go

53
weed/remote_storage/azure/azure_storage_client.go

@ -41,9 +41,6 @@ const (
// with consistent retry configuration across the application. // with consistent retry configuration across the application.
// This centralizes the retry policy to ensure uniform behavior between // This centralizes the retry policy to ensure uniform behavior between
// remote storage and replication sink implementations. // remote storage and replication sink implementations.
//
// Related: Use DefaultAzureOpTimeout for context.WithTimeout when calling Azure operations
// to ensure the timeout accommodates all retry attempts configured here.
func DefaultAzBlobClientOptions() *azblob.ClientOptions { func DefaultAzBlobClientOptions() *azblob.ClientOptions {
return &azblob.ClientOptions{ return &azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{ ClientOptions: azcore.ClientOptions{
@ -130,6 +127,32 @@ type azureRemoteStorageClient struct {
var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{}) var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
func (az *azureRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
key := loc.Path[1:]
ctx, cancel := context.WithTimeout(context.Background(), DefaultAzureOpTimeout)
defer cancel()
resp, err := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key).GetProperties(ctx, nil)
if err != nil {
if bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil, remote_storage.ErrRemoteObjectNotFound
}
return nil, fmt.Errorf("stat azure %s%s: %w", loc.Bucket, loc.Path, err)
}
remoteEntry = &filer_pb.RemoteEntry{
StorageName: az.conf.Name,
}
if resp.ContentLength != nil {
remoteEntry.RemoteSize = *resp.ContentLength
}
if resp.LastModified != nil {
remoteEntry.RemoteMtime = resp.LastModified.Unix()
}
if resp.ETag != nil {
remoteEntry.RemoteETag = string(*resp.ETag)
}
return remoteEntry, nil
}
func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := loc.Path[1:] pathKey := loc.Path[1:]
@ -241,29 +264,7 @@ func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocati
} }
func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
key := loc.Path[1:]
blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
props, err := blobClient.GetProperties(context.Background(), nil)
if err != nil {
return nil, err
}
remoteEntry := &filer_pb.RemoteEntry{
StorageName: az.conf.Name,
}
if props.LastModified != nil {
remoteEntry.RemoteMtime = props.LastModified.Unix()
}
if props.ContentLength != nil {
remoteEntry.RemoteSize = *props.ContentLength
}
if props.ETag != nil {
remoteEntry.RemoteETag = string(*props.ETag)
}
return remoteEntry, nil
return az.StatFile(loc)
} }
func toMetadata(attributes map[string][]byte) map[string]*string { func toMetadata(attributes map[string][]byte) map[string]*string {

12
weed/remote_storage/azure/azure_storage_client_test.go

@ -10,7 +10,9 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/stretchr/testify/require"
) )
// TestAzureStorageClientBasic tests basic Azure storage client operations // TestAzureStorageClientBasic tests basic Azure storage client operations
@ -378,3 +380,13 @@ func TestAzureStorageClientErrors(t *testing.T) {
t.Log("Expected error with invalid credentials on ReadFile, but got none (might be cached)") t.Log("Expected error with invalid credentials on ReadFile, but got none (might be cached)")
} }
} }
func TestAzureRemoteStorageClientImplementsInterface(t *testing.T) {
var _ remote_storage.RemoteStorageClient = (*azureRemoteStorageClient)(nil)
}
func TestAzureErrRemoteObjectNotFoundIsAccessible(t *testing.T) {
require.Error(t, remote_storage.ErrRemoteObjectNotFound)
require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error())
}

39
weed/remote_storage/gcs/gcs_storage_client.go

@ -2,11 +2,13 @@ package gcs
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"reflect" "reflect"
"strings" "strings"
"time"
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -126,6 +128,28 @@ func (gcs *gcsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation
} }
return return
} }
const defaultGCSOpTimeout = 30 * time.Second
func (gcs *gcsRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
key := loc.Path[1:]
ctx, cancel := context.WithTimeout(context.Background(), defaultGCSOpTimeout)
defer cancel()
attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, remote_storage.ErrRemoteObjectNotFound
}
return nil, fmt.Errorf("stat gcs %s%s: %w", loc.Bucket, loc.Path, err)
}
return &filer_pb.RemoteEntry{
StorageName: gcs.conf.Name,
RemoteMtime: attr.Updated.Unix(),
RemoteSize: attr.Size,
RemoteETag: attr.Etag,
}, nil
}
func (gcs *gcsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { func (gcs *gcsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
key := loc.Path[1:] key := loc.Path[1:]
@ -170,20 +194,7 @@ func (gcs *gcsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocatio
} }
func (gcs *gcsRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { func (gcs *gcsRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
key := loc.Path[1:]
attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(context.Background())
if err != nil {
return nil, err
}
return &filer_pb.RemoteEntry{
RemoteMtime: attr.Updated.Unix(),
RemoteSize: attr.Size,
RemoteETag: attr.Etag,
StorageName: gcs.conf.Name,
}, nil
return gcs.StatFile(loc)
} }
func toMetadata(attributes map[string][]byte) map[string]string { func toMetadata(attributes map[string][]byte) map[string]string {

17
weed/remote_storage/gcs/gcs_storage_client_test.go

@ -0,0 +1,17 @@
package gcs
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/stretchr/testify/require"
)
func TestGCSRemoteStorageClientImplementsInterface(t *testing.T) {
var _ remote_storage.RemoteStorageClient = (*gcsRemoteStorageClient)(nil)
}
func TestGCSErrRemoteObjectNotFoundIsAccessible(t *testing.T) {
require.Error(t, remote_storage.ErrRemoteObjectNotFound)
require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error())
}

5
weed/remote_storage/remote_storage.go

@ -1,6 +1,7 @@
package remote_storage package remote_storage
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"sort" "sort"
@ -69,8 +70,12 @@ type Bucket struct {
CreatedAt time.Time CreatedAt time.Time
} }
// ErrRemoteObjectNotFound is returned by StatFile when the object does not exist in the remote storage backend.
var ErrRemoteObjectNotFound = errors.New("remote object not found")
type RemoteStorageClient interface { type RemoteStorageClient interface {
Traverse(loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error Traverse(loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error
StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error)
ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error)

45
weed/remote_storage/s3/s3_storage_client.go

@ -3,9 +3,11 @@ package s3
import ( import (
"fmt" "fmt"
"io" "io"
"net/http"
"reflect" "reflect"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
@ -119,6 +121,33 @@ func (s *s3RemoteStorageClient) Traverse(remote *remote_pb.RemoteStorageLocation
} }
return return
} }
func (s *s3RemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
resp, err := s.conn.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
})
if err != nil {
if reqErr, ok := err.(awserr.RequestFailure); ok && reqErr.StatusCode() == http.StatusNotFound {
return nil, remote_storage.ErrRemoteObjectNotFound
}
return nil, fmt.Errorf("stat s3 %s%s: %w", loc.Bucket, loc.Path, err)
}
remoteEntry = &filer_pb.RemoteEntry{
StorageName: s.conf.Name,
}
if resp.ContentLength != nil {
remoteEntry.RemoteSize = *resp.ContentLength
}
if resp.LastModified != nil {
remoteEntry.RemoteMtime = resp.LastModified.Unix()
}
if resp.ETag != nil {
remoteEntry.RemoteETag = *resp.ETag
}
return remoteEntry, nil
}
func (s *s3RemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { func (s *s3RemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) { downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
u.PartSize = int64(4 * 1024 * 1024) u.PartSize = int64(4 * 1024 * 1024)
@ -208,21 +237,7 @@ func toTagging(attributes map[string][]byte) *s3.Tagging {
} }
func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
resp, err := s.conn.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
})
if err != nil {
return nil, err
}
return &filer_pb.RemoteEntry{
RemoteMtime: resp.LastModified.Unix(),
RemoteSize: *resp.ContentLength,
RemoteETag: *resp.ETag,
StorageName: s.conf.Name,
}, nil
return s.StatFile(loc)
} }
func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) { func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {

10
weed/remote_storage/s3/s3_storage_client_test.go

@ -6,6 +6,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
awss3 "github.com/aws/aws-sdk-go/service/s3" awss3 "github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -55,3 +56,12 @@ func TestS3MakeUsesStaticCredentialsWhenKeysAreProvided(t *testing.T) {
require.Equal(t, conf.S3AccessKey, credValue.AccessKeyID) require.Equal(t, conf.S3AccessKey, credValue.AccessKeyID)
require.Equal(t, conf.S3SecretKey, credValue.SecretAccessKey) require.Equal(t, conf.S3SecretKey, credValue.SecretAccessKey)
} }
func TestS3RemoteStorageClientImplementsInterface(t *testing.T) {
var _ remote_storage.RemoteStorageClient = (*s3RemoteStorageClient)(nil)
}
func TestS3ErrRemoteObjectNotFoundIsAccessible(t *testing.T) {
require.Error(t, remote_storage.ErrRemoteObjectNotFound)
require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error())
}
Loading…
Cancel
Save