Browse Source

Update s3api_bucket_handlers.go

pull/7434/head
chrislu 1 month ago
parent
commit
3491610bed
  1. 78
      weed/s3api/s3api_bucket_handlers.go

78
weed/s3api/s3api_bucket_handlers.go

@ -9,6 +9,7 @@ import (
"fmt"
"math"
"net/http"
"path"
"sort"
"strconv"
"strings"
@ -346,6 +347,41 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
return nil
}
// Helper function to check entries in a directory with pagination
checkEntriesWithPagination := func(directory string, isVersionDir bool) error {
startFrom := ""
for {
if *hasLocks {
return nil
}
entries, isLast, err := s3a.list(directory, "", startFrom, false, 10000)
if err != nil {
// Fail-safe: propagate error to prevent incorrect bucket deletion
return fmt.Errorf("failed to list directory %s: %w", directory, err)
}
for _, entry := range entries {
if s3a.entryHasActiveLock(entry, currentTime) {
*hasLocks = true
if isVersionDir {
glog.V(2).Infof("Found object with active lock in versions: %s/%s", directory, entry.Name)
} else {
glog.V(2).Infof("Found object with active lock: %s/%s", directory, entry.Name)
}
return nil
}
}
if isLast || len(entries) == 0 {
break
}
// Use the last entry name as the start point for next page
startFrom = entries[len(entries)-1].Name
}
return nil
}
// Paginate through directory listing
startFrom := ""
for {
@ -367,31 +403,9 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
// If it's a .versions directory, check all version files with pagination
if strings.HasSuffix(entry.Name, ".versions") && entry.IsDirectory {
versionDir := dir + "/" + entry.Name
versionStartFrom := ""
for {
if *hasLocks {
return nil
}
versionEntries, isVersionLast, vErr := s3a.list(versionDir, "", versionStartFrom, false, 10000)
if vErr != nil {
glog.Warningf("Failed to list version directory %s: %v", versionDir, vErr)
break
}
for _, versionEntry := range versionEntries {
if s3a.entryHasActiveLock(versionEntry, currentTime) {
*hasLocks = true
glog.V(2).Infof("Found object with active lock in versions: %s/%s", versionDir, versionEntry.Name)
return nil
}
}
if isVersionLast || len(versionEntries) == 0 {
break
}
// Use the last entry name as the start point for next page
versionStartFrom = versionEntries[len(versionEntries)-1].Name
versionDir := path.Join(dir, entry.Name)
if err := checkEntriesWithPagination(versionDir, true); err != nil {
return err
}
continue
}
@ -400,11 +414,7 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
if !entry.IsDirectory {
if s3a.entryHasActiveLock(entry, currentTime) {
*hasLocks = true
objectPath := relativePath
if objectPath != "" {
objectPath += "/"
}
objectPath += entry.Name
objectPath := path.Join(relativePath, entry.Name)
glog.V(2).Infof("Found object with active lock: %s", objectPath)
return nil
}
@ -412,12 +422,8 @@ func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, h
// Recursively check subdirectories
if entry.IsDirectory && !strings.HasSuffix(entry.Name, ".versions") {
subDir := dir + "/" + entry.Name
subRelativePath := relativePath
if subRelativePath != "" {
subRelativePath += "/"
}
subRelativePath += entry.Name
subDir := path.Join(dir, entry.Name)
subRelativePath := path.Join(relativePath, entry.Name)
if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil {
return err

Loading…
Cancel
Save