Browse Source

mount: internals switch to filer.Entry instead of protobuf

test_udp
Chris Lu 4 years ago
parent
commit
7d57664c2d
  1. 4
      weed/filer/filechunks.go
  2. 58
      weed/filesys/dir.go
  3. 10
      weed/filesys/dir_link.go
  4. 2
      weed/filesys/dirty_page.go
  5. 46
      weed/filesys/file.go
  6. 37
      weed/filesys/filehandle.go
  7. 2
      weed/filesys/fscache.go
  8. 2
      weed/filesys/wfs.go
  9. 13
      weed/filesys/xattr.go

4
weed/filer/filechunks.go

@ -23,6 +23,10 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
return return
} }
func FileSize2(entry *Entry) (size uint64) {
return maxUint64(TotalSize(entry.Chunks), entry.Attr.FileSize)
}
func FileSize(entry *filer_pb.Entry) (size uint64) { func FileSize(entry *filer_pb.Entry) (size uint64) {
return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
} }

58
weed/filesys/dir.go

@ -1,7 +1,6 @@
package filesys package filesys
import ( import (
"bytes"
"context" "context"
"math" "math"
"os" "os"
@ -22,7 +21,7 @@ import (
type Dir struct { type Dir struct {
name string name string
wfs *WFS wfs *WFS
entry *filer_pb.Entry
entry *filer.Entry
parent *Dir parent *Dir
} }
@ -59,11 +58,11 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
} }
// attr.Inode = util.FullPath(dir.FullPath()).AsInode() // attr.Inode = util.FullPath(dir.FullPath()).AsInode()
attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0)
attr.Gid = dir.entry.Attributes.Gid
attr.Uid = dir.entry.Attributes.Uid
attr.Mode = dir.entry.Attr.Mode | os.ModeDir
attr.Mtime = dir.entry.Attr.Mtime
attr.Crtime = dir.entry.Attr.Crtime
attr.Gid = dir.entry.Attr.Gid
attr.Uid = dir.entry.Attr.Uid
glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
@ -103,12 +102,13 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
} }
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
dirPath := dir.FullPath()
f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dirPath, name), func() fs.Node {
return &File{ return &File{
Name: name, Name: name,
dir: dir, dir: dir,
wfs: dir.wfs, wfs: dir.wfs,
entry: entry,
entry: filer.FromPbEntry(dirPath, entry),
entryViewCache: nil, entryViewCache: nil,
} }
}) })
@ -119,7 +119,7 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
return &Dir{name: entry.Name, wfs: dir.wfs, entry: filer.FromPbEntry(dir.FullPath(), entry), parent: dir}
}) })
d.(*Dir).parent = dir // in case dir node was created later d.(*Dir).parent = dir // in case dir node was created later
return d return d
@ -436,19 +436,19 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
} }
if req.Valid.Mode() { if req.Valid.Mode() {
dir.entry.Attributes.FileMode = uint32(req.Mode)
dir.entry.Attr.Mode = req.Mode
} }
if req.Valid.Uid() { if req.Valid.Uid() {
dir.entry.Attributes.Uid = req.Uid
dir.entry.Attr.Uid = req.Uid
} }
if req.Valid.Gid() { if req.Valid.Gid() {
dir.entry.Attributes.Gid = req.Gid
dir.entry.Attr.Gid = req.Gid
} }
if req.Valid.Mtime() { if req.Valid.Mtime() {
dir.entry.Attributes.Mtime = req.Mtime.Unix()
dir.entry.Attr.Mtime = req.Mtime
} }
return dir.saveEntry() return dir.saveEntry()
@ -527,12 +527,14 @@ func (dir *Dir) saveEntry() error {
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(dir.entry)
defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry)
pbEntry := dir.entry.ToProtoEntry()
dir.wfs.mapPbIdFromLocalToFiler(pbEntry)
defer dir.wfs.mapPbIdFromFilerToLocal(pbEntry)
request := &filer_pb.UpdateEntryRequest{ request := &filer_pb.UpdateEntryRequest{
Directory: parentDir, Directory: parentDir,
Entry: dir.entry,
Entry: pbEntry,
Signatures: []int32{dir.wfs.signature}, Signatures: []int32{dir.wfs.signature},
} }
@ -550,25 +552,5 @@ func (dir *Dir) saveEntry() error {
} }
func (dir *Dir) FullPath() string { func (dir *Dir) FullPath() string {
var parts []string
for p := dir; p != nil; p = p.parent {
if strings.HasPrefix(p.name, "/") {
if len(p.name) > 1 {
parts = append(parts, p.name[1:])
}
} else {
parts = append(parts, p.name)
}
}
if len(parts) == 0 {
return "/"
}
var buf bytes.Buffer
for i := len(parts) - 1; i >= 0; i-- {
buf.WriteString("/")
buf.WriteString(parts[i])
}
return buf.String()
return string(dir.entry.FullPath)
} }

10
weed/filesys/dir_link.go

@ -48,7 +48,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
oldEntry.HardLinkCounter++ oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldFile.dir.FullPath(), Directory: oldFile.dir.FullPath(),
Entry: oldEntry,
Entry: oldEntry.ToProtoEntry(),
Signatures: []int32{dir.wfs.signature}, Signatures: []int32{dir.wfs.signature},
} }
@ -58,7 +58,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
Entry: &filer_pb.Entry{ Entry: &filer_pb.Entry{
Name: req.NewName, Name: req.NewName,
IsDirectory: false, IsDirectory: false,
Attributes: oldEntry.Attributes,
Attributes: filer.EntryAttributeToPb(oldEntry),
Chunks: oldEntry.Chunks, Chunks: oldEntry.Chunks,
Extended: oldEntry.Extended, Extended: oldEntry.Extended,
HardLinkId: oldEntry.HardLinkId, HardLinkId: oldEntry.HardLinkId,
@ -152,12 +152,12 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri
return "", err return "", err
} }
if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 {
if entry.Attr.Mode&os.ModeSymlink == 0 {
return "", fuse.Errno(syscall.EINVAL) return "", fuse.Errno(syscall.EINVAL)
} }
glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget)
glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attr.SymlinkTarget)
return entry.Attributes.SymlinkTarget, nil
return entry.Attr.SymlinkTarget, nil
} }

2
weed/filesys/dirty_page.go

@ -74,7 +74,7 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
return false return false
} }
fileSize := int64(entry.Attributes.FileSize)
fileSize := int64(entry.Attr.FileSize)
chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
if chunkSize == 0 { if chunkSize == 0 {

46
weed/filesys/file.go

@ -33,7 +33,7 @@ type File struct {
Name string Name string
dir *Dir dir *Dir
wfs *WFS wfs *WFS
entry *filer_pb.Entry
entry *filer.Entry
entryLock sync.RWMutex entryLock sync.RWMutex
entryViewCache []filer.VisibleInterval entryViewCache []filer.VisibleInterval
isOpen int isOpen int
@ -58,16 +58,16 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
// attr.Inode = file.fullpath().AsInode() // attr.Inode = file.fullpath().AsInode()
attr.Valid = time.Second attr.Valid = time.Second
attr.Mode = os.FileMode(entry.Attributes.FileMode)
attr.Size = filer.FileSize(entry)
attr.Mode = os.FileMode(entry.Attr.Mode)
attr.Size = filer.FileSize2(entry)
if file.isOpen > 0 { if file.isOpen > 0 {
attr.Size = entry.Attributes.FileSize
attr.Size = entry.Attr.FileSize
glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
} }
attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
attr.Gid = entry.Attributes.Gid
attr.Uid = entry.Attributes.Uid
attr.Crtime = entry.Attr.Crtime
attr.Mtime = entry.Attr.Mtime
attr.Gid = entry.Attr.Gid
attr.Uid = entry.Attr.Uid
attr.Blocks = attr.Size/blockSize + 1 attr.Blocks = attr.Size/blockSize + 1
attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit) attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit)
if entry.HardLinkCounter > 0 { if entry.HardLinkCounter > 0 {
@ -126,7 +126,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
if req.Valid.Size() { if req.Valid.Size() {
glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks)) glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
if req.Size < filer.FileSize(entry) {
if req.Size < filer.FileSize2(entry) {
// fmt.Printf("truncate %v \n", fullPath) // fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk var chunks []*filer_pb.FileChunk
var truncatedChunks []*filer_pb.FileChunk var truncatedChunks []*filer_pb.FileChunk
@ -149,32 +149,32 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks) file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
file.reader = nil file.reader = nil
} }
entry.Attributes.FileSize = req.Size
entry.Attr.FileSize = req.Size
file.dirtyMetadata = true file.dirtyMetadata = true
} }
if req.Valid.Mode() { if req.Valid.Mode() {
entry.Attributes.FileMode = uint32(req.Mode)
entry.Attr.Mode = req.Mode
file.dirtyMetadata = true file.dirtyMetadata = true
} }
if req.Valid.Uid() { if req.Valid.Uid() {
entry.Attributes.Uid = req.Uid
entry.Attr.Uid = req.Uid
file.dirtyMetadata = true file.dirtyMetadata = true
} }
if req.Valid.Gid() { if req.Valid.Gid() {
entry.Attributes.Gid = req.Gid
entry.Attr.Gid = req.Gid
file.dirtyMetadata = true file.dirtyMetadata = true
} }
if req.Valid.Crtime() { if req.Valid.Crtime() {
entry.Attributes.Crtime = req.Crtime.Unix()
entry.Attr.Crtime = req.Crtime
file.dirtyMetadata = true file.dirtyMetadata = true
} }
if req.Valid.Mtime() { if req.Valid.Mtime() {
entry.Attributes.Mtime = req.Mtime.Unix()
entry.Attr.Mtime = req.Mtime
file.dirtyMetadata = true file.dirtyMetadata = true
} }
@ -259,7 +259,7 @@ func (file *File) Forget() {
file.wfs.fsNodeCache.DeleteFsNode(t) file.wfs.fsNodeCache.DeleteFsNode(t)
} }
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer.Entry, err error) {
entry = file.getEntry() entry = file.getEntry()
if file.isOpen > 0 { if file.isOpen > 0 {
return entry, nil return entry, nil
@ -330,7 +330,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
entry.Chunks = append(entry.Chunks, newChunks...) entry.Chunks = append(entry.Chunks, newChunks...)
} }
func (file *File) setEntry(entry *filer_pb.Entry) {
func (file *File) setEntry(entry *filer.Entry) {
file.entryLock.Lock() file.entryLock.Lock()
defer file.entryLock.Unlock() defer file.entryLock.Unlock()
file.entry = entry file.entry = entry
@ -346,15 +346,17 @@ func (file *File) clearEntry() {
file.reader = nil file.reader = nil
} }
func (file *File) saveEntry(entry *filer_pb.Entry) error {
func (file *File) saveEntry(entry *filer.Entry) error {
return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
file.wfs.mapPbIdFromLocalToFiler(entry)
defer file.wfs.mapPbIdFromFilerToLocal(entry)
pbEntry := entry.ToProtoEntry()
file.wfs.mapPbIdFromLocalToFiler(pbEntry)
defer file.wfs.mapPbIdFromFilerToLocal(pbEntry)
request := &filer_pb.UpdateEntryRequest{ request := &filer_pb.UpdateEntryRequest{
Directory: file.dir.FullPath(), Directory: file.dir.FullPath(),
Entry: entry,
Entry: pbEntry,
Signatures: []int32{file.wfs.signature}, Signatures: []int32{file.wfs.signature},
} }
@ -371,7 +373,7 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
}) })
} }
func (file *File) getEntry() *filer_pb.Entry {
func (file *File) getEntry() *filer.Entry {
file.entryLock.RLock() file.entryLock.RLock()
defer file.entryLock.RUnlock() defer file.entryLock.RUnlock()
return file.entry return file.entry

37
weed/filesys/filehandle.go

@ -6,7 +6,6 @@ import (
"io" "io"
"math" "math"
"net/http" "net/http"
"os"
"sync" "sync"
"time" "time"
@ -42,7 +41,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
} }
entry := fh.f.getEntry() entry := fh.f.getEntry()
if entry != nil { if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
entry.Attr.FileSize = filer.FileSize2(entry)
} }
return fh return fh
@ -110,7 +109,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
return 0, io.EOF return 0, io.EOF
} }
fileSize := int64(filer.FileSize(entry))
fileSize := int64(filer.FileSize2(entry))
fileFullPath := fh.f.fullpath() fileFullPath := fh.f.fullpath()
if fileSize == 0 { if fileSize == 0 {
@ -171,7 +170,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
} }
entry.Content = nil entry.Content = nil
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
entry.Attr.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attr.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data) fh.dirtyPages.AddPage(req.Offset, data)
@ -259,26 +258,24 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
return nil return nil
} }
if entry.Attributes != nil {
entry.Attributes.Mime = fh.contentType
if entry.Attributes.Uid == 0 {
entry.Attributes.Uid = header.Uid
}
if entry.Attributes.Gid == 0 {
entry.Attributes.Gid = header.Gid
}
if entry.Attributes.Crtime == 0 {
entry.Attributes.Crtime = time.Now().Unix()
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
entry.Attributes.Collection = fh.dirtyPages.collection
entry.Attributes.Replication = fh.dirtyPages.replication
entry.Attr.Mime = fh.contentType
if entry.Attr.Uid == 0 {
entry.Attr.Uid = header.Uid
}
if entry.Attr.Gid == 0 {
entry.Attr.Gid = header.Gid
}
if entry.Attr.Crtime.IsZero() {
entry.Attr.Crtime = time.Now()
} }
entry.Attr.Mtime = time.Now()
entry.Attr.Mode = entry.Attr.Mode &^ fh.f.wfs.option.Umask
entry.Attr.Collection = fh.dirtyPages.collection
entry.Attr.Replication = fh.dirtyPages.replication
request := &filer_pb.CreateEntryRequest{ request := &filer_pb.CreateEntryRequest{
Directory: fh.f.dir.FullPath(), Directory: fh.f.dir.FullPath(),
Entry: entry,
Entry: entry.ToProtoEntry(),
Signatures: []int32{fh.f.wfs.signature}, Signatures: []int32{fh.f.wfs.signature},
} }

2
weed/filesys/fscache.go

@ -126,7 +126,7 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
f.Name = target.name f.Name = target.name
entry := f.getEntry() entry := f.getEntry()
if entry != nil { if entry != nil {
entry.Name = f.Name
entry.FullPath = newPath
} }
} }
parent.disconnectChild(target) parent.disconnectChild(target)

2
weed/filesys/wfs.go

@ -131,7 +131,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}) })
entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath)) entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: filer.FromPbEntry(wfs.option.FilerMountRootPath,entry)}
wfs.fsNodeCache = newFsCache(wfs.root) wfs.fsNodeCache = newFsCache(wfs.root)
if wfs.option.ConcurrentWriters > 0 { if wfs.option.ConcurrentWriters > 0 {

13
weed/filesys/xattr.go

@ -2,6 +2,7 @@ package filesys
import ( import (
"context" "context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse"
@ -10,7 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
func getxattr(entry *filer.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
if entry == nil { if entry == nil {
return fuse.ErrNoXattr return fuse.ErrNoXattr
@ -38,7 +39,7 @@ func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.Getxa
} }
func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error {
func setxattr(entry *filer.Entry, req *fuse.SetxattrRequest) error {
if entry == nil { if entry == nil {
return fuse.EIO return fuse.EIO
@ -61,7 +62,7 @@ func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error {
} }
func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error {
func removexattr(entry *filer.Entry, req *fuse.RemovexattrRequest) error {
if entry == nil { if entry == nil {
return fuse.ErrNoXattr return fuse.ErrNoXattr
@ -83,7 +84,7 @@ func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error {
} }
func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
func listxattr(entry *filer.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
if entry == nil { if entry == nil {
return fuse.EIO return fuse.EIO
@ -108,7 +109,7 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis
} }
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer.Entry, err error) {
fullpath := util.NewFullPath(dir, name) fullpath := util.NewFullPath(dir, name)
// glog.V(3).Infof("read entry cache miss %s", fullpath) // glog.V(3).Infof("read entry cache miss %s", fullpath)
@ -119,5 +120,5 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err
if cacheErr == filer_pb.ErrNotFound { if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT return nil, fuse.ENOENT
} }
return cachedEntry.ToProtoEntry(), cacheErr
return cachedEntry, cacheErr
} }
Loading…
Cancel
Save