Browse Source

filer: configurable directory list cache size

pull/1158/head
Chris Lu 5 years ago
parent
commit
05c3b795dc
  1. 4
      weed/command/mount.go
  2. 2
      weed/command/mount_std.go
  3. 18
      weed/filesys/dir.go
  4. 10
      weed/filesys/file.go
  5. 9
      weed/filesys/wfs.go

4
weed/command/mount.go

@ -10,7 +10,7 @@ type MountOptions struct {
filer *string
filerMountRootPath *string
dir *string
dirListingLimit *int
dirListingLimit *int64
collection *string
replication *string
ttlSec *int
@ -31,7 +31,7 @@ func init() {
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size")
mountOptions.dirListingLimit = cmdMount.Flag.Int64("dirListLimit", 1000000, "limit directory listing size")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")

2
weed/command/mount_std.go

@ -49,7 +49,7 @@ func runMount(cmd *Command, args []string) bool {
}
func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int,
allowOthers bool, ttlSec int, dirListingLimit int, umask os.FileMode) bool {
allowOthers bool, ttlSec int, dirListingLimit int64, umask os.FileMode) bool {
util.LoadConfiguration("security", false)

18
weed/filesys/dir.go

@ -30,6 +30,8 @@ var _ = fs.NodeSetattrer(&Dir{})
func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
glog.V(3).Infof("dir Attr %s", dir.Path)
// https://github.com/bazil/fuse/issues/196
attr.Valid = time.Second
@ -40,6 +42,9 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
item := dir.wfs.listDirectoryEntriesCache.Get(dir.Path)
if item != nil && !item.Expired() {
glog.V(4).Infof("dir Attr cache hit %s", dir.Path)
entry := item.Value().(*filer_pb.Entry)
attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
@ -51,6 +56,8 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
return nil
}
glog.V(3).Infof("dir Attr cache miss %s", dir.Path)
entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path)
if err != nil {
glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err)
@ -175,6 +182,8 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name)
var entry *filer_pb.Entry
fullFilePath := path.Join(dir.Path, req.Name)
@ -184,10 +193,14 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
}
if entry == nil {
glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath)
if err != nil {
return nil, err
}
dir.wfs.listDirectoryEntriesCache.Set(fullFilePath, entry, 5*time.Second)
} else {
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
if entry != nil {
@ -212,7 +225,9 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
cacheTtl := 3 * time.Second
glog.V(3).Infof("dir ReadDirAll %s", dir.Path)
cacheTtl := 10 * time.Minute
readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, dir.Path, "", func(entry *filer_pb.Entry, isLast bool) {
if entry.IsDirectory {
@ -222,7 +237,6 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
ret = append(ret, dirent)
}
cacheTtl = cacheTtl + 2 * time.Millisecond
dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl)
})
if readErr != nil {

10
weed/filesys/file.go

@ -36,6 +36,8 @@ func (file *File) fullpath() string {
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
glog.V(4).Infof("file Attr %s", file.fullpath())
if err := file.maybeLoadAttributes(ctx); err != nil {
return err
}
@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
glog.V(3).Infof("%v file open %+v", file.fullpath(), req)
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
file.isOpen = true
@ -140,10 +142,16 @@ func (file *File) maybeLoadAttributes(ctx context.Context) error {
if file.entry == nil || !file.isOpen {
item := file.wfs.listDirectoryEntriesCache.Get(file.fullpath())
if item != nil && !item.Expired() {
glog.V(4).Infof("file read attr cache hit %s", file.fullpath())
entry := item.Value().(*filer_pb.Entry)
file.setEntry(entry)
// glog.V(1).Infof("file attr read cached %v attributes", file.Name)
} else {
glog.V(3).Infof("file read attr cache miss %s", file.fullpath())
err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{

9
weed/filesys/wfs.go

@ -8,13 +8,14 @@ import (
"sync"
"time"
"github.com/karlseguin/ccache"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/karlseguin/ccache"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"google.golang.org/grpc"
)
type Option struct {
@ -26,7 +27,7 @@ type Option struct {
TtlSec int32
ChunkSizeLimit int64
DataCenter string
DirListingLimit int
DirListingLimit int64
EntryCacheTtl time.Duration
Umask os.FileMode
@ -60,7 +61,7 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)),
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListingLimit * 3).ItemsToPrune(100)),
pathToHandleIndex: make(map[string]int),
bufPool: sync.Pool{
New: func() interface{} {

Loading…
Cancel
Save