Browse Source

add context, sort directories by depth (deepest first) to avoid redundant checks

pull/7426/head
chrislu 2 months ago
parent
commit
ec8ca216a5
  1. 60
      weed/s3api/s3api_object_handlers_delete.go

60
weed/s3api/s3api_object_handlers_delete.go

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"slices"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -126,6 +127,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Use operation context that won't be cancelled if request terminates
// This ensures deletion completes atomically to avoid inconsistent state
opCtx := context.WithoutCancel(r.Context())
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
return err
@ -135,7 +139,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Recursively delete empty parent directories, stop at bucket path
deleteEmptyParentDirectories(client, util.FullPath(dir), util.FullPath(bucketPath))
deleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil)
}
return nil
@ -241,6 +245,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
versioningConfigured := (versioningState != "")
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Use operation context that won't be cancelled if request terminates
// This ensures batch deletion completes atomically to avoid inconsistent state
opCtx := context.WithoutCancel(r.Context())
// delete file entries
for _, object := range deleteObjects.Objects {
@ -373,12 +380,28 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
// Cleanup empty directories
// Cleanup empty directories - optimize by processing deepest first
if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Collect and sort directories by depth (deepest first) to avoid redundant checks
var allDirs []string
for dirPath := range directoriesWithDeletion {
// Recursively delete empty parent directories, stop at bucket path
deleteEmptyParentDirectories(client, util.FullPath(dirPath), util.FullPath(bucketPath))
allDirs = append(allDirs, dirPath)
}
// Sort by depth (deeper directories first)
slices.SortFunc(allDirs, func(a, b string) int {
return strings.Count(b, "/") - strings.Count(a, "/")
})
// Track already-checked directories to avoid redundant work
checked := make(map[string]bool)
for _, dirPath := range allDirs {
if !checked[dirPath] {
// Recursively delete empty parent directories, stop at bucket path
// Mark this directory and all its parents as checked during recursion
deleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked)
}
}
}
@ -401,45 +424,54 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
// It stops at root "/" or at stopAtPath.
// This implements the same logic as filer.DeleteEmptyParentDirectories but uses gRPC client.
// For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
func deleteEmptyParentDirectories(client filer_pb.SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath) {
// The checked map tracks already-processed directories to avoid redundant work in batch operations.
func deleteEmptyParentDirectories(ctx context.Context, client filer_pb.SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath, checked map[string]bool) {
if dirPath == "/" || dirPath == stopAtPath {
return
}
// Skip if already checked (for batch delete optimization)
dirPathStr := string(dirPath)
if checked != nil {
if checked[dirPathStr] {
return
}
checked[dirPathStr] = true
}
// Safety check: if stopAtPath is provided, dirPath must be under it
if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") {
glog.V(1).Infof("deleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
if stopAtPath != "" && !strings.HasPrefix(dirPathStr+"/", string(stopAtPath)+"/") {
glog.V(1).InfofCtx(ctx, "deleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
return
}
// Check if directory is empty by listing with limit 1
ctx := context.Background()
isEmpty := true
err := filer_pb.SeaweedList(ctx, client, string(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error {
err := filer_pb.SeaweedList(ctx, client, dirPathStr, "", func(entry *filer_pb.Entry, isLast bool) error {
isEmpty = false
return io.EOF // Use sentinel error to explicitly stop iteration
}, "", false, 1)
if err != nil && err != io.EOF {
glog.V(3).Infof("deleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
glog.V(3).InfofCtx(ctx, "deleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
return
}
if !isEmpty {
// Directory is not empty, stop checking upward
glog.V(3).Infof("deleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
glog.V(3).InfofCtx(ctx, "deleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
return
}
// Directory is empty, try to delete it
glog.V(2).Infof("deleteEmptyParentDirectories: deleting empty directory %s", dirPath)
glog.V(2).InfofCtx(ctx, "deleteEmptyParentDirectories: deleting empty directory %s", dirPath)
parentDir, dirName := dirPath.DirAndName()
if err := doDeleteEntry(client, parentDir, dirName, false, false); err == nil {
// Successfully deleted, continue checking upwards
deleteEmptyParentDirectories(client, util.FullPath(parentDir), stopAtPath)
deleteEmptyParentDirectories(ctx, client, util.FullPath(parentDir), stopAtPath, checked)
} else {
// Failed to delete, stop cleanup
glog.V(3).Infof("deleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err)
glog.V(3).InfofCtx(ctx, "deleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err)
}
}
Loading…
Cancel
Save