From 7f0d87b2065ffb5e7be1bd0591416b9649839c9e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 26 Mar 2020 00:08:14 -0700 Subject: [PATCH] tree structured fs cache FsCache for FsNode, wrapping fs.Node --- go.mod | 1 + weed/filesys/dir.go | 106 +++++++++++++++++++++++-------------- weed/filesys/dir_link.go | 8 +-- weed/filesys/dir_rename.go | 11 ++-- weed/filesys/dir_test.go | 34 ++++++++++++ weed/filesys/dirty_page.go | 4 +- weed/filesys/file.go | 14 ++--- weed/filesys/filehandle.go | 2 +- weed/filesys/fscache.go | 33 ++++++++++-- weed/filesys/wfs.go | 6 +-- 10 files changed, 150 insertions(+), 69 deletions(-) create mode 100644 weed/filesys/dir_test.go diff --git a/go.mod b/go.mod index 61716fe36..15d7a3759 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.4.0 github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 // indirect + github.com/stretchr/testify v1.3.0 github.com/syndtr/goleveldb v1.0.0 github.com/tidwall/gjson v1.3.2 github.com/tidwall/match v1.0.1 diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index b4a771f37..0b330fd4f 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -1,6 +1,7 @@ package filesys import ( + "bytes" "context" "os" "strings" @@ -14,9 +15,10 @@ import ( ) type Dir struct { - Path string - wfs *WFS - entry *filer_pb.Entry + name string + wfs *WFS + entry *filer_pb.Entry + parent *Dir } var _ = fs.Node(&Dir{}) @@ -38,32 +40,32 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { // https://github.com/bazil/fuse/issues/196 attr.Valid = time.Second - if dir.Path == dir.wfs.option.FilerMountRootPath { + if dir.FullPath() == dir.wfs.option.FilerMountRootPath { dir.setRootDirAttributes(attr) - glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.Path, attr) + glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } if err := dir.maybeLoadEntry(); err != nil { - glog.V(3).Infof("dir Attr %s,err: %+v", dir.Path, err) + glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err) return err } - attr.Inode = util.FullPath(dir.Path).AsInode() + attr.Inode = util.FullPath(dir.FullPath()).AsInode() attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0) attr.Gid = dir.entry.Attributes.Gid attr.Uid = dir.entry.Attributes.Uid - glog.V(3).Infof("dir Attr %s, attr: %+v", dir.Path, attr) + glog.V(3).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - glog.V(4).Infof("dir Getxattr %s", dir.Path) + glog.V(4).Infof("dir Getxattr %s", dir.FullPath()) if err := dir.maybeLoadEntry(); err != nil { return err @@ -86,7 +88,7 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.Path, name), func() fs.Node { + return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { return &File{ Name: name, dir: dir, @@ -100,7 +102,7 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { - return &Dir{Path: string(fullpath), wfs: dir.wfs, entry: entry} + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir,} }) } @@ -109,7 +111,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: &filer_pb.Entry{ Name: req.Name, IsDirectory: req.Mode&os.ModeDir > 0, @@ -126,7 +128,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, }, OExcl: req.Flags&fuse.OpenExclusive != 0, } - glog.V(1).Infof("create %s/%s: %v", dir.Path, req.Name, req.Flags) + glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags) if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(client, request); err != nil { @@ -141,7 +143,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, } var node fs.Node if request.Entry.IsDirectory { - node = dir.newDirectory(util.NewFullPath(dir.Path, req.Name), request.Entry) + node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry) return node, nil, nil } @@ -170,13 +172,13 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: newEntry, } glog.V(1).Infof("mkdir: %v", request) if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err) + glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err) return err } @@ -184,7 +186,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err }) if err == nil { - node := dir.newDirectory(util.NewFullPath(dir.Path(), req.Name), newEntry) + node := dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), newEntry) return node, nil } @@ -193,9 +195,9 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Lookup %s: %s", dir.FullPath(), req.Name) - fullFilePath := util.NewFullPath(dir.Path, req.Name) + fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) entry := dir.wfs.cacheGet(fullFilePath) if entry == nil { @@ -235,12 +237,12 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(3).Infof("dir ReadDirAll %s", dir.Path) + glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) cacheTtl := 5 * time.Minute - readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.Path), "", func(entry *filer_pb.Entry, isLast bool) { - fullpath := util.NewFullPath(dir.Path, entry.Name) + readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", func(entry *filer_pb.Entry, isLast bool) { + fullpath := util.NewFullPath(dir.FullPath(), entry.Name) inode := fullpath.AsInode() if entry.IsDirectory { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir} @@ -252,7 +254,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { dir.wfs.cacheSet(fullpath, entry, cacheTtl) }) if readErr != nil { - glog.V(0).Infof("list %s: %v", dir.Path, err) + glog.V(0).Infof("list %s: %v", dir.FullPath(), err) return ret, fuse.EIO } @@ -271,7 +273,7 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { - filePath := util.NewFullPath(dir.Path, req.Name) + filePath := util.NewFullPath(dir.FullPath(), req.Name) entry, err := filer_pb.GetEntry(dir.wfs, filePath) if err != nil { return err @@ -285,9 +287,9 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { dir.wfs.cacheDelete(filePath) glog.V(3).Infof("remove file: %v", req) - err = filer_pb.Remove(dir.wfs, dir.Path, req.Name, false, false, false) + err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false) if err != nil { - glog.V(3).Infof("not found remove file %s/%s: %v", dir.Path, req.Name, err) + glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err) return fuse.ENOENT } @@ -297,12 +299,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { - dir.wfs.cacheDelete(util.NewFullPath(dir.Path, req.Name)) + dir.wfs.cacheDelete(util.NewFullPath(dir.FullPath(), req.Name)) glog.V(3).Infof("remove directory entry: %v", req) - err := filer_pb.Remove(dir.wfs, dir.Path, req.Name, true, false, false) + err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false) if err != nil { - glog.V(3).Infof("not found remove %s/%s: %v", dir.Path, req.Name, err) + glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err) return fuse.ENOENT } return nil @@ -311,7 +313,7 @@ func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - glog.V(3).Infof("%v dir setattr %+v", dir.Path, req) + glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req) if err := dir.maybeLoadEntry(); err != nil { return err @@ -333,7 +335,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus dir.entry.Attributes.Mtime = req.Mtime.Unix() } - dir.wfs.cacheDelete(util.FullPath(dir.Path)) + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) return dir.saveEntry() @@ -341,7 +343,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name) if err := dir.maybeLoadEntry(); err != nil { return err @@ -351,7 +353,7 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { return err } - dir.wfs.cacheDelete(util.FullPath(dir.Path)) + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) return dir.saveEntry() @@ -359,7 +361,7 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name) if err := dir.maybeLoadEntry(); err != nil { return err @@ -369,7 +371,7 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e return err } - dir.wfs.cacheDelete(util.FullPath(dir.Path)) + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) return dir.saveEntry() @@ -377,7 +379,7 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - glog.V(4).Infof("dir Listxattr %s", dir.Path) + glog.V(4).Infof("dir Listxattr %s", dir.FullPath()) if err := dir.maybeLoadEntry(); err != nil { return err @@ -392,14 +394,14 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp } func (dir *Dir) Forget() { - glog.V(3).Infof("Forget dir %s", dir.Path) + glog.V(3).Infof("Forget dir %s", dir.FullPath()) - dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.Path)) + dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } func (dir *Dir) maybeLoadEntry() error { if dir.entry == nil { - parentDirPath, name := util.FullPath(dir.Path).DirAndName() + parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name) if err != nil { return err @@ -411,7 +413,7 @@ func (dir *Dir) maybeLoadEntry() error { func (dir *Dir) saveEntry() error { - parentDir, name := util.FullPath(dir.Path).DirAndName() + parentDir, name := util.FullPath(dir.FullPath()).DirAndName() return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { @@ -430,3 +432,27 @@ func (dir *Dir) saveEntry() error { return nil }) } + +func (dir *Dir) FullPath() string { + var parts []string + for p := dir; p != nil; p = p.parent { + if strings.HasPrefix(p.name, "/") { + if len(p.name) > 1 { + parts = append(parts, p.name[1:]) + } + } else { + parts = append(parts, p.name) + } + } + + if len(parts) == 0 { + return "/" + } + + var buf bytes.Buffer + for i := len(parts) - 1; i >= 0; i-- { + buf.WriteString("/") + buf.WriteString(parts[i]) + } + return buf.String() +} diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 61ed04c26..3c415c3a6 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -17,10 +17,10 @@ var _ = fs.NodeReadlinker(&File{}) func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) { - glog.V(3).Infof("Symlink: %v/%v to %v", dir.Path, req.NewName, req.Target) + glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: &filer_pb.Entry{ Name: req.NewName, IsDirectory: false, @@ -37,7 +37,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err) + glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err) return fuse.EIO } return nil @@ -59,7 +59,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri return "", fuse.Errno(syscall.EINVAL) } - glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.Path, file.Name, file.entry.Attributes.SymlinkTarget) + glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) return file.entry.Attributes.SymlinkTarget, nil diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index f0b3e2118..f8dbdbb6a 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -13,18 +13,19 @@ import ( func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { - newPath := util.NewFullPath(newDir.Path, req.NewName) - oldPath := util.NewFullPath(dir.Path, req.OldName) - newDir := newDirectory.(*Dir) + + newPath := util.NewFullPath(newDir.FullPath(), req.NewName) + oldPath := util.NewFullPath(dir.FullPath(), req.OldName) + glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ - OldDirectory: dir.Path, + OldDirectory: dir.FullPath(), OldName: req.OldName, - NewDirectory: newDir.Path, + NewDirectory: newDir.FullPath(), NewName: req.NewName, } diff --git a/weed/filesys/dir_test.go b/weed/filesys/dir_test.go new file mode 100644 index 000000000..7c439756f --- /dev/null +++ b/weed/filesys/dir_test.go @@ -0,0 +1,34 @@ +package filesys + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDirPath(t *testing.T) { + + p := &Dir{name:"/some"} + p = &Dir{name:"path", parent: p} + p = &Dir{name:"to", parent: p} + p = &Dir{name:"a", parent: p} + p = &Dir{name:"file", parent: p} + + assert.Equal(t, "/some/path/to/a/file", p.FullPath()) + + p = &Dir{name:"/some"} + assert.Equal(t, "/some", p.FullPath()) + + p = &Dir{name:"/"} + assert.Equal(t, "/", p.FullPath()) + + p = &Dir{name:"/"} + p = &Dir{name:"path", parent: p} + assert.Equal(t, "/path", p.FullPath()) + + p = &Dir{name:"/"} + p = &Dir{name:"path", parent: p} + p = &Dir{name:"to", parent: p} + assert.Equal(t, "/path/to", p.FullPath()) + +} diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 7e33c97a7..aa37b0841 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -78,11 +78,11 @@ func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chun // flush the new page if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil { if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) + glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) chunks = append(chunks, chunk) } } else { - glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 148f767ac..6f67081a6 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -38,7 +38,7 @@ type File struct { } func (file *File) fullpath() util.FullPath { - return util.NewFullPath(file.dir.Path, file.Name) + return util.NewFullPath(file.dir.FullPath(), file.Name) } func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { @@ -211,21 +211,21 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // fsync works at OS level // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req) + glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) return nil } func (file *File) Forget() { - glog.V(3).Infof("Forget file %s/%s", file.dir.Path, file.Name) + glog.V(3).Infof("Forget file %s/%s", file.dir.FullPath(), file.Name) - file.wfs.fsNodeCache.DeleteFsNode(util.NewFullPath(file.dir.Path, file.Name)) + file.wfs.fsNodeCache.DeleteFsNode(util.NewFullPath(file.dir.FullPath(), file.Name)) } func (file *File) maybeLoadEntry(ctx context.Context) error { if file.entry == nil || file.isOpen <= 0 { - entry, err := file.wfs.maybeLoadEntry(file.dir.Path, file.Name) + entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) if err != nil { return err } @@ -266,14 +266,14 @@ func (file *File) saveEntry() error { return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: file.dir.Path, + Directory: file.dir.FullPath(), Entry: file.entry, } glog.V(1).Infof("save file entry: %v", request) _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err) + glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) return fuse.EIO } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 1dfaf5944..e1e495b52 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -191,7 +191,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { } request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.Path, + Directory: fh.f.dir.FullPath(), Entry: fh.f.entry, } diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go index 974f378c7..08bd641a5 100644 --- a/weed/filesys/fscache.go +++ b/weed/filesys/fscache.go @@ -1,6 +1,8 @@ package filesys import ( + "sync" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse/fs" ) @@ -9,10 +11,11 @@ type FsCache struct { root *FsNode } type FsNode struct { - parent *FsNode - node fs.Node - name string - children map[string]*FsNode + parent *FsNode + node fs.Node + name string + childrenLock sync.RWMutex + children map[string]*FsNode } func newFsCache(root fs.Node) *FsCache { @@ -89,6 +92,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { } parent := target.parent src.name = target.name + if dir, ok := src.node.(*Dir); ok { + dir.name = target.name // target is not Dir, but a shortcut + } parent.deleteChild(target.name) target.deleteSelf() @@ -104,10 +110,18 @@ func (n *FsNode) connectToParent(parent *FsNode) { if oldNode != nil { oldNode.deleteSelf() } + if dir, ok := n.node.(*Dir); ok { + dir.parent = parent.node.(*Dir) + } + n.childrenLock.Lock() parent.children[n.name] = n + n.childrenLock.Unlock() } func (n *FsNode) findChild(name string) *FsNode { + n.childrenLock.RLock() + defer n.childrenLock.RUnlock() + child, found := n.children[name] if found { return child @@ -116,6 +130,9 @@ func (n *FsNode) findChild(name string) *FsNode { } func (n *FsNode) ensureChild(name string) *FsNode { + n.childrenLock.Lock() + defer n.childrenLock.Unlock() + if n.children == nil { n.children = make(map[string]*FsNode) } @@ -134,14 +151,20 @@ func (n *FsNode) ensureChild(name string) *FsNode { } func (n *FsNode) deleteChild(name string) { + n.childrenLock.Lock() delete(n.children, name) + n.childrenLock.Unlock() } func (n *FsNode) deleteSelf() { + n.childrenLock.Lock() for _, child := range n.children { child.deleteSelf() } + n.children = nil + n.childrenLock.Unlock() + n.node = nil n.parent = nil - n.children = nil + } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 5d3dd984a..5075687e3 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -60,9 +60,6 @@ type WFS struct { stats statsCache - // nodes, protected by nodesLock - nodesLock sync.Mutex - nodes map[uint64]fs.Node root fs.Node fsNodeCache *FsCache } @@ -81,10 +78,9 @@ func NewSeaweedFileSystem(option *Option) *WFS { return make([]byte, option.ChunkSizeLimit) }, }, - nodes: make(map[uint64]fs.Node), } - wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} wfs.fsNodeCache = newFsCache(wfs.root) return wfs