Browse Source

mount: ensure ordered file handle lock and unlock

pull/3645/head
chrislu 2 years ago
parent
commit
22064c3425
  1. 13
      weed/mount/filehandle.go
  2. 9
      weed/mount/weedfs_file_copy_range.go
  3. 5
      weed/mount/weedfs_file_lseek.go
  4. 5
      weed/mount/weedfs_file_read.go
  5. 8
      weed/mount/weedfs_file_sync.go
  6. 5
      weed/mount/weedfs_file_write.go

13
weed/mount/filehandle.go

@ -1,6 +1,8 @@
package mount package mount
import ( import (
"golang.org/x/sync/semaphore"
"math"
"sync" "sync"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -28,17 +30,18 @@ type FileHandle struct {
reader *filer.ChunkReadAt reader *filer.ChunkReadAt
contentType string contentType string
handle uint64 handle uint64
sync.Mutex
orderedMutex *semaphore.Weighted
isDeleted bool isDeleted bool
} }
func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle { func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
fh := &FileHandle{ fh := &FileHandle{
fh: handleId,
counter: 1,
inode: inode,
wfs: wfs,
fh: handleId,
counter: 1,
inode: inode,
wfs: wfs,
orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
} }
// dirtyPages: newContinuousDirtyPages(file, writeOnly), // dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit) fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)

9
weed/mount/weedfs_file_copy_range.go

@ -1,6 +1,7 @@
package mount package mount
import ( import (
"context"
"net/http" "net/http"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
@ -43,8 +44,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
} }
// lock source and target file handles // lock source and target file handles
fhOut.Lock()
defer fhOut.Unlock()
fhOut.orderedMutex.Acquire(context.Background(), 1)
defer fhOut.orderedMutex.Release(1)
fhOut.entryLock.Lock() fhOut.entryLock.Lock()
defer fhOut.entryLock.Unlock() defer fhOut.entryLock.Unlock()
@ -53,8 +54,8 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
} }
if fhIn.fh != fhOut.fh { if fhIn.fh != fhOut.fh {
fhIn.Lock()
defer fhIn.Unlock()
fhIn.orderedMutex.Acquire(context.Background(), 1)
defer fhIn.orderedMutex.Release(1)
fhIn.entryLock.Lock() fhIn.entryLock.Lock()
defer fhIn.entryLock.Unlock() defer fhIn.entryLock.Unlock()
} }

5
weed/mount/weedfs_file_lseek.go

@ -1,6 +1,7 @@
package mount package mount
import ( import (
"context"
"syscall" "syscall"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
@ -35,8 +36,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
} }
// lock the file until the proper offset was calculated // lock the file until the proper offset was calculated
fh.Lock()
defer fh.Unlock()
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
fh.entryLock.Lock() fh.entryLock.Lock()
defer fh.entryLock.Unlock() defer fh.entryLock.Unlock()

5
weed/mount/weedfs_file_read.go

@ -1,6 +1,7 @@
package mount package mount
import ( import (
"context"
"io" "io"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
@ -39,8 +40,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT return nil, fuse.ENOENT
} }
fh.Lock()
defer fh.Unlock()
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
offset := int64(in.Offset) offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset) totalRead, err := readDataByFileHandle(buff, fh, offset)

8
weed/mount/weedfs_file_sync.go

@ -55,8 +55,8 @@ func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status {
return fuse.ENOENT return fuse.ENOENT
} }
fh.Lock()
defer fh.Unlock()
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
return wfs.doFlush(fh, in.Uid, in.Gid) return wfs.doFlush(fh, in.Uid, in.Gid)
} }
@ -87,8 +87,8 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
return fuse.ENOENT return fuse.ENOENT
} }
fh.Lock()
defer fh.Unlock()
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
return wfs.doFlush(fh, in.Uid, in.Gid) return wfs.doFlush(fh, in.Uid, in.Gid)

5
weed/mount/weedfs_file_write.go

@ -1,6 +1,7 @@
package mount package mount
import ( import (
"context"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"net/http" "net/http"
"syscall" "syscall"
@ -45,8 +46,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size)) fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
fh.Lock()
defer fh.Unlock()
fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1)
entry := fh.entry entry := fh.entry
if entry == nil { if entry == nil {

Loading…
Cancel
Save