diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 74536098d..75c50871e 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -419,19 +419,14 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta // Only check if we didn't find any valid entries and we're not at root if !hasValidEntries && p != "/" && startFileName == "" { // Do a quick check to see if directory is truly empty now - isEmpty := true - _, checkErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, "", true, 1, prefix, func(entry *Entry) bool { - isEmpty = false - return false // Stop after first entry - }) - if checkErr == nil && isEmpty { + if isEmpty, checkErr := f.IsDirectoryEmpty(ctx, p); checkErr == nil && isEmpty { glog.V(2).InfofCtx(ctx, "doListDirectoryEntries: deleting empty directory %s after expiring all entries", p) parentDir, _ := p.DirAndName() if dirEntry, findErr := f.FindEntry(ctx, p); findErr == nil { // Delete the now-empty directory if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil { // Recursively try to delete parent directories if they become empty - f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(parentDir)) + f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), "") } } } @@ -441,45 +436,77 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta return } -// maybeDeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty -func (f *Filer) maybeDeleteEmptyParentDirectories(ctx context.Context, parentDir util.FullPath) { - if parentDir == "/" { +// DeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty. +// It stops at root "/" or at stopAtPath (if provided). +// This is useful for cleaning up directories after deleting files or expired entries. +// +// IMPORTANT: For safety, dirPath must be under stopAtPath (when stopAtPath is provided). +// This prevents accidental deletion of directories outside the intended scope (e.g., outside bucket paths). +// +// Example usage: +// +// // After deleting /bucket/dir/subdir/file.txt, clean up empty parent directories +// // but stop at the bucket path +// parentPath := util.FullPath("/bucket/dir/subdir") +// filer.DeleteEmptyParentDirectories(ctx, parentPath, util.FullPath("/bucket")) +// +// Example with gRPC client: +// +// if err := pb_filer_client.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { +// return filer_pb.Traverse(ctx, filer, parentPath, "", func(entry *filer_pb.Entry) error { +// // Process entries... +// }) +// }); err == nil { +// filer.DeleteEmptyParentDirectories(ctx, parentPath, stopPath) +// } +func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.FullPath, stopAtPath util.FullPath) { + if dirPath == "/" || dirPath == stopAtPath { return } - // Check if parent directory is empty - isEmpty := true - _, err := f.Store.ListDirectoryPrefixedEntries(ctx, parentDir, "", true, 1, "", func(entry *Entry) bool { - isEmpty = false - return false // Stop after first entry - }) + // Safety check: if stopAtPath is provided, dirPath must be under it + if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") { + glog.V(1).InfofCtx(ctx, "DeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath) + return + } + // Check if directory is empty + isEmpty, err := f.IsDirectoryEmpty(ctx, dirPath) if err != nil { - // Error checking directory, stop cleanup - glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: error checking %s: %v", parentDir, err) + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: error checking %s: %v", dirPath, err) return } if !isEmpty { // Directory is not empty, stop checking upward - glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", parentDir) + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath) return } // Directory is empty, try to delete it - glog.V(2).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: deleting empty directory %s", parentDir) - grandParentDir, _ := parentDir.DirAndName() - if parentEntry, findErr := f.FindEntry(ctx, parentDir); findErr == nil { - if delErr := f.doDeleteEntryMetaAndData(ctx, parentEntry, false, false, nil); delErr == nil { + glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: deleting empty directory %s", dirPath) + parentDir, _ := dirPath.DirAndName() + if dirEntry, findErr := f.FindEntry(ctx, dirPath); findErr == nil { + if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil { // Successfully deleted, continue checking upwards - f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(grandParentDir)) + f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), stopAtPath) } else { // Failed to delete, stop cleanup - glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: failed to delete %s: %v", parentDir, delErr) + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, delErr) } } } +// IsDirectoryEmpty checks if a directory contains any entries +func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) { + isEmpty := true + _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool { + isEmpty = false + return false // Stop after first entry + }) + return isEmpty, err +} + func (f *Filer) Shutdown() { close(f.deletionQuit) f.LocalMetaLogBuffer.ShutdownLogBuffer() diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 3a2544710..fcaac5da0 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -1,21 +1,18 @@ package s3api import ( + "context" "encoding/xml" "fmt" "io" "net/http" - "slices" "strings" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/filer" - - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -134,17 +131,11 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque return err } - if s3a.option.AllowEmptyFolder { - return nil - } - - directoriesWithDeletion := make(map[string]int) - if strings.LastIndex(object, "/") > 0 { - directoriesWithDeletion[dir]++ - // purge empty folders, only checking folders with deletions - for len(directoriesWithDeletion) > 0 { - directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion) - } + // Cleanup empty directories + if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 { + bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) + // Recursively delete empty parent directories, stop at bucket path + deleteEmptyParentDirectories(client, util.FullPath(dir), util.FullPath(bucketPath)) } return nil @@ -227,7 +218,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h var deleteErrors []DeleteError var auditLog *s3err.AccessLog - directoriesWithDeletion := make(map[string]int) + directoriesWithDeletion := make(map[string]bool) if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) @@ -359,12 +350,14 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { - directoriesWithDeletion[parentDirectoryPath]++ + // Track directory for empty directory cleanup + if !s3a.option.AllowEmptyFolder { + directoriesWithDeletion[parentDirectoryPath] = true + } deletedObjects = append(deletedObjects, object) } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { deletedObjects = append(deletedObjects, object) } else { - delete(directoriesWithDeletion, parentDirectoryPath) deleteErrors = append(deleteErrors, DeleteError{ Code: "", Message: err.Error(), @@ -380,13 +373,13 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } - if s3a.option.AllowEmptyFolder { - return nil - } - - // purge empty folders, only checking folders with deletions - for len(directoriesWithDeletion) > 0 { - directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion) + // Cleanup empty directories + if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 { + bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) + for dirPath := range directoriesWithDeletion { + // Recursively delete empty parent directories, stop at bucket path + deleteEmptyParentDirectories(client, util.FullPath(dirPath), util.FullPath(bucketPath)) + } } return nil @@ -404,25 +397,49 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } -func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) { - var allDirs []string - for dir := range directoriesWithDeletion { - allDirs = append(allDirs, dir) +// deleteEmptyParentDirectories recursively deletes empty parent directories. +// It stops at root "/" or at stopAtPath. +// This implements the same logic as filer.DeleteEmptyParentDirectories but uses gRPC client. +// For safety, dirPath must be under stopAtPath (when stopAtPath is provided). +func deleteEmptyParentDirectories(client filer_pb.SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath) { + if dirPath == "/" || dirPath == stopAtPath { + return } - slices.SortFunc(allDirs, func(a, b string) int { - return len(b) - len(a) - }) - newDirectoriesWithDeletion = make(map[string]int) - for _, dir := range allDirs { - parentDir, dirName := util.FullPath(dir).DirAndName() - if parentDir == s3a.option.BucketsPath { - continue - } - if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil { - glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err) - } else { - newDirectoriesWithDeletion[parentDir]++ - } + + // Safety check: if stopAtPath is provided, dirPath must be under it + if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") { + glog.V(1).Infof("deleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath) + return + } + + // Check if directory is empty by listing with limit 1 + ctx := context.Background() + isEmpty := true + err := filer_pb.SeaweedList(ctx, client, string(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error { + isEmpty = false + return nil + }, "", false, 1) + + if err != nil { + glog.V(3).Infof("deleteEmptyParentDirectories: error checking %s: %v", dirPath, err) + return + } + + if !isEmpty { + // Directory is not empty, stop checking upward + glog.V(3).Infof("deleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath) + return + } + + // Directory is empty, try to delete it + glog.V(2).Infof("deleteEmptyParentDirectories: deleting empty directory %s", dirPath) + parentDir, dirName := dirPath.DirAndName() + + if err := doDeleteEntry(client, parentDir, dirName, false, false); err == nil { + // Successfully deleted, continue checking upwards + deleteEmptyParentDirectories(client, util.FullPath(parentDir), stopAtPath) + } else { + // Failed to delete, stop cleanup + glog.V(3).Infof("deleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err) } - return }