diff --git a/weed/command/mount.go b/weed/command/mount.go index 440aca8c6..97207f7f9 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -21,6 +21,7 @@ type MountOptions struct { umaskString *string nonempty *bool outsideContainerClusterMode *bool + asyncMetaDataCaching *bool } var ( @@ -49,6 +50,7 @@ func init() { mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system") + mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.") } var cmdMount = &Command{ diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 915754166..abcf85110 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -170,6 +170,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { MountMtime: time.Now(), Umask: umask, OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode, + AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching, Cipher: cipher, }) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 18d21cf7f..2a4a6a1a5 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -7,13 +7,14 @@ import ( "strings" "time" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type Dir struct { @@ -90,18 +91,22 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, - } + return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, + entryViewCache: nil, + } + }) } func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { - return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} + return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} + }) } @@ -136,7 +141,9 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, return fuse.EIO } - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } return nil }); err != nil { @@ -185,7 +192,9 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err return err } - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } return nil }) @@ -205,15 +214,18 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) - dirPath := dir.FullPath() - fullFilePath := util.NewFullPath(dirPath, req.Name) + fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) + entry := dir.wfs.cacheGet(fullFilePath) + dirPath := util.FullPath(dir.FullPath()) meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath)) - cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT + if dir.wfs.option.AsyncMetaDataCaching { + cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + entry = cachedEntry.ToProtoEntry() } - entry := cachedEntry.ToProtoEntry() if entry == nil { // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) @@ -222,6 +234,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } + dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute) } else { glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } @@ -253,6 +266,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) + cacheTtl := 5 * time.Minute processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { fullpath := util.NewFullPath(dir.FullPath(), entry.Name) inode := fullpath.AsInode() @@ -263,21 +277,31 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File} ret = append(ret, dirent) } + dir.wfs.cacheSet(fullpath, entry, cacheTtl) return nil } dirPath := util.FullPath(dir.FullPath()) meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) - listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int(dir.wfs.option.DirListCacheLimit)) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return nil, fuse.EIO + if dir.wfs.option.AsyncMetaDataCaching { + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO + } + for _, cachedEntry := range listedEntries { + processEachEntryFn(cachedEntry.ToProtoEntry(), false) + } + return } - for _, cachedEntry := range listedEntries { - processEachEntryFn(cachedEntry.ToProtoEntry(), false) + + readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn) + if readErr != nil { + glog.V(0).Infof("list %s: %v", dir.FullPath(), err) + return ret, fuse.EIO } - return + return ret, err } func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { @@ -303,7 +327,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { dir.wfs.deleteFileChunks(entry.Chunks) - dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + dir.wfs.cacheDelete(filePath) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) + + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + } glog.V(3).Infof("remove file: %v", req) err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false) @@ -319,8 +348,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { t := util.NewFullPath(dir.FullPath(), req.Name) + dir.wfs.cacheDelete(t) + dir.wfs.fsNodeCache.DeleteFsNode(t) - dir.wfs.metaCache.DeleteEntry(context.Background(), t) + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.DeleteEntry(context.Background(), t) + } glog.V(3).Infof("remove directory entry: %v", req) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false) @@ -357,6 +390,8 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus dir.entry.Attributes.Mtime = req.Mtime.Unix() } + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) + return dir.saveEntry() } @@ -373,6 +408,8 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { return err } + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) + return dir.saveEntry() } @@ -389,6 +426,8 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e return err } + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) + return dir.saveEntry() } @@ -411,6 +450,8 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp func (dir *Dir) Forget() { glog.V(3).Infof("Forget dir %s", dir.FullPath()) + + dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } func (dir *Dir) maybeLoadEntry() error { @@ -443,7 +484,9 @@ func (dir *Dir) saveEntry() error { return fuse.EIO } - dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } return nil }) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 4990e743c..d1858e99b 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -42,7 +42,9 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, return fuse.EIO } - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } return nil }) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 92d667c57..ea40f5c31 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -38,5 +38,14 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector }) + if err == nil { + dir.wfs.cacheDelete(newPath) + dir.wfs.cacheDelete(oldPath) + + // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) + dir.wfs.fsNodeCache.Move(oldPath, newPath) + + } + return err } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 0f788a888..bafbd7cc8 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -150,6 +150,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } + file.wfs.cacheDelete(file.fullpath()) + return file.saveEntry() } @@ -166,6 +168,8 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error return err } + file.wfs.cacheDelete(file.fullpath()) + return file.saveEntry() } @@ -182,6 +186,8 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) return err } + file.wfs.cacheDelete(file.fullpath()) + return file.saveEntry() } @@ -212,7 +218,8 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) - glog.V(4).Infof("Forget file %s", t) + glog.V(3).Infof("Forget file %s", t) + file.wfs.fsNodeCache.DeleteFsNode(t) } func (file *File) maybeLoadEntry(ctx context.Context) error { @@ -271,7 +278,9 @@ func (file *File) saveEntry() error { return fuse.EIO } - file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + if file.wfs.option.AsyncMetaDataCaching { + file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } return nil }) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 9b9df916c..83daccad1 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -215,7 +215,9 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) } - fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + if fh.f.wfs.option.AsyncMetaDataCaching { + fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } fh.f.wfs.deleteFileChunks(garbages) for i, chunk := range garbages { diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go new file mode 100644 index 000000000..b146f0615 --- /dev/null +++ b/weed/filesys/fscache.go @@ -0,0 +1,207 @@ +package filesys + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse/fs" +) + +type FsCache struct { + root *FsNode + sync.RWMutex +} +type FsNode struct { + parent *FsNode + node fs.Node + name string + childrenLock sync.RWMutex + children map[string]*FsNode +} + +func newFsCache(root fs.Node) *FsCache { + return &FsCache{ + root: &FsNode{ + node: root, + }, + } +} + +func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { + + c.RLock() + defer c.RUnlock() + + return c.doGetFsNode(path) +} + +func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return nil + } + } + return t.node +} + +func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { + + c.Lock() + defer c.Unlock() + + c.doSetFsNode(path, node) +} + +func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { + t := c.root + for _, p := range path.Split() { + t = t.ensureChild(p) + } + t.node = node +} + +func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { + + c.Lock() + defer c.Unlock() + + t := c.doGetFsNode(path) + if t != nil { + return t + } + t = genNodeFn() + c.doSetFsNode(path, t) + return t +} + +func (c *FsCache) DeleteFsNode(path util.FullPath) { + + c.Lock() + defer c.Unlock() + + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return + } + } + if t.parent != nil { + t.parent.disconnectChild(t) + } + t.deleteSelf() +} + +// oldPath and newPath are full path including the new name +func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { + + c.Lock() + defer c.Unlock() + + // find old node + src := c.root + for _, p := range oldPath.Split() { + src = src.findChild(p) + if src == nil { + return src + } + } + if src.parent != nil { + src.parent.disconnectChild(src) + } + + // find new node + target := c.root + for _, p := range newPath.Split() { + target = target.ensureChild(p) + } + parent := target.parent + src.name = target.name + if dir, ok := src.node.(*Dir); ok { + dir.name = target.name // target is not Dir, but a shortcut + } + if f, ok := src.node.(*File); ok { + f.Name = target.name + if f.entry != nil { + f.entry.Name = f.Name + } + } + parent.disconnectChild(target) + + target.deleteSelf() + + src.connectToParent(parent) + + return src +} + +func (n *FsNode) connectToParent(parent *FsNode) { + n.parent = parent + oldNode := parent.findChild(n.name) + if oldNode != nil { + oldNode.deleteSelf() + } + if dir, ok := n.node.(*Dir); ok { + dir.parent = parent.node.(*Dir) + } + if f, ok := n.node.(*File); ok { + f.dir = parent.node.(*Dir) + } + n.childrenLock.Lock() + parent.children[n.name] = n + n.childrenLock.Unlock() +} + +func (n *FsNode) findChild(name string) *FsNode { + n.childrenLock.RLock() + defer n.childrenLock.RUnlock() + + child, found := n.children[name] + if found { + return child + } + return nil +} + +func (n *FsNode) ensureChild(name string) *FsNode { + n.childrenLock.Lock() + defer n.childrenLock.Unlock() + + if n.children == nil { + n.children = make(map[string]*FsNode) + } + child, found := n.children[name] + if found { + return child + } + t := &FsNode{ + parent: n, + node: nil, + name: name, + children: nil, + } + n.children[name] = t + return t +} + +func (n *FsNode) disconnectChild(child *FsNode) { + n.childrenLock.Lock() + delete(n.children, child.name) + n.childrenLock.Unlock() + child.parent = nil +} + +func (n *FsNode) deleteSelf() { + n.childrenLock.Lock() + for _, child := range n.children { + child.deleteSelf() + } + n.children = nil + n.childrenLock.Unlock() + + n.node = nil + n.parent = nil + +} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go new file mode 100644 index 000000000..67f9aacc8 --- /dev/null +++ b/weed/filesys/fscache_test.go @@ -0,0 +1,96 @@ +package filesys + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestPathSplit(t *testing.T) { + parts := util.FullPath("/").Split() + if len(parts) != 0 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + + parts = util.FullPath("/readme.md").Split() + if len(parts) != 1 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + +} + +func TestFsCache(t *testing.T) { + + cache := newFsCache(nil) + + x := cache.GetFsNode(util.FullPath("/y/x")) + if x != nil { + t.Errorf("wrong node!") + } + + p := util.FullPath("/a/b/c") + cache.SetFsNode(p, &File{Name: "cc"}) + tNode := cache.GetFsNode(p) + tFile := tNode.(*File) + if tFile.Name != "cc" { + t.Errorf("expecting a FsNode") + } + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + b := cache.GetFsNode(util.FullPath("/a/b")) + if b != nil { + t.Errorf("unexpected node!") + } + + a := cache.GetFsNode(util.FullPath("/a")) + if a == nil { + t.Errorf("missing node!") + } + + cache.DeleteFsNode(util.FullPath("/a")) + if b != nil { + t.Errorf("unexpected node!") + } + + a = cache.GetFsNode(util.FullPath("/a")) + if a != nil { + t.Errorf("wrong DeleteFsNode!") + } + + z := cache.GetFsNode(util.FullPath("/z")) + if z == nil { + t.Errorf("missing node!") + } + + y := cache.GetFsNode(util.FullPath("/x/y")) + if y != nil { + t.Errorf("wrong node!") + } + +} + +func TestFsCacheMove(t *testing.T) { + + cache := newFsCache(nil) + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) + + d := cache.GetFsNode(util.FullPath("/z/x/d")) + if d == nil { + t.Errorf("unexpected nil node!") + } + if d.(*File).Name != "dd" { + t.Errorf("unexpected non dd node!") + } + +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index ee4dcc916..2b0ef64c2 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -10,12 +10,9 @@ import ( "sync" "time" - "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/util/grace" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" + "github.com/karlseguin/ccache" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" @@ -23,6 +20,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" ) type Option struct { @@ -48,6 +47,7 @@ type Option struct { OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers Cipher bool // whether encrypt data on volume server + AsyncMetaDataCaching bool // whether asynchronously cache meta data } @@ -56,6 +56,7 @@ var _ = fs.FSStatfser(&WFS{}) type WFS struct { option *Option + listDirectoryEntriesCache *ccache.Cache // contains all open handles, protected by handlesLock handlesLock sync.Mutex @@ -66,6 +67,7 @@ type WFS struct { stats statsCache root fs.Node + fsNodeCache *FsCache chunkCache *chunk_cache.ChunkCache metaCache *meta_cache.MetaCache @@ -78,6 +80,7 @@ type statsCache struct { func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, + listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { @@ -92,18 +95,21 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache.Shutdown() }) } - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) - startTime := time.Now() - if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { - glog.V(0).Infof("failed to init meta cache: %v", err) - } else { - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) - grace.OnInterrupt(func() { - wfs.metaCache.Shutdown() - }) + if wfs.option.AsyncMetaDataCaching { + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) + startTime := time.Now() + if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { + glog.V(0).Infof("failed to init meta cache: %v", err) + } else { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) + } } wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.fsNodeCache = newFsCache(wfs.root) return wfs } @@ -223,6 +229,24 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. return nil } +func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry { + item := wfs.listDirectoryEntriesCache.Get(string(path)) + if item != nil && !item.Expired() { + return item.Value().(*filer_pb.Entry) + } + return nil +} +func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) { + if entry == nil { + wfs.listDirectoryEntriesCache.Delete(string(path)) + } else { + wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) + } +} +func (wfs *WFS) cacheDelete(path util.FullPath) { + wfs.listDirectoryEntriesCache.Delete(string(path)) +} + func (wfs *WFS) AdjustedUrl(hostAndPort string) string { if !wfs.option.OutsideContainerClusterMode { return hostAndPort diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 870a72ebe..9603ece79 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -3,11 +3,11 @@ package filesys import ( "context" - "github.com/seaweedfs/fuse" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse" ) func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { @@ -111,14 +111,44 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { fullpath := util.NewFullPath(dir, name) + entry = wfs.cacheGet(fullpath) + if entry != nil { + return + } // glog.V(3).Infof("read entry cache miss %s", fullpath) // read from async meta cache meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) - cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT + if wfs.option.AsyncMetaDataCaching { + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + return cachedEntry.ToProtoEntry(), nil } - return cachedEntry.ToProtoEntry(), nil + err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Name: name, + Directory: dir, + } + + resp, err := filer_pb.LookupEntry(client, request) + if err != nil { + if err == filer_pb.ErrNotFound { + glog.V(3).Infof("file attr read not found file %v: %v", request, err) + return fuse.ENOENT + } + glog.V(3).Infof("attr read %v: %v", request, err) + return fuse.EIO + } + + entry = resp.Entry + wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl) + + return nil + }) + + return }