Browse Source

address comments

pull/7434/head
chrislu 1 month ago
parent
commit
77865720ce
  1. 53
      test/s3/retention/s3_bucket_delete_with_lock_test.go
  2. 85
      weed/s3api/s3api_bucket_handlers.go

53
test/s3/retention/s3_bucket_delete_with_lock_test.go

@ -2,6 +2,7 @@ package retention
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -29,7 +30,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future
// Upload object with compliance retention
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content),
@ -39,7 +40,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
require.NoError(t, err, "PutObject with compliance retention should succeed")
// Try to delete bucket - should fail because object has active retention
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when objects have active retention")
@ -51,7 +52,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
time.Sleep(time.Until(retainUntilDate) + time.Second)
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
_, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
@ -68,7 +69,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future
// Upload object with governance retention
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content),
@ -78,7 +79,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
require.NoError(t, err, "PutObject with governance retention should succeed")
// Try to delete bucket - should fail because object has active retention
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when objects have active retention")
@ -90,7 +91,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
time.Sleep(time.Until(retainUntilDate) + time.Second)
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
_, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
@ -106,7 +107,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
content := "test content for legal hold"
// Upload object first
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content),
@ -114,7 +115,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
require.NoError(t, err, "PutObject should succeed")
// Set legal hold on the object
_, err = client.PutObjectLegalHold(context.TODO(), &s3.PutObjectLegalHoldInput{
_, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOn},
@ -122,7 +123,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
require.NoError(t, err, "PutObjectLegalHold should succeed")
// Try to delete bucket - should fail because object has active legal hold
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when objects have active legal hold")
@ -130,7 +131,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
t.Logf("Expected error: %v", err)
// Remove legal hold
_, err = client.PutObjectLegalHold(context.TODO(), &s3.PutObjectLegalHoldInput{
_, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOff},
@ -138,7 +139,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
require.NoError(t, err, "Removing legal hold should succeed")
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
_, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
@ -153,14 +154,18 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
// Make sure all objects are deleted
deleteAllObjectVersions(t, client, bucketName)
// Wait for eventual consistency
time.Sleep(500 * time.Millisecond)
// Now delete bucket should succeed
_, err := client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
// Use retry mechanism for eventual consistency instead of fixed sleep
require.Eventually(t, func() bool {
_, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err, "DeleteBucket should succeed when no objects have active locks")
if err != nil {
t.Logf("Retrying DeleteBucket due to: %v", err)
return false
}
return true
}, 5*time.Second, 500*time.Millisecond, "DeleteBucket should succeed when no objects have active locks")
t.Logf("Successfully deleted bucket without active locks")
})
}
@ -180,7 +185,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) {
retainUntilDate := time.Now().Add(10 * time.Second)
// Upload first version with retention
putResp1, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
putResp1, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content1),
@ -191,7 +196,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) {
version1 := *putResp1.VersionId
// Upload second version with retention
putResp2, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
putResp2, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content2),
@ -204,7 +209,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) {
t.Logf("Created two versions: %s, %s", version1, version2)
// Try to delete bucket - should fail because versions have active retention
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when object versions have active retention")
@ -222,7 +227,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) {
time.Sleep(500 * time.Millisecond)
// Now delete bucket should succeed
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err, "DeleteBucket should succeed after all locks expire")
@ -239,9 +244,9 @@ func TestBucketDeletionWithoutObjectLock(t *testing.T) {
// Upload some objects
for i := 0; i < 3; i++ {
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(strings.ReplaceAll("test-object-{}", "{}", string(rune('0'+i)))),
Key: aws.String(fmt.Sprintf("test-object-%d", i)),
Body: strings.NewReader("test content"),
})
require.NoError(t, err)
@ -251,7 +256,7 @@ func TestBucketDeletionWithoutObjectLock(t *testing.T) {
deleteAllObjectVersions(t, client, bucketName)
// Delete bucket should succeed
_, err := client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
_, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err, "DeleteBucket should succeed for regular bucket")

85
weed/s3api/s3api_bucket_handlers.go

@ -339,38 +339,25 @@ func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
return hasLocks, nil
}
// recursivelyCheckLocks recursively checks all objects and versions for active locks
// Uses pagination to handle directories with more than 10,000 entries
func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
}
// errStopPagination is a sentinel error to signal early termination of pagination
var errStopPagination = errors.New("stop pagination")
// Helper function to check entries in a directory with pagination
checkEntriesWithPagination := func(directory string, isVersionDir bool) error {
// paginateEntries iterates through directory entries with pagination
// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully.
func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error {
startFrom := ""
for {
if *hasLocks {
return nil
}
entries, isLast, err := s3a.list(directory, "", startFrom, false, 10000)
entries, isLast, err := s3a.list(dir, "", startFrom, false, 10000)
if err != nil {
// Fail-safe: propagate error to prevent incorrect bucket deletion
return fmt.Errorf("failed to list directory %s: %w", directory, err)
return fmt.Errorf("failed to list directory %s: %w", dir, err)
}
for _, entry := range entries {
if s3a.entryHasActiveLock(entry, currentTime) {
*hasLocks = true
if isVersionDir {
glog.V(2).Infof("Found object with active lock in versions: %s/%s", directory, entry.Name)
} else {
glog.V(2).Infof("Found object with active lock: %s/%s", directory, entry.Name)
}
if err := fn(entries); err != nil {
if errors.Is(err, errStopPagination) {
return nil
}
return err
}
if isLast || len(entries) == 0 {
@ -380,20 +367,22 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
startFrom = entries[len(entries)-1].Name
}
return nil
}
}
// Paginate through directory listing
startFrom := ""
for {
entries, isLast, err := s3a.list(dir, "", startFrom, false, 10000)
if err != nil {
return err
// recursivelyCheckLocks recursively checks all objects and versions for active locks
// Uses pagination to handle directories with more than 10,000 entries
func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
}
// Process entries in the current directory with pagination
err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error {
for _, entry := range entries {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
return errStopPagination
}
// Skip multipart uploads folder
@ -404,7 +393,17 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
// If it's a .versions directory, check all version files with pagination
if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory {
versionDir := path.Join(dir, entry.Name)
if err := checkEntriesWithPagination(versionDir, true); err != nil {
err := s3a.paginateEntries(versionDir, func(versionEntries []*filer_pb.Entry) error {
for _, versionEntry := range versionEntries {
if s3a.entryHasActiveLock(versionEntry, currentTime) {
*hasLocks = true
glog.V(2).Infof("Found object with active lock in versions: %s/%s", versionDir, versionEntry.Name)
return errStopPagination
}
}
return nil
})
if err != nil {
return err
}
continue
@ -416,7 +415,7 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
*hasLocks = true
objectPath := path.Join(relativePath, entry.Name)
glog.V(2).Infof("Found object with active lock: %s", objectPath)
return nil
return errStopPagination
}
}
@ -430,15 +429,10 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
}
}
}
if isLast || len(entries) == 0 {
break
}
// Use the last entry name as the start point for next page
startFrom = entries[len(entries)-1].Name
}
return nil
})
return err
}
// entryHasActiveLock checks if an entry has an active retention or legal hold
@ -460,7 +454,13 @@ func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime ti
if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
// Check if retention is still active
if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64)
if err != nil {
// Fail-safe: if we can't parse the retention date, assume the object is locked
// to prevent accidental data loss
glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err)
return true
}
retainUntil := time.Unix(timestamp, 0)
if retainUntil.After(currentTime) {
return true
@ -468,7 +468,6 @@ func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime ti
}
}
}
}
return false
}

Loading…
Cancel
Save