From dbeeda812376eda39997cd814c3e7eefaf4ea686 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 14 Feb 2022 01:09:31 -0800 Subject: [PATCH] listen for metadata updates --- weed/command/mount2_std.go | 4 +- weed/mount/directory_read.go | 2 +- weed/mount/inode_to_path.go | 65 ++++--- weed/mount/meta_cache/cache_config.go | 32 ++++ weed/mount/meta_cache/id_mapper.go | 101 +++++++++++ weed/mount/meta_cache/meta_cache.go | 160 ++++++++++++++++++ weed/mount/meta_cache/meta_cache_init.go | 67 ++++++++ weed/mount/meta_cache/meta_cache_subscribe.go | 68 ++++++++ weed/mount/weedfs.go | 13 +- weed/mount/weedfs_dir_lookup.go | 4 +- weed/mount/weedfs_dir_mkrm.go | 2 +- weed/mount/weedfs_dir_read.go | 2 +- weed/mount/weedfs_file_mkrm.go | 4 +- weed/mount/weedfs_forget.go | 9 +- weed/mount/weedfs_link.go | 2 +- weed/mount/weedfs_rename.go | 2 - weed/mount/weedfs_symlink.go | 2 +- 17 files changed, 499 insertions(+), 40 deletions(-) create mode 100644 weed/mount/meta_cache/cache_config.go create mode 100644 weed/mount/meta_cache/id_mapper.go create mode 100644 weed/mount/meta_cache/meta_cache.go create mode 100644 weed/mount/meta_cache/meta_cache_init.go create mode 100644 weed/mount/meta_cache/meta_cache_subscribe.go diff --git a/weed/command/mount2_std.go b/weed/command/mount2_std.go index cb2b46556..584a72fc1 100644 --- a/weed/command/mount2_std.go +++ b/weed/command/mount2_std.go @@ -3,9 +3,9 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/mount" + "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" "github.com/chrislusf/seaweedfs/weed/mount/unmount" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -200,6 +200,8 @@ func RunMount2(option *Mount2Options, umask os.FileMode) bool { unmount.Unmount(dir) }) + seaweedFileSystem.StartBackgroundTasks() + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) server.Serve() diff --git a/weed/mount/directory_read.go b/weed/mount/directory_read.go index 51c51ae16..6034856f0 100644 --- a/weed/mount/directory_read.go +++ b/weed/mount/directory_read.go @@ -3,8 +3,8 @@ package mount import ( "context" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" "github.com/chrislusf/seaweedfs/weed/util" "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go index 590531397..ffb0cc02f 100644 --- a/weed/mount/inode_to_path.go +++ b/weed/mount/inode_to_path.go @@ -14,21 +14,23 @@ type InodeToPath struct { } type InodeEntry struct { util.FullPath - nlookup uint64 + nlookup uint64 + isDirectory bool + isChildrenCached bool } func NewInodeToPath() *InodeToPath { - return &InodeToPath{ + t := &InodeToPath{ inode2path: make(map[uint64]*InodeEntry), path2inode: make(map[util.FullPath]uint64), nextInodeId: 2, // the root inode id is 1 } + t.inode2path[1] = &InodeEntry{"/", 1, true, false} + t.path2inode["/"] = 1 + return t } -func (i *InodeToPath) Lookup(path util.FullPath) uint64 { - if path == "/" { - return 1 - } +func (i *InodeToPath) Lookup(path util.FullPath, isDirectory bool) uint64 { i.Lock() defer i.Unlock() inode, found := i.path2inode[path] @@ -36,7 +38,7 @@ func (i *InodeToPath) Lookup(path util.FullPath) uint64 { inode = i.nextInodeId i.nextInodeId++ i.path2inode[path] = inode - i.inode2path[inode] = &InodeEntry{path, 1} + i.inode2path[inode] = &InodeEntry{path, 1, isDirectory, false} } else { i.inode2path[inode].nlookup++ } @@ -58,9 +60,6 @@ func (i *InodeToPath) GetInode(path util.FullPath) uint64 { } func (i *InodeToPath) GetPath(inode uint64) util.FullPath { - if inode == 1 { - return "/" - } i.RLock() defer i.RUnlock() path, found := i.inode2path[inode] @@ -71,15 +70,37 @@ func (i *InodeToPath) GetPath(inode uint64) util.FullPath { } func (i *InodeToPath) HasPath(path util.FullPath) bool { - if path == "/" { - return true - } i.RLock() defer i.RUnlock() _, found := i.path2inode[path] return found } +func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) { + i.RLock() + defer i.RUnlock() + inode, found := i.path2inode[fullpath] + if !found { + glog.Fatalf("MarkChildrenCached not found inode %v", fullpath) + } + path, found := i.inode2path[inode] + path.isChildrenCached = true +} + +func (i *InodeToPath) IsChildrenCached(fullpath util.FullPath) bool { + i.RLock() + defer i.RUnlock() + inode, found := i.path2inode[fullpath] + if !found { + return false + } + path, found := i.inode2path[inode] + if found { + return path.isChildrenCached + } + return false +} + func (i *InodeToPath) HasInode(inode uint64) bool { if inode == 1 { return true @@ -91,9 +112,6 @@ func (i *InodeToPath) HasInode(inode uint64) bool { } func (i *InodeToPath) RemovePath(path util.FullPath) { - if path == "/" { - return - } i.Lock() defer i.Unlock() inode, found := i.path2inode[path] @@ -104,9 +122,6 @@ func (i *InodeToPath) RemovePath(path util.FullPath) { } func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) { - if sourcePath == "/" || targetPath == "/" { - return - } i.Lock() defer i.Unlock() sourceInode, sourceFound := i.path2inode[sourcePath] @@ -127,12 +142,8 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) { } } -func (i *InodeToPath) Forget(inode, nlookup uint64) { - if inode == 1 { - return - } +func (i *InodeToPath) Forget(inode, nlookup uint64, onForgetDir func(dir util.FullPath)) { i.Lock() - defer i.Unlock() path, found := i.inode2path[inode] if found { path.nlookup -= nlookup @@ -141,4 +152,10 @@ func (i *InodeToPath) Forget(inode, nlookup uint64) { delete(i.inode2path, inode) } } + i.Unlock() + if found { + if path.isDirectory && onForgetDir != nil { + onForgetDir(path.FullPath) + } + } } diff --git a/weed/mount/meta_cache/cache_config.go b/weed/mount/meta_cache/cache_config.go new file mode 100644 index 000000000..e6593ebde --- /dev/null +++ b/weed/mount/meta_cache/cache_config.go @@ -0,0 +1,32 @@ +package meta_cache + +import "github.com/chrislusf/seaweedfs/weed/util" + +var ( + _ = util.Configuration(&cacheConfig{}) +) + +// implementing util.Configuraion +type cacheConfig struct { + dir string +} + +func (c cacheConfig) GetString(key string) string { + return c.dir +} + +func (c cacheConfig) GetBool(key string) bool { + panic("implement me") +} + +func (c cacheConfig) GetInt(key string) int { + panic("implement me") +} + +func (c cacheConfig) GetStringSlice(key string) []string { + panic("implement me") +} + +func (c cacheConfig) SetDefault(key string, value interface{}) { + panic("implement me") +} diff --git a/weed/mount/meta_cache/id_mapper.go b/weed/mount/meta_cache/id_mapper.go new file mode 100644 index 000000000..4a2179f31 --- /dev/null +++ b/weed/mount/meta_cache/id_mapper.go @@ -0,0 +1,101 @@ +package meta_cache + +import ( + "fmt" + "strconv" + "strings" +) + +type UidGidMapper struct { + uidMapper *IdMapper + gidMapper *IdMapper +} + +type IdMapper struct { + localToFiler map[uint32]uint32 + filerToLocal map[uint32]uint32 +} + +// UidGidMapper translates local uid/gid to filer uid/gid +// The local storage always persists the same as the filer. +// The local->filer translation happens when updating the filer first and later saving to meta_cache. +// And filer->local happens when reading from the meta_cache. +func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) { + uidMapper, err := newIdMapper(uidPairsStr) + if err != nil { + return nil, err + } + gidMapper, err := newIdMapper(gidPairStr) + if err != nil { + return nil, err + } + + return &UidGidMapper{ + uidMapper: uidMapper, + gidMapper: gidMapper, + }, nil +} + +func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) { + return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid) +} +func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) { + return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid) +} + +func (m *IdMapper) LocalToFiler(id uint32) uint32 { + value, found := m.localToFiler[id] + if found { + return value + } + return id +} +func (m *IdMapper) FilerToLocal(id uint32) uint32 { + value, found := m.filerToLocal[id] + if found { + return value + } + return id +} + +func newIdMapper(pairsStr string) (*IdMapper, error) { + + localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr) + if err != nil { + return nil, err + } + + return &IdMapper{ + localToFiler: localToFiler, + filerToLocal: filerToLocal, + }, nil + +} + +func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) { + + if pairsStr == "" { + return + } + + localToFiler = make(map[uint32]uint32) + filerToLocal = make(map[uint32]uint32) + for _, pairStr := range strings.Split(pairsStr, ",") { + pair := strings.Split(pairStr, ":") + localUidStr, filerUidStr := pair[0], pair[1] + localUid, localUidErr := strconv.Atoi(localUidStr) + if localUidErr != nil { + err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr) + return + } + filerUid, filerUidErr := strconv.Atoi(filerUidStr) + if filerUidErr != nil { + err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr) + return + } + localToFiler[uint32(localUid)] = uint32(filerUid) + filerToLocal[uint32(filerUid)] = uint32(localUid) + } + + return +} diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go new file mode 100644 index 000000000..7f997c5b0 --- /dev/null +++ b/weed/mount/meta_cache/meta_cache.go @@ -0,0 +1,160 @@ +package meta_cache + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "os" +) + +// need to have logic similar to FilerStoreWrapper +// e.g. fill fileId field for chunks + +type MetaCache struct { + localStore filer.VirtualFilerStore + // sync.RWMutex + uidGidMapper *UidGidMapper + markCachedFn func(fullpath util.FullPath) + isCachedFn func(fullpath util.FullPath) bool + invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry) +} + +func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache { + return &MetaCache{ + localStore: openMetaStore(dbFolder), + markCachedFn: markCachedFn, + isCachedFn: isCachedFn, + uidGidMapper: uidGidMapper, + invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) { + invalidateFunc(fullpath, entry) + }, + } +} + +func openMetaStore(dbFolder string) filer.VirtualFilerStore { + + os.RemoveAll(dbFolder) + os.MkdirAll(dbFolder, 0755) + + store := &leveldb.LevelDBStore{} + config := &cacheConfig{ + dir: dbFolder, + } + + if err := store.Initialize(config, ""); err != nil { + glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) + } + + return filer.NewFilerStoreWrapper(store) + +} + +func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { + //mc.Lock() + //defer mc.Unlock() + return mc.doInsertEntry(ctx, entry) +} + +func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error { + return mc.localStore.InsertEntry(ctx, entry) +} + +func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { + //mc.Lock() + //defer mc.Unlock() + + oldDir, _ := oldPath.DirAndName() + if mc.isCachedFn(util.FullPath(oldDir)) { + if oldPath != "" { + if newEntry != nil && oldPath == newEntry.FullPath { + // skip the unnecessary deletion + // leave the update to the following InsertEntry operation + } else { + glog.V(3).Infof("DeleteEntry %s", oldPath) + if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { + return err + } + } + } + } else { + // println("unknown old directory:", oldDir) + } + + if newEntry != nil { + newDir, _ := newEntry.DirAndName() + if mc.isCachedFn(util.FullPath(newDir)) { + glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) + if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { + return err + } + } + } + return nil +} + +func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { + //mc.Lock() + //defer mc.Unlock() + return mc.localStore.UpdateEntry(ctx, entry) +} + +func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { + //mc.RLock() + //defer mc.RUnlock() + entry, err = mc.localStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + mc.mapIdFromFilerToLocal(entry) + return +} + +func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + //mc.Lock() + //defer mc.Unlock() + return mc.localStore.DeleteEntry(ctx, fp) +} +func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + //mc.Lock() + //defer mc.Unlock() + return mc.localStore.DeleteFolderChildren(ctx, fp) +} + +func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { + //mc.RLock() + //defer mc.RUnlock() + + if !mc.isCachedFn(dirPath) { + // if this request comes after renaming, it should be fine + glog.Warningf("unsynchronized dir: %v", dirPath) + } + + _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { + mc.mapIdFromFilerToLocal(entry) + return eachEntryFunc(entry) + }) + if err != nil { + return err + } + return err +} + +func (mc *MetaCache) Shutdown() { + //mc.Lock() + //defer mc.Unlock() + mc.localStore.Shutdown() +} + +func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) { + entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid) +} + +func (mc *MetaCache) Debug() { + if debuggable, ok := mc.localStore.(filer.Debuggable); ok { + println("start debugging") + debuggable.Debug(os.Stderr) + } +} diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go new file mode 100644 index 000000000..cd9c71668 --- /dev/null +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -0,0 +1,67 @@ +package meta_cache + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { + + for { + + // the directory children are already cached + // so no need for this and upper directories + if mc.isCachedFn(dirPath) { + return nil + } + + if err := doEnsureVisited(mc, client, dirPath); err != nil { + return err + } + + // continue to parent directory + if dirPath != "/" { + parent, _ := dirPath.DirAndName() + dirPath = util.FullPath(parent) + } else { + break + } + } + + return nil + +} + +func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { + + glog.V(4).Infof("ReadDirAllEntries %s ...", path) + + err := util.Retry("ReadDirAllEntries", func() error { + return filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer.FromPbEntry(string(path), pbEntry) + if IsHiddenSystemEntry(string(path), entry.Name()) { + return nil + } + if err := mc.doInsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + return err + } + return nil + }) + }) + + if err != nil { + err = fmt.Errorf("list %s: %v", path, err) + } + mc.markCachedFn(path) + return err +} + +func IsHiddenSystemEntry(dir, name string) bool { + return dir == "/" && (name == "topics" || name == "etc") +} diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go new file mode 100644 index 000000000..881fee08f --- /dev/null +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -0,0 +1,68 @@ +package meta_cache + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { + + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + + for _, sig := range message.Signatures { + if sig == selfSignature && selfSignature != 0 { + return nil + } + } + + dir := resp.Directory + var oldPath util.FullPath + var newEntry *filer.Entry + if message.OldEntry != nil { + oldPath = util.NewFullPath(dir, message.OldEntry.Name) + glog.V(4).Infof("deleting %v", oldPath) + } + + if message.NewEntry != nil { + if message.NewParentPath != "" { + dir = message.NewParentPath + } + key := util.NewFullPath(dir, message.NewEntry.Name) + glog.V(4).Infof("creating %v", key) + newEntry = filer.FromPbEntry(dir, message.NewEntry) + } + err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) + if err == nil { + if message.OldEntry != nil && message.NewEntry != nil { + oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) + mc.invalidateFunc(oldKey, message.OldEntry) + if message.OldEntry.Name != message.NewEntry.Name { + newKey := util.NewFullPath(dir, message.NewEntry.Name) + mc.invalidateFunc(newKey, message.NewEntry) + } + } else if message.OldEntry == nil && message.NewEntry != nil { + // no need to invaalidate + } else if message.OldEntry != nil && message.NewEntry == nil { + oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) + mc.invalidateFunc(oldKey, message.OldEntry) + } + } + + return err + + } + + util.RetryForever("followMetaUpdates", func() error { + return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true) + }, func(err error) bool { + glog.Errorf("follow metadata updates: %v", err) + return true + }) + + return nil +} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index b7f50cd13..0fdd9bd28 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -3,7 +3,7 @@ package mount import ( "context" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -91,7 +91,11 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024) } - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) { + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper, func(path util.FullPath) { + wfs.inodeToPath.MarkChildrenCached(path) + }, func(path util.FullPath) bool { + return wfs.inodeToPath.IsChildrenCached(path) + }, func(filePath util.FullPath, entry *filer_pb.Entry) { }) grace.OnInterrupt(func() { wfs.metaCache.Shutdown() @@ -103,6 +107,11 @@ func NewSeaweedFileSystem(option *Option) *WFS { return wfs } +func (wfs *WFS) StartBackgroundTasks() { + startTime := time.Now() + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) +} + func (wfs *WFS) Root() *Directory { return &wfs.root } diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go index 733e31908..30b61d75f 100644 --- a/weed/mount/weedfs_dir_lookup.go +++ b/weed/mount/weedfs_dir_lookup.go @@ -3,8 +3,8 @@ package mount import ( "context" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/hanwen/go-fuse/v2/fuse" ) @@ -50,7 +50,7 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin return fuse.ENOENT } - inode := wfs.inodeToPath.Lookup(fullFilePath) + inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.IsDirectory()) wfs.outputFilerEntry(out, inode, localEntry) diff --git a/weed/mount/weedfs_dir_mkrm.go b/weed/mount/weedfs_dir_mkrm.go index 4efab078f..839fa493b 100644 --- a/weed/mount/weedfs_dir_mkrm.go +++ b/weed/mount/weedfs_dir_mkrm.go @@ -71,7 +71,7 @@ func (wfs *WFS) Mkdir(cancel <-chan struct{}, in *fuse.MkdirIn, name string, out return fuse.EIO } - inode := wfs.inodeToPath.Lookup(entryFullPath) + inode := wfs.inodeToPath.Lookup(entryFullPath, true) wfs.outputPbEntry(out, inode, newEntry) diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index 3a187aa1c..0177c1863 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -3,8 +3,8 @@ package mount import ( "context" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" "github.com/chrislusf/seaweedfs/weed/util" "github.com/hanwen/go-fuse/v2/fuse" "math" diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go index 218ce24f1..c3fd04661 100644 --- a/weed/mount/weedfs_file_mkrm.go +++ b/weed/mount/weedfs_file_mkrm.go @@ -88,7 +88,7 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out return fuse.EIO } - inode := wfs.inodeToPath.Lookup(entryFullPath) + inode := wfs.inodeToPath.Lookup(entryFullPath, false) wfs.outputPbEntry(out, inode, newEntry) @@ -125,8 +125,6 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin wfs.metaCache.DeleteEntry(context.Background(), entryFullPath) wfs.inodeToPath.RemovePath(entryFullPath) - // TODO handle open files, hardlink - return fuse.OK } diff --git a/weed/mount/weedfs_forget.go b/weed/mount/weedfs_forget.go index 14b39882e..62946b216 100644 --- a/weed/mount/weedfs_forget.go +++ b/weed/mount/weedfs_forget.go @@ -1,5 +1,10 @@ package mount +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/util" +) + // Forget is called when the kernel discards entries from its // dentry cache. This happens on unmount, and when the kernel // is short on memory. Since it is not guaranteed to occur at @@ -57,5 +62,7 @@ Side effects: increments the lookup count on success */ func (wfs *WFS) Forget(nodeid, nlookup uint64) { - wfs.inodeToPath.Forget(nodeid, nlookup) + wfs.inodeToPath.Forget(nodeid, nlookup, func(dir util.FullPath) { + wfs.metaCache.DeleteFolderChildren(context.Background(), dir) + }) } diff --git a/weed/mount/weedfs_link.go b/weed/mount/weedfs_link.go index 05710e5a0..ca252d639 100644 --- a/weed/mount/weedfs_link.go +++ b/weed/mount/weedfs_link.go @@ -85,7 +85,7 @@ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out * return fuse.EIO } - inode := wfs.inodeToPath.Lookup(newEntryPath) + inode := wfs.inodeToPath.Lookup(newEntryPath, false) wfs.outputPbEntry(out, inode, request.Entry) diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go index a4054b64a..9e461abce 100644 --- a/weed/mount/weedfs_rename.go +++ b/weed/mount/weedfs_rename.go @@ -223,8 +223,6 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR wfs.inodeToPath.MovePath(oldPath, newPath) - // TODO change file handle - } else if resp.EventNotification.OldEntry != nil { // without new entry, only old entry name exists. This is the second step to delete old entry if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil { diff --git a/weed/mount/weedfs_symlink.go b/weed/mount/weedfs_symlink.go index 86a7b50e4..c47ad0a2e 100644 --- a/weed/mount/weedfs_symlink.go +++ b/weed/mount/weedfs_symlink.go @@ -56,7 +56,7 @@ func (wfs *WFS) Symlink(cancel <-chan struct{}, header *fuse.InHeader, target st return fuse.EIO } - inode := wfs.inodeToPath.Lookup(entryFullPath) + inode := wfs.inodeToPath.Lookup(entryFullPath, false) wfs.outputPbEntry(out, inode, request.Entry)