Browse Source

fix fscache move

pull/1255/head
Chris Lu 5 years ago
parent
commit
2e4fadd10a
  1. 10
      weed/filesys/dir.go
  2. 29
      weed/filesys/dir_rename.go
  3. 2
      weed/filesys/file.go
  4. 147
      weed/filesys/fscache.go
  5. 90
      weed/filesys/fscache_test.go
  6. 33
      weed/filesys/wfs.go
  7. 8
      weed/util/fullpath.go

10
weed/filesys/dir.go

@ -86,7 +86,7 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
} }
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
return dir.wfs.getNode(util.NewFullPath(dir.Path, name), func() fs.Node {
return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.Path, name), func() fs.Node {
return &File{ return &File{
Name: name, Name: name,
dir: dir, dir: dir,
@ -98,9 +98,11 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
} }
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
return dir.wfs.getNode(fullpath, func() fs.Node {
return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
return &Dir{Path: string(fullpath), wfs: dir.wfs, entry: entry} return &Dir{Path: string(fullpath), wfs: dir.wfs, entry: entry}
}) })
} }
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
@ -182,7 +184,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
}) })
if err == nil { if err == nil {
node := dir.newDirectory(util.NewFullPath(dir.Path, req.Name), newEntry)
node := dir.newDirectory(util.NewFullPath(dir.Path(), req.Name), newEntry)
return node, nil return node, nil
} }
@ -392,7 +394,7 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
func (dir *Dir) Forget() { func (dir *Dir) Forget() {
glog.V(3).Infof("Forget dir %s", dir.Path) glog.V(3).Infof("Forget dir %s", dir.Path)
dir.wfs.forgetNode(util.FullPath(dir.Path))
dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.Path))
} }
func (dir *Dir) maybeLoadEntry() error { func (dir *Dir) maybeLoadEntry() error {

29
weed/filesys/dir_rename.go

@ -2,6 +2,7 @@ package filesys
import ( import (
"context" "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -12,8 +13,11 @@ import (
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
newPath := util.NewFullPath(newDir.Path, req.NewName)
oldPath := util.NewFullPath(dir.Path, req.OldName)
newDir := newDirectory.(*Dir) newDir := newDirectory.(*Dir)
glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName)
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
@ -26,7 +30,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
_, err := client.AtomicRenameEntry(context.Background(), request) _, err := client.AtomicRenameEntry(context.Background(), request)
if err != nil { if err != nil {
glog.V(0).Infof("dir Rename %s/%s => %s/%s : %v", dir.Path, req.OldName, newDir.Path, req.NewName, err)
glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
return fuse.EIO return fuse.EIO
} }
@ -35,29 +39,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
}) })
if err == nil { if err == nil {
newPath := util.NewFullPath(newDir.Path, req.NewName)
oldPath := util.NewFullPath(dir.Path, req.OldName)
dir.wfs.cacheDelete(newPath) dir.wfs.cacheDelete(newPath)
dir.wfs.cacheDelete(oldPath) dir.wfs.cacheDelete(oldPath)
oldFileNode := dir.wfs.getNode(oldPath, func() fs.Node {
return nil
})
newDirNode := dir.wfs.getNode(util.FullPath(newDir.Path), func() fs.Node {
return nil
})
// fmt.Printf("new path: %v dir: %v node:%+v\n", newPath, newDir.Path, newDirNode)
dir.wfs.forgetNode(newPath)
dir.wfs.forgetNode(oldPath)
if oldFileNode != nil && newDirNode != nil {
oldFile := oldFileNode.(*File)
oldFile.Name = req.NewName
oldFile.dir = newDirNode.(*Dir)
dir.wfs.getNode(newPath, func() fs.Node {
return oldFile
})
fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
dir.wfs.fsNodeCache.Move(oldPath, newPath)
}
} }
return err return err

2
weed/filesys/file.go

@ -219,7 +219,7 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
func (file *File) Forget() { func (file *File) Forget() {
glog.V(3).Infof("Forget file %s/%s", file.dir.Path, file.Name) glog.V(3).Infof("Forget file %s/%s", file.dir.Path, file.Name)
file.wfs.forgetNode(util.NewFullPath(file.dir.Path, file.Name))
file.wfs.fsNodeCache.DeleteFsNode(util.NewFullPath(file.dir.Path, file.Name))
} }

147
weed/filesys/fscache.go

@ -0,0 +1,147 @@
package filesys
import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse/fs"
)
type FsCache struct {
root *FsNode
}
type FsNode struct {
parent *FsNode
node fs.Node
name string
children map[string]*FsNode
}
func newFsCache(root fs.Node) *FsCache {
return &FsCache{
root: &FsNode{
node: root,
},
}
}
func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
if t == nil {
return nil
}
}
return t.node
}
func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
t := c.root
for _, p := range path.Split() {
t = t.ensureChild(p)
}
t.node = node
}
func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
t := c.GetFsNode(path)
if t != nil {
return t
}
t = genNodeFn()
c.SetFsNode(path, t)
return t
}
func (c *FsCache) DeleteFsNode(path util.FullPath) {
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
if t == nil {
return
}
}
if t.parent != nil {
t.parent.deleteChild(t.name)
}
t.deleteSelf()
}
// oldPath and newPath are full path including the new name
func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
// find old node
src := c.root
for _, p := range oldPath.Split() {
src = src.findChild(p)
if src == nil {
return src
}
}
if src.parent != nil {
src.parent.deleteChild(src.name)
}
src.parent = nil
// find new node
target := c.root
for _, p := range newPath.Split() {
target = target.ensureChild(p)
}
parent := target.parent
src.name = target.name
parent.deleteChild(target.name)
target.deleteSelf()
src.connectToParent(parent)
return src
}
func (n *FsNode) connectToParent(parent *FsNode) {
n.parent = parent
oldNode := parent.findChild(n.name)
if oldNode != nil {
oldNode.deleteSelf()
}
parent.children[n.name] = n
}
func (n *FsNode) findChild(name string) *FsNode {
child, found := n.children[name]
if found {
return child
}
return nil
}
func (n *FsNode) ensureChild(name string) *FsNode {
if n.children == nil {
n.children = make(map[string]*FsNode)
}
child, found := n.children[name]
if found {
return child
}
t := &FsNode{
parent: n,
node: nil,
name: name,
children: nil,
}
n.children[name] = t
return t
}
func (n *FsNode) deleteChild(name string) {
delete(n.children, name)
}
func (n *FsNode) deleteSelf() {
for _, child := range n.children {
child.deleteSelf()
}
n.node = nil
n.parent = nil
n.children = nil
}

90
weed/filesys/fscache_test.go

@ -0,0 +1,90 @@
package filesys
import (
"testing"
"github.com/chrislusf/seaweedfs/weed/util"
)
func TestPathSplit(t *testing.T) {
parts := util.FullPath("/").Split()
if len(parts) != 0 {
t.Errorf("expecting an empty list, but getting %d", len(parts))
}
}
func TestFsCache(t *testing.T) {
cache := newFsCache(nil)
x := cache.GetFsNode(util.FullPath("/y/x"))
if x != nil {
t.Errorf("wrong node!")
}
p := util.FullPath("/a/b/c")
cache.SetFsNode(p, &File{Name: "cc"})
tNode := cache.GetFsNode(p)
tFile := tNode.(*File)
if tFile.Name != "cc" {
t.Errorf("expecting a FsNode")
}
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
b := cache.GetFsNode(util.FullPath("/a/b"))
if b != nil {
t.Errorf("unexpected node!")
}
a := cache.GetFsNode(util.FullPath("/a"))
if a == nil {
t.Errorf("missing node!")
}
cache.DeleteFsNode(util.FullPath("/a"))
if b != nil {
t.Errorf("unexpected node!")
}
a = cache.GetFsNode(util.FullPath("/a"))
if a != nil {
t.Errorf("wrong DeleteFsNode!")
}
z := cache.GetFsNode(util.FullPath("/z"))
if z == nil {
t.Errorf("missing node!")
}
y := cache.GetFsNode(util.FullPath("/x/y"))
if y != nil {
t.Errorf("wrong node!")
}
}
func TestFsCacheMove(t *testing.T) {
cache := newFsCache(nil)
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
d := cache.GetFsNode(util.FullPath("/z/x/d"))
if d == nil {
t.Errorf("unexpected nil node!")
}
if d.(*File).Name != "dd" {
t.Errorf("unexpected non dd node!")
}
}

33
weed/filesys/wfs.go

@ -61,9 +61,10 @@ type WFS struct {
stats statsCache stats statsCache
// nodes, protected by nodesLock // nodes, protected by nodesLock
nodesLock sync.Mutex
nodes map[uint64]fs.Node
root fs.Node
nodesLock sync.Mutex
nodes map[uint64]fs.Node
root fs.Node
fsNodeCache *FsCache
} }
type statsCache struct { type statsCache struct {
filer_pb.StatisticsResponse filer_pb.StatisticsResponse
@ -84,9 +85,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
} }
wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs} wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}
wfs.getNode(util.FullPath(wfs.option.FilerMountRootPath), func() fs.Node {
return wfs.root
})
wfs.fsNodeCache = newFsCache(wfs.root)
return wfs return wfs
} }
@ -235,28 +234,6 @@ func (wfs *WFS) cacheDelete(path util.FullPath) {
wfs.listDirectoryEntriesCache.Delete(string(path)) wfs.listDirectoryEntriesCache.Delete(string(path))
} }
func (wfs *WFS) getNode(fullpath util.FullPath, fn func() fs.Node) fs.Node {
wfs.nodesLock.Lock()
defer wfs.nodesLock.Unlock()
node, found := wfs.nodes[fullpath.AsInode()]
if found {
return node
}
node = fn()
if node != nil {
wfs.nodes[fullpath.AsInode()] = node
}
return node
}
func (wfs *WFS) forgetNode(fullpath util.FullPath) {
wfs.nodesLock.Lock()
defer wfs.nodesLock.Unlock()
delete(wfs.nodes, fullpath.AsInode())
}
func (wfs *WFS) AdjustedUrl(hostAndPort string) string { func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
if !wfs.option.OutsideContainerClusterMode { if !wfs.option.OutsideContainerClusterMode {
return hostAndPort return hostAndPort

8
weed/util/fullpath.go

@ -38,3 +38,11 @@ func (fp FullPath) Child(name string) FullPath {
func (fp FullPath) AsInode() uint64 { func (fp FullPath) AsInode() uint64 {
return uint64(HashStringToLong(string(fp))) return uint64(HashStringToLong(string(fp)))
} }
// split, but skipping the root
func (fp FullPath) Split() []string {
if fp == "" || fp == "/"{
return []string{}
}
return strings.Split(string(fp)[1:], "/")
}
Loading…
Cancel
Save