Browse Source

addressing comments

pull/7434/head
chrislu 1 month ago
parent
commit
17cc09d3bc
  1. 12
      test/s3/retention/s3_bucket_delete_with_lock_test.go
  2. 127
      weed/s3api/s3api_bucket_handlers.go

12
test/s3/retention/s3_bucket_delete_with_lock_test.go

@ -46,9 +46,9 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
t.Logf("Expected error: %v", err)
// Wait for retention to expire
// Wait for retention to expire with dynamic sleep based on actual retention time
t.Logf("Waiting for compliance retention to expire...")
time.Sleep(11 * time.Second)
time.Sleep(time.Until(retainUntilDate) + time.Second)
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
@ -85,9 +85,9 @@ func TestBucketDeletionWithObjectLock(t *testing.T) {
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
t.Logf("Expected error: %v", err)
// Wait for retention to expire
// Wait for retention to expire with dynamic sleep based on actual retention time
t.Logf("Waiting for governance retention to expire...")
time.Sleep(11 * time.Second)
time.Sleep(time.Until(retainUntilDate) + time.Second)
// Delete the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
@ -211,9 +211,9 @@ func TestBucketDeletionWithVersionedLocks(t *testing.T) {
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
t.Logf("Expected error: %v", err)
// Wait for retention to expire
// Wait for retention to expire with dynamic sleep based on actual retention time
t.Logf("Waiting for retention to expire on all versions...")
time.Sleep(11 * time.Second)
time.Sleep(time.Until(retainUntilDate) + time.Second)
// Clean up all versions
deleteAllObjectVersions(t, client, bucketName)

127
weed/s3api/s3api_bucket_handlers.go

@ -327,8 +327,10 @@ func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
bucketPath := s3a.option.BucketsPath + "/" + bucket
// Check all objects including versions for active locks
// Establish current time once at the start for consistency across the entire scan
hasLocks := false
err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks)
currentTime := time.Now()
err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime)
if err != nil {
return false, fmt.Errorf("error checking for locked objects: %w", err)
}
@ -337,76 +339,97 @@ func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
}
// recursivelyCheckLocks recursively checks all objects and versions for active locks
func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool) error {
// Uses pagination to handle directories with more than 10,000 entries
func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
}
entries, _, err := s3a.list(dir, "", "", false, 10000)
if err != nil {
return err
}
currentTime := time.Now()
for _, entry := range entries {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
// Paginate through directory listing
startFrom := ""
for {
entries, isLast, err := s3a.list(dir, "", startFrom, false, 10000)
if err != nil {
return err
}
// Skip multipart uploads folder
if entry.Name == s3_constants.MultipartUploadsFolder {
continue
}
// If it's a .versions directory, check all version files
if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory {
versionDir := dir + "/" + entry.Name
versionEntries, _, vErr := s3a.list(versionDir, "", "", false, 10000)
if vErr != nil {
glog.Warningf("Failed to list version directory %s: %v", versionDir, vErr)
for _, entry := range entries {
if *hasLocks {
// Early exit if we've already found a locked object
return nil
}
// Skip multipart uploads folder
if entry.Name == s3_constants.MultipartUploadsFolder {
continue
}
for _, versionEntry := range versionEntries {
if s3a.entryHasActiveLock(versionEntry, currentTime) {
// If it's a .versions directory, check all version files with pagination
if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory {
versionDir := dir + "/" + entry.Name
versionStartFrom := ""
for {
if *hasLocks {
return nil
}
versionEntries, isVersionLast, vErr := s3a.list(versionDir, "", versionStartFrom, false, 10000)
if vErr != nil {
glog.Warningf("Failed to list version directory %s: %v", versionDir, vErr)
break
}
for _, versionEntry := range versionEntries {
if s3a.entryHasActiveLock(versionEntry, currentTime) {
*hasLocks = true
glog.V(2).Infof("Found object with active lock in versions: %s/%s", versionDir, versionEntry.Name)
return nil
}
}
if isVersionLast || len(versionEntries) == 0 {
break
}
// Use the last entry name as the start point for next page
versionStartFrom = versionEntries[len(versionEntries)-1].Name
}
continue
}
// Check regular files for locks
if !entry.IsDirectory {
if s3a.entryHasActiveLock(entry, currentTime) {
*hasLocks = true
glog.V(2).Infof("Found object with active lock in versions: %s/%s", versionDir, versionEntry.Name)
objectPath := relativePath
if objectPath != "" {
objectPath += "/"
}
objectPath += entry.Name
glog.V(2).Infof("Found object with active lock: %s", objectPath)
return nil
}
}
continue
}
// Check regular files for locks
if !entry.IsDirectory {
if s3a.entryHasActiveLock(entry, currentTime) {
*hasLocks = true
objectPath := relativePath
if objectPath != "" {
objectPath += "/"
// Recursively check subdirectories
if entry.IsDirectory && !strings.HasSuffix(entry.Name, ".versions") {
subDir := dir + "/" + entry.Name
subRelativePath := relativePath
if subRelativePath != "" {
subRelativePath += "/"
}
subRelativePath += entry.Name
if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil {
return err
}
objectPath += entry.Name
glog.V(2).Infof("Found object with active lock: %s", objectPath)
return nil
}
}
// Recursively check subdirectories
if entry.IsDirectory && !strings.HasSuffix(entry.Name, ".versions") {
subDir := dir + "/" + entry.Name
subRelativePath := relativePath
if subRelativePath != "" {
subRelativePath += "/"
}
subRelativePath += entry.Name
if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks); err != nil {
return err
}
if isLast || len(entries) == 0 {
break
}
// Use the last entry name as the start point for next page
startFrom = entries[len(entries)-1].Name
}
return nil

Loading…
Cancel
Save