Browse Source

prevent deleting buckets with object locking

pull/7434/head
chrislu 1 month ago
parent
commit
d6cf7f4686
  1. 260
      test/s3/retention/s3_bucket_delete_with_lock_test.go
  2. 145
      weed/s3api/s3api_bucket_handlers.go

260
test/s3/retention/s3_bucket_delete_with_lock_test.go

@ -0,0 +1,260 @@
package retention
import (
"context"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestBucketDeletionWithObjectLock tests that buckets with object lock enabled
// cannot be deleted if they contain objects with active retention or legal hold
func TestBucketDeletionWithObjectLock(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket with object lock enabled
createBucketWithObjectLock(t, client, bucketName)
// Test 1: Bucket deletion with active compliance retention should fail
t.Run("CannotDeleteBucketWithComplianceRetention", func(t *testing.T) {
key := "test-compliance-retention"
content := "test content for compliance retention"
retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future
// Upload object with compliance retention
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content),
ObjectLockMode: types.ObjectLockModeCompliance,
ObjectLockRetainUntilDate: aws.Time(retainUntilDate),
})
require.NoError(t, err, "PutObject with compliance retention should succeed")
// Try to delete bucket - should fail because object has active retention
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when objects have active retention")
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
t.Logf("Expected error: %v", err)
// Wait for retention to expire
t.Logf("Waiting for compliance retention to expire...")
time.Sleep(11 * time.Second)
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
require.NoError(t, err, "DeleteObject should succeed after retention expires")
// Clean up versions
deleteAllObjectVersions(t, client, bucketName)
})
// Test 2: Bucket deletion with active governance retention should fail
t.Run("CannotDeleteBucketWithGovernanceRetention", func(t *testing.T) {
key := "test-governance-retention"
content := "test content for governance retention"
retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future
// Upload object with governance retention
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content),
ObjectLockMode: types.ObjectLockModeGovernance,
ObjectLockRetainUntilDate: aws.Time(retainUntilDate),
})
require.NoError(t, err, "PutObject with governance retention should succeed")
// Try to delete bucket - should fail because object has active retention
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when objects have active retention")
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
t.Logf("Expected error: %v", err)
// Wait for retention to expire
t.Logf("Waiting for governance retention to expire...")
time.Sleep(11 * time.Second)
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
require.NoError(t, err, "DeleteObject should succeed after retention expires")
// Clean up versions
deleteAllObjectVersions(t, client, bucketName)
})
// Test 3: Bucket deletion with legal hold should fail
t.Run("CannotDeleteBucketWithLegalHold", func(t *testing.T) {
key := "test-legal-hold"
content := "test content for legal hold"
// Upload object first
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content),
})
require.NoError(t, err, "PutObject should succeed")
// Set legal hold on the object
_, err = client.PutObjectLegalHold(context.TODO(), &s3.PutObjectLegalHoldInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOn},
})
require.NoError(t, err, "PutObjectLegalHold should succeed")
// Try to delete bucket - should fail because object has active legal hold
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when objects have active legal hold")
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
t.Logf("Expected error: %v", err)
// Remove legal hold
_, err = client.PutObjectLegalHold(context.TODO(), &s3.PutObjectLegalHoldInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOff},
})
require.NoError(t, err, "Removing legal hold should succeed")
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
require.NoError(t, err, "DeleteObject should succeed after legal hold is removed")
// Clean up versions
deleteAllObjectVersions(t, client, bucketName)
})
// Test 4: Bucket deletion should succeed when no objects have active locks
t.Run("CanDeleteBucketWithoutActiveLocks", func(t *testing.T) {
// Make sure all objects are deleted
deleteAllObjectVersions(t, client, bucketName)
// Wait for eventual consistency
time.Sleep(500 * time.Millisecond)
// Now delete bucket should succeed
_, err := client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err, "DeleteBucket should succeed when no objects have active locks")
t.Logf("Successfully deleted bucket without active locks")
})
}
// TestBucketDeletionWithVersionedLocks tests deletion with versioned objects under lock
func TestBucketDeletionWithVersionedLocks(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket with object lock enabled
createBucketWithObjectLock(t, client, bucketName)
defer deleteBucket(t, client, bucketName) // Best effort cleanup
key := "test-versioned-locks"
content1 := "version 1 content"
content2 := "version 2 content"
retainUntilDate := time.Now().Add(10 * time.Second)
// Upload first version with retention
putResp1, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content1),
ObjectLockMode: types.ObjectLockModeGovernance,
ObjectLockRetainUntilDate: aws.Time(retainUntilDate),
})
require.NoError(t, err)
version1 := *putResp1.VersionId
// Upload second version with retention
putResp2, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content2),
ObjectLockMode: types.ObjectLockModeGovernance,
ObjectLockRetainUntilDate: aws.Time(retainUntilDate),
})
require.NoError(t, err)
version2 := *putResp2.VersionId
t.Logf("Created two versions: %s, %s", version1, version2)
// Try to delete bucket - should fail because versions have active retention
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.Error(t, err, "DeleteBucket should fail when object versions have active retention")
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
t.Logf("Expected error: %v", err)
// Wait for retention to expire
t.Logf("Waiting for retention to expire on all versions...")
time.Sleep(11 * time.Second)
// Clean up all versions
deleteAllObjectVersions(t, client, bucketName)
// Wait for eventual consistency
time.Sleep(500 * time.Millisecond)
// Now delete bucket should succeed
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err, "DeleteBucket should succeed after all locks expire")
t.Logf("Successfully deleted bucket after locks expired")
}
// TestBucketDeletionWithoutObjectLock tests that buckets without object lock can be deleted normally
func TestBucketDeletionWithoutObjectLock(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create regular bucket without object lock
createBucket(t, client, bucketName)
// Upload some objects
for i := 0; i < 3; i++ {
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(strings.ReplaceAll("test-object-{}", "{}", string(rune('0'+i)))),
Body: strings.NewReader("test content"),
})
require.NoError(t, err)
}
// Delete all objects
deleteAllObjectVersions(t, client, bucketName)
// Delete bucket should succeed
_, err := client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err, "DeleteBucket should succeed for regular bucket")
t.Logf("Successfully deleted regular bucket without object lock")
}

145
weed/s3api/s3api_bucket_handlers.go

@ -10,6 +10,7 @@ import (
"math"
"net/http"
"sort"
"strconv"
"strings"
"time"
@ -251,6 +252,28 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
return
}
// Check if bucket has object lock enabled
bucketConfig, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// If object lock is enabled, check for objects with active locks
if bucketConfig.ObjectLockConfig != nil {
hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket)
if checkErr != nil {
glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if hasLockedObjects {
glog.V(3).Infof("DeleteBucketHandler: bucket %s has objects with active object locks, cannot delete", bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty)
return
}
}
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if !s3a.option.AllowDeleteBucketNotEmpty {
entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 2)
@ -299,6 +322,128 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
bucketPath := s3a.option.BucketsPath + "/" + bucket
// Check all objects including versions for active locks
hasLocks := false
err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks)
if err != nil {
return false, fmt.Errorf("error checking for locked objects: %w", err)
}
return hasLocks, nil
}
// recursivelyCheckLocks recursively checks all objects and versions for active locks
func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool) error {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
}
entries, _, err := s3a.list(dir, "", "", false, 10000)
if err != nil {
return err
}
currentTime := time.Now()
for _, entry := range entries {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
}
// Skip multipart uploads folder
if entry.Name == s3_constants.MultipartUploadsFolder {
continue
}
// If it's a .versions directory, check all version files
if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory {
versionDir := dir + "/" + entry.Name
versionEntries, _, vErr := s3a.list(versionDir, "", "", false, 10000)
if vErr != nil {
glog.Warningf("Failed to list version directory %s: %v", versionDir, vErr)
continue
}
for _, versionEntry := range versionEntries {
if s3a.entryHasActiveLock(versionEntry, currentTime) {
*hasLocks = true
glog.V(2).Infof("Found object with active lock in versions: %s/%s", versionDir, versionEntry.Name)
return nil
}
}
continue
}
// Check regular files for locks
if !entry.IsDirectory {
if s3a.entryHasActiveLock(entry, currentTime) {
*hasLocks = true
objectPath := relativePath
if objectPath != "" {
objectPath += "/"
}
objectPath += entry.Name
glog.V(2).Infof("Found object with active lock: %s", objectPath)
return nil
}
}
// Recursively check subdirectories
if entry.IsDirectory && !strings.HasSuffix(entry.Name, ".versions") {
subDir := dir + "/" + entry.Name
subRelativePath := relativePath
if subRelativePath != "" {
subRelativePath += "/"
}
subRelativePath += entry.Name
if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks); err != nil {
return err
}
}
}
return nil
}
// entryHasActiveLock checks if an entry has an active retention or legal hold
func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
if entry.Extended == nil {
return false
}
// Check for active legal hold
if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
if string(legalHoldBytes) == s3_constants.LegalHoldOn {
return true
}
}
// Check for active retention
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
mode := string(modeBytes)
if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
// Check if retention is still active
if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
retainUntil := time.Unix(timestamp, 0)
if retainUntil.After(currentTime) {
return true
}
}
}
}
}
return false
}
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)

Loading…
Cancel
Save