Browse Source

removed async option

pull/1377/head
Chris Lu 5 years ago
parent
commit
9033a7d369
  1. 2
      weed/command/mount.go
  2. 1
      weed/command/mount_std.go
  3. 54
      weed/filesys/dir.go
  4. 4
      weed/filesys/dir_link.go
  5. 4
      weed/filesys/file.go
  6. 4
      weed/filesys/filehandle.go
  7. 30
      weed/filesys/wfs.go
  8. 39
      weed/filesys/xattr.go

2
weed/command/mount.go

@ -21,7 +21,6 @@ type MountOptions struct {
umaskString *string umaskString *string
nonempty *bool nonempty *bool
outsideContainerClusterMode *bool outsideContainerClusterMode *bool
asyncMetaDataCaching *bool
} }
var ( var (
@ -50,7 +49,6 @@ func init() {
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system") mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.")
} }
var cmdMount = &Command{ var cmdMount = &Command{

1
weed/command/mount_std.go

@ -170,7 +170,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
MountMtime: time.Now(), MountMtime: time.Now(),
Umask: umask, Umask: umask,
OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode, OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching,
Cipher: cipher, Cipher: cipher,
}) })

54
weed/filesys/dir.go

@ -141,9 +141,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
return fuse.EIO return fuse.EIO
} }
if dir.wfs.option.AsyncMetaDataCaching { dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
}
return nil return nil
}); err != nil { }); err != nil {
@ -192,9 +190,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
return err return err
} }
if dir.wfs.option.AsyncMetaDataCaching { dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
}
return nil return nil
}) })
@ -219,13 +215,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
dirPath := util.FullPath(dir.FullPath()) dirPath := util.FullPath(dir.FullPath())
meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath)) meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath))
if dir.wfs.option.AsyncMetaDataCaching { cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) if cacheErr == filer_pb.ErrNotFound {
if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT
return nil, fuse.ENOENT
}
entry = cachedEntry.ToProtoEntry()
} }
entry = cachedEntry.ToProtoEntry()
if entry == nil { if entry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
@ -283,25 +277,15 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
dirPath := util.FullPath(dir.FullPath()) dirPath := util.FullPath(dir.FullPath())
meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
if dir.wfs.option.AsyncMetaDataCaching { listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) if listErr != nil {
if listErr != nil { glog.Errorf("list meta cache: %v", listErr)
glog.Errorf("list meta cache: %v", listErr) return nil, fuse.EIO
return nil, fuse.EIO
}
for _, cachedEntry := range listedEntries {
processEachEntryFn(cachedEntry.ToProtoEntry(), false)
}
return
} }
for _, cachedEntry := range listedEntries {
readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn) processEachEntryFn(cachedEntry.ToProtoEntry(), false)
if readErr != nil {
glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
return ret, fuse.EIO
} }
return
return ret, err
} }
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
@ -330,9 +314,7 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
dir.wfs.cacheDelete(filePath) dir.wfs.cacheDelete(filePath)
dir.wfs.fsNodeCache.DeleteFsNode(filePath) dir.wfs.fsNodeCache.DeleteFsNode(filePath)
if dir.wfs.option.AsyncMetaDataCaching { dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
}
glog.V(3).Infof("remove file: %v", req) glog.V(3).Infof("remove file: %v", req)
err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false) err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false)
@ -351,9 +333,7 @@ func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
dir.wfs.cacheDelete(t) dir.wfs.cacheDelete(t)
dir.wfs.fsNodeCache.DeleteFsNode(t) dir.wfs.fsNodeCache.DeleteFsNode(t)
if dir.wfs.option.AsyncMetaDataCaching { dir.wfs.metaCache.DeleteEntry(context.Background(), t)
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
}
glog.V(3).Infof("remove directory entry: %v", req) glog.V(3).Infof("remove directory entry: %v", req)
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false)
@ -484,9 +464,7 @@ func (dir *Dir) saveEntry() error {
return fuse.EIO return fuse.EIO
} }
if dir.wfs.option.AsyncMetaDataCaching { dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
}
return nil return nil
}) })

4
weed/filesys/dir_link.go

@ -42,9 +42,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
return fuse.EIO return fuse.EIO
} }
if dir.wfs.option.AsyncMetaDataCaching { dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
}
return nil return nil
}) })

4
weed/filesys/file.go

@ -278,9 +278,7 @@ func (file *File) saveEntry() error {
return fuse.EIO return fuse.EIO
} }
if file.wfs.option.AsyncMetaDataCaching { file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
}
return nil return nil
}) })

4
weed/filesys/filehandle.go

@ -215,9 +215,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
} }
if fh.f.wfs.option.AsyncMetaDataCaching { fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
}
fh.f.wfs.deleteFileChunks(garbages) fh.f.wfs.deleteFileChunks(garbages)
for i, chunk := range garbages { for i, chunk := range garbages {

30
weed/filesys/wfs.go

@ -10,18 +10,20 @@ import (
"sync" "sync"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/karlseguin/ccache" "github.com/karlseguin/ccache"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
) )
type Option struct { type Option struct {
@ -47,7 +49,6 @@ type Option struct {
OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
Cipher bool // whether encrypt data on volume server Cipher bool // whether encrypt data on volume server
AsyncMetaDataCaching bool // whether asynchronously cache meta data
} }
@ -95,17 +96,16 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache.Shutdown() wfs.chunkCache.Shutdown()
}) })
} }
if wfs.option.AsyncMetaDataCaching { wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) startTime := time.Now()
startTime := time.Now() if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { glog.V(0).Infof("failed to init meta cache: %v", err)
glog.V(0).Infof("failed to init meta cache: %v", err) } else {
} else { go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) grace.OnInterrupt(func() {
grace.OnInterrupt(func() { wfs.metaCache.Shutdown()
wfs.metaCache.Shutdown() })
})
}
} }
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}

39
weed/filesys/xattr.go

@ -3,11 +3,11 @@ package filesys
import ( import (
"context" "context"
"github.com/seaweedfs/fuse"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
) )
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
@ -119,36 +119,9 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err
// read from async meta cache // read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
if wfs.option.AsyncMetaDataCaching { cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) if cacheErr == filer_pb.ErrNotFound {
if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT
return nil, fuse.ENOENT
}
return cachedEntry.ToProtoEntry(), nil
} }
return cachedEntry.ToProtoEntry(), nil
err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
resp, err := filer_pb.LookupEntry(client, request)
if err != nil {
if err == filer_pb.ErrNotFound {
glog.V(3).Infof("file attr read not found file %v: %v", request, err)
return fuse.ENOENT
}
glog.V(3).Infof("attr read %v: %v", request, err)
return fuse.EIO
}
entry = resp.Entry
wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl)
return nil
})
return
} }
|||||||
100:0
Loading…
Cancel
Save