|
|
|
@ -1,6 +1,9 @@ |
|
|
|
package s3api |
|
|
|
|
|
|
|
import ( |
|
|
|
"encoding/xml" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
@ -80,12 +83,79 @@ func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, conte |
|
|
|
func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { |
|
|
|
if dir == s3a.option.BucketsPath { |
|
|
|
if newEntry != nil { |
|
|
|
// Update bucket registry (existing functionality)
|
|
|
|
s3a.bucketRegistry.LoadBucketMetadata(newEntry) |
|
|
|
glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry) |
|
|
|
} else { |
|
|
|
glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry.Name) |
|
|
|
|
|
|
|
// Update bucket configuration cache with new entry
|
|
|
|
s3a.updateBucketConfigCacheFromEntry(newEntry) |
|
|
|
} else if oldEntry != nil { |
|
|
|
// Remove from bucket registry (existing functionality)
|
|
|
|
s3a.bucketRegistry.RemoveBucketMetadata(oldEntry) |
|
|
|
glog.V(0).Infof("remove bucketMetadata %s/%s", dir, newEntry) |
|
|
|
glog.V(0).Infof("remove bucketMetadata %s/%s", dir, oldEntry.Name) |
|
|
|
|
|
|
|
// Remove from bucket configuration cache
|
|
|
|
s3a.invalidateBucketConfigCache(oldEntry.Name) |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// updateBucketConfigCacheFromEntry updates the bucket config cache when a bucket entry changes
|
|
|
|
func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry) { |
|
|
|
if s3a.bucketConfigCache == nil { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
bucket := entry.Name |
|
|
|
glog.V(2).Infof("updateBucketConfigCacheFromEntry: updating cache for bucket %s", bucket) |
|
|
|
|
|
|
|
// Create new bucket config from the entry
|
|
|
|
config := &BucketConfig{ |
|
|
|
Name: bucket, |
|
|
|
Entry: entry, |
|
|
|
} |
|
|
|
|
|
|
|
// Extract configuration from extended attributes
|
|
|
|
if entry.Extended != nil { |
|
|
|
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists { |
|
|
|
config.Versioning = string(versioning) |
|
|
|
} |
|
|
|
if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists { |
|
|
|
config.Ownership = string(ownership) |
|
|
|
} |
|
|
|
if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists { |
|
|
|
config.ACL = acl |
|
|
|
} |
|
|
|
if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists { |
|
|
|
config.Owner = string(owner) |
|
|
|
} |
|
|
|
// Parse Object Lock configuration if present
|
|
|
|
if objectLockConfigXML, exists := entry.Extended[s3_constants.ExtObjectLockConfigKey]; exists { |
|
|
|
var objectLockConfig ObjectLockConfiguration |
|
|
|
if err := xml.Unmarshal(objectLockConfigXML, &objectLockConfig); err != nil { |
|
|
|
glog.Errorf("updateBucketConfigCacheFromEntry: failed to parse Object Lock configuration for bucket %s: %v", bucket, err) |
|
|
|
} else { |
|
|
|
config.ObjectLockConfig = &objectLockConfig |
|
|
|
glog.V(2).Infof("updateBucketConfigCacheFromEntry: cached Object Lock configuration for bucket %s", bucket) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Update timestamp
|
|
|
|
config.LastModified = time.Now() |
|
|
|
|
|
|
|
// Update cache
|
|
|
|
s3a.bucketConfigCache.Set(bucket, config) |
|
|
|
glog.V(2).Infof("updateBucketConfigCacheFromEntry: updated bucket config cache for %s", bucket) |
|
|
|
} |
|
|
|
|
|
|
|
// invalidateBucketConfigCache removes a bucket from the configuration cache
|
|
|
|
func (s3a *S3ApiServer) invalidateBucketConfigCache(bucket string) { |
|
|
|
if s3a.bucketConfigCache == nil { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
s3a.bucketConfigCache.Remove(bucket) |
|
|
|
glog.V(2).Infof("invalidateBucketConfigCache: removed bucket %s from cache", bucket) |
|
|
|
} |