Browse Source

reuse code to delete empty folders

pull/7426/head
chrislu 4 weeks ago
parent
commit
0cad84ee36
  1. 77
      weed/filer/filer.go
  2. 103
      weed/s3api/s3api_object_handlers_delete.go

77
weed/filer/filer.go

@ -419,19 +419,14 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
// Only check if we didn't find any valid entries and we're not at root // Only check if we didn't find any valid entries and we're not at root
if !hasValidEntries && p != "/" && startFileName == "" { if !hasValidEntries && p != "/" && startFileName == "" {
// Do a quick check to see if directory is truly empty now // Do a quick check to see if directory is truly empty now
isEmpty := true
_, checkErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, "", true, 1, prefix, func(entry *Entry) bool {
isEmpty = false
return false // Stop after first entry
})
if checkErr == nil && isEmpty {
if isEmpty, checkErr := f.IsDirectoryEmpty(ctx, p); checkErr == nil && isEmpty {
glog.V(2).InfofCtx(ctx, "doListDirectoryEntries: deleting empty directory %s after expiring all entries", p) glog.V(2).InfofCtx(ctx, "doListDirectoryEntries: deleting empty directory %s after expiring all entries", p)
parentDir, _ := p.DirAndName() parentDir, _ := p.DirAndName()
if dirEntry, findErr := f.FindEntry(ctx, p); findErr == nil { if dirEntry, findErr := f.FindEntry(ctx, p); findErr == nil {
// Delete the now-empty directory // Delete the now-empty directory
if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil { if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
// Recursively try to delete parent directories if they become empty // Recursively try to delete parent directories if they become empty
f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(parentDir))
f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), "")
} }
} }
} }
@ -441,45 +436,77 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
return return
} }
// maybeDeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty
func (f *Filer) maybeDeleteEmptyParentDirectories(ctx context.Context, parentDir util.FullPath) {
if parentDir == "/" {
// DeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty.
// It stops at root "/" or at stopAtPath (if provided).
// This is useful for cleaning up directories after deleting files or expired entries.
//
// IMPORTANT: For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
// This prevents accidental deletion of directories outside the intended scope (e.g., outside bucket paths).
//
// Example usage:
//
// // After deleting /bucket/dir/subdir/file.txt, clean up empty parent directories
// // but stop at the bucket path
// parentPath := util.FullPath("/bucket/dir/subdir")
// filer.DeleteEmptyParentDirectories(ctx, parentPath, util.FullPath("/bucket"))
//
// Example with gRPC client:
//
// if err := pb_filer_client.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
// return filer_pb.Traverse(ctx, filer, parentPath, "", func(entry *filer_pb.Entry) error {
// // Process entries...
// })
// }); err == nil {
// filer.DeleteEmptyParentDirectories(ctx, parentPath, stopPath)
// }
func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.FullPath, stopAtPath util.FullPath) {
if dirPath == "/" || dirPath == stopAtPath {
return return
} }
// Check if parent directory is empty
isEmpty := true
_, err := f.Store.ListDirectoryPrefixedEntries(ctx, parentDir, "", true, 1, "", func(entry *Entry) bool {
isEmpty = false
return false // Stop after first entry
})
// Safety check: if stopAtPath is provided, dirPath must be under it
if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") {
glog.V(1).InfofCtx(ctx, "DeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
return
}
// Check if directory is empty
isEmpty, err := f.IsDirectoryEmpty(ctx, dirPath)
if err != nil { if err != nil {
// Error checking directory, stop cleanup
glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: error checking %s: %v", parentDir, err)
glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
return return
} }
if !isEmpty { if !isEmpty {
// Directory is not empty, stop checking upward // Directory is not empty, stop checking upward
glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", parentDir)
glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
return return
} }
// Directory is empty, try to delete it // Directory is empty, try to delete it
glog.V(2).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: deleting empty directory %s", parentDir)
grandParentDir, _ := parentDir.DirAndName()
if parentEntry, findErr := f.FindEntry(ctx, parentDir); findErr == nil {
if delErr := f.doDeleteEntryMetaAndData(ctx, parentEntry, false, false, nil); delErr == nil {
glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: deleting empty directory %s", dirPath)
parentDir, _ := dirPath.DirAndName()
if dirEntry, findErr := f.FindEntry(ctx, dirPath); findErr == nil {
if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
// Successfully deleted, continue checking upwards // Successfully deleted, continue checking upwards
f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(grandParentDir))
f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), stopAtPath)
} else { } else {
// Failed to delete, stop cleanup // Failed to delete, stop cleanup
glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: failed to delete %s: %v", parentDir, delErr)
glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, delErr)
} }
} }
} }
// IsDirectoryEmpty checks if a directory contains any entries
func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) {
isEmpty := true
_, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool {
isEmpty = false
return false // Stop after first entry
})
return isEmpty, err
}
func (f *Filer) Shutdown() { func (f *Filer) Shutdown() {
close(f.deletionQuit) close(f.deletionQuit)
f.LocalMetaLogBuffer.ShutdownLogBuffer() f.LocalMetaLogBuffer.ShutdownLogBuffer()

103
weed/s3api/s3api_object_handlers_delete.go

@ -1,21 +1,18 @@
package s3api package s3api
import ( import (
"context"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"slices"
"strings" "strings"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
@ -134,17 +131,11 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
return err return err
} }
if s3a.option.AllowEmptyFolder {
return nil
}
directoriesWithDeletion := make(map[string]int)
if strings.LastIndex(object, "/") > 0 {
directoriesWithDeletion[dir]++
// purge empty folders, only checking folders with deletions
for len(directoriesWithDeletion) > 0 {
directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
}
// Cleanup empty directories
if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Recursively delete empty parent directories, stop at bucket path
deleteEmptyParentDirectories(client, util.FullPath(dir), util.FullPath(bucketPath))
} }
return nil return nil
@ -227,7 +218,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deleteErrors []DeleteError var deleteErrors []DeleteError
var auditLog *s3err.AccessLog var auditLog *s3err.AccessLog
directoriesWithDeletion := make(map[string]int)
directoriesWithDeletion := make(map[string]bool)
if s3err.Logger != nil { if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
@ -359,12 +350,14 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil { if err == nil {
directoriesWithDeletion[parentDirectoryPath]++
// Track directory for empty directory cleanup
if !s3a.option.AllowEmptyFolder {
directoriesWithDeletion[parentDirectoryPath] = true
}
deletedObjects = append(deletedObjects, object) deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object) deletedObjects = append(deletedObjects, object)
} else { } else {
delete(directoriesWithDeletion, parentDirectoryPath)
deleteErrors = append(deleteErrors, DeleteError{ deleteErrors = append(deleteErrors, DeleteError{
Code: "", Code: "",
Message: err.Error(), Message: err.Error(),
@ -380,13 +373,13 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
} }
} }
if s3a.option.AllowEmptyFolder {
return nil
// Cleanup empty directories
if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
for dirPath := range directoriesWithDeletion {
// Recursively delete empty parent directories, stop at bucket path
deleteEmptyParentDirectories(client, util.FullPath(dirPath), util.FullPath(bucketPath))
} }
// purge empty folders, only checking folders with deletions
for len(directoriesWithDeletion) > 0 {
directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
} }
return nil return nil
@ -404,25 +397,49 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
} }
func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
var allDirs []string
for dir := range directoriesWithDeletion {
allDirs = append(allDirs, dir)
}
slices.SortFunc(allDirs, func(a, b string) int {
return len(b) - len(a)
})
newDirectoriesWithDeletion = make(map[string]int)
for _, dir := range allDirs {
parentDir, dirName := util.FullPath(dir).DirAndName()
if parentDir == s3a.option.BucketsPath {
continue
// deleteEmptyParentDirectories recursively deletes empty parent directories.
// It stops at root "/" or at stopAtPath.
// This implements the same logic as filer.DeleteEmptyParentDirectories but uses gRPC client.
// For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
func deleteEmptyParentDirectories(client filer_pb.SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath) {
if dirPath == "/" || dirPath == stopAtPath {
return
} }
if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
} else {
newDirectoriesWithDeletion[parentDir]++
// Safety check: if stopAtPath is provided, dirPath must be under it
if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") {
glog.V(1).Infof("deleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
return
} }
// Check if directory is empty by listing with limit 1
ctx := context.Background()
isEmpty := true
err := filer_pb.SeaweedList(ctx, client, string(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error {
isEmpty = false
return nil
}, "", false, 1)
if err != nil {
glog.V(3).Infof("deleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
return
} }
if !isEmpty {
// Directory is not empty, stop checking upward
glog.V(3).Infof("deleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
return return
} }
// Directory is empty, try to delete it
glog.V(2).Infof("deleteEmptyParentDirectories: deleting empty directory %s", dirPath)
parentDir, dirName := dirPath.DirAndName()
if err := doDeleteEntry(client, parentDir, dirName, false, false); err == nil {
// Successfully deleted, continue checking upwards
deleteEmptyParentDirectories(client, util.FullPath(parentDir), stopAtPath)
} else {
// Failed to delete, stop cleanup
glog.V(3).Infof("deleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err)
}
}
Loading…
Cancel
Save