Browse Source

bucket re-creation error handling

Different Owner: Always fails with 409 BucketAlreadyExists
Checks the s3-identity-id header against the stored bucket owner
Returns error immediately if owners don't match
Same Owner + Conflicting Settings: Fails with 409 BucketAlreadyExists
Compares requested Object Lock settings with existing bucket configuration
Returns error if settings are incompatible (e.g., trying to enable Object Lock on a bucket that doesn't have it)
Same Owner + Compatible Settings: Returns 200 OK (idempotent)
If the bucket already exists with the same owner and compatible settings
Returns success response without recreating the bucket
pull/7231/head
chrislu 2 months ago
parent
commit
b0392731b7
  1. 64
      weed/s3api/s3api_bucket_handlers.go

64
weed/s3api/s3api_bucket_handlers.go

@ -108,8 +108,11 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
return
}
// avoid duplicated buckets
errCode := s3err.ErrNone
// Check if bucket already exists and handle ownership/settings
currentIdentityId := r.Header.Get(s3_constants.AmzIdentityId)
// Check collection existence first
collectionExists := false
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{
IncludeEcVolumes: true,
@ -120,7 +123,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
} else {
for _, c := range resp.Collections {
if s3a.getCollectionName(bucket) == c.Name {
errCode = s3err.ErrBucketAlreadyExists
collectionExists = true
break
}
}
@ -130,11 +133,60 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Check bucket directory existence and get metadata
if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist {
errCode = s3err.ErrBucketAlreadyExists
// Bucket exists, check ownership and settings
if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); err == nil {
// Get existing bucket owner
var existingOwnerId string
if entry.Extended != nil {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
existingOwnerId = string(id)
}
}
// Check ownership
if existingOwnerId != "" && existingOwnerId != currentIdentityId {
// Different owner - always fail with BucketAlreadyExists
glog.V(3).Infof("PutBucketHandler: bucket %s owned by %s, requested by %s", bucket, existingOwnerId, currentIdentityId)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}
// Same owner or no owner set - check for conflicting settings
objectLockRequested := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true")
// Get current bucket configuration
bucketConfig, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
glog.Errorf("PutBucketHandler: failed to get bucket config for %s: %v", bucket, errCode)
// If we can't get config, assume no conflict and allow recreation
} else {
// Check for Object Lock conflict
currentObjectLockEnabled := bucketConfig.ObjectLockConfig != nil &&
bucketConfig.ObjectLockConfig.ObjectLockEnabled == s3_constants.ObjectLockEnabled
if objectLockRequested != currentObjectLockEnabled {
// Conflicting Object Lock settings - fail with BucketAlreadyExists
glog.V(3).Infof("PutBucketHandler: bucket %s has conflicting Object Lock settings (requested: %v, current: %v)",
bucket, objectLockRequested, currentObjectLockEnabled)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}
}
// Same owner, compatible settings - return success (idempotent)
glog.V(3).Infof("PutBucketHandler: bucket %s already exists with compatible settings, returning success", bucket)
writeSuccessResponseXML(w, r, CreateBucketConfiguration{})
return
}
}
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
// If collection exists but bucket directory doesn't, this is an inconsistent state
if collectionExists {
glog.Errorf("PutBucketHandler: collection exists but bucket directory missing for %s", bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}

Loading…
Cancel
Save