diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 791e5b090..af5ef994e 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "os/user" + "path" "runtime" "strconv" "strings" @@ -26,6 +27,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/storage/types" "google.golang.org/grpc/reflection" @@ -59,6 +61,63 @@ func runMount(cmd *Command, args []string) bool { return RunMount(&mountOptions, os.FileMode(umask)) } +func ensureBucketAutoRemoveEmptyFoldersDisabled(ctx context.Context, filerClient filer_pb.FilerClient, mountRoot, bucketRootPath string) error { + bucketPath, isBucketRootMount := bucketPathForMountRoot(mountRoot, bucketRootPath) + if !isBucketRootMount { + return nil + } + + entry, err := filer_pb.GetEntry(ctx, filerClient, util.FullPath(bucketPath)) + if err != nil { + return err + } + if entry == nil { + return fmt.Errorf("bucket %s not found", bucketPath) + } + + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + if strings.EqualFold(strings.TrimSpace(string(entry.Extended[s3_constants.ExtAutoRemoveEmptyFolders])), "false") { + return nil + } + + entry.Extended[s3_constants.ExtAutoRemoveEmptyFolders] = []byte("false") + + bucketFullPath := util.FullPath(bucketPath) + parent, _ := bucketFullPath.DirAndName() + if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: parent, + Entry: entry, + }) + }); err != nil { + return err + } + + glog.Infof("RunMount: set bucket %s %s=false", bucketPath, s3_constants.ExtAutoRemoveEmptyFolders) + return nil +} + +func bucketPathForMountRoot(mountRoot, bucketRootPath string) (string, bool) { + cleanPath := path.Clean("/" + strings.TrimPrefix(mountRoot, "/")) + cleanBucketRoot := path.Clean("/" + strings.TrimPrefix(bucketRootPath, "/")) + if cleanBucketRoot == "/" { + return "", false + } + prefix := cleanBucketRoot + "/" + if !strings.HasPrefix(cleanPath, prefix) { + return "", false + } + rest := strings.TrimPrefix(cleanPath, prefix) + + bucketParts := strings.Split(rest, "/") + if len(bucketParts) != 1 || bucketParts[0] == "" { + return "", false + } + return cleanBucketRoot + "/" + bucketParts[0], true +} + func RunMount(option *MountOptions, umask os.FileMode) bool { // basic checks @@ -73,6 +132,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool + var bucketRootPath string var err error for i := 0; i < 10; i++ { err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -81,6 +141,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { return fmt.Errorf("get filer grpc address %v configuration: %w", filerAddresses, err) } cipher = resp.Cipher + bucketRootPath = resp.DirBuckets return nil }) if err != nil { @@ -93,6 +154,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err) return true } + if bucketRootPath == "" { + bucketRootPath = "/buckets" + } filerMountRootPath := *option.filerMountRootPath @@ -287,6 +351,10 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { fmt.Printf("failed to create dir %s on filer %s: %v\n", mountRoot, filerAddresses, err) return false } + if err := ensureBucketAutoRemoveEmptyFoldersDisabled(context.Background(), seaweedFileSystem, mountRoot, bucketRootPath); err != nil { + fmt.Printf("failed to set bucket auto-remove-empty-folders policy for %s: %v\n", mountRoot, err) + return false + } server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions) if err != nil { diff --git a/weed/command/mount_std_test.go b/weed/command/mount_std_test.go new file mode 100644 index 000000000..e4b8e7598 --- /dev/null +++ b/weed/command/mount_std_test.go @@ -0,0 +1,52 @@ +//go:build linux || darwin || freebsd +// +build linux darwin freebsd + +package command + +import "testing" + +func Test_bucketPathForMountRoot(t *testing.T) { + tests := []struct { + name string + mountRoot string + expected string + ok bool + }{ + { + name: "bucket root mount", + mountRoot: "/buckets/test", + expected: "/buckets/test", + ok: true, + }, + { + name: "bucket root with trailing slash", + mountRoot: "/buckets/test/", + expected: "/buckets/test", + ok: true, + }, + { + name: "subdirectory mount", + mountRoot: "/buckets/test/data", + expected: "", + ok: false, + }, + { + name: "non-bucket mount", + mountRoot: "/data/test", + expected: "", + ok: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotPath, gotOK := bucketPathForMountRoot(tt.mountRoot, "/buckets") + if gotOK != tt.ok { + t.Fatalf("expected ok=%v, got %v", tt.ok, gotOK) + } + if gotPath != tt.expected { + t.Fatalf("expected path %q, got %q", tt.expected, gotPath) + } + }) + } +} diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go index a915b7fa1..f011cbf88 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -32,12 +32,17 @@ type FilerOperations interface { // folderState tracks the state of a folder for empty folder cleanup type folderState struct { roughCount int // Cached rough count (up to maxCountCheck) - isImplicit *bool // Tri-state boolean: nil (unknown), true (implicit), false (explicit) lastAddTime time.Time // Last time an item was added lastDelTime time.Time // Last time an item was deleted lastCheck time.Time // Last time we checked the actual count } +type bucketCleanupPolicyState struct { + autoRemove bool + attrValue string + lastCheck time.Time +} + // EmptyFolderCleaner handles asynchronous cleanup of empty folders // Each filer owns specific folders via consistent hashing based on the peer filer list type EmptyFolderCleaner struct { @@ -46,8 +51,9 @@ type EmptyFolderCleaner struct { host pb.ServerAddress // Folder state tracking - mu sync.RWMutex - folderCounts map[string]*folderState // Rough count cache + mu sync.RWMutex + folderCounts map[string]*folderState // Rough count cache + bucketCleanupPolicies map[string]*bucketCleanupPolicyState // bucket path -> cleanup policy cache // Cleanup queue (thread-safe, has its own lock) cleanupQueue *CleanupQueue @@ -66,17 +72,18 @@ type EmptyFolderCleaner struct { // NewEmptyFolderCleaner creates a new EmptyFolderCleaner func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string) *EmptyFolderCleaner { efc := &EmptyFolderCleaner{ - filer: filer, - lockRing: lockRing, - host: host, - folderCounts: make(map[string]*folderState), - cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge), - maxCountCheck: DefaultMaxCountCheck, - cacheExpiry: DefaultCacheExpiry, - processorSleep: DefaultProcessorSleep, - bucketPath: bucketPath, - enabled: true, - stopCh: make(chan struct{}), + filer: filer, + lockRing: lockRing, + host: host, + folderCounts: make(map[string]*folderState), + bucketCleanupPolicies: make(map[string]*bucketCleanupPolicyState), + cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge), + maxCountCheck: DefaultMaxCountCheck, + cacheExpiry: DefaultCacheExpiry, + processorSleep: DefaultProcessorSleep, + bucketPath: bucketPath, + enabled: true, + stopCh: make(chan struct{}), } go efc.cacheEvictionLoop() go efc.cleanupProcessor() @@ -268,58 +275,19 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string, triggeredBy string) return } - // Check for explicit implicit_dir attribute - // First check cache ctx := context.Background() - efc.mu.RLock() - var cachedImplicit *bool - if state, exists := efc.folderCounts[folder]; exists { - cachedImplicit = state.isImplicit - } - efc.mu.RUnlock() - - var isImplicit bool - implicitSource := "cache" - implicitAttr := "" - if cachedImplicit != nil { - isImplicit = *cachedImplicit - if isImplicit { - implicitAttr = "true" - } else { - implicitAttr = "false" - } - } else { - implicitSource = "filer" - // Not cached, check filer - attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(folder)) - if err != nil { - if err == filer_pb.ErrNotFound { - return - } - glog.V(2).Infof("EmptyFolderCleaner: error getting attributes for %s: %v", folder, err) + bucketPath, autoRemove, source, attrValue, err := efc.getBucketCleanupPolicy(ctx, folder) + if err != nil { + if err == filer_pb.ErrNotFound { return } - - isImplicit = attrs != nil && string(attrs[s3_constants.ExtS3ImplicitDir]) == "true" - if attrs == nil { - implicitAttr = "" - } else if value, found := attrs[s3_constants.ExtS3ImplicitDir]; found { - implicitAttr = string(value) - } else { - implicitAttr = "" - } - - // Update cache - efc.mu.Lock() - if _, exists := efc.folderCounts[folder]; !exists { - efc.folderCounts[folder] = &folderState{} - } - efc.folderCounts[folder].isImplicit = &isImplicit - efc.mu.Unlock() + glog.V(2).Infof("EmptyFolderCleaner: failed to load bucket cleanup policy for folder %s (triggered by %s): %v", folder, triggeredBy, err) + return } - if !isImplicit { - glog.Infof("EmptyFolderCleaner: folder %s (triggered by %s) is not marked as implicit (source=%s attr=%s), skipping", folder, triggeredBy, implicitSource, implicitAttr) + if !autoRemove { + glog.Infof("EmptyFolderCleaner: skipping folder %s (triggered by %s), bucket %s auto-remove-empty-folders disabled (source=%s attr=%s)", + folder, triggeredBy, bucketPath, source, attrValue) return } @@ -371,6 +339,80 @@ func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string) return efc.filer.DeleteEntryMetaAndData(ctx, util.FullPath(folder), false, false, false, false, nil, 0) } +func (efc *EmptyFolderCleaner) getBucketCleanupPolicy(ctx context.Context, folder string) (bucketPath string, autoRemove bool, source string, attrValue string, err error) { + bucketPath, ok := extractBucketPath(folder, efc.bucketPath) + if !ok { + return "", true, "default", "", nil + } + + now := time.Now() + + efc.mu.RLock() + if state, found := efc.bucketCleanupPolicies[bucketPath]; found && now.Sub(state.lastCheck) <= efc.cacheExpiry { + efc.mu.RUnlock() + return bucketPath, state.autoRemove, "cache", state.attrValue, nil + } + efc.mu.RUnlock() + + attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(bucketPath)) + if err != nil { + return "", true, "", "", err + } + + autoRemove, attrValue = autoRemoveEmptyFoldersEnabled(attrs) + + efc.mu.Lock() + if efc.bucketCleanupPolicies == nil { + efc.bucketCleanupPolicies = make(map[string]*bucketCleanupPolicyState) + } + efc.bucketCleanupPolicies[bucketPath] = &bucketCleanupPolicyState{ + autoRemove: autoRemove, + attrValue: attrValue, + lastCheck: now, + } + efc.mu.Unlock() + + return bucketPath, autoRemove, "filer", attrValue, nil +} + +func extractBucketPath(folder string, bucketPath string) (string, bool) { + if bucketPath == "" { + return "", false + } + + cleanBucketPath := strings.TrimSuffix(bucketPath, "/") + prefix := cleanBucketPath + "/" + if !strings.HasPrefix(folder, prefix) { + return "", false + } + + rest := strings.TrimPrefix(folder, prefix) + bucketName, _, found := strings.Cut(rest, "/") + if !found || bucketName == "" { + return "", false + } + + return prefix + bucketName, true +} + +func autoRemoveEmptyFoldersEnabled(attrs map[string][]byte) (bool, string) { + if attrs == nil { + return true, "" + } + + value, found := attrs[s3_constants.ExtAutoRemoveEmptyFolders] + if !found { + return true, "" + } + + text := strings.TrimSpace(string(value)) + if text == "" { + return true, "" + } + + return !strings.EqualFold(text, "false"), text +} + // isUnderPath checks if child is under parent path func isUnderPath(child, parent string) bool { if parent == "" || parent == "/" { @@ -459,6 +501,12 @@ func (efc *EmptyFolderCleaner) evictStaleCacheEntries() { } } + for bucketPath, state := range efc.bucketCleanupPolicies { + if now.Sub(state.lastCheck) > efc.cacheExpiry { + delete(efc.bucketCleanupPolicies, bucketPath) + } + } + if expiredCount > 0 { glog.V(3).Infof("EmptyFolderCleaner: evicted %d stale cache entries", expiredCount) } @@ -474,6 +522,7 @@ func (efc *EmptyFolderCleaner) Stop() { efc.enabled = false efc.cleanupQueue.Clear() efc.folderCounts = make(map[string]*folderState) // Clear cache on stop + efc.bucketCleanupPolicies = make(map[string]*bucketCleanupPolicyState) } // GetPendingCleanupCount returns the number of pending cleanup tasks (for testing) diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go index 9ddb61cde..2d1a62933 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go @@ -96,6 +96,100 @@ func Test_isUnderBucketPath(t *testing.T) { } } +func Test_extractBucketPath(t *testing.T) { + tests := []struct { + name string + folder string + bucketRoot string + expected string + ok bool + }{ + { + name: "folder under bucket", + folder: "/buckets/test/a/b", + bucketRoot: "/buckets", + expected: "/buckets/test", + ok: true, + }, + { + name: "bucket root folder should not match", + folder: "/buckets/test", + bucketRoot: "/buckets", + expected: "", + ok: false, + }, + { + name: "outside buckets", + folder: "/data/test/a", + bucketRoot: "/buckets", + expected: "", + ok: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotPath, gotOK := extractBucketPath(tt.folder, tt.bucketRoot) + if gotOK != tt.ok { + t.Fatalf("expected ok=%v, got %v", tt.ok, gotOK) + } + if gotPath != tt.expected { + t.Fatalf("expected path %q, got %q", tt.expected, gotPath) + } + }) + } +} + +func Test_autoRemoveEmptyFoldersEnabled(t *testing.T) { + tests := []struct { + name string + attrs map[string][]byte + enabled bool + attrValue string + }{ + { + name: "no attrs defaults enabled", + attrs: nil, + enabled: true, + attrValue: "", + }, + { + name: "missing key defaults enabled", + attrs: map[string][]byte{}, + enabled: true, + attrValue: "", + }, + { + name: "false disables cleanup", + attrs: map[string][]byte{ + s3_constants.ExtAutoRemoveEmptyFolders: []byte("false"), + }, + enabled: false, + attrValue: "false", + }, + { + name: "true enables cleanup", + attrs: map[string][]byte{ + s3_constants.ExtAutoRemoveEmptyFolders: []byte("true"), + }, + enabled: true, + attrValue: "true", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + enabled, attrValue := autoRemoveEmptyFoldersEnabled(tt.attrs) + if enabled != tt.enabled { + t.Fatalf("expected enabled=%v, got %v", tt.enabled, enabled) + } + if attrValue != tt.attrValue { + t.Fatalf("expected attrValue=%q, got %q", tt.attrValue, attrValue) + } + }) + } +} + func TestEmptyFolderCleaner_ownsFolder(t *testing.T) { // Create a LockRing with multiple servers lockRing := lock_manager.NewLockRing(5 * time.Second) @@ -610,9 +704,6 @@ func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testin deleted = append(deleted, string(path)) return nil }, - attrsFn: func(_ util.FullPath) (map[string][]byte, error) { - return map[string][]byte{s3_constants.ExtS3ImplicitDir: []byte("true")}, nil - }, } cleaner := &EmptyFolderCleaner{ @@ -644,7 +735,7 @@ func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testin } } -func TestEmptyFolderCleaner_executeCleanup_missingImplicitAttributeSkips(t *testing.T) { +func TestEmptyFolderCleaner_executeCleanup_bucketPolicyDisabledSkips(t *testing.T) { lockRing := lock_manager.NewLockRing(5 * time.Second) lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) @@ -657,8 +748,11 @@ func TestEmptyFolderCleaner_executeCleanup_missingImplicitAttributeSkips(t *test deleted = append(deleted, string(path)) return nil }, - attrsFn: func(_ util.FullPath) (map[string][]byte, error) { - return map[string][]byte{}, nil + attrsFn: func(path util.FullPath) (map[string][]byte, error) { + if string(path) == "/buckets/test" { + return map[string][]byte{s3_constants.ExtAutoRemoveEmptyFolders: []byte("false")}, nil + } + return nil, nil }, } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 0a135d915..8c9bf57eb 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" @@ -303,13 +302,8 @@ func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, di GroupNames: entry.GroupNames, }, } - // level > 3 corresponds to a path depth greater than "/buckets/", - // ensuring we only mark subdirectories within a bucket as implicit. if isUnderBuckets && level > 3 { - dirEntry.Extended = map[string][]byte{ - s3_constants.ExtS3ImplicitDir: []byte("true"), - } - glog.InfofCtx(ctx, "ensureParentDirectoryEntry: implicit directory created %s", dirPath) + glog.InfofCtx(ctx, "ensureParentDirectoryEntry: auto-created parent directory %s", dirPath) } glog.V(2).InfofCtx(ctx, "create directory: %s %v", dirPath, dirEntry.Mode) diff --git a/weed/s3api/s3_constants/extend_key.go b/weed/s3api/s3_constants/extend_key.go index b4c030d49..612a6416b 100644 --- a/weed/s3api/s3_constants/extend_key.go +++ b/weed/s3api/s3_constants/extend_key.go @@ -11,7 +11,7 @@ const ( ExtETagKey = "Seaweed-X-Amz-ETag" ExtLatestVersionIdKey = "Seaweed-X-Amz-Latest-Version-Id" ExtLatestVersionFileNameKey = "Seaweed-X-Amz-Latest-Version-File-Name" - ExtS3ImplicitDir = "Seaweed-X-Amz-Implicit-Dir" + ExtAutoRemoveEmptyFolders = "Seaweed-X-Amz-Auto-Remove-Empty-Folders" // Cached list metadata in .versions directory for single-scan efficiency ExtLatestVersionSizeKey = "Seaweed-X-Amz-Latest-Version-Size" ExtLatestVersionETagKey = "Seaweed-X-Amz-Latest-Version-ETag"