Browse Source

Fix Spark temp marker cleanup in async folder cleaner

pull/8292/head
Chris Lu 20 hours ago
parent
commit
74c80aa441
  1. 12
      weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
  2. 117
      weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
  3. 15
      weed/s3api/s3api_object_handlers_delete.go

12
weed/filer/empty_folder_cleanup/empty_folder_cleaner.go

@ -18,7 +18,7 @@ const (
DefaultMaxCountCheck = 1000 DefaultMaxCountCheck = 1000
DefaultCacheExpiry = 5 * time.Minute DefaultCacheExpiry = 5 * time.Minute
DefaultQueueMaxSize = 1000 DefaultQueueMaxSize = 1000
DefaultQueueMaxAge = 10 * time.Minute
DefaultQueueMaxAge = 5 * time.Second
DefaultProcessorSleep = 10 * time.Second // How often to check queue DefaultProcessorSleep = 10 * time.Second // How often to check queue
) )
@ -239,11 +239,6 @@ func (efc *EmptyFolderCleaner) processCleanupQueue() {
// Execute cleanup for this folder // Execute cleanup for this folder
efc.executeCleanup(folder) efc.executeCleanup(folder)
// If queue is no longer full and oldest item is not old enough, stop processing
if !efc.cleanupQueue.ShouldProcess() {
break
}
} }
} }
@ -324,8 +319,9 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) {
} }
if !isImplicit { if !isImplicit {
glog.Infof("EmptyFolderCleaner: folder %s is not marked as implicit (source=%s attr=%s), skipping", folder, implicitSource, implicitAttr)
return
// Some S3 clients create directory markers without implicit-dir metadata.
// For delete-driven cleanup, still verify emptiness and remove if safe.
glog.Infof("EmptyFolderCleaner: folder %s is not marked as implicit (source=%s attr=%s), proceeding with delete-driven empty check", folder, implicitSource, implicitAttr)
} }
// Check if folder is actually empty (count up to maxCountCheck) // Check if folder is actually empty (count up to maxCountCheck)

117
weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go

@ -1,13 +1,43 @@
package empty_folder_cleanup package empty_folder_cleanup
import ( import (
"context"
"testing" "testing"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
) )
type mockFilerOps struct {
countFn func(path util.FullPath) (int, error)
deleteFn func(path util.FullPath) error
attrsFn func(path util.FullPath) (map[string][]byte, error)
}
func (m *mockFilerOps) CountDirectoryEntries(_ context.Context, dirPath util.FullPath, _ int) (int, error) {
if m.countFn == nil {
return 0, nil
}
return m.countFn(dirPath)
}
func (m *mockFilerOps) DeleteEntryMetaAndData(_ context.Context, p util.FullPath, _, _, _, _ bool, _ []int32, _ int64) error {
if m.deleteFn == nil {
return nil
}
return m.deleteFn(p)
}
func (m *mockFilerOps) GetEntryAttributes(_ context.Context, p util.FullPath) (map[string][]byte, error) {
if m.attrsFn == nil {
return nil, nil
}
return m.attrsFn(p)
}
func Test_isUnderPath(t *testing.T) { func Test_isUnderPath(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -566,3 +596,90 @@ func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) {
cleaner.Stop() cleaner.Stop()
} }
func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
var deleted []string
mock := &mockFilerOps{
countFn: func(_ util.FullPath) (int, error) {
return 0, nil
},
deleteFn: func(path util.FullPath) error {
deleted = append(deleted, string(path))
return nil
},
attrsFn: func(_ util.FullPath) (map[string][]byte, error) {
return map[string][]byte{s3_constants.ExtS3ImplicitDir: []byte("true")}, nil
},
}
cleaner := &EmptyFolderCleaner{
filer: mock,
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(2, time.Hour),
maxCountCheck: 1000,
cacheExpiry: time.Minute,
processorSleep: time.Second,
stopCh: make(chan struct{}),
}
now := time.Now()
cleaner.cleanupQueue.Add("/buckets/test/folder1", now)
cleaner.cleanupQueue.Add("/buckets/test/folder2", now.Add(time.Millisecond))
cleaner.cleanupQueue.Add("/buckets/test/folder3", now.Add(2*time.Millisecond))
cleaner.processCleanupQueue()
if got := cleaner.cleanupQueue.Len(); got != 0 {
t.Fatalf("expected queue to be drained, got len=%d", got)
}
if len(deleted) != 3 {
t.Fatalf("expected 3 deleted folders, got %d", len(deleted))
}
}
func TestEmptyFolderCleaner_executeCleanup_missingImplicitAttributeStillDeletes(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
var deleted []string
mock := &mockFilerOps{
countFn: func(_ util.FullPath) (int, error) {
return 0, nil
},
deleteFn: func(path util.FullPath) error {
deleted = append(deleted, string(path))
return nil
},
attrsFn: func(_ util.FullPath) (map[string][]byte, error) {
return map[string][]byte{}, nil
},
}
cleaner := &EmptyFolderCleaner{
filer: mock,
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, time.Minute),
maxCountCheck: 1000,
cacheExpiry: time.Minute,
processorSleep: time.Second,
stopCh: make(chan struct{}),
}
folder := "/buckets/test/folder"
cleaner.executeCleanup(folder)
if len(deleted) != 1 || deleted[0] != folder {
t.Fatalf("expected folder %s to be deleted, got %v", folder, deleted)
}
}

15
weed/s3api/s3api_object_handlers_delete.go

@ -2,7 +2,6 @@ package s3api
import ( import (
"encoding/xml" "encoding/xml"
"fmt"
"io" "io"
"net/http" "net/http"
"strings" "strings"
@ -125,7 +124,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
return return
} }
target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object))
// Normalize trailing-slash object keys (e.g. "path/") to the
// underlying directory entry path so DeleteEntry gets a valid name.
target := util.NewFullPath(s3a.bucketDir(bucket), object)
dir, name := target.DirAndName() dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
@ -340,13 +341,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
} }
} else { } else {
// Handle non-versioned delete (original logic) // Handle non-versioned delete (original logic)
lastSeparator := strings.LastIndex(object.Key, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
entryName = object.Key[lastSeparator+1:]
parentDirectoryPath = object.Key[:lastSeparator]
}
parentDirectoryPath = fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), parentDirectoryPath)
target := util.NewFullPath(s3a.bucketDir(bucket), object.Key)
parentDirectoryPath, entryName := target.DirAndName()
isDeleteData, isRecursive := true, false
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil { if err == nil {

Loading…
Cancel
Save