diff --git a/weed/admin/dash/file_browser_data.go b/weed/admin/dash/file_browser_data.go index d566033e7..4126b2ac8 100644 --- a/weed/admin/dash/file_browser_data.go +++ b/weed/admin/dash/file_browser_data.go @@ -283,17 +283,17 @@ func (s *AdminServer) generateBreadcrumbs(dir string) []BreadcrumbItem { } currentPath += "/" + part - // Special handling for bucket paths - displayName := part - if len(breadcrumbs) == 1 && part == "buckets" { - displayName = "Object Store Buckets" - } else if len(breadcrumbs) == 1 && part == "table-buckets" { - displayName = "Table Buckets" - } else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/buckets/") { - displayName = "📦 " + part // Add bucket icon to bucket name - } else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/table-buckets/") { - displayName = "🧊 " + part - } + // Special handling for bucket paths + displayName := part + if len(breadcrumbs) == 1 && part == "buckets" { + displayName = "Object Store Buckets" + } else if len(breadcrumbs) == 1 && part == "table-buckets" { + displayName = "Table Buckets" + } else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/buckets/") { + displayName = "📦 " + part // Add bucket icon to bucket name + } else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/table-buckets/") { + displayName = "🧊 " + part + } breadcrumbs = append(breadcrumbs, BreadcrumbItem{ Name: displayName, diff --git a/weed/command/fuse_std.go b/weed/command/fuse_std.go index 5f76089c0..60c3402de 100644 --- a/weed/command/fuse_std.go +++ b/weed/command/fuse_std.go @@ -157,6 +157,13 @@ func runFuse(cmd *Command, args []string) bool { } else { panic(fmt.Errorf("cacheMetaTtlSec: %s", err)) } + case "dirIdleEvictSec": + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.dirIdleEvictSec = &intValue + } else { + panic(fmt.Errorf("dirIdleEvictSec: %s", err)) + } case "concurrentWriters": i++ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { diff --git a/weed/command/mount.go b/weed/command/mount.go index ddd429ccc..7407ad908 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -48,6 +48,8 @@ type MountOptions struct { rdmaMaxConcurrent *int rdmaTimeoutMs *int + dirIdleEvictSec *int + // FUSE performance options writebackCache *bool asyncDio *bool @@ -107,6 +109,8 @@ func init() { mountOptions.rdmaMaxConcurrent = cmdMount.Flag.Int("rdma.maxConcurrent", 64, "max concurrent RDMA operations") mountOptions.rdmaTimeoutMs = cmdMount.Flag.Int("rdma.timeoutMs", 5000, "RDMA operation timeout in milliseconds") + mountOptions.dirIdleEvictSec = cmdMount.Flag.Int("dirIdleEvictSec", 600, "seconds to evict idle cached directories (0 to disable)") + mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") mountReadRetryTime = cmdMount.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index a31645a13..369f2e7c7 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -270,6 +270,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { RdmaReadOnly: *option.rdmaReadOnly, RdmaMaxConcurrent: *option.rdmaMaxConcurrent, RdmaTimeoutMs: *option.rdmaTimeoutMs, + DirIdleEvictSec: *option.dirIdleEvictSec, }) // create mount root diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go index 5c0eaf365..e23a9c1e9 100644 --- a/weed/mount/inode_to_path.go +++ b/weed/mount/inode_to_path.go @@ -22,6 +22,19 @@ type InodeEntry struct { isDirectory bool isChildrenCached bool cachedExpiresTime time.Time + lastAccess time.Time + lastRefresh time.Time + updateWindowStart time.Time + updateCount int + needsRefresh bool +} + +func (ie *InodeEntry) resetCacheState() { + ie.isChildrenCached = false + ie.cachedExpiresTime = time.Time{} + ie.needsRefresh = false + ie.updateCount = 0 + ie.updateWindowStart = time.Time{} } func (ie *InodeEntry) removeOnePath(p util.FullPath) bool { @@ -51,7 +64,12 @@ func NewInodeToPath(root util.FullPath, ttlSec int) *InodeToPath { path2inode: make(map[util.FullPath]uint64), cacheMetaTtlSec: time.Second * time.Duration(ttlSec), } - t.inode2path[1] = &InodeEntry{[]util.FullPath{root}, 1, true, false, time.Time{}} + t.inode2path[1] = &InodeEntry{ + paths: []util.FullPath{root}, + nlookup: 1, + isDirectory: true, + lastAccess: time.Now(), + } t.path2inode[root] = 1 return t @@ -94,9 +112,16 @@ func (i *InodeToPath) Lookup(path util.FullPath, unixTime int64, isDirectory boo } } else { if !isLookup { - i.inode2path[inode] = &InodeEntry{[]util.FullPath{path}, 0, isDirectory, false, time.Time{}} + i.inode2path[inode] = &InodeEntry{ + paths: []util.FullPath{path}, + isDirectory: isDirectory, + } } else { - i.inode2path[inode] = &InodeEntry{[]util.FullPath{path}, 1, isDirectory, false, time.Time{}} + i.inode2path[inode] = &InodeEntry{ + paths: []util.FullPath{path}, + nlookup: 1, + isDirectory: isDirectory, + } } } @@ -163,8 +188,14 @@ func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) { return } path.isChildrenCached = true + now := time.Now() + path.lastAccess = now + path.lastRefresh = now + path.updateCount = 0 + path.needsRefresh = false + path.updateWindowStart = time.Time{} if i.cacheMetaTtlSec > 0 { - path.cachedExpiresTime = time.Now().Add(i.cacheMetaTtlSec) + path.cachedExpiresTime = now.Add(i.cacheMetaTtlSec) } } @@ -200,10 +231,118 @@ func (i *InodeToPath) InvalidateAllChildrenCache() { defer i.Unlock() for _, entry := range i.inode2path { if entry.isDirectory && entry.isChildrenCached { - entry.isChildrenCached = false - entry.cachedExpiresTime = time.Time{} + entry.resetCacheState() + } + } +} + +func (i *InodeToPath) InvalidateChildrenCache(fullpath util.FullPath) { + i.Lock() + defer i.Unlock() + inode, found := i.path2inode[fullpath] + if !found { + return + } + entry, found := i.inode2path[inode] + if !found { + return + } + entry.resetCacheState() +} + +func (i *InodeToPath) TouchDirectory(fullpath util.FullPath) { + i.Lock() + defer i.Unlock() + inode, found := i.path2inode[fullpath] + if !found { + return + } + entry, found := i.inode2path[inode] + if !found || !entry.isDirectory { + return + } + entry.lastAccess = time.Now() +} + +func (i *InodeToPath) RecordDirectoryUpdate(fullpath util.FullPath, now time.Time, window time.Duration, threshold int) bool { + if threshold <= 0 || window <= 0 { + return false + } + i.Lock() + defer i.Unlock() + inode, found := i.path2inode[fullpath] + if !found { + return false + } + entry, found := i.inode2path[inode] + if !found || !entry.isDirectory || !entry.isChildrenCached { + return false + } + if entry.updateWindowStart.IsZero() || now.Sub(entry.updateWindowStart) > window { + entry.updateWindowStart = now + entry.updateCount = 0 + } + entry.updateCount++ + if entry.updateCount >= threshold { + entry.needsRefresh = true + return true + } + return false +} + +func (i *InodeToPath) NeedsRefresh(fullpath util.FullPath) bool { + i.RLock() + defer i.RUnlock() + inode, found := i.path2inode[fullpath] + if !found { + return false + } + entry, found := i.inode2path[inode] + if !found || !entry.isDirectory { + return false + } + return entry.isChildrenCached && entry.needsRefresh +} + +func (i *InodeToPath) MarkDirectoryRefreshed(fullpath util.FullPath, now time.Time) { + i.Lock() + defer i.Unlock() + inode, found := i.path2inode[fullpath] + if !found { + return + } + entry, found := i.inode2path[inode] + if !found || !entry.isDirectory { + return + } + entry.lastRefresh = now + entry.lastAccess = now + entry.updateCount = 0 + entry.needsRefresh = false + entry.updateWindowStart = time.Time{} + if i.cacheMetaTtlSec > 0 { + entry.cachedExpiresTime = now.Add(i.cacheMetaTtlSec) + } +} + +func (i *InodeToPath) CollectEvictableDirs(now time.Time, idle time.Duration) []util.FullPath { + if idle <= 0 { + return nil + } + i.Lock() + defer i.Unlock() + var dirs []util.FullPath + for _, entry := range i.inode2path { + if !entry.isDirectory || !entry.isChildrenCached { + continue + } + if entry.lastAccess.IsZero() || now.Sub(entry.lastAccess) < idle { + continue } + entry.resetCacheState() + dirs = append(dirs, entry.paths...) } + return dirs } func (i *InodeToPath) AddPath(inode uint64, path util.FullPath) { @@ -217,10 +356,9 @@ func (i *InodeToPath) AddPath(inode uint64, path util.FullPath) { ie.nlookup++ } else { i.inode2path[inode] = &InodeEntry{ - paths: []util.FullPath{path}, - nlookup: 1, - isDirectory: false, - isChildrenCached: false, + paths: []util.FullPath{path}, + nlookup: 1, + isDirectory: false, } } } @@ -268,7 +406,7 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (sourceInod entry.paths[i] = targetPath } } - entry.isChildrenCached = false + entry.resetCacheState() } else { glog.Errorf("MovePath %s to %s: sourceInode %d not found", sourcePath, targetPath, sourceInode) } diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index 0b0794998..e08ba5c2d 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -23,23 +23,25 @@ type MetaCache struct { localStore filer.VirtualFilerStore leveldbStore *leveldb.LevelDBStore // direct reference for batch operations sync.RWMutex - uidGidMapper *UidGidMapper - markCachedFn func(fullpath util.FullPath) - isCachedFn func(fullpath util.FullPath) bool - invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry) - visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path + uidGidMapper *UidGidMapper + markCachedFn func(fullpath util.FullPath) + isCachedFn func(fullpath util.FullPath) bool + invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry) + onDirectoryUpdate func(dir util.FullPath) + visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path } func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath, - markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache { + markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry), onDirectoryUpdate func(dir util.FullPath)) *MetaCache { leveldbStore, virtualStore := openMetaStore(dbFolder) return &MetaCache{ - root: root, - localStore: virtualStore, - leveldbStore: leveldbStore, - markCachedFn: markCachedFn, - isCachedFn: isCachedFn, - uidGidMapper: uidGidMapper, + root: root, + localStore: virtualStore, + leveldbStore: leveldbStore, + markCachedFn: markCachedFn, + isCachedFn: isCachedFn, + uidGidMapper: uidGidMapper, + onDirectoryUpdate: onDirectoryUpdate, invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) { invalidateFunc(fullpath, entry) }, @@ -193,3 +195,9 @@ func (mc *MetaCache) Debug() { func (mc *MetaCache) IsDirectoryCached(dirPath util.FullPath) bool { return mc.isCachedFn(dirPath) } + +func (mc *MetaCache) noteDirectoryUpdate(dirPath util.FullPath) { + if mc.onDirectoryUpdate != nil { + mc.onDirectoryUpdate(dirPath) + } +} diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index 0a516f6f5..fe5f75ba9 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -77,6 +77,24 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) if err == nil { + if message.NewEntry != nil || message.OldEntry != nil { + dirsToNotify := make(map[util.FullPath]struct{}) + if oldPath != "" { + parent, _ := oldPath.DirAndName() + dirsToNotify[util.FullPath(parent)] = struct{}{} + } + if newEntry != nil { + newParent, _ := newEntry.DirAndName() + dirsToNotify[util.FullPath(newParent)] = struct{}{} + } + if message.NewEntry != nil && message.NewEntry.IsDirectory { + childPath := util.NewFullPath(dir, message.NewEntry.Name) + dirsToNotify[childPath] = struct{}{} + } + for dirPath := range dirsToNotify { + mc.noteDirectoryUpdate(dirPath) + } + } if message.OldEntry != nil && message.NewEntry != nil { oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) mc.invalidateFunc(oldKey, message.OldEntry) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 921e19840..db60c9deb 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -75,6 +75,9 @@ type Option struct { RdmaMaxConcurrent int RdmaTimeoutMs int + // Directory cache refresh/eviction controls + DirIdleEvictSec int + uniqueCacheDirForRead string uniqueCacheDirForWrite string } @@ -102,8 +105,19 @@ type WFS struct { rdmaClient *RDMAMountClient FilerConf *filer.FilerConf filerClient *wdclient.FilerClient // Cached volume location client + refreshMu sync.Mutex + refreshingDirs map[util.FullPath]struct{} + dirHotWindow time.Duration + dirHotThreshold int + dirIdleEvict time.Duration } +const ( + defaultDirHotWindow = 2 * time.Second + defaultDirHotThreshold = 64 + defaultDirIdleEvict = 10 * time.Minute +) + func NewSeaweedFileSystem(option *Option) *WFS { // Only create FilerClient for direct volume access modes // When VolumeServerAccess == "filerProxy", all reads go through filer, so no volume lookup needed @@ -127,15 +141,28 @@ func NewSeaweedFileSystem(option *Option) *WFS { ) } + dirHotWindow := defaultDirHotWindow + dirHotThreshold := defaultDirHotThreshold + dirIdleEvict := defaultDirIdleEvict + if option.DirIdleEvictSec != 0 { + dirIdleEvict = time.Duration(option.DirIdleEvictSec) * time.Second + } else { + dirIdleEvict = 0 + } + wfs := &WFS{ - RawFileSystem: fuse.NewDefaultRawFileSystem(), - option: option, - signature: util.RandomInt32(), - inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), - fhMap: NewFileHandleToInode(), - dhMap: NewDirectoryHandleToInode(), - filerClient: filerClient, // nil for proxy mode, initialized for direct access - fhLockTable: util.NewLockTable[FileHandleId](), + RawFileSystem: fuse.NewDefaultRawFileSystem(), + option: option, + signature: util.RandomInt32(), + inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), + fhMap: NewFileHandleToInode(), + dhMap: NewDirectoryHandleToInode(), + filerClient: filerClient, // nil for proxy mode, initialized for direct access + fhLockTable: util.NewLockTable[FileHandleId](), + refreshingDirs: make(map[util.FullPath]struct{}), + dirHotWindow: dirHotWindow, + dirHotThreshold: dirHotThreshold, + dirIdleEvict: dirIdleEvict, } wfs.option.filerIndex = int32(rand.IntN(len(option.FilerAddresses))) @@ -171,6 +198,10 @@ func NewSeaweedFileSystem(option *Option) *WFS { } } } + }, func(dirPath util.FullPath) { + if wfs.inodeToPath.RecordDirectoryUpdate(dirPath, time.Now(), wfs.dirHotWindow, wfs.dirHotThreshold) { + wfs.maybeRefreshDirectory(dirPath) + } }) grace.OnInterrupt(func() { wfs.metaCache.Shutdown() @@ -224,6 +255,7 @@ func (wfs *WFS) StartBackgroundTasks() error { }, follower) go wfs.loopCheckQuota() go wfs.loopFlushDirtyMetadata() + go wfs.loopEvictIdleDirCache() return nil } @@ -339,6 +371,49 @@ func (wfs *WFS) ClearCacheDir() { os.RemoveAll(wfs.option.getUniqueCacheDirForRead()) } +func (wfs *WFS) maybeRefreshDirectory(dirPath util.FullPath) { + if !wfs.inodeToPath.NeedsRefresh(dirPath) { + return + } + wfs.refreshMu.Lock() + if _, exists := wfs.refreshingDirs[dirPath]; exists { + wfs.refreshMu.Unlock() + return + } + wfs.refreshingDirs[dirPath] = struct{}{} + wfs.refreshMu.Unlock() + + go func() { + defer func() { + wfs.refreshMu.Lock() + delete(wfs.refreshingDirs, dirPath) + wfs.refreshMu.Unlock() + }() + wfs.inodeToPath.InvalidateChildrenCache(dirPath) + if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { + glog.Warningf("refresh dir cache %s: %v", dirPath, err) + return + } + wfs.inodeToPath.MarkDirectoryRefreshed(dirPath, time.Now()) + }() +} + +func (wfs *WFS) loopEvictIdleDirCache() { + if wfs.dirIdleEvict <= 0 { + return + } + ticker := time.NewTicker(wfs.dirIdleEvict / 2) + defer ticker.Stop() + for range ticker.C { + dirs := wfs.inodeToPath.CollectEvictableDirs(time.Now(), wfs.dirIdleEvict) + for _, dir := range dirs { + if err := wfs.metaCache.DeleteFolderChildren(context.Background(), dir); err != nil { + glog.V(2).Infof("evict dir cache %s: %v", dir, err) + } + } + } +} + func (option *Option) setupUniqueCacheDirectory() { cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + version.Version()))[0:8] option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId) diff --git a/weed/mount/weedfs_dir_mkrm.go b/weed/mount/weedfs_dir_mkrm.go index 654f3dee5..0ede18397 100644 --- a/weed/mount/weedfs_dir_mkrm.go +++ b/weed/mount/weedfs_dir_mkrm.go @@ -71,6 +71,7 @@ func (wfs *WFS) Mkdir(cancel <-chan struct{}, in *fuse.MkdirIn, name string, out // Only cache the entry if the parent directory is already cached. // This avoids polluting the cache with partial directory data. if wfs.metaCache.IsDirectoryCached(dirFullPath) { + wfs.inodeToPath.TouchDirectory(dirFullPath) if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { return fmt.Errorf("local mkdir dir %s: %w", entryFullPath, err) } @@ -122,6 +123,7 @@ func (wfs *WFS) Rmdir(cancel <-chan struct{}, header *fuse.InHeader, name string wfs.metaCache.DeleteEntry(context.Background(), entryFullPath) wfs.inodeToPath.RemovePath(entryFullPath) + wfs.inodeToPath.TouchDirectory(dirFullPath) return fuse.OK diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index 0d749ad97..274f2c185 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -163,6 +163,8 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl if code != fuse.OK { return code } + wfs.inodeToPath.TouchDirectory(dirPath) + wfs.maybeRefreshDirectory(dirPath) var dirEntry fuse.DirEntry diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go index 3ec3ed91e..f7306f7a3 100644 --- a/weed/mount/weedfs_file_mkrm.go +++ b/weed/mount/weedfs_file_mkrm.go @@ -91,6 +91,7 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out // Only cache the entry if the parent directory is already cached. // This avoids polluting the cache with partial directory data. if wfs.metaCache.IsDirectoryCached(dirFullPath) { + wfs.inodeToPath.TouchDirectory(dirFullPath) if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { return fmt.Errorf("local mknod %s: %w", entryFullPath, err) } @@ -153,6 +154,7 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin glog.V(3).Infof("local DeleteEntry %s: %v", entryFullPath, err) return fuse.EIO } + wfs.inodeToPath.TouchDirectory(dirFullPath) wfs.inodeToPath.RemovePath(entryFullPath) diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go index 9e5898093..42567b60d 100644 --- a/weed/mount/weedfs_rename.go +++ b/weed/mount/weedfs_rename.go @@ -220,6 +220,8 @@ func (wfs *WFS) Rename(cancel <-chan struct{}, in *fuse.RenameIn, oldName string glog.V(0).Infof("Link: %v", err) return } + wfs.inodeToPath.TouchDirectory(oldDir) + wfs.inodeToPath.TouchDirectory(newDir) return fuse.OK