diff --git a/test/s3/retention/s3_bucket_delete_with_lock_test.go b/test/s3/retention/s3_bucket_delete_with_lock_test.go index 1515e2528..7c417c24c 100644 --- a/test/s3/retention/s3_bucket_delete_with_lock_test.go +++ b/test/s3/retention/s3_bucket_delete_with_lock_test.go @@ -2,6 +2,7 @@ package retention import ( "context" + "fmt" "strings" "testing" "time" @@ -29,7 +30,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future // Upload object with compliance retention - _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + _, err := client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: strings.NewReader(content), @@ -39,7 +40,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { require.NoError(t, err, "PutObject with compliance retention should succeed") // Try to delete bucket - should fail because object has active retention - _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ Bucket: aws.String(bucketName), }) require.Error(t, err, "DeleteBucket should fail when objects have active retention") @@ -51,7 +52,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { time.Sleep(time.Until(retainUntilDate) + time.Second) // Delete the object - _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + _, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), }) @@ -68,7 +69,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future // Upload object with governance retention - _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + _, err := client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: strings.NewReader(content), @@ -78,7 +79,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { require.NoError(t, err, "PutObject with governance retention should succeed") // Try to delete bucket - should fail because object has active retention - _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ Bucket: aws.String(bucketName), }) require.Error(t, err, "DeleteBucket should fail when objects have active retention") @@ -90,7 +91,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { time.Sleep(time.Until(retainUntilDate) + time.Second) // Delete the object - _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + _, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), }) @@ -106,7 +107,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { content := "test content for legal hold" // Upload object first - _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + _, err := client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: strings.NewReader(content), @@ -114,7 +115,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { require.NoError(t, err, "PutObject should succeed") // Set legal hold on the object - _, err = client.PutObjectLegalHold(context.TODO(), &s3.PutObjectLegalHoldInput{ + _, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{ Bucket: aws.String(bucketName), Key: aws.String(key), LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOn}, @@ -122,7 +123,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { require.NoError(t, err, "PutObjectLegalHold should succeed") // Try to delete bucket - should fail because object has active legal hold - _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ Bucket: aws.String(bucketName), }) require.Error(t, err, "DeleteBucket should fail when objects have active legal hold") @@ -130,7 +131,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { t.Logf("Expected error: %v", err) // Remove legal hold - _, err = client.PutObjectLegalHold(context.TODO(), &s3.PutObjectLegalHoldInput{ + _, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{ Bucket: aws.String(bucketName), Key: aws.String(key), LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOff}, @@ -138,7 +139,7 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { require.NoError(t, err, "Removing legal hold should succeed") // Delete the object - _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + _, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), }) @@ -153,14 +154,18 @@ func TestBucketDeletionWithObjectLock(t *testing.T) { // Make sure all objects are deleted deleteAllObjectVersions(t, client, bucketName) - // Wait for eventual consistency - time.Sleep(500 * time.Millisecond) - - // Now delete bucket should succeed - _, err := client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ - Bucket: aws.String(bucketName), - }) - require.NoError(t, err, "DeleteBucket should succeed when no objects have active locks") + // Use retry mechanism for eventual consistency instead of fixed sleep + require.Eventually(t, func() bool { + _, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Logf("Retrying DeleteBucket due to: %v", err) + return false + } + return true + }, 5*time.Second, 500*time.Millisecond, "DeleteBucket should succeed when no objects have active locks") + t.Logf("Successfully deleted bucket without active locks") }) } @@ -180,7 +185,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) { retainUntilDate := time.Now().Add(10 * time.Second) // Upload first version with retention - putResp1, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + putResp1, err := client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: strings.NewReader(content1), @@ -191,7 +196,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) { version1 := *putResp1.VersionId // Upload second version with retention - putResp2, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + putResp2, err := client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: strings.NewReader(content2), @@ -204,7 +209,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) { t.Logf("Created two versions: %s, %s", version1, version2) // Try to delete bucket - should fail because versions have active retention - _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ Bucket: aws.String(bucketName), }) require.Error(t, err, "DeleteBucket should fail when object versions have active retention") @@ -222,7 +227,7 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) { time.Sleep(500 * time.Millisecond) // Now delete bucket should succeed - _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ Bucket: aws.String(bucketName), }) require.NoError(t, err, "DeleteBucket should succeed after all locks expire") @@ -239,9 +244,9 @@ func TestBucketDeletionWithoutObjectLock(t *testing.T) { // Upload some objects for i := 0; i < 3; i++ { - _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + _, err := client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(bucketName), - Key: aws.String(strings.ReplaceAll("test-object-{}", "{}", string(rune('0'+i)))), + Key: aws.String(fmt.Sprintf("test-object-%d", i)), Body: strings.NewReader("test content"), }) require.NoError(t, err) @@ -251,7 +256,7 @@ func TestBucketDeletionWithoutObjectLock(t *testing.T) { deleteAllObjectVersions(t, client, bucketName) // Delete bucket should succeed - _, err := client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + _, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ Bucket: aws.String(bucketName), }) require.NoError(t, err, "DeleteBucket should succeed for regular bucket") diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index ed1e3dc02..533c24341 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -339,6 +339,36 @@ func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) { return hasLocks, nil } +// errStopPagination is a sentinel error to signal early termination of pagination +var errStopPagination = errors.New("stop pagination") + +// paginateEntries iterates through directory entries with pagination +// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully. +func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error { + startFrom := "" + for { + entries, isLast, err := s3a.list(dir, "", startFrom, false, 10000) + if err != nil { + // Fail-safe: propagate error to prevent incorrect bucket deletion + return fmt.Errorf("failed to list directory %s: %w", dir, err) + } + + if err := fn(entries); err != nil { + if errors.Is(err, errStopPagination) { + return nil + } + return err + } + + if isLast || len(entries) == 0 { + break + } + // Use the last entry name as the start point for next page + startFrom = entries[len(entries)-1].Name + } + return nil +} + // recursivelyCheckLocks recursively checks all objects and versions for active locks // Uses pagination to handle directories with more than 10,000 entries func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error { @@ -347,53 +377,12 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h return nil } - // Helper function to check entries in a directory with pagination - checkEntriesWithPagination := func(directory string, isVersionDir bool) error { - startFrom := "" - for { - if *hasLocks { - return nil - } - - entries, isLast, err := s3a.list(directory, "", startFrom, false, 10000) - if err != nil { - // Fail-safe: propagate error to prevent incorrect bucket deletion - return fmt.Errorf("failed to list directory %s: %w", directory, err) - } - - for _, entry := range entries { - if s3a.entryHasActiveLock(entry, currentTime) { - *hasLocks = true - if isVersionDir { - glog.V(2).Infof("Found object with active lock in versions: %s/%s", directory, entry.Name) - } else { - glog.V(2).Infof("Found object with active lock: %s/%s", directory, entry.Name) - } - return nil - } - } - - if isLast || len(entries) == 0 { - break - } - // Use the last entry name as the start point for next page - startFrom = entries[len(entries)-1].Name - } - return nil - } - - // Paginate through directory listing - startFrom := "" - for { - entries, isLast, err := s3a.list(dir, "", startFrom, false, 10000) - if err != nil { - return err - } - + // Process entries in the current directory with pagination + err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error { for _, entry := range entries { if *hasLocks { // Early exit if we've already found a locked object - return nil + return errStopPagination } // Skip multipart uploads folder @@ -404,7 +393,17 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h // If it's a .versions directory, check all version files with pagination if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory { versionDir := path.Join(dir, entry.Name) - if err := checkEntriesWithPagination(versionDir, true); err != nil { + err := s3a.paginateEntries(versionDir, func(versionEntries []*filer_pb.Entry) error { + for _, versionEntry := range versionEntries { + if s3a.entryHasActiveLock(versionEntry, currentTime) { + *hasLocks = true + glog.V(2).Infof("Found object with active lock in versions: %s/%s", versionDir, versionEntry.Name) + return errStopPagination + } + } + return nil + }) + if err != nil { return err } continue @@ -416,7 +415,7 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h *hasLocks = true objectPath := path.Join(relativePath, entry.Name) glog.V(2).Infof("Found object with active lock: %s", objectPath) - return nil + return errStopPagination } } @@ -430,15 +429,10 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h } } } - - if isLast || len(entries) == 0 { - break - } - // Use the last entry name as the start point for next page - startFrom = entries[len(entries)-1].Name - } + return nil + }) - return nil + return err } // entryHasActiveLock checks if an entry has an active retention or legal hold @@ -460,11 +454,16 @@ func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime ti if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance { // Check if retention is still active if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists { - if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil { - retainUntil := time.Unix(timestamp, 0) - if retainUntil.After(currentTime) { - return true - } + timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64) + if err != nil { + // Fail-safe: if we can't parse the retention date, assume the object is locked + // to prevent accidental data loss + glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err) + return true + } + retainUntil := time.Unix(timestamp, 0) + if retainUntil.After(currentTime) { + return true } } }