From 74c80aa441ed87da33e433f9b550bfe7b38a87b4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Feb 2026 14:38:11 -0800 Subject: [PATCH] Fix Spark temp marker cleanup in async folder cleaner --- .../empty_folder_cleaner.go | 12 +- .../empty_folder_cleaner_test.go | 117 ++++++++++++++++++ weed/s3api/s3api_object_handlers_delete.go | 15 +-- 3 files changed, 127 insertions(+), 17 deletions(-) diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go index a3351f682..da4b35902 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -18,7 +18,7 @@ const ( DefaultMaxCountCheck = 1000 DefaultCacheExpiry = 5 * time.Minute DefaultQueueMaxSize = 1000 - DefaultQueueMaxAge = 10 * time.Minute + DefaultQueueMaxAge = 5 * time.Second DefaultProcessorSleep = 10 * time.Second // How often to check queue ) @@ -239,11 +239,6 @@ func (efc *EmptyFolderCleaner) processCleanupQueue() { // Execute cleanup for this folder efc.executeCleanup(folder) - - // If queue is no longer full and oldest item is not old enough, stop processing - if !efc.cleanupQueue.ShouldProcess() { - break - } } } @@ -324,8 +319,9 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) { } if !isImplicit { - glog.Infof("EmptyFolderCleaner: folder %s is not marked as implicit (source=%s attr=%s), skipping", folder, implicitSource, implicitAttr) - return + // Some S3 clients create directory markers without implicit-dir metadata. + // For delete-driven cleanup, still verify emptiness and remove if safe. + glog.Infof("EmptyFolderCleaner: folder %s is not marked as implicit (source=%s attr=%s), proceeding with delete-driven empty check", folder, implicitSource, implicitAttr) } // Check if folder is actually empty (count up to maxCountCheck) diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go index 1c62a5dd1..dda13f085 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go @@ -1,13 +1,43 @@ package empty_folder_cleanup import ( + "context" "testing" "time" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" ) +type mockFilerOps struct { + countFn func(path util.FullPath) (int, error) + deleteFn func(path util.FullPath) error + attrsFn func(path util.FullPath) (map[string][]byte, error) +} + +func (m *mockFilerOps) CountDirectoryEntries(_ context.Context, dirPath util.FullPath, _ int) (int, error) { + if m.countFn == nil { + return 0, nil + } + return m.countFn(dirPath) +} + +func (m *mockFilerOps) DeleteEntryMetaAndData(_ context.Context, p util.FullPath, _, _, _, _ bool, _ []int32, _ int64) error { + if m.deleteFn == nil { + return nil + } + return m.deleteFn(p) +} + +func (m *mockFilerOps) GetEntryAttributes(_ context.Context, p util.FullPath) (map[string][]byte, error) { + if m.attrsFn == nil { + return nil, nil + } + return m.attrsFn(p) +} + func Test_isUnderPath(t *testing.T) { tests := []struct { name string @@ -566,3 +596,90 @@ func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) { cleaner.Stop() } + +func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + var deleted []string + mock := &mockFilerOps{ + countFn: func(_ util.FullPath) (int, error) { + return 0, nil + }, + deleteFn: func(path util.FullPath) error { + deleted = append(deleted, string(path)) + return nil + }, + attrsFn: func(_ util.FullPath) (map[string][]byte, error) { + return map[string][]byte{s3_constants.ExtS3ImplicitDir: []byte("true")}, nil + }, + } + + cleaner := &EmptyFolderCleaner{ + filer: mock, + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(2, time.Hour), + maxCountCheck: 1000, + cacheExpiry: time.Minute, + processorSleep: time.Second, + stopCh: make(chan struct{}), + } + + now := time.Now() + cleaner.cleanupQueue.Add("/buckets/test/folder1", now) + cleaner.cleanupQueue.Add("/buckets/test/folder2", now.Add(time.Millisecond)) + cleaner.cleanupQueue.Add("/buckets/test/folder3", now.Add(2*time.Millisecond)) + + cleaner.processCleanupQueue() + + if got := cleaner.cleanupQueue.Len(); got != 0 { + t.Fatalf("expected queue to be drained, got len=%d", got) + } + if len(deleted) != 3 { + t.Fatalf("expected 3 deleted folders, got %d", len(deleted)) + } +} + +func TestEmptyFolderCleaner_executeCleanup_missingImplicitAttributeStillDeletes(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + var deleted []string + mock := &mockFilerOps{ + countFn: func(_ util.FullPath) (int, error) { + return 0, nil + }, + deleteFn: func(path util.FullPath) error { + deleted = append(deleted, string(path)) + return nil + }, + attrsFn: func(_ util.FullPath) (map[string][]byte, error) { + return map[string][]byte{}, nil + }, + } + + cleaner := &EmptyFolderCleaner{ + filer: mock, + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, time.Minute), + maxCountCheck: 1000, + cacheExpiry: time.Minute, + processorSleep: time.Second, + stopCh: make(chan struct{}), + } + + folder := "/buckets/test/folder" + cleaner.executeCleanup(folder) + + if len(deleted) != 1 || deleted[0] != folder { + t.Fatalf("expected folder %s to be deleted, got %v", folder, deleted) + } +} diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 8bf10646b..883cf6947 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -2,7 +2,6 @@ package s3api import ( "encoding/xml" - "fmt" "io" "net/http" "strings" @@ -125,7 +124,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque return } - target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object)) + // Normalize trailing-slash object keys (e.g. "path/") to the + // underlying directory entry path so DeleteEntry gets a valid name. + target := util.NewFullPath(s3a.bucketDir(bucket), object) dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { @@ -340,13 +341,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } else { // Handle non-versioned delete (original logic) - lastSeparator := strings.LastIndex(object.Key, "/") - parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false - if lastSeparator > 0 && lastSeparator+1 < len(object.Key) { - entryName = object.Key[lastSeparator+1:] - parentDirectoryPath = object.Key[:lastSeparator] - } - parentDirectoryPath = fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), parentDirectoryPath) + target := util.NewFullPath(s3a.bucketDir(bucket), object.Key) + parentDirectoryPath, entryName := target.DirAndName() + isDeleteData, isRecursive := true, false err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil {