package s3api import ( "context" "encoding/json" "errors" "fmt" "path/filepath" "strings" "sync" "time" "github.com/aws/aws-sdk-go/service/s3" "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/kms" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/cors" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" ) // BucketConfig represents cached bucket configuration type BucketConfig struct { Name string Versioning string // "Enabled", "Suspended", or "" Ownership string ACL []byte Owner string IsPublicRead bool // Cached flag to avoid JSON parsing on every request CORS *cors.CORSConfiguration ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations LastModified time.Time Entry *filer_pb.Entry } // BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations // This provides better isolation and automatic cleanup compared to global caching type BucketKMSCache struct { cache map[string]*BucketKMSCacheEntry // Key: contextHash, Value: cached data key mutex sync.RWMutex bucket string // Bucket name for logging/debugging lastTTL time.Duration // TTL used for cache entries (typically 1 hour) } // BucketKMSCacheEntry represents a single cached KMS data key type BucketKMSCacheEntry struct { DataKey interface{} // Could be *kms.GenerateDataKeyResponse or similar ExpiresAt time.Time KeyID string ContextHash string // Hash of encryption context for cache validation } // NewBucketKMSCache creates a new per-bucket KMS key cache func NewBucketKMSCache(bucketName string, ttl time.Duration) *BucketKMSCache { return &BucketKMSCache{ cache: make(map[string]*BucketKMSCacheEntry), bucket: bucketName, lastTTL: ttl, } } // Get retrieves a cached KMS data key if it exists and hasn't expired func (bkc *BucketKMSCache) Get(contextHash string) (*BucketKMSCacheEntry, bool) { if bkc == nil { return nil, false } bkc.mutex.RLock() defer bkc.mutex.RUnlock() entry, exists := bkc.cache[contextHash] if !exists { return nil, false } // Check if entry has expired if time.Now().After(entry.ExpiresAt) { return nil, false } return entry, true } // Set stores a KMS data key in the cache func (bkc *BucketKMSCache) Set(contextHash, keyID string, dataKey interface{}, ttl time.Duration) { if bkc == nil { return } bkc.mutex.Lock() defer bkc.mutex.Unlock() bkc.cache[contextHash] = &BucketKMSCacheEntry{ DataKey: dataKey, ExpiresAt: time.Now().Add(ttl), KeyID: keyID, ContextHash: contextHash, } bkc.lastTTL = ttl } // CleanupExpired removes expired entries from the cache func (bkc *BucketKMSCache) CleanupExpired() int { if bkc == nil { return 0 } bkc.mutex.Lock() defer bkc.mutex.Unlock() now := time.Now() expiredCount := 0 for key, entry := range bkc.cache { if now.After(entry.ExpiresAt) { // Clear sensitive data before removing from cache bkc.clearSensitiveData(entry) delete(bkc.cache, key) expiredCount++ } } return expiredCount } // Size returns the current number of cached entries func (bkc *BucketKMSCache) Size() int { if bkc == nil { return 0 } bkc.mutex.RLock() defer bkc.mutex.RUnlock() return len(bkc.cache) } // clearSensitiveData securely clears sensitive data from a cache entry func (bkc *BucketKMSCache) clearSensitiveData(entry *BucketKMSCacheEntry) { if dataKeyResp, ok := entry.DataKey.(*kms.GenerateDataKeyResponse); ok { // Zero out the plaintext data key to prevent it from lingering in memory if dataKeyResp.Plaintext != nil { for i := range dataKeyResp.Plaintext { dataKeyResp.Plaintext[i] = 0 } dataKeyResp.Plaintext = nil } } } // Clear clears all cached KMS entries, securely zeroing sensitive data first func (bkc *BucketKMSCache) Clear() { if bkc == nil { return } bkc.mutex.Lock() defer bkc.mutex.Unlock() // Clear sensitive data from all entries before deletion for _, entry := range bkc.cache { bkc.clearSensitiveData(entry) } // Clear the cache map bkc.cache = make(map[string]*BucketKMSCacheEntry) } // BucketConfigCache provides caching for bucket configurations // Cache entries are automatically updated/invalidated through metadata subscription events, // so TTL serves as a safety fallback rather than the primary consistency mechanism type BucketConfigCache struct { cache map[string]*BucketConfig negativeCache map[string]time.Time // Cache for non-existent buckets mutex sync.RWMutex ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events negativeTTL time.Duration // TTL for negative cache entries } // BucketMetadata represents the complete metadata for a bucket type BucketMetadata struct { Tags map[string]string `json:"tags,omitempty"` CORS *cors.CORSConfiguration `json:"cors,omitempty"` Encryption *s3_pb.EncryptionConfiguration `json:"encryption,omitempty"` // Future extensions can be added here: // Versioning *s3_pb.VersioningConfiguration `json:"versioning,omitempty"` // Lifecycle *s3_pb.LifecycleConfiguration `json:"lifecycle,omitempty"` // Notification *s3_pb.NotificationConfiguration `json:"notification,omitempty"` // Replication *s3_pb.ReplicationConfiguration `json:"replication,omitempty"` // Analytics *s3_pb.AnalyticsConfiguration `json:"analytics,omitempty"` // Logging *s3_pb.LoggingConfiguration `json:"logging,omitempty"` // Website *s3_pb.WebsiteConfiguration `json:"website,omitempty"` // RequestPayer *s3_pb.RequestPayerConfiguration `json:"requestPayer,omitempty"` // PublicAccess *s3_pb.PublicAccessConfiguration `json:"publicAccess,omitempty"` } // NewBucketMetadata creates a new BucketMetadata with default values func NewBucketMetadata() *BucketMetadata { return &BucketMetadata{ Tags: make(map[string]string), } } // IsEmpty returns true if the metadata has no configuration set func (bm *BucketMetadata) IsEmpty() bool { return len(bm.Tags) == 0 && bm.CORS == nil && bm.Encryption == nil } // HasEncryption returns true if bucket has encryption configuration func (bm *BucketMetadata) HasEncryption() bool { return bm.Encryption != nil } // HasCORS returns true if bucket has CORS configuration func (bm *BucketMetadata) HasCORS() bool { return bm.CORS != nil } // HasTags returns true if bucket has tags func (bm *BucketMetadata) HasTags() bool { return len(bm.Tags) > 0 } // NewBucketConfigCache creates a new bucket configuration cache // TTL can be set to a longer duration since cache consistency is maintained // through real-time metadata subscription events rather than TTL expiration func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache { negativeTTL := ttl / 4 // Negative cache TTL is shorter than positive cache if negativeTTL < 30*time.Second { negativeTTL = 30 * time.Second // Minimum 30 seconds for negative cache } return &BucketConfigCache{ cache: make(map[string]*BucketConfig), negativeCache: make(map[string]time.Time), ttl: ttl, negativeTTL: negativeTTL, } } // Get retrieves bucket configuration from cache func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) { bcc.mutex.RLock() defer bcc.mutex.RUnlock() config, exists := bcc.cache[bucket] if !exists { return nil, false } // Check if cache entry is expired (safety fallback; entries are normally updated via events) if time.Since(config.LastModified) > bcc.ttl { return nil, false } return config, true } // Set stores bucket configuration in cache func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) { bcc.mutex.Lock() defer bcc.mutex.Unlock() config.LastModified = time.Now() bcc.cache[bucket] = config } // Remove removes bucket configuration from cache func (bcc *BucketConfigCache) Remove(bucket string) { bcc.mutex.Lock() defer bcc.mutex.Unlock() delete(bcc.cache, bucket) } // Clear clears all cached configurations func (bcc *BucketConfigCache) Clear() { bcc.mutex.Lock() defer bcc.mutex.Unlock() bcc.cache = make(map[string]*BucketConfig) bcc.negativeCache = make(map[string]time.Time) } // IsNegativelyCached checks if a bucket is in the negative cache (doesn't exist) func (bcc *BucketConfigCache) IsNegativelyCached(bucket string) bool { bcc.mutex.RLock() defer bcc.mutex.RUnlock() if cachedTime, exists := bcc.negativeCache[bucket]; exists { // Check if the negative cache entry is still valid if time.Since(cachedTime) < bcc.negativeTTL { return true } // Entry expired, remove it delete(bcc.negativeCache, bucket) } return false } // SetNegativeCache marks a bucket as non-existent in the negative cache func (bcc *BucketConfigCache) SetNegativeCache(bucket string) { bcc.mutex.Lock() defer bcc.mutex.Unlock() bcc.negativeCache[bucket] = time.Now() } // RemoveNegativeCache removes a bucket from the negative cache func (bcc *BucketConfigCache) RemoveNegativeCache(bucket string) { bcc.mutex.Lock() defer bcc.mutex.Unlock() delete(bcc.negativeCache, bucket) } // getBucketConfig retrieves bucket configuration with caching func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) { // Check negative cache first if s3a.bucketConfigCache.IsNegativelyCached(bucket) { return nil, s3err.ErrNoSuchBucket } // Try positive cache if config, found := s3a.bucketConfigCache.Get(bucket); found { return config, s3err.ErrNone } // Try to get from filer entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { // Bucket doesn't exist - set negative cache s3a.bucketConfigCache.SetNegativeCache(bucket) return nil, s3err.ErrNoSuchBucket } glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err) return nil, s3err.ErrInternalError } config := &BucketConfig{ Name: bucket, Entry: entry, IsPublicRead: false, // Explicitly default to false for private buckets } // Extract configuration from extended attributes if entry.Extended != nil { if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists { config.Versioning = string(versioning) } if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists { config.Ownership = string(ownership) } if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists { config.ACL = acl // Parse ACL once and cache public-read status config.IsPublicRead = parseAndCachePublicReadStatus(acl) } else { // No ACL means private bucket config.IsPublicRead = false } if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists { config.Owner = string(owner) } // Parse Object Lock configuration if present if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found { config.ObjectLockConfig = objectLockConfig glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket) } } // Load CORS configuration from bucket directory content if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil { if errors.Is(err, filer_pb.ErrNotFound) { // Missing metadata is not an error; fall back cleanly glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket) } else { // Log parsing or validation errors glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err) } } else { config.CORS = corsConfig } // Cache the result s3a.bucketConfigCache.Set(bucket, config) return config, s3err.ErrNone } // updateBucketConfig updates bucket configuration and invalidates cache func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { return errCode } // Apply update function if err := updateFn(config); err != nil { glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err) return s3err.ErrInternalError } // Prepare extended attributes if config.Entry.Extended == nil { config.Entry.Extended = make(map[string][]byte) } // Update extended attributes if config.Versioning != "" { config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning) } if config.Ownership != "" { config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership) } if config.ACL != nil { config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL } if config.Owner != "" { config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner) } // Update Object Lock configuration if config.ObjectLockConfig != nil { if err := StoreObjectLockConfigurationInExtended(config.Entry, config.ObjectLockConfig); err != nil { glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err) return s3err.ErrInternalError } } // Save to filer err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry) if err != nil { glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err) return s3err.ErrInternalError } // Update cache s3a.bucketConfigCache.Set(bucket, config) return s3err.ErrNone } // isVersioningEnabled checks if versioning is enabled for a bucket (with caching) func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { if errCode == s3err.ErrNoSuchBucket { return false, filer_pb.ErrNotFound } return false, fmt.Errorf("failed to get bucket config: %v", errCode) } // Versioning is enabled if explicitly set to "Enabled" OR if object lock is enabled // (since object lock requires versioning to be enabled) return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil } // isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended) func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { if errCode == s3err.ErrNoSuchBucket { return false, filer_pb.ErrNotFound } return false, fmt.Errorf("failed to get bucket config: %v", errCode) } // Versioning is configured if explicitly set to either "Enabled" or "Suspended" // OR if object lock is enabled (which forces versioning) return config.Versioning != "" || config.ObjectLockConfig != nil, nil } // getVersioningState returns the detailed versioning state for a bucket func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { if errCode == s3err.ErrNoSuchBucket { return "", nil } return "", fmt.Errorf("failed to get bucket config: %v", errCode) } // If object lock is enabled, versioning must be enabled regardless of explicit setting if config.ObjectLockConfig != nil { return s3_constants.VersioningEnabled, nil } // Return the explicit versioning status (empty string means never configured) return config.Versioning, nil } // getBucketVersioningStatus returns the versioning status for a bucket func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { return "", errCode } // Return exactly what's stored - empty string means versioning was never configured // This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response return config.Versioning, s3err.ErrNone } // setBucketVersioningStatus sets the versioning status for a bucket func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode { return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error { config.Versioning = status return nil }) } // getBucketOwnership returns the ownership setting for a bucket func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { return "", errCode } return config.Ownership, s3err.ErrNone } // setBucketOwnership sets the ownership setting for a bucket func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode { return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error { config.Ownership = ownership return nil }) } // loadCORSFromBucketContent loads CORS configuration from bucket directory content func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) { metadata, err := s3a.GetBucketMetadata(bucket) if err != nil { return nil, err } // Note: corsConfig can be nil if no CORS configuration is set, which is valid return metadata.CORS, nil } // getCORSConfiguration retrieves CORS configuration with caching func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfiguration, s3err.ErrorCode) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { return nil, errCode } return config.CORS, s3err.ErrNone } // updateCORSConfiguration updates the CORS configuration for a bucket func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode { // Update using structured API err := s3a.UpdateBucketCORS(bucket, corsConfig) if err != nil { glog.Errorf("updateCORSConfiguration: failed to update CORS config for bucket %s: %v", bucket, err) return s3err.ErrInternalError } // Cache will be updated automatically via metadata subscription return s3err.ErrNone } // removeCORSConfiguration removes the CORS configuration for a bucket func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode { // Update using structured API err := s3a.ClearBucketCORS(bucket) if err != nil { glog.Errorf("removeCORSConfiguration: failed to remove CORS config for bucket %s: %v", bucket, err) return s3err.ErrInternalError } // Cache will be updated automatically via metadata subscription return s3err.ErrNone } // Conversion functions between CORS types and protobuf types // corsRuleToProto converts a CORS rule to protobuf format func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule { return &s3_pb.CORSRule{ AllowedHeaders: rule.AllowedHeaders, AllowedMethods: rule.AllowedMethods, AllowedOrigins: rule.AllowedOrigins, ExposeHeaders: rule.ExposeHeaders, MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)), Id: rule.ID, } } // corsRuleFromProto converts a protobuf CORS rule to standard format func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule { var maxAge *int // Always create the pointer if MaxAgeSeconds is >= 0 // This prevents nil pointer dereferences in tests and matches AWS behavior if protoRule.MaxAgeSeconds >= 0 { age := int(protoRule.MaxAgeSeconds) maxAge = &age } // Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value return cors.CORSRule{ AllowedHeaders: protoRule.AllowedHeaders, AllowedMethods: protoRule.AllowedMethods, AllowedOrigins: protoRule.AllowedOrigins, ExposeHeaders: protoRule.ExposeHeaders, MaxAgeSeconds: maxAge, ID: protoRule.Id, } } // corsConfigToProto converts CORS configuration to protobuf format func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration { if config == nil { return nil } protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules)) for i, rule := range config.CORSRules { protoRules[i] = corsRuleToProto(rule) } return &s3_pb.CORSConfiguration{ CorsRules: protoRules, } } // corsConfigFromProto converts protobuf CORS configuration to standard format func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration { if protoConfig == nil { return nil } rules := make([]cors.CORSRule, len(protoConfig.CorsRules)) for i, protoRule := range protoConfig.CorsRules { rules[i] = corsRuleFromProto(protoRule) } return &cors.CORSConfiguration{ CORSRules: rules, } } // getMaxAgeSecondsValue safely extracts max age seconds value func getMaxAgeSecondsValue(maxAge *int) int { if maxAge == nil { return 0 } return *maxAge } // parseAndCachePublicReadStatus parses the ACL and caches the public-read status func parseAndCachePublicReadStatus(acl []byte) bool { var grants []*s3.Grant if err := json.Unmarshal(acl, &grants); err != nil { return false } // Check if any grant gives read permission to "AllUsers" group for _, grant := range grants { if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil { // Check for AllUsers group with Read permission if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers && (*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) { return true } } } return false } // getBucketMetadata retrieves bucket metadata as a structured object with caching func (s3a *S3ApiServer) getBucketMetadata(bucket string) (*BucketMetadata, error) { if s3a.bucketConfigCache != nil { // Check negative cache first if s3a.bucketConfigCache.IsNegativelyCached(bucket) { return nil, fmt.Errorf("bucket directory not found %s", bucket) } // Try to get from positive cache if config, found := s3a.bucketConfigCache.Get(bucket); found { // Extract metadata from cached config if metadata, err := s3a.extractMetadataFromConfig(config); err == nil { return metadata, nil } // If extraction fails, fall through to direct load } } // Load directly from filer return s3a.loadBucketMetadataFromFiler(bucket) } // extractMetadataFromConfig extracts BucketMetadata from cached BucketConfig func (s3a *S3ApiServer) extractMetadataFromConfig(config *BucketConfig) (*BucketMetadata, error) { if config == nil || config.Entry == nil { return NewBucketMetadata(), nil } // Parse metadata from entry content if available if len(config.Entry.Content) > 0 { var protoMetadata s3_pb.BucketMetadata if err := proto.Unmarshal(config.Entry.Content, &protoMetadata); err != nil { glog.Errorf("extractMetadataFromConfig: failed to unmarshal protobuf metadata for bucket %s: %v", config.Name, err) return nil, err } // Convert protobuf to structured metadata metadata := &BucketMetadata{ Tags: protoMetadata.Tags, CORS: corsConfigFromProto(protoMetadata.Cors), Encryption: protoMetadata.Encryption, } return metadata, nil } // Fallback: create metadata from cached CORS config metadata := NewBucketMetadata() if config.CORS != nil { metadata.CORS = config.CORS } return metadata, nil } // loadBucketMetadataFromFiler loads bucket metadata directly from the filer func (s3a *S3ApiServer) loadBucketMetadataFromFiler(bucket string) (*BucketMetadata, error) { // Validate bucket name to prevent path traversal attacks if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") || strings.Contains(bucket, "..") || strings.Contains(bucket, "~") { return nil, fmt.Errorf("invalid bucket name: %s", bucket) } // Clean the bucket name further to prevent any potential path traversal bucket = filepath.Clean(bucket) if bucket == "." || bucket == ".." { return nil, fmt.Errorf("invalid bucket name: %s", bucket) } // Get bucket directory entry to access its content entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) if err != nil { // Check if this is a "not found" error if errors.Is(err, filer_pb.ErrNotFound) { // Set negative cache for non-existent bucket if s3a.bucketConfigCache != nil { s3a.bucketConfigCache.SetNegativeCache(bucket) } } return nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err) } if entry == nil { // Set negative cache for non-existent bucket if s3a.bucketConfigCache != nil { s3a.bucketConfigCache.SetNegativeCache(bucket) } return nil, fmt.Errorf("bucket directory not found %s", bucket) } // If no content, return empty metadata if len(entry.Content) == 0 { return NewBucketMetadata(), nil } // Unmarshal metadata from protobuf var protoMetadata s3_pb.BucketMetadata if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil { glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err) return nil, fmt.Errorf("failed to unmarshal bucket metadata for %s: %w", bucket, err) } // Convert protobuf CORS to standard CORS corsConfig := corsConfigFromProto(protoMetadata.Cors) // Create and return structured metadata metadata := &BucketMetadata{ Tags: protoMetadata.Tags, CORS: corsConfig, Encryption: protoMetadata.Encryption, } return metadata, nil } // setBucketMetadata stores bucket metadata from a structured object func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadata) error { // Validate bucket name to prevent path traversal attacks if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") || strings.Contains(bucket, "..") || strings.Contains(bucket, "~") { return fmt.Errorf("invalid bucket name: %s", bucket) } // Clean the bucket name further to prevent any potential path traversal bucket = filepath.Clean(bucket) if bucket == "." || bucket == ".." { return fmt.Errorf("invalid bucket name: %s", bucket) } // Default to empty metadata if nil if metadata == nil { metadata = NewBucketMetadata() } // Create protobuf metadata protoMetadata := &s3_pb.BucketMetadata{ Tags: metadata.Tags, Cors: corsConfigToProto(metadata.CORS), Encryption: metadata.Encryption, } // Marshal metadata to protobuf metadataBytes, err := proto.Marshal(protoMetadata) if err != nil { return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err) } // Update the bucket entry with new content err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // Get current bucket entry entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) if err != nil { return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err) } if entry == nil { return fmt.Errorf("bucket directory not found %s", bucket) } // Update content with metadata entry.Content = metadataBytes request := &filer_pb.UpdateEntryRequest{ Directory: s3a.option.BucketsPath, Entry: entry, } _, err = client.UpdateEntry(context.Background(), request) return err }) // Invalidate cache after successful update if err == nil && s3a.bucketConfigCache != nil { s3a.bucketConfigCache.Remove(bucket) s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Remove from negative cache too } return err } // New structured API functions using BucketMetadata // GetBucketMetadata retrieves complete bucket metadata as a structured object func (s3a *S3ApiServer) GetBucketMetadata(bucket string) (*BucketMetadata, error) { return s3a.getBucketMetadata(bucket) } // SetBucketMetadata stores complete bucket metadata from a structured object func (s3a *S3ApiServer) SetBucketMetadata(bucket string, metadata *BucketMetadata) error { return s3a.setBucketMetadata(bucket, metadata) } // UpdateBucketMetadata updates specific parts of bucket metadata while preserving others // // DISTRIBUTED SYSTEM DESIGN NOTE: // This function implements a read-modify-write pattern with "last write wins" semantics. // In the rare case of concurrent updates to different parts of bucket metadata // (e.g., simultaneous tag and CORS updates), the last write may overwrite previous changes. // // This is an acceptable trade-off because: // 1. Bucket metadata updates are infrequent in typical S3 usage // 2. Traditional locking doesn't work in distributed systems across multiple nodes // 3. The complexity of distributed consensus (e.g., Raft) for metadata updates would // be disproportionate to the low frequency of bucket configuration changes // 4. Most bucket operations (tags, CORS, encryption) are typically configured once // during setup rather than being frequently modified // // If stronger consistency is required, consider implementing optimistic concurrency // control with version numbers or ETags at the storage layer. func (s3a *S3ApiServer) UpdateBucketMetadata(bucket string, update func(*BucketMetadata) error) error { // Get current metadata metadata, err := s3a.GetBucketMetadata(bucket) if err != nil { return fmt.Errorf("failed to get current bucket metadata: %w", err) } // Apply update function if err := update(metadata); err != nil { return fmt.Errorf("failed to apply metadata update: %w", err) } // Store updated metadata (last write wins) return s3a.SetBucketMetadata(bucket, metadata) } // Helper functions for specific metadata operations using structured API // UpdateBucketTags sets bucket tags using the structured API func (s3a *S3ApiServer) UpdateBucketTags(bucket string, tags map[string]string) error { return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { metadata.Tags = tags return nil }) } // UpdateBucketCORS sets bucket CORS configuration using the structured API func (s3a *S3ApiServer) UpdateBucketCORS(bucket string, corsConfig *cors.CORSConfiguration) error { return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { metadata.CORS = corsConfig return nil }) } // UpdateBucketEncryption sets bucket encryption configuration using the structured API func (s3a *S3ApiServer) UpdateBucketEncryption(bucket string, encryptionConfig *s3_pb.EncryptionConfiguration) error { return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { metadata.Encryption = encryptionConfig return nil }) } // ClearBucketTags removes all bucket tags using the structured API func (s3a *S3ApiServer) ClearBucketTags(bucket string) error { return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { metadata.Tags = make(map[string]string) return nil }) } // ClearBucketCORS removes bucket CORS configuration using the structured API func (s3a *S3ApiServer) ClearBucketCORS(bucket string) error { return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { metadata.CORS = nil return nil }) } // ClearBucketEncryption removes bucket encryption configuration using the structured API func (s3a *S3ApiServer) ClearBucketEncryption(bucket string) error { return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { metadata.Encryption = nil return nil }) }