Browse Source

refactor: moved to locked entry

pull/4028/head
chrislu 2 years ago
parent
commit
94bc9afd9d
  1. 40
      weed/mount/filehandle.go
  2. 2
      weed/mount/filehandle_map.go
  3. 2
      weed/mount/filehandle_read.go
  4. 42
      weed/mount/locked_entry.go
  5. 6
      weed/mount/weedfs_attr.go
  6. 8
      weed/mount/weedfs_dir_lookup.go
  7. 2
      weed/mount/weedfs_dir_read.go
  8. 4
      weed/mount/weedfs_file_copy_range.go
  9. 4
      weed/mount/weedfs_file_lseek.go
  10. 5
      weed/mount/weedfs_file_sync.go
  11. 2
      weed/mount/weedfs_file_write.go
  12. 7
      weed/mount/weedfs_rename.go
  13. 12
      weed/mount/weedfs_xattr.go

40
weed/mount/filehandle.go

@ -1,27 +1,23 @@
package mount package mount
import ( import (
"golang.org/x/sync/semaphore"
"math"
"sync"
"golang.org/x/exp/slices"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/exp/slices"
"golang.org/x/sync/semaphore"
"math"
) )
type FileHandleId uint64 type FileHandleId uint64
type FileHandle struct { type FileHandle struct {
fh FileHandleId
counter int64
entry *filer_pb.Entry
entryLock sync.Mutex
inode uint64
wfs *WFS
fh FileHandleId
counter int64
entry *LockedEntry
inode uint64
wfs *WFS
// cache file has been written to // cache file has been written to
dirtyMetadata bool dirtyMetadata bool
@ -48,6 +44,9 @@ func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_p
if entry != nil { if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry) entry.Attributes.FileSize = filer.FileSize(entry)
} }
fh.entry = &LockedEntry{
Entry: entry,
}
return fh return fh
} }
@ -58,27 +57,18 @@ func (fh *FileHandle) FullPath() util.FullPath {
} }
func (fh *FileHandle) GetEntry() *filer_pb.Entry { func (fh *FileHandle) GetEntry() *filer_pb.Entry {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
return fh.entry
return fh.entry.GetEntry()
} }
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fh.entry = entry
fh.entry.SetEntry(entry)
} }
func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry { func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fn(fh.entry)
return fh.entry
return fh.entry.UpdateEntry(fn)
} }
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
if fh.entry == nil { if fh.entry == nil {
return return
@ -107,7 +97,7 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks)) glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))
fh.entry.Chunks = append(fh.entry.GetChunks(), newChunks...)
fh.entry.AppendChunks(newChunks)
fh.entryViewCache = nil fh.entryViewCache = nil
} }

2
weed/mount/filehandle_map.go

@ -50,7 +50,7 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil
} else { } else {
fh.counter++ fh.counter++
} }
if fh.entry != entry {
if fh.GetEntry() != entry {
fh.SetEntry(entry) fh.SetEntry(entry)
} }
return fh return fh

2
weed/mount/filehandle_read.go

@ -26,7 +26,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fileFullPath := fh.FullPath() fileFullPath := fh.FullPath()
entry := fh.entry
entry := fh.GetEntry()
if entry == nil { if entry == nil {
return 0, io.EOF return 0, io.EOF
} }

42
weed/mount/locked_entry.go

@ -0,0 +1,42 @@
package mount
import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"sync"
)
type LockedEntry struct {
*filer_pb.Entry
sync.RWMutex
}
func (le *LockedEntry) GetEntry() *filer_pb.Entry {
le.RLock()
defer le.RUnlock()
return le.Entry
}
func (le *LockedEntry) SetEntry(entry *filer_pb.Entry) {
le.Lock()
defer le.Unlock()
le.Entry = entry
}
func (le *LockedEntry) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
le.Lock()
defer le.Unlock()
fn(le.Entry)
return le.Entry
}
func (le *LockedEntry) GetChunks() []*filer_pb.FileChunk {
le.RLock()
defer le.RUnlock()
return le.Entry.Chunks
}
func (le *LockedEntry) AppendChunks(newChunks []*filer_pb.FileChunk) {
le.Lock()
defer le.Unlock()
le.Entry.Chunks = append(le.Entry.Chunks, newChunks...)
}

6
weed/mount/weedfs_attr.go

@ -25,7 +25,7 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
} else { } else {
if fh, found := wfs.fhmap.FindFileHandle(inode); found { if fh, found := wfs.fhmap.FindFileHandle(inode); found {
out.AttrValid = 1 out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry)
wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry())
out.Nlink = 0 out.Nlink = 0
return fuse.OK return fuse.OK
} }
@ -44,10 +44,6 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
if status != fuse.OK { if status != fuse.OK {
return status return status
} }
if fh != nil {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
}
if size, ok := input.GetSize(); ok && entry != nil { if size, ok := input.GetSize(); ok && entry != nil {
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks())) glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks()))

8
weed/mount/weedfs_dir_lookup.go

@ -58,12 +58,10 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true) inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
if fh, found := wfs.fhmap.FindFileHandle(inode); found { if fh, found := wfs.fhmap.FindFileHandle(inode); found {
fh.entryLock.Lock()
if fh.entry != nil {
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry))
localEntry = filer.FromPbEntry(string(dirPath), fh.entry)
if entry := fh.GetEntry(); entry != nil {
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
localEntry = filer.FromPbEntry(string(dirPath), entry)
} }
fh.entryLock.Unlock()
} }
wfs.outputFilerEntry(out, inode, localEntry) wfs.outputFilerEntry(out, inode, localEntry)

2
weed/mount/weedfs_dir_read.go

@ -173,7 +173,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
} }
if fh, found := wfs.fhmap.FindFileHandle(inode); found { if fh, found := wfs.fhmap.FindFileHandle(inode); found {
glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name)) glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
entry = filer.FromPbEntry(string(dirPath), fh.entry)
entry = filer.FromPbEntry(string(dirPath), fh.GetEntry())
} }
wfs.outputFilerEntry(entryOut, inode, entry) wfs.outputFilerEntry(entryOut, inode, entry)
} }

4
weed/mount/weedfs_file_copy_range.go

@ -46,8 +46,6 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
// lock source and target file handles // lock source and target file handles
fhOut.orderedMutex.Acquire(context.Background(), 1) fhOut.orderedMutex.Acquire(context.Background(), 1)
defer fhOut.orderedMutex.Release(1) defer fhOut.orderedMutex.Release(1)
fhOut.entryLock.Lock()
defer fhOut.entryLock.Unlock()
if fhOut.entry == nil { if fhOut.entry == nil {
return 0, fuse.ENOENT return 0, fuse.ENOENT
@ -56,8 +54,6 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
if fhIn.fh != fhOut.fh { if fhIn.fh != fhOut.fh {
fhIn.orderedMutex.Acquire(context.Background(), 1) fhIn.orderedMutex.Acquire(context.Background(), 1)
defer fhIn.orderedMutex.Release(1) defer fhIn.orderedMutex.Release(1)
fhIn.entryLock.Lock()
defer fhIn.entryLock.Unlock()
} }
// directories are not supported // directories are not supported

4
weed/mount/weedfs_file_lseek.go

@ -38,10 +38,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
// lock the file until the proper offset was calculated // lock the file until the proper offset was calculated
fh.orderedMutex.Acquire(context.Background(), 1) fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1) defer fh.orderedMutex.Release(1)
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fileSize := int64(filer.FileSize(fh.entry))
fileSize := int64(filer.FileSize(fh.GetEntry()))
offset := max(int64(in.Offset), 0) offset := max(int64(in.Offset), 0)
glog.V(4).Infof( glog.V(4).Infof(

5
weed/mount/weedfs_file_sync.go

@ -118,10 +118,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
entry := fh.entry
entry := fh.GetEntry()
if entry == nil { if entry == nil {
return nil return nil
} }

2
weed/mount/weedfs_file_write.go

@ -49,7 +49,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.orderedMutex.Acquire(context.Background(), 1) fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1) defer fh.orderedMutex.Release(1)
entry := fh.entry
entry := fh.GetEntry()
if entry == nil { if entry == nil {
return 0, fuse.OK return 0, fuse.OK
} }

7
weed/mount/weedfs_rename.go

@ -235,8 +235,11 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
sourceInode, targetInode := wfs.inodeToPath.MovePath(oldPath, newPath) sourceInode, targetInode := wfs.inodeToPath.MovePath(oldPath, newPath)
if sourceInode != 0 { if sourceInode != 0 {
if fh, foundFh := wfs.fhmap.FindFileHandle(sourceInode); foundFh && fh.entry != nil {
fh.entry.Name = newName
fh, foundFh := wfs.fhmap.FindFileHandle(sourceInode)
if foundFh {
if entry := fh.GetEntry(); entry != nil {
entry.Name = newName
}
} }
// invalidate attr and data // invalidate attr and data
// wfs.fuseServer.InodeNotify(sourceInode, 0, -1) // wfs.fuseServer.InodeNotify(sourceInode, 0, -1)

12
weed/mount/weedfs_xattr.go

@ -103,17 +103,13 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
} }
} }
path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK { if status != fuse.OK {
return status return status
} }
if entry == nil { if entry == nil {
return fuse.ENOENT return fuse.ENOENT
} }
if fh != nil {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
}
if entry.Extended == nil { if entry.Extended == nil {
entry.Extended = make(map[string][]byte) entry.Extended = make(map[string][]byte)
@ -181,17 +177,13 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr
if len(attr) == 0 { if len(attr) == 0 {
return fuse.EINVAL return fuse.EINVAL
} }
path, fh, entry, status := wfs.maybeReadEntry(header.NodeId)
path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK { if status != fuse.OK {
return status return status
} }
if entry == nil { if entry == nil {
return fuse.OK return fuse.OK
} }
if fh != nil {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
}
if entry.Extended == nil { if entry.Extended == nil {
return fuse.ENOATTR return fuse.ENOATTR

Loading…
Cancel
Save