Browse Source

mount: support extended attributes

pull/1158/head
Chris Lu 5 years ago
parent
commit
33b4b1868b
  1. 139
      weed/filesys/dir.go
  2. 2
      weed/filesys/dir_link.go
  3. 117
      weed/filesys/file.go
  4. 134
      weed/filesys/xattr.go

139
weed/filesys/dir.go

@ -16,7 +16,7 @@ import (
type Dir struct { type Dir struct {
Path string Path string
wfs *WFS wfs *WFS
attributes *filer_pb.FuseAttributes
entry *filer_pb.Entry
} }
var _ = fs.Node(&Dir{}) var _ = fs.Node(&Dir{})
@ -27,6 +27,10 @@ var _ = fs.HandleReadDirAller(&Dir{})
var _ = fs.NodeRemover(&Dir{}) var _ = fs.NodeRemover(&Dir{})
var _ = fs.NodeRenamer(&Dir{}) var _ = fs.NodeRenamer(&Dir{})
var _ = fs.NodeSetattrer(&Dir{}) var _ = fs.NodeSetattrer(&Dir{})
var _ = fs.NodeGetxattrer(&Dir{})
var _ = fs.NodeSetxattrer(&Dir{})
var _ = fs.NodeRemovexattrer(&Dir{})
var _ = fs.NodeListxattrer(&Dir{})
func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
@ -40,46 +44,28 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
return nil return nil
} }
item := dir.wfs.listDirectoryEntriesCache.Get(dir.Path)
var entry *filer_pb.Entry
if item != nil && !item.Expired() {
entry = item.Value().(*filer_pb.Entry)
if err := dir.maybeLoadEntry(ctx); err != nil {
return err
} }
if entry != nil {
entry := item.Value().(*filer_pb.Entry)
attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
attr.Ctime = time.Unix(entry.Attributes.Crtime, 0)
attr.Mode = os.FileMode(entry.Attributes.FileMode)
attr.Gid = entry.Attributes.Gid
attr.Uid = entry.Attributes.Uid
attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
attr.Ctime = time.Unix(dir.entry.Attributes.Crtime, 0)
attr.Gid = dir.entry.Attributes.Gid
attr.Uid = dir.entry.Attributes.Uid
return nil return nil
} }
glog.V(3).Infof("dir Attr cache miss %s", dir.Path)
func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path)
if err != nil {
glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err)
glog.V(4).Infof("dir Getxattr %s", dir.Path)
if err := dir.maybeLoadEntry(ctx); err != nil {
return err return err
} }
if entry == nil {
return fuse.ENOENT
}
dir.attributes = entry.Attributes
glog.V(2).Infof("dir %s: %v perm: %v", dir.Path, dir.attributes, os.FileMode(dir.attributes.FileMode))
attr.Mode = os.FileMode(dir.attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(dir.attributes.Mtime, 0)
attr.Ctime = time.Unix(dir.attributes.Crtime, 0)
attr.Gid = dir.attributes.Gid
attr.Uid = dir.attributes.Uid
return nil
return getxattr(dir.entry, req, resp)
} }
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
@ -207,7 +193,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
if entry != nil { if entry != nil {
if entry.IsDirectory { if entry.IsDirectory {
node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, attributes: entry.Attributes}
node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, entry: entry}
} else { } else {
node = dir.newFile(req.Name, entry) node = dir.newFile(req.Name, entry)
} }
@ -316,48 +302,111 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
if dir.attributes == nil {
return nil
if err := dir.maybeLoadEntry(ctx); err != nil {
return err
} }
glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle) glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle)
if req.Valid.Mode() { if req.Valid.Mode() {
dir.attributes.FileMode = uint32(req.Mode)
dir.entry.Attributes.FileMode = uint32(req.Mode)
} }
if req.Valid.Uid() { if req.Valid.Uid() {
dir.attributes.Uid = req.Uid
dir.entry.Attributes.Uid = req.Uid
} }
if req.Valid.Gid() { if req.Valid.Gid() {
dir.attributes.Gid = req.Gid
dir.entry.Attributes.Gid = req.Gid
} }
if req.Valid.Mtime() { if req.Valid.Mtime() {
dir.attributes.Mtime = req.Mtime.Unix()
dir.entry.Attributes.Mtime = req.Mtime.Unix()
} }
dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) dir.wfs.listDirectoryEntriesCache.Delete(dir.Path)
return dir.saveEntry(ctx)
}
func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name)
if err := dir.maybeLoadEntry(ctx); err != nil {
return err
}
if err := setxattr(dir.entry, req); err != nil {
return err
}
return dir.saveEntry(ctx)
}
func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name)
if err := dir.maybeLoadEntry(ctx); err != nil {
return err
}
if err := removexattr(dir.entry, req); err != nil {
return err
}
return dir.saveEntry(ctx)
}
func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
glog.V(4).Infof("dir Listxattr %s: %s", dir.Path)
if err := dir.maybeLoadEntry(ctx); err != nil {
return err
}
if err := listxattr(dir.entry, req, resp); err != nil {
return err
}
return dir.saveEntry(ctx)
}
func (dir *Dir) maybeLoadEntry(ctx context.Context) error {
if dir.entry == nil {
parentDirPath, name := filer2.FullPath(dir.Path).DirAndName()
entry, err := dir.wfs.maybeLoadEntry(ctx, parentDirPath, name)
dir.entry = entry
if err != nil {
return err
}
}
return nil
}
func (dir *Dir) saveEntry(ctx context.Context) error {
parentDir, name := filer2.FullPath(dir.Path).DirAndName() parentDir, name := filer2.FullPath(dir.Path).DirAndName()
return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{ request := &filer_pb.UpdateEntryRequest{
Directory: parentDir, Directory: parentDir,
Entry: &filer_pb.Entry{
Name: name,
Attributes: dir.attributes,
},
Entry: dir.entry,
} }
glog.V(1).Infof("set attr directory entry: %v", request)
glog.V(1).Infof("save dir entry: %v", request)
_, err := client.UpdateEntry(ctx, request) _, err := client.UpdateEntry(ctx, request)
if err != nil { if err != nil {
glog.V(0).Infof("UpdateEntry %s: %v", dir.Path, err)
glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO return fuse.EIO
} }
return nil return nil
}) })
} }

2
weed/filesys/dir_link.go

@ -51,7 +51,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) { func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) {
if err := file.maybeLoadAttributes(ctx); err != nil {
if err := file.maybeLoadEntry(ctx); err != nil {
return "", err return "", err
} }

117
weed/filesys/file.go

@ -20,6 +20,10 @@ var _ = fs.Node(&File{})
var _ = fs.NodeOpener(&File{}) var _ = fs.NodeOpener(&File{})
var _ = fs.NodeFsyncer(&File{}) var _ = fs.NodeFsyncer(&File{})
var _ = fs.NodeSetattrer(&File{}) var _ = fs.NodeSetattrer(&File{})
var _ = fs.NodeGetxattrer(&File{})
var _ = fs.NodeSetxattrer(&File{})
var _ = fs.NodeRemovexattrer(&File{})
var _ = fs.NodeListxattrer(&File{})
type File struct { type File struct {
Name string Name string
@ -38,7 +42,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
glog.V(4).Infof("file Attr %s", file.fullpath()) glog.V(4).Infof("file Attr %s", file.fullpath())
if err := file.maybeLoadAttributes(ctx); err != nil {
if err := file.maybeLoadEntry(ctx); err != nil {
return err return err
} }
@ -54,6 +58,17 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
} }
func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
glog.V(4).Infof("file Getxattr %s", file.fullpath())
if err := file.maybeLoadEntry(ctx); err != nil {
return err
}
return getxattr(file.entry, req, resp)
}
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
glog.V(4).Infof("file %v open %+v", file.fullpath(), req) glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
@ -72,7 +87,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
if err := file.maybeLoadAttributes(ctx); err != nil {
if err := file.maybeLoadEntry(ctx); err != nil {
return err return err
} }
@ -113,75 +128,74 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.wfs.listDirectoryEntriesCache.Delete(file.fullpath()) file.wfs.listDirectoryEntriesCache.Delete(file.fullpath())
return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
return file.saveEntry(ctx)
request := &filer_pb.UpdateEntryRequest{
Directory: file.dir.Path,
Entry: file.entry,
} }
glog.V(1).Infof("set attr file entry: %v", request)
_, err := client.UpdateEntry(ctx, request)
if err != nil {
glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err)
return fuse.EIO
func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name)
if err := file.maybeLoadEntry(ctx); err != nil {
return err
} }
return nil
})
if err := setxattr(file.entry, req); err != nil {
return err
}
return file.saveEntry(ctx)
} }
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
return nil
glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name)
if err := file.maybeLoadEntry(ctx); err != nil {
return err
} }
func (file *File) maybeLoadAttributes(ctx context.Context) error {
if file.entry == nil || !file.isOpen {
item := file.wfs.listDirectoryEntriesCache.Get(file.fullpath())
var entry *filer_pb.Entry
if item != nil && !item.Expired() {
entry = item.Value().(*filer_pb.Entry)
if err := removexattr(file.entry, req); err != nil {
return err
} }
if entry != nil {
glog.V(4).Infof("file read attr cache hit %s", file.fullpath())
return file.saveEntry(ctx)
file.setEntry(entry)
// glog.V(1).Infof("file attr read cached %v attributes", file.Name)
} else {
}
glog.V(3).Infof("file read attr cache miss %s", file.fullpath())
func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("file Listxattr %s: %s", file.fullpath())
request := &filer_pb.LookupDirectoryEntryRequest{
Name: file.Name,
Directory: file.dir.Path,
if err := file.maybeLoadEntry(ctx); err != nil {
return err
} }
resp, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
glog.V(3).Infof("file attr read file %v: %v", request, err)
return fuse.ENOENT
if err := listxattr(file.entry, req, resp); err != nil {
return err
} }
file.setEntry(resp.Entry)
return file.saveEntry(ctx)
glog.V(3).Infof("file attr %v %+v: %d", file.fullpath(), file.entry.Attributes, filer2.TotalSize(file.entry.Chunks))
}
// file.wfs.listDirectoryEntriesCache.Set(file.fullpath(), file.entry, file.wfs.option.EntryCacheTtl)
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
return nil return nil
})
}
func (file *File) maybeLoadEntry(ctx context.Context) error {
if file.entry == nil || !file.isOpen {
entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name)
if err != nil { if err != nil {
return err return err
} }
if entry != nil {
file.setEntry(entry)
} }
} }
return nil return nil
@ -216,3 +230,22 @@ func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry file.entry = entry
file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
} }
func (file *File) saveEntry(ctx context.Context) error {
return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: file.dir.Path,
Entry: file.entry,
}
glog.V(1).Infof("save file entry: %v", request)
_, err := client.UpdateEntry(ctx, request)
if err != nil {
glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err)
return fuse.EIO
}
return nil
})
}

134
weed/filesys/xattr.go

@ -0,0 +1,134 @@
package filesys
import (
"context"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
)
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
if entry == nil {
return fuse.ErrNoXattr
}
if entry.Extended == nil {
return fuse.ErrNoXattr
}
data, found := entry.Extended[req.Name]
if !found {
return fuse.ErrNoXattr
}
if req.Position < uint32(len(data)) {
size := req.Size
if req.Position+size >= uint32(len(data)) {
size = uint32(len(data)) - req.Position
}
resp.Xattr = data[req.Position : req.Position+size]
}
return nil
}
func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error {
if entry == nil {
return fuse.EIO
}
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
data, _ := entry.Extended[req.Name]
newData := make([]byte, int(req.Position)+len(req.Xattr))
copy(newData, data)
copy(newData[int(req.Position):], req.Xattr)
entry.Extended[req.Name] = newData
return nil
}
func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error {
if entry == nil {
return fuse.ErrNoXattr
}
if entry.Extended == nil {
return fuse.ErrNoXattr
}
_, found := entry.Extended[req.Name]
if !found {
return fuse.ErrNoXattr
}
delete(entry.Extended, req.Name)
return nil
}
func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
if entry == nil {
return fuse.EIO
}
for k := range entry.Extended {
resp.Append(k)
}
size := req.Size
if req.Position+size >= uint32(len(resp.Xattr)) {
size = uint32(len(resp.Xattr)) - req.Position
}
resp.Xattr = resp.Xattr[req.Position : req.Position+size]
return nil
}
func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *filer_pb.Entry, err error) {
fullpath := filepath.Join(dir, name)
item := wfs.listDirectoryEntriesCache.Get(fullpath)
if item != nil && !item.Expired() {
entry = item.Value().(*filer_pb.Entry)
return
}
glog.V(3).Infof("read entry cache miss %s", fullpath)
err = wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
resp, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
glog.V(3).Infof("file attr read file %v: %v", request, err)
return fuse.ENOENT
}
entry = resp.Entry
if entry != nil {
wfs.listDirectoryEntriesCache.Set(fullpath, entry, wfs.option.EntryCacheTtl)
}
return nil
})
return
}
Loading…
Cancel
Save