Browse Source

mount switch to ordered lock requests

pull/4859/head
chrislu 1 year ago
parent
commit
31fc165715
  1. 6
      weed/mount/filehandle.go
  2. 2
      weed/mount/weedfs.go
  3. 9
      weed/mount/weedfs_file_copy_range.go
  4. 5
      weed/mount/weedfs_file_lseek.go
  5. 5
      weed/mount/weedfs_file_read.go
  6. 6
      weed/mount/weedfs_file_sync.go
  7. 5
      weed/mount/weedfs_file_write.go

6
weed/mount/filehandle.go

@ -27,7 +27,6 @@ type FileHandle struct {
dirtyPages *PageWriter
reader *filer.ChunkReadAt
contentType string
sync.RWMutex
isDeleted bool
@ -102,8 +101,9 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
}
func (fh *FileHandle) ReleaseHandle() {
fh.Lock()
defer fh.Unlock()
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.Lock()
defer fh.entryLock.Unlock()

2
weed/mount/weedfs.go

@ -78,6 +78,7 @@ type WFS struct {
dhmap *DirectoryHandleToInode
fuseServer *fuse.Server
IsOverQuota bool
fhLockTable *util.LockTable[FileHandleId]
}
func NewSeaweedFileSystem(option *Option) *WFS {
@ -88,6 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
fhmap: NewFileHandleToInode(),
dhmap: NewDirectoryHandleToInode(),
fhLockTable: util.NewLockTable[FileHandleId](),
}
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))

9
weed/mount/weedfs_file_copy_range.go

@ -1,6 +1,7 @@
package mount
import (
"github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"time"
@ -44,16 +45,16 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
}
// lock source and target file handles
fhOut.Lock()
defer fhOut.Unlock()
fhOutActiveLock := fhOut.wfs.fhLockTable.AcquireLock("CopyFileRange", fhOut.fh, util.ExclusiveLock)
defer fhOut.wfs.fhLockTable.ReleaseLock(fhOut.fh, fhOutActiveLock)
if fhOut.entry == nil {
return 0, fuse.ENOENT
}
if fhIn.fh != fhOut.fh {
fhIn.RLock()
defer fhIn.RUnlock()
fhInActiveLock := fhIn.wfs.fhLockTable.AcquireLock("CopyFileRange", fhIn.fh, util.ExclusiveLock)
defer fhIn.wfs.fhLockTable.ReleaseLock(fhIn.fh, fhInActiveLock)
}
// directories are not supported

5
weed/mount/weedfs_file_lseek.go

@ -1,6 +1,7 @@
package mount
import (
"github.com/seaweedfs/seaweedfs/weed/util"
"syscall"
"github.com/hanwen/go-fuse/v2/fuse"
@ -35,8 +36,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
}
// lock the file until the proper offset was calculated
fh.RLock()
defer fh.RUnlock()
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Lseek", fh.fh, util.SharedLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.RLock()
defer fh.entryLock.RUnlock()

5
weed/mount/weedfs_file_read.go

@ -3,6 +3,7 @@ package mount
import (
"bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"github.com/hanwen/go-fuse/v2/fuse"
@ -41,8 +42,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
fh.RLock()
defer fh.RUnlock()
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)

6
weed/mount/weedfs_file_sync.go

@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"syscall"
"time"
)
@ -89,8 +90,6 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
}
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
fh.Lock()
defer fh.Unlock()
// flush works at fh level
fileFullPath := fh.FullPath()
@ -105,6 +104,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
}
}
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("doFlush", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
if !fh.dirtyMetadata {
return fuse.OK
}

5
weed/mount/weedfs_file_write.go

@ -2,6 +2,7 @@ package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"syscall"
"time"
@ -48,8 +49,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
tsNs := time.Now().UnixNano()
fh.Lock()
defer fh.Unlock()
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Write", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
entry := fh.GetEntry()
if entry == nil {

Loading…
Cancel
Save