Browse Source

early return

pull/7434/head
chrislu 1 month ago
parent
commit
8084fa9234
  1. 20
      test/s3/retention/s3_bucket_delete_with_lock_test.go
  2. 39
      weed/s3api/s3api_bucket_handlers.go

20
test/s3/retention/s3_bucket_delete_with_lock_test.go

@ -223,14 +223,18 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) {
// Clean up all versions // Clean up all versions
deleteAllObjectVersions(t, client, bucketName) deleteAllObjectVersions(t, client, bucketName)
// Wait for eventual consistency
time.Sleep(500 * time.Millisecond)
// Now delete bucket should succeed
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err, "DeleteBucket should succeed after all locks expire")
// Wait for eventual consistency and attempt to delete the bucket with retry
require.Eventually(t, func() bool {
_, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
t.Logf("Retrying DeleteBucket due to: %v", err)
return false
}
return true
}, 5*time.Second, 500*time.Millisecond, "DeleteBucket should succeed after all locks expire")
t.Logf("Successfully deleted bucket after locks expired") t.Logf("Successfully deleted bucket after locks expired")
} }

39
weed/s3api/s3api_bucket_handlers.go

@ -326,7 +326,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold // hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) { func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
bucketPath := s3a.option.BucketsPath + "/" + bucket bucketPath := s3a.option.BucketsPath + "/" + bucket
// Check all objects including versions for active locks // Check all objects including versions for active locks
// Establish current time once at the start for consistency across the entire scan // Establish current time once at the start for consistency across the entire scan
hasLocks := false hasLocks := false
@ -335,10 +335,15 @@ func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
if err != nil { if err != nil {
return false, fmt.Errorf("error checking for locked objects: %w", err) return false, fmt.Errorf("error checking for locked objects: %w", err)
} }
return hasLocks, nil return hasLocks, nil
} }
const (
// lockCheckPaginationSize is the page size for listing directories during lock checks
lockCheckPaginationSize = 10000
)
// errStopPagination is a sentinel error to signal early termination of pagination // errStopPagination is a sentinel error to signal early termination of pagination
var errStopPagination = errors.New("stop pagination") var errStopPagination = errors.New("stop pagination")
@ -347,19 +352,19 @@ var errStopPagination = errors.New("stop pagination")
func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error { func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error {
startFrom := "" startFrom := ""
for { for {
entries, isLast, err := s3a.list(dir, "", startFrom, false, 10000)
entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize)
if err != nil { if err != nil {
// Fail-safe: propagate error to prevent incorrect bucket deletion // Fail-safe: propagate error to prevent incorrect bucket deletion
return fmt.Errorf("failed to list directory %s: %w", dir, err) return fmt.Errorf("failed to list directory %s: %w", dir, err)
} }
if err := fn(entries); err != nil { if err := fn(entries); err != nil {
if errors.Is(err, errStopPagination) { if errors.Is(err, errStopPagination) {
return nil return nil
} }
return err return err
} }
if isLast || len(entries) == 0 { if isLast || len(entries) == 0 {
break break
} }
@ -376,7 +381,7 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
// Early exit if we've already found a locked object // Early exit if we've already found a locked object
return nil return nil
} }
// Process entries in the current directory with pagination // Process entries in the current directory with pagination
err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error { err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error {
for _, entry := range entries { for _, entry := range entries {
@ -384,12 +389,12 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
// Early exit if we've already found a locked object // Early exit if we've already found a locked object
return errStopPagination return errStopPagination
} }
// Skip multipart uploads folder // Skip multipart uploads folder
if entry.Name == s3_constants.MultipartUploadsFolder { if entry.Name == s3_constants.MultipartUploadsFolder {
continue continue
} }
// If it's a .versions directory, check all version files with pagination // If it's a .versions directory, check all version files with pagination
if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory { if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory {
versionDir := path.Join(dir, entry.Name) versionDir := path.Join(dir, entry.Name)
@ -408,7 +413,7 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
} }
continue continue
} }
// Check regular files for locks // Check regular files for locks
if !entry.IsDirectory { if !entry.IsDirectory {
if s3a.entryHasActiveLock(entry, currentTime) { if s3a.entryHasActiveLock(entry, currentTime) {
@ -418,20 +423,24 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
return errStopPagination return errStopPagination
} }
} }
// Recursively check subdirectories // Recursively check subdirectories
if entry.IsDirectory && !strings.HasSuffix(entry.Name, ".versions") { if entry.IsDirectory && !strings.HasSuffix(entry.Name, ".versions") {
subDir := path.Join(dir, entry.Name) subDir := path.Join(dir, entry.Name)
subRelativePath := path.Join(relativePath, entry.Name) subRelativePath := path.Join(relativePath, entry.Name)
if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil { if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil {
return err return err
} }
// Early exit if a locked object was found in the subdirectory
if *hasLocks {
return errStopPagination
}
} }
} }
return nil return nil
}) })
return err return err
} }
@ -440,14 +449,14 @@ func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime ti
if entry.Extended == nil { if entry.Extended == nil {
return false return false
} }
// Check for active legal hold // Check for active legal hold
if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists { if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
if string(legalHoldBytes) == s3_constants.LegalHoldOn { if string(legalHoldBytes) == s3_constants.LegalHoldOn {
return true return true
} }
} }
// Check for active retention // Check for active retention
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
mode := string(modeBytes) mode := string(modeBytes)
@ -468,7 +477,7 @@ func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime ti
} }
} }
} }
return false return false
} }

Loading…
Cancel
Save