diff --git a/test/s3/spark/issue_8234_repro_test.go b/test/s3/spark/issue_8234_repro_test.go index 7f78ae9f5..07d264aa1 100644 --- a/test/s3/spark/issue_8234_repro_test.go +++ b/test/s3/spark/issue_8234_repro_test.go @@ -14,7 +14,7 @@ func setupSparkIssue8234Env(t *testing.T) *TestEnvironment { } env.StartSeaweedFS(t) - t.Cleanup(func() { env.Cleanup() }) + t.Cleanup(func() { env.Cleanup(t) }) createObjectBucket(t, env, "test") env.startSparkContainer(t) diff --git a/test/s3/spark/issue_8285_repro_test.go b/test/s3/spark/issue_8285_repro_test.go new file mode 100644 index 000000000..a514356da --- /dev/null +++ b/test/s3/spark/issue_8285_repro_test.go @@ -0,0 +1,180 @@ +package spark + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +func TestSparkS3TemporaryDirectoryCleanupIssue8285Regression(t *testing.T) { + if testing.Short() { + t.Skip("Skipping Spark integration test in short mode") + } + + env := setupSparkIssue8234Env(t) + + script := ` +import pyspark.sql.functions as F + +target = "s3a://test/issue-8285/output" + +spark.conf.set("spark.hadoop.fs.s3a.committer.name", "directory") +spark.conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false") +spark.conf.set("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", "true") +spark.conf.set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append") +spark.conf.set("spark.hadoop.fs.s3a.committer.staging.tmp.path", "/tmp") +spark.conf.set("spark.hadoop.fs.s3a.directory.marker.retention", "keep") + +df = spark.range(0, 200).repartition(12).withColumn("value", F.col("id") * 2) +df.write.format("parquet").mode("overwrite").save(target) +count = spark.read.parquet(target).count() +print("WRITE_COUNT=" + str(count)) +` + + code, output := runSparkPyScript(t, env.sparkContainer, script, env.s3Port) + if code != 0 { + t.Fatalf("Spark script exited with code %d; output:\n%s", code, output) + } + if !strings.Contains(output, "WRITE_COUNT=200") { + t.Fatalf("expected write/read success marker in output, got:\n%s", output) + } + + keys := listObjectKeysByPrefix(t, env, "test", "issue-8285/") + var temporaryKeys []string + for _, key := range keys { + if hasTemporaryPathSegment(key) { + temporaryKeys = append(temporaryKeys, key) + } + } + + if len(temporaryKeys) > 0 { + t.Fatalf("issue #8285 regression detected: found lingering _temporary artifacts: %v\nall keys: %v", temporaryKeys, keys) + } + + temporaryCandidates := []string{ + "issue-8285/output/_temporary/", + "issue-8285/output/_temporary/0/", + "issue-8285/output/_temporary/0/_temporary/", + } + lingering := waitForObjectsToDisappear(t, env, "test", temporaryCandidates, 35*time.Second) + if len(lingering) > 0 { + t.Fatalf("issue #8285 regression detected: lingering temporary directories: %v", lingering) + } +} + +func listObjectKeysByPrefix(t *testing.T, env *TestEnvironment, bucketName, prefix string) []string { + t.Helper() + client := newS3Client(env) + + pager := s3.NewListObjectsV2Paginator(client, &s3.ListObjectsV2Input{ + Bucket: aws.String(bucketName), + Prefix: aws.String(prefix), + }) + + var keys []string + for pager.HasMorePages() { + page, err := pager.NextPage(context.Background()) + if err != nil { + t.Fatalf("failed listing objects for prefix %q: %v", prefix, err) + } + for _, object := range page.Contents { + keys = append(keys, aws.ToString(object.Key)) + } + } + + return keys +} + +func headObjectInfo(t *testing.T, env *TestEnvironment, bucketName, key string) (bool, string, error) { + t.Helper() + + client := newS3Client(env) + output, err := client.HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + }) + if err == nil { + return true, aws.ToString(output.ContentType), nil + } + + var notFound *s3types.NotFound + if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") || errors.As(err, ¬Found) { + return false, "", nil + } + return false, "", err +} + +func waitForObjectsToDisappear(t *testing.T, env *TestEnvironment, bucketName string, keys []string, timeout time.Duration) []string { + t.Helper() + + deadline := time.Now().Add(timeout) + pending := make(map[string]struct{}, len(keys)) + details := make(map[string]string, len(keys)) + for _, key := range keys { + pending[key] = struct{}{} + } + + for len(pending) > 0 && time.Now().Before(deadline) { + for key := range pending { + exists, contentType, err := headObjectInfo(t, env, bucketName, key) + if err != nil { + details[key] = fmt.Sprintf("%s (head_error=%v)", key, err) + continue + } + if !exists { + delete(pending, key) + delete(details, key) + continue + } + details[key] = fmt.Sprintf("%s (exists=true, contentType=%q)", key, contentType) + } + if len(pending) > 0 { + time.Sleep(2 * time.Second) + } + } + + if len(pending) == 0 { + return nil + } + + var lingering []string + for _, key := range keys { + if _, ok := pending[key]; !ok { + continue + } + if detail, hasDetail := details[key]; hasDetail { + lingering = append(lingering, detail) + } else { + lingering = append(lingering, key) + } + } + return lingering +} + +func newS3Client(env *TestEnvironment) *s3.Client { + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://localhost:%d", env.s3Port)), + } + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) +} + +func hasTemporaryPathSegment(key string) bool { + for _, segment := range strings.Split(strings.TrimSuffix(key, "/"), "/") { + if segment == "_temporary" { + return true + } + } + return false +} diff --git a/test/s3/spark/setup_test.go b/test/s3/spark/setup_test.go index eadd84d18..79053a0ab 100644 --- a/test/s3/spark/setup_test.go +++ b/test/s3/spark/setup_test.go @@ -8,6 +8,7 @@ import ( "net" "os" "os/exec" + "path/filepath" "sync" "testing" "time" @@ -53,6 +54,8 @@ type TestEnvironment struct { dockerAvailable bool weedBinary string seaweedfsDataDir string + weedLogPath string + weedLogFile *os.File masterPort int filerPort int s3Port int @@ -113,6 +116,15 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { "-s3.config", iamConfigPath, "-dir", env.seaweedfsDataDir, ) + weedLogPath := filepath.Join(env.seaweedfsDataDir, "weed-mini.log") + weedLogFile, err := os.Create(weedLogPath) + if err != nil { + t.Fatalf("failed to create weed log file: %v", err) + } + env.weedLogPath = weedLogPath + env.weedLogFile = weedLogFile + env.masterProcess.Stdout = weedLogFile + env.masterProcess.Stderr = weedLogFile env.masterProcess.Env = append(os.Environ(), "AWS_ACCESS_KEY_ID="+env.accessKey, "AWS_SECRET_ACCESS_KEY="+env.secretKey, @@ -160,12 +172,30 @@ func (env *TestEnvironment) startSparkContainer(t *testing.T) { env.sparkContainer = container } -func (env *TestEnvironment) Cleanup() { +func (env *TestEnvironment) Cleanup(t *testing.T) { if env.masterProcess != nil && env.masterProcess.Process != nil { _ = env.masterProcess.Process.Kill() _ = env.masterProcess.Wait() } clearMiniProcess(env.masterProcess) + if env.weedLogFile != nil { + _ = env.weedLogFile.Close() + } + + if t.Failed() && os.Getenv("CI") != "" && env.weedLogPath != "" { + logData, err := os.ReadFile(env.weedLogPath) + if err != nil { + t.Logf("failed to read weed mini log file %s: %v", env.weedLogPath, err) + } else { + // Print the tail to keep CI output manageable while preserving failure context. + const maxTailBytes = 64 * 1024 + start := 0 + if len(logData) > maxTailBytes { + start = len(logData) - maxTailBytes + } + t.Logf("weed mini logs (tail, %d bytes):\n%s", len(logData)-start, string(logData[start:])) + } + } if env.sparkContainer != nil { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 53fb50da7..5238e7fc5 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -390,11 +390,10 @@ func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket u } func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { - if !strings.HasPrefix(dir, bucketsDir+"/") { - return "", false + if bucketPath, ok := util.ExtractBucketPath(bucketsDir, dir, false); ok { + return util.FullPath(bucketPath), true } - parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2) - return util.FullPath(bucketsDir).Child(parts[0]), true + return "", false } func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) { diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 791e5b090..22e30a6cb 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "os/user" + "path" "runtime" "strconv" "strings" @@ -26,6 +27,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/storage/types" "google.golang.org/grpc/reflection" @@ -59,6 +61,63 @@ func runMount(cmd *Command, args []string) bool { return RunMount(&mountOptions, os.FileMode(umask)) } +func ensureBucketAllowEmptyFolders(ctx context.Context, filerClient filer_pb.FilerClient, mountRoot, bucketRootPath string) error { + bucketPath, isBucketRootMount := bucketPathForMountRoot(mountRoot, bucketRootPath) + if !isBucketRootMount { + return nil + } + + entry, err := filer_pb.GetEntry(ctx, filerClient, util.FullPath(bucketPath)) + if err != nil { + return err + } + if entry == nil { + return fmt.Errorf("bucket %s not found", bucketPath) + } + + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + if strings.EqualFold(strings.TrimSpace(string(entry.Extended[s3_constants.ExtAllowEmptyFolders])), "true") { + return nil + } + + entry.Extended[s3_constants.ExtAllowEmptyFolders] = []byte("true") + + bucketFullPath := util.FullPath(bucketPath) + parent, _ := bucketFullPath.DirAndName() + if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: parent, + Entry: entry, + }) + }); err != nil { + return err + } + + glog.V(3).Infof("RunMount: set bucket %s %s=true", bucketPath, s3_constants.ExtAllowEmptyFolders) + return nil +} + +func bucketPathForMountRoot(mountRoot, bucketRootPath string) (string, bool) { + cleanPath := path.Clean("/" + strings.TrimPrefix(mountRoot, "/")) + cleanBucketRoot := path.Clean("/" + strings.TrimPrefix(bucketRootPath, "/")) + if cleanBucketRoot == "/" { + return "", false + } + prefix := cleanBucketRoot + "/" + if !strings.HasPrefix(cleanPath, prefix) { + return "", false + } + rest := strings.TrimPrefix(cleanPath, prefix) + + bucketParts := strings.Split(rest, "/") + if len(bucketParts) != 1 || bucketParts[0] == "" { + return "", false + } + return cleanBucketRoot + "/" + bucketParts[0], true +} + func RunMount(option *MountOptions, umask os.FileMode) bool { // basic checks @@ -73,6 +132,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool + var bucketRootPath string var err error for i := 0; i < 10; i++ { err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -81,6 +141,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { return fmt.Errorf("get filer grpc address %v configuration: %w", filerAddresses, err) } cipher = resp.Cipher + bucketRootPath = resp.DirBuckets return nil }) if err != nil { @@ -93,6 +154,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err) return true } + if bucketRootPath == "" { + bucketRootPath = "/buckets" + } filerMountRootPath := *option.filerMountRootPath @@ -287,6 +351,10 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { fmt.Printf("failed to create dir %s on filer %s: %v\n", mountRoot, filerAddresses, err) return false } + if err := ensureBucketAllowEmptyFolders(context.Background(), seaweedFileSystem, mountRoot, bucketRootPath); err != nil { + fmt.Printf("failed to set bucket auto-remove-empty-folders policy for %s: %v\n", mountRoot, err) + return false + } server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions) if err != nil { diff --git a/weed/command/mount_std_test.go b/weed/command/mount_std_test.go new file mode 100644 index 000000000..e4b8e7598 --- /dev/null +++ b/weed/command/mount_std_test.go @@ -0,0 +1,52 @@ +//go:build linux || darwin || freebsd +// +build linux darwin freebsd + +package command + +import "testing" + +func Test_bucketPathForMountRoot(t *testing.T) { + tests := []struct { + name string + mountRoot string + expected string + ok bool + }{ + { + name: "bucket root mount", + mountRoot: "/buckets/test", + expected: "/buckets/test", + ok: true, + }, + { + name: "bucket root with trailing slash", + mountRoot: "/buckets/test/", + expected: "/buckets/test", + ok: true, + }, + { + name: "subdirectory mount", + mountRoot: "/buckets/test/data", + expected: "", + ok: false, + }, + { + name: "non-bucket mount", + mountRoot: "/data/test", + expected: "", + ok: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotPath, gotOK := bucketPathForMountRoot(tt.mountRoot, "/buckets") + if gotOK != tt.ok { + t.Fatalf("expected ok=%v, got %v", tt.ok, gotOK) + } + if gotPath != tt.expected { + t.Fatalf("expected path %q, got %q", tt.expected, gotPath) + } + }) + } +} diff --git a/weed/filer/empty_folder_cleanup/cleanup_queue.go b/weed/filer/empty_folder_cleanup/cleanup_queue.go index b2f66a835..5e2acd85c 100644 --- a/weed/filer/empty_folder_cleanup/cleanup_queue.go +++ b/weed/filer/empty_folder_cleanup/cleanup_queue.go @@ -21,8 +21,9 @@ type CleanupQueue struct { // queueItem represents an item in the cleanup queue type queueItem struct { - folder string - queueTime time.Time + folder string + triggeredBy string + queueTime time.Time } // NewCleanupQueue creates a new CleanupQueue with the specified limits @@ -39,7 +40,7 @@ func NewCleanupQueue(maxSize int, maxAge time.Duration) *CleanupQueue { // The item is inserted in time-sorted order (oldest at front) to handle out-of-order events. // If folder already exists with an older time, the time is updated and position adjusted. // Returns true if the folder was newly added, false if it was updated. -func (q *CleanupQueue) Add(folder string, eventTime time.Time) bool { +func (q *CleanupQueue) Add(folder string, triggeredBy string, eventTime time.Time) bool { q.mu.Lock() defer q.mu.Unlock() @@ -51,23 +52,24 @@ func (q *CleanupQueue) Add(folder string, eventTime time.Time) bool { // Remove from current position q.items.Remove(elem) // Re-insert with new time in sorted position - newElem := q.insertSorted(folder, eventTime) + newElem := q.insertSorted(folder, triggeredBy, eventTime) q.itemsMap[folder] = newElem } return false } // Insert new folder in sorted position - elem := q.insertSorted(folder, eventTime) + elem := q.insertSorted(folder, triggeredBy, eventTime) q.itemsMap[folder] = elem return true } // insertSorted inserts an item in the correct position to maintain time ordering (oldest at front) -func (q *CleanupQueue) insertSorted(folder string, eventTime time.Time) *list.Element { +func (q *CleanupQueue) insertSorted(folder string, triggeredBy string, eventTime time.Time) *list.Element { item := &queueItem{ - folder: folder, - queueTime: eventTime, + folder: folder, + triggeredBy: triggeredBy, + queueTime: eventTime, } // Find the correct position (insert before the first item with a later time) @@ -135,35 +137,35 @@ func (q *CleanupQueue) shouldProcessLocked() bool { // Pop removes and returns the oldest folder from the queue. // Returns the folder and true if an item was available, or empty string and false if queue is empty. -func (q *CleanupQueue) Pop() (string, bool) { +func (q *CleanupQueue) Pop() (string, string, bool) { q.mu.Lock() defer q.mu.Unlock() front := q.items.Front() if front == nil { - return "", false + return "", "", false } item := front.Value.(*queueItem) q.items.Remove(front) delete(q.itemsMap, item.folder) - return item.folder, true + return item.folder, item.triggeredBy, true } // Peek returns the oldest folder without removing it. // Returns the folder and queue time if available, or empty values if queue is empty. -func (q *CleanupQueue) Peek() (folder string, queueTime time.Time, ok bool) { +func (q *CleanupQueue) Peek() (folder string, triggeredBy string, queueTime time.Time, ok bool) { q.mu.Lock() defer q.mu.Unlock() front := q.items.Front() if front == nil { - return "", time.Time{}, false + return "", "", time.Time{}, false } item := front.Value.(*queueItem) - return item.folder, item.queueTime, true + return item.folder, item.triggeredBy, item.queueTime, true } // Len returns the current queue size. diff --git a/weed/filer/empty_folder_cleanup/cleanup_queue_test.go b/weed/filer/empty_folder_cleanup/cleanup_queue_test.go index 0e26495dd..b68f054d5 100644 --- a/weed/filer/empty_folder_cleanup/cleanup_queue_test.go +++ b/weed/filer/empty_folder_cleanup/cleanup_queue_test.go @@ -10,7 +10,7 @@ func TestCleanupQueue_Add(t *testing.T) { now := time.Now() // Add first item - if !q.Add("/buckets/b1/folder1", now) { + if !q.Add("/buckets/b1/folder1", "item1", now) { t.Error("expected Add to return true for new item") } if q.Len() != 1 { @@ -18,7 +18,7 @@ func TestCleanupQueue_Add(t *testing.T) { } // Add second item with later time - if !q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) { + if !q.Add("/buckets/b1/folder2", "item2", now.Add(1*time.Second)) { t.Error("expected Add to return true for new item") } if q.Len() != 2 { @@ -26,7 +26,7 @@ func TestCleanupQueue_Add(t *testing.T) { } // Add duplicate with newer time - should update and reposition - if q.Add("/buckets/b1/folder1", now.Add(2*time.Second)) { + if q.Add("/buckets/b1/folder1", "item1-updated", now.Add(2*time.Second)) { t.Error("expected Add to return false for existing item") } if q.Len() != 2 { @@ -34,11 +34,14 @@ func TestCleanupQueue_Add(t *testing.T) { } // folder1 should now be at the back (newer time) - verify by popping - folder1, _ := q.Pop() - folder2, _ := q.Pop() + folder1, triggered1, _ := q.Pop() + folder2, triggered2, _ := q.Pop() if folder1 != "/buckets/b1/folder2" || folder2 != "/buckets/b1/folder1" { t.Errorf("expected folder1 to be moved to back, got %s, %s", folder1, folder2) } + if triggered1 != "item2" || triggered2 != "item1-updated" { + t.Errorf("triggeredBy mismatch: got %s, %s", triggered1, triggered2) + } } func TestCleanupQueue_Add_OutOfOrder(t *testing.T) { @@ -46,16 +49,23 @@ func TestCleanupQueue_Add_OutOfOrder(t *testing.T) { baseTime := time.Now() // Add items out of order - q.Add("/buckets/b1/folder3", baseTime.Add(3*time.Second)) - q.Add("/buckets/b1/folder1", baseTime.Add(1*time.Second)) - q.Add("/buckets/b1/folder2", baseTime.Add(2*time.Second)) + q.Add("/buckets/b1/folder3", "f3", baseTime.Add(3*time.Second)) + q.Add("/buckets/b1/folder1", "f1", baseTime.Add(1*time.Second)) + q.Add("/buckets/b1/folder2", "f2", baseTime.Add(2*time.Second)) // Items should be in time order (oldest first) - verify by popping - expected := []string{"/buckets/b1/folder1", "/buckets/b1/folder2", "/buckets/b1/folder3"} + expected := []struct { + folder string + item string + }{ + {"/buckets/b1/folder1", "f1"}, + {"/buckets/b1/folder2", "f2"}, + {"/buckets/b1/folder3", "f3"}, + } for i, exp := range expected { - folder, ok := q.Pop() - if !ok || folder != exp { - t.Errorf("at index %d: expected %s, got %s", i, exp, folder) + folder, item, ok := q.Pop() + if !ok || folder != exp.folder || item != exp.item { + t.Errorf("at index %d: expected %s by %s, got %s by %s", i, exp.folder, exp.item, folder, item) } } } @@ -65,15 +75,15 @@ func TestCleanupQueue_Add_DuplicateWithOlderTime(t *testing.T) { baseTime := time.Now() // Add folder at t=5 - q.Add("/buckets/b1/folder1", baseTime.Add(5*time.Second)) + q.Add("/buckets/b1/folder1", "t5", baseTime.Add(5*time.Second)) // Try to add same folder with older time - should NOT update - q.Add("/buckets/b1/folder1", baseTime.Add(2*time.Second)) + q.Add("/buckets/b1/folder1", "t2", baseTime.Add(2*time.Second)) - // Time should remain at t=5 - _, queueTime, _ := q.Peek() - if queueTime != baseTime.Add(5*time.Second) { - t.Errorf("expected time to remain unchanged, got %v", queueTime) + // Time and triggeredBy should remain at t=5 + _, triggered, queueTime, _ := q.Peek() + if queueTime != baseTime.Add(5*time.Second) || triggered != "t5" { + t.Errorf("expected time to remain unchanged, got %v and %s", queueTime, triggered) } } @@ -81,9 +91,9 @@ func TestCleanupQueue_Remove(t *testing.T) { q := NewCleanupQueue(100, 10*time.Minute) now := time.Now() - q.Add("/buckets/b1/folder1", now) - q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) - q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + q.Add("/buckets/b1/folder1", "i1", now) + q.Add("/buckets/b1/folder2", "i2", now.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", "i3", now.Add(2*time.Second)) // Remove middle item if !q.Remove("/buckets/b1/folder2") { @@ -102,8 +112,8 @@ func TestCleanupQueue_Remove(t *testing.T) { } // Verify order is preserved by popping - folder1, _ := q.Pop() - folder3, _ := q.Pop() + folder1, _, _ := q.Pop() + folder3, _, _ := q.Pop() if folder1 != "/buckets/b1/folder1" || folder3 != "/buckets/b1/folder3" { t.Errorf("unexpected order: %s, %s", folder1, folder3) } @@ -114,30 +124,30 @@ func TestCleanupQueue_Pop(t *testing.T) { now := time.Now() // Pop from empty queue - folder, ok := q.Pop() + folder, item, ok := q.Pop() if ok { t.Error("expected Pop to return false for empty queue") } - if folder != "" { - t.Errorf("expected empty folder, got %s", folder) + if folder != "" || item != "" { + t.Errorf("expected empty results, got %s by %s", folder, item) } // Add items and pop in order - q.Add("/buckets/b1/folder1", now) - q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) - q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + q.Add("/buckets/b1/folder1", "i1", now) + q.Add("/buckets/b1/folder2", "i2", now.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", "i3", now.Add(2*time.Second)) - folder, ok = q.Pop() + folder, _, ok = q.Pop() if !ok || folder != "/buckets/b1/folder1" { t.Errorf("expected folder1, got %s (ok=%v)", folder, ok) } - folder, ok = q.Pop() + folder, _, ok = q.Pop() if !ok || folder != "/buckets/b1/folder2" { t.Errorf("expected folder2, got %s (ok=%v)", folder, ok) } - folder, ok = q.Pop() + folder, _, ok = q.Pop() if !ok || folder != "/buckets/b1/folder3" { t.Errorf("expected folder3, got %s (ok=%v)", folder, ok) } @@ -153,16 +163,16 @@ func TestCleanupQueue_Peek(t *testing.T) { now := time.Now() // Peek empty queue - folder, _, ok := q.Peek() + folder, item, _, ok := q.Peek() if ok { t.Error("expected Peek to return false for empty queue") } // Add item and peek - q.Add("/buckets/b1/folder1", now) - folder, queueTime, ok := q.Peek() - if !ok || folder != "/buckets/b1/folder1" { - t.Errorf("expected folder1, got %s (ok=%v)", folder, ok) + q.Add("/buckets/b1/folder1", "i1", now) + folder, item, queueTime, ok := q.Peek() + if !ok || folder != "/buckets/b1/folder1" || item != "i1" { + t.Errorf("expected folder1 by i1, got %s by %s (ok=%v)", folder, item, ok) } if queueTime != now { t.Errorf("expected queue time %v, got %v", now, queueTime) @@ -178,7 +188,7 @@ func TestCleanupQueue_Contains(t *testing.T) { q := NewCleanupQueue(100, 10*time.Minute) now := time.Now() - q.Add("/buckets/b1/folder1", now) + q.Add("/buckets/b1/folder1", "i1", now) if !q.Contains("/buckets/b1/folder1") { t.Error("expected Contains to return true") @@ -198,14 +208,14 @@ func TestCleanupQueue_ShouldProcess_MaxSize(t *testing.T) { } // Add items below max - q.Add("/buckets/b1/folder1", now) - q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) + q.Add("/buckets/b1/folder1", "i1", now) + q.Add("/buckets/b1/folder2", "i2", now.Add(1*time.Second)) if q.ShouldProcess() { t.Error("queue below max should not need processing") } // Add item to reach max - q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + q.Add("/buckets/b1/folder3", "i3", now.Add(2*time.Second)) if !q.ShouldProcess() { t.Error("queue at max should need processing") } @@ -216,7 +226,7 @@ func TestCleanupQueue_ShouldProcess_MaxAge(t *testing.T) { // Add item with old event time oldTime := time.Now().Add(-1 * time.Second) // 1 second ago - q.Add("/buckets/b1/folder1", oldTime) + q.Add("/buckets/b1/folder1", "i1", oldTime) // Item is older than maxAge, should need processing if !q.ShouldProcess() { @@ -225,7 +235,7 @@ func TestCleanupQueue_ShouldProcess_MaxAge(t *testing.T) { // Clear and add fresh item q.Clear() - q.Add("/buckets/b1/folder2", time.Now()) + q.Add("/buckets/b1/folder2", "i2", time.Now()) // Fresh item should not trigger processing if q.ShouldProcess() { @@ -237,9 +247,9 @@ func TestCleanupQueue_Clear(t *testing.T) { q := NewCleanupQueue(100, 10*time.Minute) now := time.Now() - q.Add("/buckets/b1/folder1", now) - q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) - q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + q.Add("/buckets/b1/folder1", "i1", now) + q.Add("/buckets/b1/folder2", "i2", now.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", "i3", now.Add(2*time.Second)) q.Clear() @@ -261,7 +271,7 @@ func TestCleanupQueue_OldestAge(t *testing.T) { // Add item with time in the past oldTime := time.Now().Add(-5 * time.Minute) - q.Add("/buckets/b1/folder1", oldTime) + q.Add("/buckets/b1/folder1", "i1", oldTime) // Age should be approximately 5 minutes age := q.OldestAge() @@ -283,12 +293,12 @@ func TestCleanupQueue_TimeOrder(t *testing.T) { "/buckets/b1/e", } for i, item := range items { - q.Add(item, baseTime.Add(time.Duration(i)*time.Second)) + q.Add(item, "i", baseTime.Add(time.Duration(i)*time.Second)) } // Pop should return in time order for i, expected := range items { - got, ok := q.Pop() + got, _, ok := q.Pop() if !ok { t.Errorf("Pop %d: expected item, got empty", i) } @@ -303,17 +313,17 @@ func TestCleanupQueue_DuplicateWithNewerTime(t *testing.T) { baseTime := time.Now() // Add items - q.Add("/buckets/b1/folder1", baseTime) - q.Add("/buckets/b1/folder2", baseTime.Add(1*time.Second)) - q.Add("/buckets/b1/folder3", baseTime.Add(2*time.Second)) + q.Add("/buckets/b1/folder1", "i1", baseTime) + q.Add("/buckets/b1/folder2", "i2", baseTime.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", "i3", baseTime.Add(2*time.Second)) // Add duplicate with newer time - should update and reposition - q.Add("/buckets/b1/folder1", baseTime.Add(3*time.Second)) + q.Add("/buckets/b1/folder1", "i1-new", baseTime.Add(3*time.Second)) // folder1 should now be at the back (newest time) - verify by popping expected := []string{"/buckets/b1/folder2", "/buckets/b1/folder3", "/buckets/b1/folder1"} for i, exp := range expected { - folder, ok := q.Pop() + folder, _, ok := q.Pop() if !ok || folder != exp { t.Errorf("at index %d: expected %s, got %s", i, exp, folder) } @@ -328,7 +338,7 @@ func TestCleanupQueue_Concurrent(t *testing.T) { // Concurrent adds go func() { for i := 0; i < 100; i++ { - q.Add("/buckets/b1/folder"+string(rune('A'+i%26)), now.Add(time.Duration(i)*time.Millisecond)) + q.Add("/buckets/b1/folder"+string(rune('A'+i%26)), "item", now.Add(time.Duration(i)*time.Millisecond)) } done <- true }() diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go index d98dd5ee6..943d86c6f 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -18,7 +18,7 @@ const ( DefaultMaxCountCheck = 1000 DefaultCacheExpiry = 5 * time.Minute DefaultQueueMaxSize = 1000 - DefaultQueueMaxAge = 10 * time.Minute + DefaultQueueMaxAge = 5 * time.Second DefaultProcessorSleep = 10 * time.Second // How often to check queue ) @@ -32,12 +32,17 @@ type FilerOperations interface { // folderState tracks the state of a folder for empty folder cleanup type folderState struct { roughCount int // Cached rough count (up to maxCountCheck) - isImplicit *bool // Tri-state boolean: nil (unknown), true (implicit), false (explicit) lastAddTime time.Time // Last time an item was added lastDelTime time.Time // Last time an item was deleted lastCheck time.Time // Last time we checked the actual count } +type bucketCleanupPolicyState struct { + autoRemove bool + attrValue string + lastCheck time.Time +} + // EmptyFolderCleaner handles asynchronous cleanup of empty folders // Each filer owns specific folders via consistent hashing based on the peer filer list type EmptyFolderCleaner struct { @@ -46,8 +51,9 @@ type EmptyFolderCleaner struct { host pb.ServerAddress // Folder state tracking - mu sync.RWMutex - folderCounts map[string]*folderState // Rough count cache + mu sync.RWMutex + folderCounts map[string]*folderState // Rough count cache + bucketCleanupPolicies map[string]*bucketCleanupPolicyState // bucket path -> cleanup policy cache // Cleanup queue (thread-safe, has its own lock) cleanupQueue *CleanupQueue @@ -66,17 +72,18 @@ type EmptyFolderCleaner struct { // NewEmptyFolderCleaner creates a new EmptyFolderCleaner func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string) *EmptyFolderCleaner { efc := &EmptyFolderCleaner{ - filer: filer, - lockRing: lockRing, - host: host, - folderCounts: make(map[string]*folderState), - cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge), - maxCountCheck: DefaultMaxCountCheck, - cacheExpiry: DefaultCacheExpiry, - processorSleep: DefaultProcessorSleep, - bucketPath: bucketPath, - enabled: true, - stopCh: make(chan struct{}), + filer: filer, + lockRing: lockRing, + host: host, + folderCounts: make(map[string]*folderState), + bucketCleanupPolicies: make(map[string]*bucketCleanupPolicyState), + cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge), + maxCountCheck: DefaultMaxCountCheck, + cacheExpiry: DefaultCacheExpiry, + processorSleep: DefaultProcessorSleep, + bucketPath: bucketPath, + enabled: true, + stopCh: make(chan struct{}), } go efc.cacheEvictionLoop() go efc.cleanupProcessor() @@ -162,8 +169,8 @@ func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, } // Add to cleanup queue with event time (handles out-of-order events) - if efc.cleanupQueue.Add(directory, eventTime) { - glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory) + if efc.cleanupQueue.Add(directory, entryName, eventTime) { + glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup (triggered by %s)", directory, entryName) } } @@ -214,6 +221,10 @@ func (efc *EmptyFolderCleaner) cleanupProcessor() { func (efc *EmptyFolderCleaner) processCleanupQueue() { // Check if we should process if !efc.cleanupQueue.ShouldProcess() { + if efc.cleanupQueue.Len() > 0 { + glog.Infof("EmptyFolderCleaner: pending queue not processed yet (len=%d, oldest_age=%v, max_size=%d, max_age=%v)", + efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge(), efc.cleanupQueue.maxSize, efc.cleanupQueue.maxAge) + } return } @@ -228,35 +239,30 @@ func (efc *EmptyFolderCleaner) processCleanupQueue() { } // Pop the oldest item - folder, ok := efc.cleanupQueue.Pop() + folder, triggeredBy, ok := efc.cleanupQueue.Pop() if !ok { break } // Execute cleanup for this folder - efc.executeCleanup(folder) - - // If queue is no longer full and oldest item is not old enough, stop processing - if !efc.cleanupQueue.ShouldProcess() { - break - } + efc.executeCleanup(folder, triggeredBy) } } // executeCleanup performs the actual cleanup of an empty folder -func (efc *EmptyFolderCleaner) executeCleanup(folder string) { +func (efc *EmptyFolderCleaner) executeCleanup(folder string, triggeredBy string) { efc.mu.Lock() // Quick check: if we have cached count and it's > 0, skip if state, exists := efc.folderCounts[folder]; exists { if state.roughCount > 0 { - glog.V(3).Infof("EmptyFolderCleaner: skipping %s, cached count=%d", folder, state.roughCount) + glog.V(3).Infof("EmptyFolderCleaner: skipping %s (triggered by %s), cached count=%d", folder, triggeredBy, state.roughCount) efc.mu.Unlock() return } // If there was an add after our delete, skip if !state.lastAddTime.IsZero() && state.lastAddTime.After(state.lastDelTime) { - glog.V(3).Infof("EmptyFolderCleaner: skipping %s, add happened after delete", folder) + glog.V(3).Infof("EmptyFolderCleaner: skipping %s (triggered by %s), add happened after delete", folder, triggeredBy) efc.mu.Unlock() return } @@ -265,47 +271,23 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) { // Re-check ownership (topology might have changed) if !efc.ownsFolder(folder) { - glog.V(3).Infof("EmptyFolderCleaner: no longer owner of %s, skipping", folder) + glog.V(3).Infof("EmptyFolderCleaner: no longer owner of %s (triggered by %s), skipping", folder, triggeredBy) return } - // Check for explicit implicit_dir attribute - // First check cache ctx := context.Background() - efc.mu.RLock() - var cachedImplicit *bool - if state, exists := efc.folderCounts[folder]; exists { - cachedImplicit = state.isImplicit - } - efc.mu.RUnlock() - - var isImplicit bool - if cachedImplicit != nil { - isImplicit = *cachedImplicit - } else { - // Not cached, check filer - attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(folder)) - if err != nil { - if err == filer_pb.ErrNotFound { - return - } - glog.V(2).Infof("EmptyFolderCleaner: error getting attributes for %s: %v", folder, err) + bucketPath, autoRemove, source, attrValue, err := efc.getBucketCleanupPolicy(ctx, folder) + if err != nil { + if err == filer_pb.ErrNotFound { return } - - isImplicit = attrs != nil && string(attrs[s3_constants.ExtS3ImplicitDir]) == "true" - - // Update cache - efc.mu.Lock() - if _, exists := efc.folderCounts[folder]; !exists { - efc.folderCounts[folder] = &folderState{} - } - efc.folderCounts[folder].isImplicit = &isImplicit - efc.mu.Unlock() + glog.V(2).Infof("EmptyFolderCleaner: failed to load bucket cleanup policy for folder %s (triggered by %s): %v", folder, triggeredBy, err) + return } - if !isImplicit { - glog.V(4).Infof("EmptyFolderCleaner: folder %s is not marked as implicit, skipping", folder) + if !autoRemove { + glog.V(3).Infof("EmptyFolderCleaner: skipping folder %s (triggered by %s), bucket %s auto-remove-empty-folders disabled (source=%s attr=%s)", + folder, triggeredBy, bucketPath, source, attrValue) return } @@ -326,14 +308,14 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) { efc.mu.Unlock() if count > 0 { - glog.V(3).Infof("EmptyFolderCleaner: folder %s has %d items, not empty", folder, count) + glog.Infof("EmptyFolderCleaner: folder %s (triggered by %s) has %d items, not empty", folder, triggeredBy, count) return } // Delete the empty folder - glog.V(2).Infof("EmptyFolderCleaner: deleting empty folder %s", folder) + glog.Infof("EmptyFolderCleaner: deleting empty folder %s (triggered by %s)", folder, triggeredBy) if err := efc.deleteFolder(ctx, folder); err != nil { - glog.V(2).Infof("EmptyFolderCleaner: failed to delete empty folder %s: %v", folder, err) + glog.V(2).Infof("EmptyFolderCleaner: failed to delete empty folder %s (triggered by %s): %v", folder, triggeredBy, err) return } @@ -357,6 +339,60 @@ func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string) return efc.filer.DeleteEntryMetaAndData(ctx, util.FullPath(folder), false, false, false, false, nil, 0) } +func (efc *EmptyFolderCleaner) getBucketCleanupPolicy(ctx context.Context, folder string) (bucketPath string, autoRemove bool, source string, attrValue string, err error) { + bucketPath, ok := util.ExtractBucketPath(efc.bucketPath, folder, true) + if !ok { + return "", true, "default", "", nil + } + + now := time.Now() + + efc.mu.RLock() + if state, found := efc.bucketCleanupPolicies[bucketPath]; found && now.Sub(state.lastCheck) <= efc.cacheExpiry { + efc.mu.RUnlock() + return bucketPath, state.autoRemove, "cache", state.attrValue, nil + } + efc.mu.RUnlock() + + attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(bucketPath)) + if err != nil { + return "", true, "", "", err + } + + autoRemove, attrValue = autoRemoveEmptyFoldersEnabled(attrs) + + efc.mu.Lock() + if efc.bucketCleanupPolicies == nil { + efc.bucketCleanupPolicies = make(map[string]*bucketCleanupPolicyState) + } + efc.bucketCleanupPolicies[bucketPath] = &bucketCleanupPolicyState{ + autoRemove: autoRemove, + attrValue: attrValue, + lastCheck: now, + } + efc.mu.Unlock() + + return bucketPath, autoRemove, "filer", attrValue, nil +} + +func autoRemoveEmptyFoldersEnabled(attrs map[string][]byte) (bool, string) { + if attrs == nil { + return true, "" + } + + value, found := attrs[s3_constants.ExtAllowEmptyFolders] + if !found { + return true, "" + } + + text := strings.TrimSpace(string(value)) + if text == "" { + return true, "" + } + + return !strings.EqualFold(text, "true"), text +} + // isUnderPath checks if child is under parent path func isUnderPath(child, parent string) bool { if parent == "" || parent == "/" { @@ -445,6 +481,12 @@ func (efc *EmptyFolderCleaner) evictStaleCacheEntries() { } } + for bucketPath, state := range efc.bucketCleanupPolicies { + if now.Sub(state.lastCheck) > efc.cacheExpiry { + delete(efc.bucketCleanupPolicies, bucketPath) + } + } + if expiredCount > 0 { glog.V(3).Infof("EmptyFolderCleaner: evicted %d stale cache entries", expiredCount) } @@ -460,6 +502,7 @@ func (efc *EmptyFolderCleaner) Stop() { efc.enabled = false efc.cleanupQueue.Clear() efc.folderCounts = make(map[string]*folderState) // Clear cache on stop + efc.bucketCleanupPolicies = make(map[string]*bucketCleanupPolicyState) } // GetPendingCleanupCount returns the number of pending cleanup tasks (for testing) diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go index 1c62a5dd1..25cf1ec68 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go @@ -1,13 +1,43 @@ package empty_folder_cleanup import ( + "context" "testing" "time" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" ) +type mockFilerOps struct { + countFn func(path util.FullPath) (int, error) + deleteFn func(path util.FullPath) error + attrsFn func(path util.FullPath) (map[string][]byte, error) +} + +func (m *mockFilerOps) CountDirectoryEntries(_ context.Context, dirPath util.FullPath, _ int) (int, error) { + if m.countFn == nil { + return 0, nil + } + return m.countFn(dirPath) +} + +func (m *mockFilerOps) DeleteEntryMetaAndData(_ context.Context, p util.FullPath, _, _, _, _ bool, _ []int32, _ int64) error { + if m.deleteFn == nil { + return nil + } + return m.deleteFn(p) +} + +func (m *mockFilerOps) GetEntryAttributes(_ context.Context, p util.FullPath) (map[string][]byte, error) { + if m.attrsFn == nil { + return nil, nil + } + return m.attrsFn(p) +} + func Test_isUnderPath(t *testing.T) { tests := []struct { name string @@ -66,6 +96,56 @@ func Test_isUnderBucketPath(t *testing.T) { } } +func Test_autoRemoveEmptyFoldersEnabled(t *testing.T) { + tests := []struct { + name string + attrs map[string][]byte + enabled bool + attrValue string + }{ + { + name: "no attrs defaults enabled", + attrs: nil, + enabled: true, + attrValue: "", + }, + { + name: "missing key defaults enabled", + attrs: map[string][]byte{}, + enabled: true, + attrValue: "", + }, + { + name: "allow-empty disables cleanup", + attrs: map[string][]byte{ + s3_constants.ExtAllowEmptyFolders: []byte("true"), + }, + enabled: false, + attrValue: "true", + }, + { + name: "explicit false keeps cleanup enabled", + attrs: map[string][]byte{ + s3_constants.ExtAllowEmptyFolders: []byte("false"), + }, + enabled: true, + attrValue: "false", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + enabled, attrValue := autoRemoveEmptyFoldersEnabled(tt.attrs) + if enabled != tt.enabled { + t.Fatalf("expected enabled=%v, got %v", tt.enabled, enabled) + } + if attrValue != tt.attrValue { + t.Fatalf("expected attrValue=%q, got %q", tt.attrValue, attrValue) + } + }) + } +} + func TestEmptyFolderCleaner_ownsFolder(t *testing.T) { // Create a LockRing with multiple servers lockRing := lock_manager.NewLockRing(5 * time.Second) @@ -512,7 +592,7 @@ func TestEmptyFolderCleaner_cacheEviction_skipsEntriesInQueue(t *testing.T) { // Add a stale cache entry cleaner.folderCounts[folder] = &folderState{roughCount: 0, lastCheck: oldTime} // Also add to cleanup queue - cleaner.cleanupQueue.Add(folder, time.Now()) + cleaner.cleanupQueue.Add(folder, "item", time.Now()) // Run eviction cleaner.evictStaleCacheEntries() @@ -558,7 +638,7 @@ func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) { // Verify time-sorted order by popping for i, expected := range folders { - folder, ok := cleaner.cleanupQueue.Pop() + folder, _, ok := cleaner.cleanupQueue.Pop() if !ok || folder != expected { t.Errorf("expected folder %s at index %d, got %s", expected, i, folder) } @@ -566,3 +646,90 @@ func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) { cleaner.Stop() } + +func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + var deleted []string + mock := &mockFilerOps{ + countFn: func(_ util.FullPath) (int, error) { + return 0, nil + }, + deleteFn: func(path util.FullPath) error { + deleted = append(deleted, string(path)) + return nil + }, + } + + cleaner := &EmptyFolderCleaner{ + filer: mock, + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(2, time.Hour), + maxCountCheck: 1000, + cacheExpiry: time.Minute, + processorSleep: time.Second, + stopCh: make(chan struct{}), + } + + now := time.Now() + cleaner.cleanupQueue.Add("/buckets/test/folder1", "i1", now) + cleaner.cleanupQueue.Add("/buckets/test/folder2", "i2", now.Add(time.Millisecond)) + cleaner.cleanupQueue.Add("/buckets/test/folder3", "i3", now.Add(2*time.Millisecond)) + + cleaner.processCleanupQueue() + + if got := cleaner.cleanupQueue.Len(); got != 0 { + t.Fatalf("expected queue to be drained, got len=%d", got) + } + if len(deleted) != 3 { + t.Fatalf("expected 3 deleted folders, got %d", len(deleted)) + } +} + +func TestEmptyFolderCleaner_executeCleanup_bucketPolicyDisabledSkips(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + var deleted []string + mock := &mockFilerOps{ + countFn: func(_ util.FullPath) (int, error) { + return 0, nil + }, + deleteFn: func(path util.FullPath) error { + deleted = append(deleted, string(path)) + return nil + }, + attrsFn: func(path util.FullPath) (map[string][]byte, error) { + if string(path) == "/buckets/test" { + return map[string][]byte{s3_constants.ExtAllowEmptyFolders: []byte("true")}, nil + } + return nil, nil + }, + } + + cleaner := &EmptyFolderCleaner{ + filer: mock, + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, time.Minute), + maxCountCheck: 1000, + cacheExpiry: time.Minute, + processorSleep: time.Second, + stopCh: make(chan struct{}), + } + + folder := "/buckets/test/folder" + cleaner.executeCleanup(folder, "triggered_item") + + if len(deleted) != 0 { + t.Fatalf("expected folder %s to be skipped, got deletions %v", folder, deleted) + } +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 9f58d3f9d..042400c74 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" @@ -303,12 +302,8 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di GroupNames: entry.GroupNames, }, } - // level > 3 corresponds to a path depth greater than "/buckets/", - // ensuring we only mark subdirectories within a bucket as implicit. if isUnderBuckets && level > 3 { - dirEntry.Extended = map[string][]byte{ - s3_constants.ExtS3ImplicitDir: []byte("true"), - } + // Parent directories under buckets are created automatically; no additional logging. } glog.V(2).InfofCtx(ctx, "create directory: %s %v", dirPath, dirEntry.Mode) diff --git a/weed/s3api/s3_constants/extend_key.go b/weed/s3api/s3_constants/extend_key.go index b4c030d49..bb3d637d1 100644 --- a/weed/s3api/s3_constants/extend_key.go +++ b/weed/s3api/s3_constants/extend_key.go @@ -11,7 +11,7 @@ const ( ExtETagKey = "Seaweed-X-Amz-ETag" ExtLatestVersionIdKey = "Seaweed-X-Amz-Latest-Version-Id" ExtLatestVersionFileNameKey = "Seaweed-X-Amz-Latest-Version-File-Name" - ExtS3ImplicitDir = "Seaweed-X-Amz-Implicit-Dir" + ExtAllowEmptyFolders = "Seaweed-X-Amz-Allow-Empty-Folders" // Cached list metadata in .versions directory for single-scan efficiency ExtLatestVersionSizeKey = "Seaweed-X-Amz-Latest-Version-Size" ExtLatestVersionETagKey = "Seaweed-X-Amz-Latest-Version-ETag" diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 8bf10646b..ea51f61e0 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -2,7 +2,6 @@ package s3api import ( "encoding/xml" - "fmt" "io" "net/http" "strings" @@ -23,7 +22,7 @@ const ( func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) - glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object) + glog.Infof("DeleteObjectHandler %s %s", bucket, object) if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) return @@ -125,7 +124,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque return } - target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object)) + // Normalize trailing-slash object keys (e.g. "path/") to the + // underlying directory entry path so DeleteEntry gets a valid name. + target := util.NewFullPath(s3a.bucketDir(bucket), object) dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { @@ -340,13 +341,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } else { // Handle non-versioned delete (original logic) - lastSeparator := strings.LastIndex(object.Key, "/") - parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false - if lastSeparator > 0 && lastSeparator+1 < len(object.Key) { - entryName = object.Key[lastSeparator+1:] - parentDirectoryPath = object.Key[:lastSeparator] - } - parentDirectoryPath = fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), parentDirectoryPath) + target := util.NewFullPath(s3a.bucketDir(bucket), object.Key) + parentDirectoryPath, entryName := target.DirAndName() + isDeleteData, isRecursive := true, false err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 254885f22..7903bd6a1 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -139,6 +139,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) fullDirPath = fullDirPath + "/" + dirName } + glog.Infof("PutObjectHandler: explicit directory marker %s/%s (contentType=%q, len=%d)", + bucket, object, objectContentType, r.ContentLength) if err := s3a.mkdir( fullDirPath, entryName, func(entry *filer_pb.Entry) { diff --git a/weed/util/buckets.go b/weed/util/buckets.go new file mode 100644 index 000000000..8a4189a82 --- /dev/null +++ b/weed/util/buckets.go @@ -0,0 +1,33 @@ +package util + +import "strings" + +// ExtractBucketPath returns the bucket path under basePath that contains target. +// If requireChild is true, the target must include additional segments beyond the bucket itself. +func ExtractBucketPath(basePath, target string, requireChild bool) (string, bool) { + cleanBase := strings.TrimSuffix(basePath, "/") + if cleanBase == "" { + return "", false + } + + prefix := cleanBase + "/" + if !strings.HasPrefix(target, prefix) { + return "", false + } + + rest := strings.TrimPrefix(target, prefix) + if rest == "" { + return "", false + } + + bucketName, _, found := strings.Cut(rest, "/") + if bucketName == "" { + return "", false + } + + if requireChild && !found { + return "", false + } + + return prefix + bucketName, true +} diff --git a/weed/util/buckets_test.go b/weed/util/buckets_test.go new file mode 100644 index 000000000..3ae9d9f51 --- /dev/null +++ b/weed/util/buckets_test.go @@ -0,0 +1,63 @@ +package util + +import "testing" + +func TestExtractBucketPath(t *testing.T) { + for _, tt := range []struct { + name string + base string + target string + requireChild bool + expected string + ok bool + }{ + { + name: "child paths return bucket", + base: "/buckets", + target: "/buckets/test/folder/file", + requireChild: true, + expected: "/buckets/test", + ok: true, + }, + { + name: "bucket root without child fails when required", + base: "/buckets", + target: "/buckets/test", + requireChild: true, + ok: false, + }, + { + name: "bucket root allowed when not required", + base: "/buckets", + target: "/buckets/test", + requireChild: false, + expected: "/buckets/test", + ok: true, + }, + { + name: "path outside buckets fails", + base: "/buckets", + target: "/data/test/folder", + requireChild: true, + ok: false, + }, + { + name: "trailing slash on base is normalized", + base: "/buckets/", + target: "/buckets/test/sub", + requireChild: true, + expected: "/buckets/test", + ok: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + got, ok := ExtractBucketPath(tt.base, tt.target, tt.requireChild) + if ok != tt.ok { + t.Fatalf("expected ok=%v, got %v", tt.ok, ok) + } + if got != tt.expected { + t.Fatalf("expected path %q, got %q", tt.expected, got) + } + }) + } +}