Browse Source

s3: optimize DELETE by skipping lock check for buckets without Object Lock

This optimization avoids an expensive filer gRPC call for every DELETE
operation on buckets that don't have Object Lock enabled.

Before this change, enforceObjectLockProtections() would always call
getObjectEntry() to fetch object metadata to check for retention/legal
hold, even for buckets that never had Object Lock configured.

Changes:
1. Add early return in enforceObjectLockProtections() if bucket has no
   Object Lock config or bucket doesn't exist
2. Add isObjectLockEnabled() helper function to check if a bucket has
   Object Lock configured
3. Fix validateObjectLockHeaders() to check ObjectLockConfig instead of
   just versioningEnabled - this ensures object-lock headers are properly
   rejected on buckets without Object Lock enabled, which aligns with
   AWS S3 semantics
4. Make bucket creation with Object Lock atomic - set Object Lock config
   in the same CreateEntry call as bucket creation, preventing race
   conditions where bucket exists without Object Lock enabled
5. Properly handle Object Lock setup failures during bucket creation -
   if StoreObjectLockConfigurationInExtended fails, roll back the bucket
   creation and return an error instead of leaving a bucket without
   the requested Object Lock configuration

This significantly improves DELETE latency for non-Object-Lock buckets,
which is the common case (lockCheck time reduced from 1-10ms to ~1µs).
pull/7642/head
chrislu 4 days ago
parent
commit
6863a4e801
  1. 2
      test/s3/retention/s3_object_lock_headers_test.go
  2. 13
      test/s3/retention/s3_retention_test.go
  3. 13
      weed/s3api/s3api_bucket_config.go
  4. 78
      weed/s3api/s3api_bucket_handlers.go
  5. 1
      weed/s3api/s3api_object_handlers_delete.go
  6. 30
      weed/s3api/s3api_object_handlers_put.go
  7. 18
      weed/s3api/s3api_object_retention.go

2
test/s3/retention/s3_object_lock_headers_test.go

@ -236,7 +236,7 @@ func TestObjectLockHeadersNonVersionedBucket(t *testing.T) {
bucketName := getNewBucketName()
// Create regular bucket without object lock/versioning
createBucket(t, client, bucketName)
createBucketWithoutObjectLock(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
key := "test-non-versioned"

13
test/s3/retention/s3_retention_test.go

@ -69,8 +69,19 @@ func getNewBucketName() string {
return fmt.Sprintf("%s%d", defaultConfig.BucketPrefix, timestamp)
}
// createBucket creates a new bucket for testing
// createBucket creates a new bucket for testing with Object Lock enabled
// Object Lock is required for retention and legal hold functionality per AWS S3 specification
func createBucket(t *testing.T, client *s3.Client, bucketName string) {
_, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
ObjectLockEnabledForBucket: aws.Bool(true),
})
require.NoError(t, err)
}
// createBucketWithoutObjectLock creates a new bucket without Object Lock enabled
// Use this only for tests that specifically need to verify non-Object-Lock bucket behavior
func createBucketWithoutObjectLock(t *testing.T, client *s3.Client, bucketName string) {
_, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})

13
weed/s3api/s3api_bucket_config.go

@ -514,6 +514,19 @@ func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
return config.Versioning != "" || config.ObjectLockConfig != nil, nil
}
// isObjectLockEnabled checks if Object Lock is enabled for a bucket (with caching)
func (s3a *S3ApiServer) isObjectLockEnabled(bucket string) (bool, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return false, filer_pb.ErrNotFound
}
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
}
return config.ObjectLockConfig != nil, nil
}
// getVersioningState returns the detailed versioning state for a bucket
func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
config, errCode := s3a.getBucketConfig(bucket)

78
weed/s3api/s3api_bucket_handlers.go

@ -244,46 +244,64 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
return
}
// create the folder for bucket, but lazily create actual collection
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, setBucketOwner(r)); err != nil {
glog.Errorf("PutBucketHandler mkdir: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Check for x-amz-bucket-object-lock-enabled header BEFORE creating bucket
// This allows us to create the bucket with Object Lock configuration atomically
objectLockEnabled := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true")
// Remove bucket from negative cache after successful creation
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
}
// Capture any Object Lock configuration error from within the callback
// The mkdir callback doesn't support returning errors, so we capture it here
var objectLockSetupError error
// Check for x-amz-bucket-object-lock-enabled header (S3 standard compliance)
if objectLockHeaderValue := r.Header.Get(s3_constants.AmzBucketObjectLockEnabled); strings.EqualFold(objectLockHeaderValue, "true") {
glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s due to x-amz-bucket-object-lock-enabled header", bucket)
// Create the folder for bucket with all settings atomically
// This ensures Object Lock configuration is set in the same CreateEntry call,
// preventing race conditions where the bucket exists without Object Lock enabled
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, func(entry *filer_pb.Entry) {
// Set bucket owner
setBucketOwner(r)(entry)
// Set Object Lock configuration atomically during bucket creation
if objectLockEnabled {
glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s atomically", bucket)
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
// Atomically update the configuration of the specified bucket. See the updateBucketConfig
// function definition for detailed documentation on parameters and behavior.
errCode := s3a.updateBucketConfig(bucket, func(bucketConfig *BucketConfig) error {
// Enable versioning (required for Object Lock)
bucketConfig.Versioning = s3_constants.VersioningEnabled
entry.Extended[s3_constants.ExtVersioningKey] = []byte(s3_constants.VersioningEnabled)
// Create basic Object Lock configuration (enabled without default retention)
// Create and store Object Lock configuration
objectLockConfig := &ObjectLockConfiguration{
ObjectLockEnabled: s3_constants.ObjectLockEnabled,
}
if err := StoreObjectLockConfigurationInExtended(entry, objectLockConfig); err != nil {
glog.Errorf("PutBucketHandler: failed to store Object Lock config for bucket %s: %v", bucket, err)
objectLockSetupError = err
// Note: The entry will still be created, but we'll roll it back below
} else {
glog.V(3).Infof("PutBucketHandler: set ObjectLockConfig for bucket %s: %+v", bucket, objectLockConfig)
}
}
}); err != nil {
glog.Errorf("PutBucketHandler mkdir: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Set the cached Object Lock configuration
bucketConfig.ObjectLockConfig = objectLockConfig
glog.V(3).Infof("PutBucketHandler: set ObjectLockConfig for bucket %s: %+v", bucket, objectLockConfig)
return nil
})
if errCode != s3err.ErrNone {
glog.Errorf("PutBucketHandler: failed to enable Object Lock for bucket %s: %v", bucket, errCode)
s3err.WriteErrorResponse(w, r, errCode)
return
// If Object Lock setup failed, roll back the bucket creation
// This ensures we don't leave a bucket without the requested Object Lock configuration
if objectLockSetupError != nil {
glog.Errorf("PutBucketHandler: rolling back bucket %s creation due to Object Lock setup failure: %v", bucket, objectLockSetupError)
if deleteErr := s3a.rm(s3a.option.BucketsPath, bucket, true, true); deleteErr != nil {
glog.Errorf("PutBucketHandler: failed to rollback bucket %s after Object Lock setup failure: %v", bucket, deleteErr)
}
glog.V(3).Infof("PutBucketHandler: enabled Object Lock and Versioning for bucket %s", bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Remove bucket from negative cache after successful creation
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
}
w.Header().Set("Location", "/"+bucket)

1
weed/s3api/s3api_object_handlers_delete.go

@ -129,6 +129,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return

30
weed/s3api/s3api_object_handlers_put.go

@ -30,14 +30,14 @@ import (
// Object lock validation errors
var (
ErrObjectLockVersioningRequired = errors.New("object lock headers can only be used on versioned buckets")
ErrObjectLockVersioningRequired = errors.New("object lock headers can only be used on buckets with Object Lock enabled")
ErrInvalidObjectLockMode = errors.New("invalid object lock mode")
ErrInvalidLegalHoldStatus = errors.New("invalid legal hold status")
ErrInvalidRetentionDateFormat = errors.New("invalid retention until date format")
ErrRetentionDateMustBeFuture = errors.New("retain until date must be in the future")
ErrObjectLockModeRequiresDate = errors.New("object lock mode requires retention until date")
ErrRetentionDateRequiresMode = errors.New("retention until date requires object lock mode")
ErrGovernanceBypassVersioningRequired = errors.New("governance bypass header can only be used on versioned buckets")
ErrGovernanceBypassVersioningRequired = errors.New("governance bypass header can only be used on buckets with Object Lock enabled")
ErrInvalidObjectLockDuration = errors.New("object lock duration must be greater than 0 days")
ErrObjectLockDurationExceeded = errors.New("object lock duration exceeds maximum allowed days")
ErrObjectLockConfigurationMissingEnabled = errors.New("object lock configuration must specify ObjectLockEnabled")
@ -159,8 +159,16 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
// Check if Object Lock is enabled for this bucket
objectLockEnabled, err := s3a.isObjectLockEnabled(bucket)
if err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
glog.Errorf("Error checking Object Lock status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
if err := s3a.validateObjectLockHeaders(r, objectLockEnabled); err != nil {
glog.V(2).Infof("PutObjectHandler: object lock header validation failed for bucket %s, object %s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, mapValidationErrorToS3Error(err))
return
@ -1311,7 +1319,8 @@ func (s3a *S3ApiServer) applyBucketDefaultRetention(bucket string, entry *filer_
}
// validateObjectLockHeaders validates object lock headers in PUT requests
func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEnabled bool) error {
// objectLockEnabled should be true only if the bucket has Object Lock configured
func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, objectLockEnabled bool) error {
// Extract object lock headers from request
mode := r.Header.Get(s3_constants.AmzObjectLockMode)
retainUntilDateStr := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
@ -1320,8 +1329,11 @@ func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEna
// Check if any object lock headers are present
hasObjectLockHeaders := mode != "" || retainUntilDateStr != "" || legalHold != ""
// Object lock headers can only be used on versioned buckets
if hasObjectLockHeaders && !versioningEnabled {
// Object lock headers can only be used on buckets with Object Lock enabled
// Per AWS S3: Object Lock can only be enabled at bucket creation, and once enabled,
// objects can have retention/legal-hold metadata. Without Object Lock enabled,
// these headers must be rejected.
if hasObjectLockHeaders && !objectLockEnabled {
return ErrObjectLockVersioningRequired
}
@ -1362,11 +1374,11 @@ func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEna
}
}
// Check for governance bypass header - only valid for versioned buckets
// Check for governance bypass header - only valid for buckets with Object Lock enabled
bypassGovernance := r.Header.Get("x-amz-bypass-governance-retention") == "true"
// Governance bypass headers are only valid for versioned buckets (like object lock headers)
if bypassGovernance && !versioningEnabled {
// Governance bypass headers are only valid for buckets with Object Lock enabled
if bypassGovernance && !objectLockEnabled {
return ErrGovernanceBypassVersioningRequired
}

18
weed/s3api/s3api_object_retention.go

@ -586,10 +586,26 @@ func (s3a *S3ApiServer) evaluateGovernanceBypassRequest(r *http.Request, bucket,
// enforceObjectLockProtections enforces object lock protections for operations
func (s3a *S3ApiServer) enforceObjectLockProtections(request *http.Request, bucket, object, versionId string, governanceBypassAllowed bool) error {
// Quick check: if bucket doesn't have Object Lock enabled, skip the expensive entry lookup
// This optimization avoids a filer gRPC call for every DELETE operation on buckets without Object Lock
objectLockEnabled, err := s3a.isObjectLockEnabled(bucket)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
// Bucket does not exist, so no protections to enforce
return nil
}
// For other errors, we can't determine lock status, so we should fail.
glog.Errorf("enforceObjectLockProtections: failed to check object lock for bucket %s: %v", bucket, err)
return err
}
if !objectLockEnabled {
// Object Lock is not enabled on this bucket, no protections to enforce
return nil
}
// Get the object entry to check both retention and legal hold
// For delete operations without versionId, we need to check the latest version
var entry *filer_pb.Entry
var err error
if versionId != "" {
// Check specific version

Loading…
Cancel
Save