diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 5a10053d1..331820ef4 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -252,7 +252,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { SingleThreaded: false, DisableXAttrs: *option.disableXAttr, Debug: *option.debug, - EnableLocks: false, + EnableLocks: true, ExplicitDataCacheControl: false, DirectMount: true, DirectMountFlags: 0, diff --git a/weed/mount/posix_file_lock.go b/weed/mount/posix_file_lock.go new file mode 100644 index 000000000..057fb20a5 --- /dev/null +++ b/weed/mount/posix_file_lock.go @@ -0,0 +1,311 @@ +package mount + +import ( + "math" + "sort" + "sync" + "syscall" + + "github.com/seaweedfs/go-fuse/v2/fuse" +) + +// lockRange represents a single held POSIX byte-range lock. +type lockRange struct { + Start uint64 // inclusive byte offset + End uint64 // inclusive; math.MaxUint64 means "to EOF" + Typ uint32 // syscall.F_RDLCK or syscall.F_WRLCK + Owner uint64 // FUSE lock owner (from LkIn.Owner) + Pid uint32 // PID of lock holder (for GetLk reporting) +} + +// inodeLocks holds all locks for one inode plus a waiter queue for SetLkw. +type inodeLocks struct { + mu sync.Mutex + locks []lockRange // currently held locks, sorted by Start + waiters []*lockWaiter // blocked SetLkw callers +} + +// lockWaiter represents a blocked SetLkw caller. +type lockWaiter struct { + requested lockRange // the lock this waiter is trying to acquire + ch chan struct{} // closed when the waiter should re-check +} + +// PosixLockTable is the per-mount POSIX lock manager. +type PosixLockTable struct { + mu sync.Mutex + inodes map[uint64]*inodeLocks +} + +func NewPosixLockTable() *PosixLockTable { + return &PosixLockTable{ + inodes: make(map[uint64]*inodeLocks), + } +} + +// getOrCreateInodeLocks returns the lock state for an inode, creating it if needed. +func (plt *PosixLockTable) getOrCreateInodeLocks(inode uint64) *inodeLocks { + plt.mu.Lock() + defer plt.mu.Unlock() + il, ok := plt.inodes[inode] + if !ok { + il = &inodeLocks{} + plt.inodes[inode] = il + } + return il +} + +// getInodeLocks returns the lock state for an inode, or nil if none exists. +func (plt *PosixLockTable) getInodeLocks(inode uint64) *inodeLocks { + plt.mu.Lock() + defer plt.mu.Unlock() + return plt.inodes[inode] +} + +// maybeCleanupInode removes the inodeLocks entry if it has no locks and no waiters. +func (plt *PosixLockTable) maybeCleanupInode(inode uint64, il *inodeLocks) { + // Caller must NOT hold il.mu. We acquire both locks in the correct order. + plt.mu.Lock() + defer plt.mu.Unlock() + il.mu.Lock() + defer il.mu.Unlock() + if len(il.locks) == 0 && len(il.waiters) == 0 { + delete(plt.inodes, inode) + } +} + +// rangesOverlap returns true if two inclusive ranges overlap. +func rangesOverlap(aStart, aEnd, bStart, bEnd uint64) bool { + return aStart <= bEnd && bStart <= aEnd +} + +// findConflict returns the first lock that conflicts with the proposed lock. +// A conflict exists when ranges overlap, at least one is a write lock, and the owners differ. +func findConflict(locks []lockRange, proposed lockRange) (lockRange, bool) { + for _, h := range locks { + if h.Owner == proposed.Owner { + continue + } + if !rangesOverlap(h.Start, h.End, proposed.Start, proposed.End) { + continue + } + if h.Typ == syscall.F_RDLCK && proposed.Typ == syscall.F_RDLCK { + continue + } + return h, true + } + return lockRange{}, false +} + +// insertAndCoalesce inserts a lock for the given owner, replacing/splitting any +// existing same-owner locks that overlap. Adjacent same-type locks are merged. +// Caller must hold il.mu. +func insertAndCoalesce(il *inodeLocks, lk lockRange) { + owner := lk.Owner + + var kept []lockRange + for _, h := range il.locks { + if h.Owner != owner { + kept = append(kept, h) + continue + } + if !rangesOverlap(h.Start, h.End, lk.Start, lk.End) { + // Check for adjacency with same type for merging. + if h.Typ == lk.Typ && ((h.End < ^uint64(0) && h.End+1 == lk.Start) || (lk.End < ^uint64(0) && lk.End+1 == h.Start)) { + // Merge adjacent same-type lock into lk. + if h.Start < lk.Start { + lk.Start = h.Start + } + if h.End > lk.End { + lk.End = h.End + } + continue + } + kept = append(kept, h) + continue + } + // Overlapping same-owner lock. + if h.Typ == lk.Typ { + // Same type: absorb into lk (expand range). + if h.Start < lk.Start { + lk.Start = h.Start + } + if h.End > lk.End { + lk.End = h.End + } + continue + } + // Different type: truncate or split the existing lock. + if h.Start < lk.Start { + // Left portion survives. + left := h + left.End = lk.Start - 1 + kept = append(kept, left) + } + if h.End > lk.End { + // Right portion survives. + right := h + right.Start = lk.End + 1 + kept = append(kept, right) + } + } + + kept = append(kept, lk) + sort.Slice(kept, func(i, j int) bool { + return kept[i].Start < kept[j].Start + }) + il.locks = kept +} + +// removeLocks removes or splits locks owned by the given owner in the given range. +// Caller must hold il.mu. +func removeLocks(il *inodeLocks, owner uint64, start, end uint64) { + var kept []lockRange + for _, h := range il.locks { + if h.Owner != owner || !rangesOverlap(h.Start, h.End, start, end) { + kept = append(kept, h) + continue + } + // h overlaps the unlock range. + if h.Start < start { + // Left portion survives. + left := h + left.End = start - 1 + kept = append(kept, left) + } + if h.End > end { + // Right portion survives. + right := h + right.Start = end + 1 + kept = append(kept, right) + } + // If fully contained, it's simply dropped. + } + il.locks = kept +} + +// wakeEligibleWaiters selectively wakes blocked SetLkw callers that can now +// succeed given the current lock state. Waiters whose requests still conflict +// with held locks remain in the queue, avoiding a thundering herd. +// Caller must hold il.mu. +func wakeEligibleWaiters(il *inodeLocks) { + remaining := il.waiters[:0] + for _, w := range il.waiters { + if _, conflicted := findConflict(il.locks, w.requested); !conflicted { + close(w.ch) + } else { + remaining = append(remaining, w) + } + } + il.waiters = remaining +} + +// removeWaiter removes a specific waiter from the list. +// Caller must hold il.mu. +func removeWaiter(il *inodeLocks, w *lockWaiter) { + for i, existing := range il.waiters { + if existing == w { + il.waiters = append(il.waiters[:i], il.waiters[i+1:]...) + return + } + } +} + +// GetLk checks for a conflicting lock. If found, it populates out with the +// conflict details. If no conflict, out.Typ is set to F_UNLCK. +func (plt *PosixLockTable) GetLk(inode uint64, proposed lockRange, out *fuse.LkOut) { + il := plt.getInodeLocks(inode) + if il == nil { + out.Lk.Typ = syscall.F_UNLCK + return + } + il.mu.Lock() + conflict, found := findConflict(il.locks, proposed) + il.mu.Unlock() + + if found { + out.Lk.Start = conflict.Start + out.Lk.End = conflict.End + out.Lk.Typ = conflict.Typ + out.Lk.Pid = conflict.Pid + } else { + out.Lk.Typ = syscall.F_UNLCK + } +} + +// SetLk attempts a non-blocking lock or unlock. +// For unlock (F_UNLCK): removes locks in the given range for the owner. +// For lock: returns fuse.EAGAIN if a conflict exists, fuse.OK on success. +func (plt *PosixLockTable) SetLk(inode uint64, lk lockRange) fuse.Status { + if lk.Typ == syscall.F_UNLCK { + il := plt.getInodeLocks(inode) + if il == nil { + return fuse.OK + } + il.mu.Lock() + removeLocks(il, lk.Owner, lk.Start, lk.End) + wakeEligibleWaiters(il) + il.mu.Unlock() + plt.maybeCleanupInode(inode, il) + return fuse.OK + } + + il := plt.getOrCreateInodeLocks(inode) + il.mu.Lock() + if _, found := findConflict(il.locks, lk); found { + il.mu.Unlock() + return fuse.EAGAIN + } + insertAndCoalesce(il, lk) + il.mu.Unlock() + return fuse.OK +} + +// SetLkw attempts a blocking lock. It waits until the lock can be acquired +// or the cancel channel is closed. +func (plt *PosixLockTable) SetLkw(inode uint64, lk lockRange, cancel <-chan struct{}) fuse.Status { + if lk.Typ == syscall.F_UNLCK { + return plt.SetLk(inode, lk) + } + + il := plt.getOrCreateInodeLocks(inode) + for { + il.mu.Lock() + if _, found := findConflict(il.locks, lk); !found { + insertAndCoalesce(il, lk) + il.mu.Unlock() + return fuse.OK + } + // Register waiter with the requested lock details for selective waking. + waiter := &lockWaiter{requested: lk, ch: make(chan struct{})} + il.waiters = append(il.waiters, waiter) + il.mu.Unlock() + + // Block until woken or cancelled. + select { + case <-waiter.ch: + // Woken — retry. + continue + case <-cancel: + // Request cancelled. + il.mu.Lock() + removeWaiter(il, waiter) + il.mu.Unlock() + return fuse.EINTR + } + } +} + +// ReleaseOwner removes all locks held by the given owner on the given inode. +// Called from FUSE Release to clean up when a file description is closed. +func (plt *PosixLockTable) ReleaseOwner(inode uint64, owner uint64) { + il := plt.getInodeLocks(inode) + if il == nil { + return + } + il.mu.Lock() + removeLocks(il, owner, 0, math.MaxUint64) + wakeEligibleWaiters(il) + il.mu.Unlock() + plt.maybeCleanupInode(inode, il) +} diff --git a/weed/mount/posix_file_lock_test.go b/weed/mount/posix_file_lock_test.go new file mode 100644 index 000000000..97f1bcd2e --- /dev/null +++ b/weed/mount/posix_file_lock_test.go @@ -0,0 +1,500 @@ +package mount + +import ( + "math" + "syscall" + "testing" + "time" + + "github.com/seaweedfs/go-fuse/v2/fuse" +) + +func TestNonOverlappingLocksFromDifferentOwners(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + s1 := plt.SetLk(inode, lockRange{Start: 0, End: 49, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + if s1 != fuse.OK { + t.Fatalf("expected OK, got %v", s1) + } + s2 := plt.SetLk(inode, lockRange{Start: 50, End: 99, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}) + if s2 != fuse.OK { + t.Fatalf("expected OK, got %v", s2) + } +} + +func TestOverlappingReadLocksFromDifferentOwners(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + s1 := plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_RDLCK, Owner: 1, Pid: 10}) + if s1 != fuse.OK { + t.Fatalf("expected OK, got %v", s1) + } + s2 := plt.SetLk(inode, lockRange{Start: 50, End: 149, Typ: syscall.F_RDLCK, Owner: 2, Pid: 20}) + if s2 != fuse.OK { + t.Fatalf("expected OK, got %v", s2) + } +} + +func TestOverlappingWriteReadConflict(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + s := plt.SetLk(inode, lockRange{Start: 50, End: 149, Typ: syscall.F_RDLCK, Owner: 2, Pid: 20}) + if s != fuse.EAGAIN { + t.Fatalf("expected EAGAIN, got %v", s) + } +} + +func TestOverlappingWriteWriteConflict(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + s := plt.SetLk(inode, lockRange{Start: 50, End: 149, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}) + if s != fuse.EAGAIN { + t.Fatalf("expected EAGAIN, got %v", s) + } +} + +func TestSameOwnerUpgradeReadToWrite(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_RDLCK, Owner: 1, Pid: 10}) + s := plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + if s != fuse.OK { + t.Fatalf("expected OK for same-owner upgrade, got %v", s) + } + + // Verify the lock is now a write lock. + var out fuse.LkOut + plt.GetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}, &out) + if out.Lk.Typ != syscall.F_WRLCK { + t.Fatalf("expected conflicting write lock, got type %d", out.Lk.Typ) + } +} + +func TestSameOwnerDowngradeWriteToRead(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + s := plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_RDLCK, Owner: 1, Pid: 10}) + if s != fuse.OK { + t.Fatalf("expected OK for same-owner downgrade, got %v", s) + } + + // Another owner should now be able to get a read lock. + s2 := plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_RDLCK, Owner: 2, Pid: 20}) + if s2 != fuse.OK { + t.Fatalf("expected OK for shared read lock, got %v", s2) + } +} + +func TestLockCoalescing(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 9, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + plt.SetLk(inode, lockRange{Start: 10, End: 19, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + + il := plt.getInodeLocks(inode) + il.mu.Lock() + ownerLocks := 0 + for _, lk := range il.locks { + if lk.Owner == 1 { + ownerLocks++ + if lk.Start != 0 || lk.End != 19 { + t.Errorf("expected coalesced lock [0,19], got [%d,%d]", lk.Start, lk.End) + } + } + } + il.mu.Unlock() + if ownerLocks != 1 { + t.Fatalf("expected 1 coalesced lock, got %d", ownerLocks) + } +} + +func TestLockSplitting(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + // Unlock the middle portion. + plt.SetLk(inode, lockRange{Start: 40, End: 59, Typ: syscall.F_UNLCK, Owner: 1, Pid: 10}) + + il := plt.getInodeLocks(inode) + il.mu.Lock() + ownerLocks := 0 + for _, lk := range il.locks { + if lk.Owner == 1 { + ownerLocks++ + } + } + if ownerLocks != 2 { + il.mu.Unlock() + t.Fatalf("expected 2 locks after split, got %d", ownerLocks) + } + // Check the ranges. + if il.locks[0].Start != 0 || il.locks[0].End != 39 { + t.Errorf("expected left lock [0,39], got [%d,%d]", il.locks[0].Start, il.locks[0].End) + } + if il.locks[1].Start != 60 || il.locks[1].End != 99 { + t.Errorf("expected right lock [60,99], got [%d,%d]", il.locks[1].Start, il.locks[1].End) + } + il.mu.Unlock() +} + +func TestGetLkConflict(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 10, End: 50, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + + var out fuse.LkOut + plt.GetLk(inode, lockRange{Start: 30, End: 70, Typ: syscall.F_RDLCK, Owner: 2, Pid: 20}, &out) + if out.Lk.Typ != syscall.F_WRLCK { + t.Fatalf("expected conflicting write lock, got type %d", out.Lk.Typ) + } + if out.Lk.Pid != 10 { + t.Fatalf("expected holder PID 10, got %d", out.Lk.Pid) + } + if out.Lk.Start != 10 || out.Lk.End != 50 { + t.Fatalf("expected conflict [10,50], got [%d,%d]", out.Lk.Start, out.Lk.End) + } +} + +func TestGetLkNoConflict(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 10, End: 50, Typ: syscall.F_RDLCK, Owner: 1, Pid: 10}) + + var out fuse.LkOut + plt.GetLk(inode, lockRange{Start: 30, End: 70, Typ: syscall.F_RDLCK, Owner: 2, Pid: 20}, &out) + if out.Lk.Typ != syscall.F_UNLCK { + t.Fatalf("expected F_UNLCK (no conflict), got type %d", out.Lk.Typ) + } +} + +func TestGetLkSameOwnerNoConflict(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + + var out fuse.LkOut + plt.GetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}, &out) + if out.Lk.Typ != syscall.F_UNLCK { + t.Fatalf("same owner should not conflict with itself, got type %d", out.Lk.Typ) + } +} + +func TestReleaseOwner(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 49, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + plt.SetLk(inode, lockRange{Start: 50, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + plt.SetLk(inode, lockRange{Start: 200, End: 299, Typ: syscall.F_RDLCK, Owner: 2, Pid: 20}) + + plt.ReleaseOwner(inode, 1) + + // Owner 1's locks should be gone. + var out fuse.LkOut + plt.GetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 3, Pid: 30}, &out) + if out.Lk.Typ != syscall.F_UNLCK { + t.Fatalf("expected no conflict after ReleaseOwner, got type %d", out.Lk.Typ) + } + + // Owner 2's lock should still exist. + plt.GetLk(inode, lockRange{Start: 200, End: 299, Typ: syscall.F_WRLCK, Owner: 3, Pid: 30}, &out) + if out.Lk.Typ != syscall.F_RDLCK { + t.Fatalf("expected owner 2's read lock to remain, got type %d", out.Lk.Typ) + } +} + +func TestReleaseOwnerWakesWaiters(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + + done := make(chan fuse.Status, 1) + go func() { + cancel := make(chan struct{}) + s := plt.SetLkw(inode, lockRange{Start: 50, End: 60, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}, cancel) + done <- s + }() + + // Give the goroutine time to block. + time.Sleep(50 * time.Millisecond) + + plt.ReleaseOwner(inode, 1) + + select { + case s := <-done: + if s != fuse.OK { + t.Fatalf("expected OK after ReleaseOwner woke waiter, got %v", s) + } + case <-time.After(2 * time.Second): + t.Fatal("SetLkw did not unblock after ReleaseOwner") + } +} + +func TestSetLkwBlocksAndSucceeds(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + + done := make(chan fuse.Status, 1) + go func() { + cancel := make(chan struct{}) + s := plt.SetLkw(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}, cancel) + done <- s + }() + + // Give the goroutine time to block. + time.Sleep(50 * time.Millisecond) + + // Release the conflicting lock. + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_UNLCK, Owner: 1, Pid: 10}) + + select { + case s := <-done: + if s != fuse.OK { + t.Fatalf("expected OK, got %v", s) + } + case <-time.After(2 * time.Second): + t.Fatal("SetLkw did not unblock after conflicting lock was released") + } +} + +func TestSetLkwCancellation(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + + cancel := make(chan struct{}) + done := make(chan fuse.Status, 1) + go func() { + s := plt.SetLkw(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}, cancel) + done <- s + }() + + // Give the goroutine time to block. + time.Sleep(50 * time.Millisecond) + + close(cancel) + + select { + case s := <-done: + if s != fuse.EINTR { + t.Fatalf("expected EINTR on cancel, got %v", s) + } + case <-time.After(2 * time.Second): + t.Fatal("SetLkw did not unblock after cancel") + } +} + +func TestWholeFileLock(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + // Simulate flock() — whole-file exclusive lock. + s1 := plt.SetLk(inode, lockRange{Start: 0, End: math.MaxUint64, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + if s1 != fuse.OK { + t.Fatalf("expected OK, got %v", s1) + } + + // Second owner should be blocked. + s2 := plt.SetLk(inode, lockRange{Start: 0, End: math.MaxUint64, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}) + if s2 != fuse.EAGAIN { + t.Fatalf("expected EAGAIN, got %v", s2) + } + + // Even a partial overlap should fail. + s3 := plt.SetLk(inode, lockRange{Start: 100, End: 200, Typ: syscall.F_RDLCK, Owner: 2, Pid: 20}) + if s3 != fuse.EAGAIN { + t.Fatalf("expected EAGAIN for partial overlap with whole-file lock, got %v", s3) + } +} + +func TestUnlockNoExistingLocks(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + // Unlock on an inode with no locks should succeed silently. + s := plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_UNLCK, Owner: 1, Pid: 10}) + if s != fuse.OK { + t.Fatalf("expected OK for unlock with no existing locks, got %v", s) + } +} + +func TestMultipleInodesIndependent(t *testing.T) { + plt := NewPosixLockTable() + + // Write lock on inode 1 should not affect inode 2. + plt.SetLk(1, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + s := plt.SetLk(2, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}) + if s != fuse.OK { + t.Fatalf("locks on different inodes should be independent, got %v", s) + } +} + +func TestMemoryCleanup(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + plt.ReleaseOwner(inode, 1) + + plt.mu.Lock() + _, exists := plt.inodes[inode] + plt.mu.Unlock() + if exists { + t.Fatal("expected inode entry to be cleaned up after all locks released") + } +} + +func TestSelectiveWaking(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + // Owner 1 holds write lock on [0, 99], owner 2 holds write lock on [200, 299]. + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + plt.SetLk(inode, lockRange{Start: 200, End: 299, Typ: syscall.F_WRLCK, Owner: 2, Pid: 20}) + + // Owner 3 waits for [50, 60] (blocked by owner 1). + done3 := make(chan fuse.Status, 1) + go func() { + cancel := make(chan struct{}) + s := plt.SetLkw(inode, lockRange{Start: 50, End: 60, Typ: syscall.F_WRLCK, Owner: 3, Pid: 30}, cancel) + done3 <- s + }() + // Owner 4 waits for [250, 260] (blocked by owner 2). + done4 := make(chan fuse.Status, 1) + go func() { + cancel := make(chan struct{}) + s := plt.SetLkw(inode, lockRange{Start: 250, End: 260, Typ: syscall.F_WRLCK, Owner: 4, Pid: 40}, cancel) + done4 <- s + }() + + time.Sleep(50 * time.Millisecond) + + // Release owner 1's lock. Only owner 3 should be woken; owner 4 is still blocked. + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_UNLCK, Owner: 1, Pid: 10}) + + select { + case s := <-done3: + if s != fuse.OK { + t.Fatalf("expected OK for owner 3, got %v", s) + } + case <-time.After(2 * time.Second): + t.Fatal("owner 3 was not woken after owner 1 released") + } + + // Owner 4 should still be blocked. + select { + case s := <-done4: + t.Fatalf("owner 4 should still be blocked, but got %v", s) + case <-time.After(100 * time.Millisecond): + // Expected — still blocked. + } + + // Now release owner 2's lock. Owner 4 should wake. + plt.SetLk(inode, lockRange{Start: 200, End: 299, Typ: syscall.F_UNLCK, Owner: 2, Pid: 20}) + + select { + case s := <-done4: + if s != fuse.OK { + t.Fatalf("expected OK for owner 4, got %v", s) + } + case <-time.After(2 * time.Second): + t.Fatal("owner 4 was not woken after owner 2 released") + } +} + +func TestSameOwnerReplaceDifferentType(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + // Lock [0, 99] as write. + plt.SetLk(inode, lockRange{Start: 0, End: 99, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + // Replace middle portion [30, 60] with read lock. + plt.SetLk(inode, lockRange{Start: 30, End: 60, Typ: syscall.F_RDLCK, Owner: 1, Pid: 10}) + + il := plt.getInodeLocks(inode) + il.mu.Lock() + defer il.mu.Unlock() + + // Should have 3 locks: write [0,29], read [30,60], write [61,99]. + if len(il.locks) != 3 { + t.Fatalf("expected 3 locks after partial type change, got %d", len(il.locks)) + } + if il.locks[0].Typ != syscall.F_WRLCK || il.locks[0].Start != 0 || il.locks[0].End != 29 { + t.Errorf("expected write [0,29], got type=%d [%d,%d]", il.locks[0].Typ, il.locks[0].Start, il.locks[0].End) + } + if il.locks[1].Typ != syscall.F_RDLCK || il.locks[1].Start != 30 || il.locks[1].End != 60 { + t.Errorf("expected read [30,60], got type=%d [%d,%d]", il.locks[1].Typ, il.locks[1].Start, il.locks[1].End) + } + if il.locks[2].Typ != syscall.F_WRLCK || il.locks[2].Start != 61 || il.locks[2].End != 99 { + t.Errorf("expected write [61,99], got type=%d [%d,%d]", il.locks[2].Typ, il.locks[2].Start, il.locks[2].End) + } +} + +func TestNonAdjacentRangesNotCoalesced(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + // Lock [5, MaxUint64] then [0, 2] — gap at [3,4] must prevent coalescing. + plt.SetLk(inode, lockRange{Start: 5, End: math.MaxUint64, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + s := plt.SetLk(inode, lockRange{Start: 0, End: 2, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + if s != fuse.OK { + t.Fatalf("expected OK, got %v", s) + } + + il := plt.getInodeLocks(inode) + il.mu.Lock() + defer il.mu.Unlock() + + if len(il.locks) != 2 { + t.Fatalf("expected 2 separate locks (gap [3,4] prevents coalescing), got %d", len(il.locks)) + } + if il.locks[0].Start != 0 || il.locks[0].End != 2 { + t.Errorf("expected first lock [0,2], got [%d,%d]", il.locks[0].Start, il.locks[0].End) + } + if il.locks[1].Start != 5 || il.locks[1].End != math.MaxUint64 { + t.Errorf("expected second lock [5,MaxUint64], got [%d,%d]", il.locks[1].Start, il.locks[1].End) + } +} + +func TestAdjacencyNoOverflowAtMaxUint64(t *testing.T) { + plt := NewPosixLockTable() + inode := uint64(1) + + // Lock to EOF (End = MaxUint64), then lock [0, 0] same type. + // Without the overflow guard, MaxUint64+1 wraps to 0, falsely merging. + plt.SetLk(inode, lockRange{Start: 100, End: math.MaxUint64, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + plt.SetLk(inode, lockRange{Start: 0, End: 0, Typ: syscall.F_WRLCK, Owner: 1, Pid: 10}) + + il := plt.getInodeLocks(inode) + il.mu.Lock() + defer il.mu.Unlock() + + // Should remain 2 separate locks, not merged. + ownerLocks := 0 + for _, lk := range il.locks { + if lk.Owner == 1 { + ownerLocks++ + } + } + if ownerLocks != 2 { + t.Fatalf("expected 2 separate locks (no overflow merge), got %d", ownerLocks) + } +} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 9145b28fe..82d570c7d 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -109,6 +109,7 @@ type WFS struct { fuseServer *fuse.Server IsOverQuota bool fhLockTable *util.LockTable[FileHandleId] + posixLocks *PosixLockTable rdmaClient *RDMAMountClient FilerConf *filer.FilerConf filerClient *wdclient.FilerClient // Cached volume location client @@ -177,6 +178,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { filerClient: filerClient, // nil for proxy mode, initialized for direct access pendingAsyncFlush: make(map[uint64]chan struct{}), fhLockTable: util.NewLockTable[FileHandleId](), + posixLocks: NewPosixLockTable(), refreshingDirs: make(map[util.FullPath]struct{}), dirHotWindow: dirHotWindow, dirHotThreshold: dirHotThreshold, diff --git a/weed/mount/weedfs_file_io.go b/weed/mount/weedfs_file_io.go index 120e10427..13587cf9a 100644 --- a/weed/mount/weedfs_file_io.go +++ b/weed/mount/weedfs_file_io.go @@ -106,5 +106,6 @@ func (wfs *WFS) Open(cancel <-chan struct{}, in *fuse.OpenIn, out *fuse.OpenOut) * @param fi file information */ func (wfs *WFS) Release(cancel <-chan struct{}, in *fuse.ReleaseIn) { + wfs.posixLocks.ReleaseOwner(in.NodeId, in.LockOwner) wfs.ReleaseHandle(FileHandleId(in.Fh)) } diff --git a/weed/mount/weedfs_file_lock.go b/weed/mount/weedfs_file_lock.go new file mode 100644 index 000000000..c6f5c066a --- /dev/null +++ b/weed/mount/weedfs_file_lock.go @@ -0,0 +1,51 @@ +package mount + +import ( + "syscall" + + "github.com/seaweedfs/go-fuse/v2/fuse" +) + +// GetLk queries for a conflicting lock on the file. +// If a conflict exists, the conflicting lock is returned in out. +// If no conflict, out.Lk.Typ is set to F_UNLCK. +func (wfs *WFS) GetLk(cancel <-chan struct{}, in *fuse.LkIn, out *fuse.LkOut) fuse.Status { + proposed := lockRange{ + Start: in.Lk.Start, + End: in.Lk.End, + Typ: in.Lk.Typ, + Owner: in.Owner, + Pid: in.Lk.Pid, + } + wfs.posixLocks.GetLk(in.NodeId, proposed, out) + return fuse.OK +} + +// SetLk sets or clears a POSIX lock (non-blocking). +// Returns EAGAIN if the lock conflicts with an existing lock from another owner. +func (wfs *WFS) SetLk(cancel <-chan struct{}, in *fuse.LkIn) fuse.Status { + lk := lockRange{ + Start: in.Lk.Start, + End: in.Lk.End, + Typ: in.Lk.Typ, + Owner: in.Owner, + Pid: in.Lk.Pid, + } + return wfs.posixLocks.SetLk(in.NodeId, lk) +} + +// SetLkw sets a POSIX lock (blocking). +// Waits until the lock can be acquired or the request is cancelled. +func (wfs *WFS) SetLkw(cancel <-chan struct{}, in *fuse.LkIn) fuse.Status { + lk := lockRange{ + Start: in.Lk.Start, + End: in.Lk.End, + Typ: in.Lk.Typ, + Owner: in.Owner, + Pid: in.Lk.Pid, + } + if lk.Typ == syscall.F_UNLCK { + return wfs.posixLocks.SetLk(in.NodeId, lk) + } + return wfs.posixLocks.SetLkw(in.NodeId, lk, cancel) +} diff --git a/weed/mount/weedfs_unsupported.go b/weed/mount/weedfs_unsupported.go index a20ab2bc3..eb9afbf04 100644 --- a/weed/mount/weedfs_unsupported.go +++ b/weed/mount/weedfs_unsupported.go @@ -16,18 +16,6 @@ func (wfs *WFS) Fallocate(cancel <-chan struct{}, in *fuse.FallocateIn) (code fu return fuse.ENOSYS } -func (wfs *WFS) GetLk(cancel <-chan struct{}, in *fuse.LkIn, out *fuse.LkOut) (code fuse.Status) { - return fuse.ENOSYS -} - -func (wfs *WFS) SetLk(cancel <-chan struct{}, in *fuse.LkIn) (code fuse.Status) { - return fuse.ENOSYS -} - -func (wfs *WFS) SetLkw(cancel <-chan struct{}, in *fuse.LkIn) (code fuse.Status) { - return fuse.ENOSYS -} - /** * Check file access permissions *