From 9033a7d3692572fc10ddf47d992a91271d693f52 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Jun 2020 10:18:32 -0700 Subject: [PATCH] removed async option --- weed/command/mount.go | 2 -- weed/command/mount_std.go | 1 - weed/filesys/dir.go | 54 +++++++++++--------------------------- weed/filesys/dir_link.go | 4 +-- weed/filesys/file.go | 4 +-- weed/filesys/filehandle.go | 4 +-- weed/filesys/wfs.go | 30 ++++++++++----------- weed/filesys/xattr.go | 39 +++++---------------------- 8 files changed, 40 insertions(+), 98 deletions(-) diff --git a/weed/command/mount.go b/weed/command/mount.go index 97207f7f9..440aca8c6 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -21,7 +21,6 @@ type MountOptions struct { umaskString *string nonempty *bool outsideContainerClusterMode *bool - asyncMetaDataCaching *bool } var ( @@ -50,7 +49,6 @@ func init() { mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system") - mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.") } var cmdMount = &Command{ diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index abcf85110..915754166 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -170,7 +170,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { MountMtime: time.Now(), Umask: umask, OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode, - AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching, Cipher: cipher, }) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 2a4a6a1a5..bb4500531 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -141,9 +141,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }); err != nil { @@ -192,9 +190,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err return err } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) @@ -219,13 +215,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. dirPath := util.FullPath(dir.FullPath()) meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath)) - if dir.wfs.option.AsyncMetaDataCaching { - cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - entry = cachedEntry.ToProtoEntry() + cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT } + entry = cachedEntry.ToProtoEntry() if entry == nil { // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) @@ -283,25 +277,15 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { dirPath := util.FullPath(dir.FullPath()) meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) - if dir.wfs.option.AsyncMetaDataCaching { - listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return nil, fuse.EIO - } - for _, cachedEntry := range listedEntries { - processEachEntryFn(cachedEntry.ToProtoEntry(), false) - } - return + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO } - - readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn) - if readErr != nil { - glog.V(0).Infof("list %s: %v", dir.FullPath(), err) - return ret, fuse.EIO + for _, cachedEntry := range listedEntries { + processEachEntryFn(cachedEntry.ToProtoEntry(), false) } - - return ret, err + return } func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { @@ -330,9 +314,7 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { dir.wfs.cacheDelete(filePath) dir.wfs.fsNodeCache.DeleteFsNode(filePath) - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) - } + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) glog.V(3).Infof("remove file: %v", req) err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false) @@ -351,9 +333,7 @@ func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { dir.wfs.cacheDelete(t) dir.wfs.fsNodeCache.DeleteFsNode(t) - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.DeleteEntry(context.Background(), t) - } + dir.wfs.metaCache.DeleteEntry(context.Background(), t) glog.V(3).Infof("remove directory entry: %v", req) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false) @@ -484,9 +464,7 @@ func (dir *Dir) saveEntry() error { return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index d1858e99b..4990e743c 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -42,9 +42,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/file.go b/weed/filesys/file.go index bafbd7cc8..31059112f 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -278,9 +278,7 @@ func (file *File) saveEntry() error { return fuse.EIO } - if file.wfs.option.AsyncMetaDataCaching { - file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 83daccad1..9b9df916c 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -215,9 +215,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) } - if fh.f.wfs.option.AsyncMetaDataCaching { - fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) fh.f.wfs.deleteFileChunks(garbages) for i, chunk := range garbages { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 2b0ef64c2..7fbab271c 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -10,18 +10,20 @@ import ( "sync" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/karlseguin/ccache" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type Option struct { @@ -47,7 +49,6 @@ type Option struct { OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers Cipher bool // whether encrypt data on volume server - AsyncMetaDataCaching bool // whether asynchronously cache meta data } @@ -95,17 +96,16 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache.Shutdown() }) } - if wfs.option.AsyncMetaDataCaching { - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) - startTime := time.Now() - if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { - glog.V(0).Infof("failed to init meta cache: %v", err) - } else { - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) - grace.OnInterrupt(func() { - wfs.metaCache.Shutdown() - }) - } + + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) + startTime := time.Now() + if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { + glog.V(0).Infof("failed to init meta cache: %v", err) + } else { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) } wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 9603ece79..b6bce6978 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -3,11 +3,11 @@ package filesys import ( "context" + "github.com/seaweedfs/fuse" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" ) func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { @@ -119,36 +119,9 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err // read from async meta cache meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) - if wfs.option.AsyncMetaDataCaching { - cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - return cachedEntry.ToProtoEntry(), nil + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT } - - err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Name: name, - Directory: dir, - } - - resp, err := filer_pb.LookupEntry(client, request) - if err != nil { - if err == filer_pb.ErrNotFound { - glog.V(3).Infof("file attr read not found file %v: %v", request, err) - return fuse.ENOENT - } - glog.V(3).Infof("attr read %v: %v", request, err) - return fuse.EIO - } - - entry = resp.Entry - wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl) - - return nil - }) - - return + return cachedEntry.ToProtoEntry(), nil }