You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

947 lines
31 KiB

package s3api
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/service/s3"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/kms"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/cors"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// BucketConfig represents cached bucket configuration
type BucketConfig struct {
Name string
Versioning string // "Enabled", "Suspended", or ""
Ownership string
ACL []byte
Owner string
IsPublicRead bool // Cached flag to avoid JSON parsing on every request
CORS *cors.CORSConfiguration
ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations
LastModified time.Time
Entry *filer_pb.Entry
}
// BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations
// This provides better isolation and automatic cleanup compared to global caching
type BucketKMSCache struct {
cache map[string]*BucketKMSCacheEntry // Key: contextHash, Value: cached data key
mutex sync.RWMutex
bucket string // Bucket name for logging/debugging
lastTTL time.Duration // TTL used for cache entries (typically 1 hour)
}
// BucketKMSCacheEntry represents a single cached KMS data key
type BucketKMSCacheEntry struct {
DataKey interface{} // Could be *kms.GenerateDataKeyResponse or similar
ExpiresAt time.Time
KeyID string
ContextHash string // Hash of encryption context for cache validation
}
// NewBucketKMSCache creates a new per-bucket KMS key cache
func NewBucketKMSCache(bucketName string, ttl time.Duration) *BucketKMSCache {
return &BucketKMSCache{
cache: make(map[string]*BucketKMSCacheEntry),
bucket: bucketName,
lastTTL: ttl,
}
}
// Get retrieves a cached KMS data key if it exists and hasn't expired
func (bkc *BucketKMSCache) Get(contextHash string) (*BucketKMSCacheEntry, bool) {
if bkc == nil {
return nil, false
}
bkc.mutex.RLock()
defer bkc.mutex.RUnlock()
entry, exists := bkc.cache[contextHash]
if !exists {
return nil, false
}
// Check if entry has expired
if time.Now().After(entry.ExpiresAt) {
return nil, false
}
return entry, true
}
// Set stores a KMS data key in the cache
func (bkc *BucketKMSCache) Set(contextHash, keyID string, dataKey interface{}, ttl time.Duration) {
if bkc == nil {
return
}
bkc.mutex.Lock()
defer bkc.mutex.Unlock()
bkc.cache[contextHash] = &BucketKMSCacheEntry{
DataKey: dataKey,
ExpiresAt: time.Now().Add(ttl),
KeyID: keyID,
ContextHash: contextHash,
}
bkc.lastTTL = ttl
}
// CleanupExpired removes expired entries from the cache
func (bkc *BucketKMSCache) CleanupExpired() int {
if bkc == nil {
return 0
}
bkc.mutex.Lock()
defer bkc.mutex.Unlock()
now := time.Now()
expiredCount := 0
for key, entry := range bkc.cache {
if now.After(entry.ExpiresAt) {
// Clear sensitive data before removing from cache
bkc.clearSensitiveData(entry)
delete(bkc.cache, key)
expiredCount++
}
}
return expiredCount
}
// Size returns the current number of cached entries
func (bkc *BucketKMSCache) Size() int {
if bkc == nil {
return 0
}
bkc.mutex.RLock()
defer bkc.mutex.RUnlock()
return len(bkc.cache)
}
// clearSensitiveData securely clears sensitive data from a cache entry
func (bkc *BucketKMSCache) clearSensitiveData(entry *BucketKMSCacheEntry) {
if dataKeyResp, ok := entry.DataKey.(*kms.GenerateDataKeyResponse); ok {
// Zero out the plaintext data key to prevent it from lingering in memory
if dataKeyResp.Plaintext != nil {
for i := range dataKeyResp.Plaintext {
dataKeyResp.Plaintext[i] = 0
}
dataKeyResp.Plaintext = nil
}
}
}
// Clear clears all cached KMS entries, securely zeroing sensitive data first
func (bkc *BucketKMSCache) Clear() {
if bkc == nil {
return
}
bkc.mutex.Lock()
defer bkc.mutex.Unlock()
// Clear sensitive data from all entries before deletion
for _, entry := range bkc.cache {
bkc.clearSensitiveData(entry)
}
// Clear the cache map
bkc.cache = make(map[string]*BucketKMSCacheEntry)
}
// BucketConfigCache provides caching for bucket configurations
// Cache entries are automatically updated/invalidated through metadata subscription events,
// so TTL serves as a safety fallback rather than the primary consistency mechanism
type BucketConfigCache struct {
cache map[string]*BucketConfig
negativeCache map[string]time.Time // Cache for non-existent buckets
mutex sync.RWMutex
ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events
negativeTTL time.Duration // TTL for negative cache entries
}
// BucketMetadata represents the complete metadata for a bucket
type BucketMetadata struct {
Tags map[string]string `json:"tags,omitempty"`
CORS *cors.CORSConfiguration `json:"cors,omitempty"`
Encryption *s3_pb.EncryptionConfiguration `json:"encryption,omitempty"`
// Future extensions can be added here:
// Versioning *s3_pb.VersioningConfiguration `json:"versioning,omitempty"`
// Lifecycle *s3_pb.LifecycleConfiguration `json:"lifecycle,omitempty"`
// Notification *s3_pb.NotificationConfiguration `json:"notification,omitempty"`
// Replication *s3_pb.ReplicationConfiguration `json:"replication,omitempty"`
// Analytics *s3_pb.AnalyticsConfiguration `json:"analytics,omitempty"`
// Logging *s3_pb.LoggingConfiguration `json:"logging,omitempty"`
// Website *s3_pb.WebsiteConfiguration `json:"website,omitempty"`
// RequestPayer *s3_pb.RequestPayerConfiguration `json:"requestPayer,omitempty"`
// PublicAccess *s3_pb.PublicAccessConfiguration `json:"publicAccess,omitempty"`
}
// NewBucketMetadata creates a new BucketMetadata with default values
func NewBucketMetadata() *BucketMetadata {
return &BucketMetadata{
Tags: make(map[string]string),
}
}
// IsEmpty returns true if the metadata has no configuration set
func (bm *BucketMetadata) IsEmpty() bool {
return len(bm.Tags) == 0 && bm.CORS == nil && bm.Encryption == nil
}
// HasEncryption returns true if bucket has encryption configuration
func (bm *BucketMetadata) HasEncryption() bool {
return bm.Encryption != nil
}
// HasCORS returns true if bucket has CORS configuration
func (bm *BucketMetadata) HasCORS() bool {
return bm.CORS != nil
}
// HasTags returns true if bucket has tags
func (bm *BucketMetadata) HasTags() bool {
return len(bm.Tags) > 0
}
// NewBucketConfigCache creates a new bucket configuration cache
// TTL can be set to a longer duration since cache consistency is maintained
// through real-time metadata subscription events rather than TTL expiration
func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache {
negativeTTL := ttl / 4 // Negative cache TTL is shorter than positive cache
if negativeTTL < 30*time.Second {
negativeTTL = 30 * time.Second // Minimum 30 seconds for negative cache
}
return &BucketConfigCache{
cache: make(map[string]*BucketConfig),
negativeCache: make(map[string]time.Time),
ttl: ttl,
negativeTTL: negativeTTL,
}
}
// Get retrieves bucket configuration from cache
func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) {
bcc.mutex.RLock()
defer bcc.mutex.RUnlock()
config, exists := bcc.cache[bucket]
if !exists {
return nil, false
}
// Check if cache entry is expired (safety fallback; entries are normally updated via events)
if time.Since(config.LastModified) > bcc.ttl {
return nil, false
}
return config, true
}
// Set stores bucket configuration in cache
func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
config.LastModified = time.Now()
bcc.cache[bucket] = config
}
// Remove removes bucket configuration from cache
func (bcc *BucketConfigCache) Remove(bucket string) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
delete(bcc.cache, bucket)
}
// Clear clears all cached configurations
func (bcc *BucketConfigCache) Clear() {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
bcc.cache = make(map[string]*BucketConfig)
bcc.negativeCache = make(map[string]time.Time)
}
// IsNegativelyCached checks if a bucket is in the negative cache (doesn't exist)
func (bcc *BucketConfigCache) IsNegativelyCached(bucket string) bool {
bcc.mutex.RLock()
defer bcc.mutex.RUnlock()
if cachedTime, exists := bcc.negativeCache[bucket]; exists {
// Check if the negative cache entry is still valid
if time.Since(cachedTime) < bcc.negativeTTL {
return true
}
// Entry expired, remove it
delete(bcc.negativeCache, bucket)
}
return false
}
// SetNegativeCache marks a bucket as non-existent in the negative cache
func (bcc *BucketConfigCache) SetNegativeCache(bucket string) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
bcc.negativeCache[bucket] = time.Now()
}
// RemoveNegativeCache removes a bucket from the negative cache
func (bcc *BucketConfigCache) RemoveNegativeCache(bucket string) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
delete(bcc.negativeCache, bucket)
}
// getBucketConfig retrieves bucket configuration with caching
func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) {
// Check negative cache first
if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
return nil, s3err.ErrNoSuchBucket
}
// Try positive cache
if config, found := s3a.bucketConfigCache.Get(bucket); found {
return config, s3err.ErrNone
}
// Try to get from filer
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
// Bucket doesn't exist - set negative cache
s3a.bucketConfigCache.SetNegativeCache(bucket)
return nil, s3err.ErrNoSuchBucket
}
glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
return nil, s3err.ErrInternalError
}
config := &BucketConfig{
Name: bucket,
Entry: entry,
IsPublicRead: false, // Explicitly default to false for private buckets
}
// Extract configuration from extended attributes
if entry.Extended != nil {
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
config.Versioning = string(versioning)
}
if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
config.Ownership = string(ownership)
}
if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
config.ACL = acl
// Parse ACL once and cache public-read status
config.IsPublicRead = parseAndCachePublicReadStatus(acl)
} else {
// No ACL means private bucket
config.IsPublicRead = false
}
if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
config.Owner = string(owner)
}
// Parse Object Lock configuration if present
if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
config.ObjectLockConfig = objectLockConfig
glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
}
}
// Load CORS configuration from bucket directory content
if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
// Missing metadata is not an error; fall back cleanly
glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
} else {
// Log parsing or validation errors
glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err)
}
} else {
config.CORS = corsConfig
}
// Cache the result
s3a.bucketConfigCache.Set(bucket, config)
return config, s3err.ErrNone
}
// updateBucketConfig updates bucket configuration and invalidates cache
func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return errCode
}
// Apply update function
if err := updateFn(config); err != nil {
glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Prepare extended attributes
if config.Entry.Extended == nil {
config.Entry.Extended = make(map[string][]byte)
}
// Update extended attributes
if config.Versioning != "" {
config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning)
}
if config.Ownership != "" {
config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership)
}
if config.ACL != nil {
config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL
}
if config.Owner != "" {
config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
}
// Update Object Lock configuration
if config.ObjectLockConfig != nil {
if err := StoreObjectLockConfigurationInExtended(config.Entry, config.ObjectLockConfig); err != nil {
glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
}
// Save to filer
err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)
if err != nil {
glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Update cache
s3a.bucketConfigCache.Set(bucket, config)
return s3err.ErrNone
}
// isVersioningEnabled checks if versioning is enabled for a bucket (with caching)
func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return false, filer_pb.ErrNotFound
}
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
}
// Versioning is enabled if explicitly set to "Enabled" OR if object lock is enabled
// (since object lock requires versioning to be enabled)
return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil
}
// isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended)
func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return false, filer_pb.ErrNotFound
}
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
}
// Versioning is configured if explicitly set to either "Enabled" or "Suspended"
// OR if object lock is enabled (which forces versioning)
return config.Versioning != "" || config.ObjectLockConfig != nil, nil
}
// getVersioningState returns the detailed versioning state for a bucket
func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return "", nil
}
return "", fmt.Errorf("failed to get bucket config: %v", errCode)
}
// If object lock is enabled, versioning must be enabled regardless of explicit setting
if config.ObjectLockConfig != nil {
return s3_constants.VersioningEnabled, nil
}
// Return the explicit versioning status (empty string means never configured)
return config.Versioning, nil
}
// getBucketVersioningStatus returns the versioning status for a bucket
func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return "", errCode
}
// Return exactly what's stored - empty string means versioning was never configured
// This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response
return config.Versioning, s3err.ErrNone
}
// setBucketVersioningStatus sets the versioning status for a bucket
func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode {
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
config.Versioning = status
return nil
})
}
// getBucketOwnership returns the ownership setting for a bucket
func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return "", errCode
}
return config.Ownership, s3err.ErrNone
}
// setBucketOwnership sets the ownership setting for a bucket
func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode {
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
config.Ownership = ownership
return nil
})
}
// loadCORSFromBucketContent loads CORS configuration from bucket directory content
func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
metadata, err := s3a.GetBucketMetadata(bucket)
if err != nil {
return nil, err
}
// Note: corsConfig can be nil if no CORS configuration is set, which is valid
return metadata.CORS, nil
}
// getCORSConfiguration retrieves CORS configuration with caching
func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfiguration, s3err.ErrorCode) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return nil, errCode
}
return config.CORS, s3err.ErrNone
}
// updateCORSConfiguration updates the CORS configuration for a bucket
func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode {
// Update using structured API
err := s3a.UpdateBucketCORS(bucket, corsConfig)
if err != nil {
glog.Errorf("updateCORSConfiguration: failed to update CORS config for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Cache will be updated automatically via metadata subscription
return s3err.ErrNone
}
// removeCORSConfiguration removes the CORS configuration for a bucket
func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode {
// Update using structured API
err := s3a.ClearBucketCORS(bucket)
if err != nil {
glog.Errorf("removeCORSConfiguration: failed to remove CORS config for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Cache will be updated automatically via metadata subscription
return s3err.ErrNone
}
// Conversion functions between CORS types and protobuf types
// corsRuleToProto converts a CORS rule to protobuf format
func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule {
return &s3_pb.CORSRule{
AllowedHeaders: rule.AllowedHeaders,
AllowedMethods: rule.AllowedMethods,
AllowedOrigins: rule.AllowedOrigins,
ExposeHeaders: rule.ExposeHeaders,
MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)),
Id: rule.ID,
}
}
// corsRuleFromProto converts a protobuf CORS rule to standard format
func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule {
var maxAge *int
// Always create the pointer if MaxAgeSeconds is >= 0
// This prevents nil pointer dereferences in tests and matches AWS behavior
if protoRule.MaxAgeSeconds >= 0 {
age := int(protoRule.MaxAgeSeconds)
maxAge = &age
}
// Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value
return cors.CORSRule{
AllowedHeaders: protoRule.AllowedHeaders,
AllowedMethods: protoRule.AllowedMethods,
AllowedOrigins: protoRule.AllowedOrigins,
ExposeHeaders: protoRule.ExposeHeaders,
MaxAgeSeconds: maxAge,
ID: protoRule.Id,
}
}
// corsConfigToProto converts CORS configuration to protobuf format
func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration {
if config == nil {
return nil
}
protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules))
for i, rule := range config.CORSRules {
protoRules[i] = corsRuleToProto(rule)
}
return &s3_pb.CORSConfiguration{
CorsRules: protoRules,
}
}
// corsConfigFromProto converts protobuf CORS configuration to standard format
func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration {
if protoConfig == nil {
return nil
}
rules := make([]cors.CORSRule, len(protoConfig.CorsRules))
for i, protoRule := range protoConfig.CorsRules {
rules[i] = corsRuleFromProto(protoRule)
}
return &cors.CORSConfiguration{
CORSRules: rules,
}
}
// getMaxAgeSecondsValue safely extracts max age seconds value
func getMaxAgeSecondsValue(maxAge *int) int {
if maxAge == nil {
return 0
}
return *maxAge
}
// parseAndCachePublicReadStatus parses the ACL and caches the public-read status
func parseAndCachePublicReadStatus(acl []byte) bool {
var grants []*s3.Grant
if err := json.Unmarshal(acl, &grants); err != nil {
return false
}
// Check if any grant gives read permission to "AllUsers" group
for _, grant := range grants {
if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
// Check for AllUsers group with Read permission
if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
(*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
return true
}
}
}
return false
}
// getBucketMetadata retrieves bucket metadata as a structured object with caching
func (s3a *S3ApiServer) getBucketMetadata(bucket string) (*BucketMetadata, error) {
if s3a.bucketConfigCache != nil {
// Check negative cache first
if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
return nil, fmt.Errorf("bucket directory not found %s", bucket)
}
// Try to get from positive cache
if config, found := s3a.bucketConfigCache.Get(bucket); found {
// Extract metadata from cached config
if metadata, err := s3a.extractMetadataFromConfig(config); err == nil {
return metadata, nil
}
// If extraction fails, fall through to direct load
}
}
// Load directly from filer
return s3a.loadBucketMetadataFromFiler(bucket)
}
// extractMetadataFromConfig extracts BucketMetadata from cached BucketConfig
func (s3a *S3ApiServer) extractMetadataFromConfig(config *BucketConfig) (*BucketMetadata, error) {
if config == nil || config.Entry == nil {
return NewBucketMetadata(), nil
}
// Parse metadata from entry content if available
if len(config.Entry.Content) > 0 {
var protoMetadata s3_pb.BucketMetadata
if err := proto.Unmarshal(config.Entry.Content, &protoMetadata); err != nil {
glog.Errorf("extractMetadataFromConfig: failed to unmarshal protobuf metadata for bucket %s: %v", config.Name, err)
return nil, err
}
// Convert protobuf to structured metadata
metadata := &BucketMetadata{
Tags: protoMetadata.Tags,
CORS: corsConfigFromProto(protoMetadata.Cors),
Encryption: protoMetadata.Encryption,
}
return metadata, nil
}
// Fallback: create metadata from cached CORS config
metadata := NewBucketMetadata()
if config.CORS != nil {
metadata.CORS = config.CORS
}
return metadata, nil
}
// loadBucketMetadataFromFiler loads bucket metadata directly from the filer
func (s3a *S3ApiServer) loadBucketMetadataFromFiler(bucket string) (*BucketMetadata, error) {
// Validate bucket name to prevent path traversal attacks
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
return nil, fmt.Errorf("invalid bucket name: %s", bucket)
}
// Clean the bucket name further to prevent any potential path traversal
bucket = filepath.Clean(bucket)
if bucket == "." || bucket == ".." {
return nil, fmt.Errorf("invalid bucket name: %s", bucket)
}
// Get bucket directory entry to access its content
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil {
// Check if this is a "not found" error
if errors.Is(err, filer_pb.ErrNotFound) {
// Set negative cache for non-existent bucket
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.SetNegativeCache(bucket)
}
}
return nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
}
if entry == nil {
// Set negative cache for non-existent bucket
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.SetNegativeCache(bucket)
}
return nil, fmt.Errorf("bucket directory not found %s", bucket)
}
// If no content, return empty metadata
if len(entry.Content) == 0 {
return NewBucketMetadata(), nil
}
// Unmarshal metadata from protobuf
var protoMetadata s3_pb.BucketMetadata
if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil {
glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err)
return nil, fmt.Errorf("failed to unmarshal bucket metadata for %s: %w", bucket, err)
}
// Convert protobuf CORS to standard CORS
corsConfig := corsConfigFromProto(protoMetadata.Cors)
// Create and return structured metadata
metadata := &BucketMetadata{
Tags: protoMetadata.Tags,
CORS: corsConfig,
Encryption: protoMetadata.Encryption,
}
return metadata, nil
}
// setBucketMetadata stores bucket metadata from a structured object
func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadata) error {
// Validate bucket name to prevent path traversal attacks
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
return fmt.Errorf("invalid bucket name: %s", bucket)
}
// Clean the bucket name further to prevent any potential path traversal
bucket = filepath.Clean(bucket)
if bucket == "." || bucket == ".." {
return fmt.Errorf("invalid bucket name: %s", bucket)
}
// Default to empty metadata if nil
if metadata == nil {
metadata = NewBucketMetadata()
}
// Create protobuf metadata
protoMetadata := &s3_pb.BucketMetadata{
Tags: metadata.Tags,
Cors: corsConfigToProto(metadata.CORS),
Encryption: metadata.Encryption,
}
// Marshal metadata to protobuf
metadataBytes, err := proto.Marshal(protoMetadata)
if err != nil {
return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
}
// Update the bucket entry with new content
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Get current bucket entry
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil {
return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
}
if entry == nil {
return fmt.Errorf("bucket directory not found %s", bucket)
}
// Update content with metadata
entry.Content = metadataBytes
request := &filer_pb.UpdateEntryRequest{
Directory: s3a.option.BucketsPath,
Entry: entry,
}
_, err = client.UpdateEntry(context.Background(), request)
return err
})
// Invalidate cache after successful update
if err == nil && s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.Remove(bucket)
s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Remove from negative cache too
}
return err
}
// New structured API functions using BucketMetadata
// GetBucketMetadata retrieves complete bucket metadata as a structured object
func (s3a *S3ApiServer) GetBucketMetadata(bucket string) (*BucketMetadata, error) {
return s3a.getBucketMetadata(bucket)
}
// SetBucketMetadata stores complete bucket metadata from a structured object
func (s3a *S3ApiServer) SetBucketMetadata(bucket string, metadata *BucketMetadata) error {
return s3a.setBucketMetadata(bucket, metadata)
}
// UpdateBucketMetadata updates specific parts of bucket metadata while preserving others
//
// DISTRIBUTED SYSTEM DESIGN NOTE:
// This function implements a read-modify-write pattern with "last write wins" semantics.
// In the rare case of concurrent updates to different parts of bucket metadata
// (e.g., simultaneous tag and CORS updates), the last write may overwrite previous changes.
//
// This is an acceptable trade-off because:
// 1. Bucket metadata updates are infrequent in typical S3 usage
// 2. Traditional locking doesn't work in distributed systems across multiple nodes
// 3. The complexity of distributed consensus (e.g., Raft) for metadata updates would
// be disproportionate to the low frequency of bucket configuration changes
// 4. Most bucket operations (tags, CORS, encryption) are typically configured once
// during setup rather than being frequently modified
//
// If stronger consistency is required, consider implementing optimistic concurrency
// control with version numbers or ETags at the storage layer.
func (s3a *S3ApiServer) UpdateBucketMetadata(bucket string, update func(*BucketMetadata) error) error {
// Get current metadata
metadata, err := s3a.GetBucketMetadata(bucket)
if err != nil {
return fmt.Errorf("failed to get current bucket metadata: %w", err)
}
// Apply update function
if err := update(metadata); err != nil {
return fmt.Errorf("failed to apply metadata update: %w", err)
}
// Store updated metadata (last write wins)
return s3a.SetBucketMetadata(bucket, metadata)
}
// Helper functions for specific metadata operations using structured API
// UpdateBucketTags sets bucket tags using the structured API
func (s3a *S3ApiServer) UpdateBucketTags(bucket string, tags map[string]string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Tags = tags
return nil
})
}
// UpdateBucketCORS sets bucket CORS configuration using the structured API
func (s3a *S3ApiServer) UpdateBucketCORS(bucket string, corsConfig *cors.CORSConfiguration) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.CORS = corsConfig
return nil
})
}
// UpdateBucketEncryption sets bucket encryption configuration using the structured API
func (s3a *S3ApiServer) UpdateBucketEncryption(bucket string, encryptionConfig *s3_pb.EncryptionConfiguration) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Encryption = encryptionConfig
return nil
})
}
// ClearBucketTags removes all bucket tags using the structured API
func (s3a *S3ApiServer) ClearBucketTags(bucket string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Tags = make(map[string]string)
return nil
})
}
// ClearBucketCORS removes bucket CORS configuration using the structured API
func (s3a *S3ApiServer) ClearBucketCORS(bucket string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.CORS = nil
return nil
})
}
// ClearBucketEncryption removes bucket encryption configuration using the structured API
func (s3a *S3ApiServer) ClearBucketEncryption(bucket string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Encryption = nil
return nil
})
}