From 6b48d246a5d7943527d1948e352c9906e9d01a17 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 22 Jan 2020 13:42:03 -0800 Subject: [PATCH] mount: read data that is just written able read on data not flushed multiple file open shares the same file handle fix https://github.com/chrislusf/seaweedfs/issues/1182 on linux --- weed/filesys/dir.go | 3 +- weed/filesys/dirty_page.go | 23 +++++++++++++++ weed/filesys/file.go | 12 ++++---- weed/filesys/filehandle.go | 57 ++++++++++++++++++++++++++++---------- weed/filesys/wfs.go | 11 ++++++++ 5 files changed, 84 insertions(+), 22 deletions(-) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 3c1672911..91e42fc0a 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -146,9 +146,8 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, } file := node.(*File) - file.isOpen = true + file.isOpen++ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) - fh.dirtyMetadata = true return file, fh, nil } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index f83944678..7a466b506 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -222,3 +222,26 @@ func max(x, y int64) int64 { } return y } +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} + +func (pages *ContinuousDirtyPages) ReadDirtyData(ctx context.Context, data []byte, startOffset int64) (offset int64, size int, err error) { + bufSize := int64(len(data)) + if startOffset+bufSize < pages.Offset { + return + } + if startOffset >= pages.Offset+pages.Size { + return + } + + offset = max(pages.Offset, startOffset) + stopOffset := min(pages.Offset+pages.Size, startOffset+bufSize) + size = int(stopOffset - offset) + copy(data[offset-startOffset:], pages.Data[offset-pages.Offset:stopOffset-pages.Offset]) + + return +} diff --git a/weed/filesys/file.go b/weed/filesys/file.go index d811cb179..e15d55b5b 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -31,7 +31,7 @@ type File struct { wfs *WFS entry *filer_pb.Entry entryViewCache []filer2.VisibleInterval - isOpen bool + isOpen int } func (file *File) fullpath() filer2.FullPath { @@ -42,7 +42,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) - if !file.isOpen { + if file.isOpen <=0 { if err := file.maybeLoadEntry(ctx); err != nil { return err } @@ -52,7 +52,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Valid = time.Second attr.Mode = os.FileMode(file.entry.Attributes.FileMode) attr.Size = filer2.TotalSize(file.entry.Chunks) - if file.isOpen { + if file.isOpen > 0 { attr.Size = file.entry.Attributes.FileSize } attr.Crtime = time.Unix(file.entry.Attributes.Crtime, 0) @@ -81,7 +81,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - file.isOpen = true + file.isOpen++ handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) @@ -140,7 +140,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.entry.Attributes.Mtime = req.Mtime.Unix() } - if file.isOpen { + if file.isOpen > 0 { return nil } @@ -218,7 +218,7 @@ func (file *File) Forget() { } func (file *File) maybeLoadEntry(ctx context.Context) error { - if file.entry == nil || !file.isOpen { + if file.entry == nil || file.isOpen <= 0{ entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name) if err != nil { return err diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 101f5c056..981de7ea2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -7,10 +7,11 @@ import ( "path" "time" + "github.com/gabriel-vasile/mimetype" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/gabriel-vasile/mimetype" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -50,29 +51,50 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) + buff := make([]byte, req.Size) + + totalRead, err := fh.readFromChunks(ctx, buff, req.Offset) + if err == nil { + dirtyOffset, dirtySize, dirtyReadErr := fh.readFromDirtyPages(ctx, buff, req.Offset) + if dirtyReadErr == nil && totalRead+req.Offset < dirtyOffset+int64(dirtySize) { + totalRead = dirtyOffset + int64(dirtySize) - req.Offset + } + } + + resp.Data = buff[:totalRead] + + if err != nil { + glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) + } + + return err +} + +func (fh *FileHandle) readFromDirtyPages(ctx context.Context, buff []byte, startOffset int64) (offset int64, size int, err error) { + return fh.dirtyPages.ReadDirtyData(ctx, buff, startOffset) +} + +func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset int64) (int64, error) { + // this value should come from the filer instead of the old f if len(fh.f.entry.Chunks) == 0 { glog.V(1).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name) - return nil + return 0, nil } - buff := make([]byte, req.Size) - if fh.f.entryViewCache == nil { fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) } - chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size) - - totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset) + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff)) - resp.Data = buff[:totalRead] + totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset) if err != nil { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - return err + return totalRead, err } // Write to the file handle @@ -115,11 +137,12 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle) - fh.dirtyPages.releaseResource() + fh.f.isOpen-- - fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) - - fh.f.isOpen = false + if fh.f.isOpen <= 0 { + fh.dirtyPages.releaseResource() + fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + } return nil } @@ -141,7 +164,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } - return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType @@ -178,4 +201,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil }) + + if err == nil { + fh.dirtyMetadata = false + } + + return err } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index a2e5a9073..4cfab811b 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -49,6 +49,7 @@ type WFS struct { // contains all open handles, protected by handlesLock handlesLock sync.Mutex handles []*FileHandle + pathToHandleIndex map[filer2.FullPath]int bufPool sync.Pool @@ -68,6 +69,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), + pathToHandleIndex: make(map[filer2.FullPath]int), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) @@ -102,11 +104,18 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() + index, found := wfs.pathToHandleIndex[fullpath] + if found && wfs.handles[index] != nil { + glog.V(2).Infoln(fullpath, "found fileHandle id", index) + return wfs.handles[index] + } + fileHandle = newFileHandle(file, uid, gid) for i, h := range wfs.handles { if h == nil { wfs.handles[i] = fileHandle fileHandle.handle = uint64(i) + wfs.pathToHandleIndex[fullpath] = i glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle) return } @@ -114,6 +123,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handles = append(wfs.handles, fileHandle) fileHandle.handle = uint64(len(wfs.handles) - 1) + wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle) return @@ -124,6 +134,7 @@ func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) defer wfs.handlesLock.Unlock() glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) + delete(wfs.pathToHandleIndex, fullpath) if int(handleId) < len(wfs.handles) { wfs.handles[int(handleId)] = nil }