Browse Source
Add FUSE integration tests for POSIX file locking (#8752)
Add FUSE integration tests for POSIX file locking (#8752)
* Add FUSE integration tests for POSIX file locking Test flock() and fcntl() advisory locks over the FUSE mount: - Exclusive and shared flock with conflict detection - flock upgrade (shared to exclusive) and release on close - fcntl F_SETLK write lock conflicts and shared read locks - fcntl F_GETLK conflict reporting on overlapping byte ranges - Non-overlapping byte-range locks held independently - F_SETLKW blocking until conflicting lock is released - Lock release on file descriptor close - Concurrent lock contention with multiple workers * Fix review feedback in POSIX lock integration tests - Assert specific EAGAIN error on fcntl lock conflicts instead of generic Error - Use O_APPEND in concurrent contention test so workers append rather than overwrite - Verify exact line count (numWorkers * writesPerWorker) after concurrent test - Check unlock error in F_SETLKW blocking test goroutine * Refactor fcntl tests to use subprocesses for inter-process semantics POSIX fcntl locks use the process's files_struct as lock owner, so all fds in the same process share the same owner and never conflict. This caused the fcntl tests to silently pass without exercising lock conflicts. Changes: - Add TestFcntlLockHelper subprocess entry point with hold/try/getlk actions - Add lockHolder with channel-based coordination (no scanner race) - Rewrite all fcntl tests to run contenders in separate subprocesses - Fix F_UNLCK int16 cast in GetLk assertion for type-safe comparison - Fix concurrent test: use non-blocking flock with retry to avoid exhausting go-fuse server reader goroutines (blocking FUSE SETLKW can starve unlock request processing, causing deadlock) flock tests remain same-process since flock uses per-struct-file owners. * Fix misleading comment and error handling in lock test subprocess - Fix comment: tryLockInSubprocess tests a subprocess, not the test process - Distinguish EAGAIN/EACCES from unexpected errors in subprocess try mode so real failures aren't silently masked as lock conflicts * Fix CI race in FcntlReleaseOnClose and increase flock retry budget - FcntlReleaseOnClose: retry lock acquisition after subprocess exits since the FUSE server may not process Release immediately - ConcurrentLockContention: increase retry limit from 500 to 3000 (5s → 30s budget) to handle CI load * separating flock and fcntl in the in-memory lock table and cleaning them up through the right release path: PID for POSIX locks, lock owner for flock * ReleasePosixOwner * weed/mount: flush before releasing posix close owner * weed/mount: keep woken lock waiters from losing inode state * test/fuse: make blocking fcntl helper state explicit * test/fuse: assert flock contention never overlaps * test/fuse: stabilize concurrent lock contention check * test/fuse: make concurrent contention writes deterministic * weed/mount: retry synchronous metadata flushespull/8763/head
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1090 additions and 62 deletions
-
741test/fuse_integration/posix_file_lock_test.go
-
28weed/mount/metadata_flush_retry.go
-
81weed/mount/metadata_flush_retry_test.go
-
96weed/mount/posix_file_lock.go
-
117weed/mount/posix_file_lock_test.go
-
31weed/mount/weedfs_async_flush.go
-
4weed/mount/weedfs_file_io.go
-
33weed/mount/weedfs_file_lock.go
-
21weed/mount/weedfs_file_sync.go
@ -0,0 +1,741 @@ |
|||
package fuse_test |
|||
|
|||
import ( |
|||
"bufio" |
|||
"bytes" |
|||
"errors" |
|||
"fmt" |
|||
"io" |
|||
"os" |
|||
"os/exec" |
|||
"path/filepath" |
|||
"strconv" |
|||
"strings" |
|||
"sync" |
|||
"sync/atomic" |
|||
"syscall" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
// --- fcntl helpers (used by both test process and subprocess) ---
|
|||
|
|||
func fcntlSetLk(f *os.File, typ int16, start, len int64) error { |
|||
flk := syscall.Flock_t{ |
|||
Type: typ, |
|||
Whence: 0, // SEEK_SET
|
|||
Start: start, |
|||
Len: len, // 0 means to EOF
|
|||
} |
|||
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLK, &flk) |
|||
} |
|||
|
|||
func fcntlSetLkw(f *os.File, typ int16, start, len int64) error { |
|||
flk := syscall.Flock_t{ |
|||
Type: typ, |
|||
Whence: 0, |
|||
Start: start, |
|||
Len: len, |
|||
} |
|||
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &flk) |
|||
} |
|||
|
|||
func fcntlGetLk(f *os.File, typ int16, start, len int64) (syscall.Flock_t, error) { |
|||
flk := syscall.Flock_t{ |
|||
Type: typ, |
|||
Whence: 0, |
|||
Start: start, |
|||
Len: len, |
|||
} |
|||
err := syscall.FcntlFlock(f.Fd(), syscall.F_GETLK, &flk) |
|||
return flk, err |
|||
} |
|||
|
|||
// --- Subprocess entry point for fcntl lock tests ---
|
|||
//
|
|||
// POSIX fcntl locks are per-process (the kernel uses the process's files_struct
|
|||
// as lock owner). To test inter-process lock conflicts over FUSE, contending
|
|||
// locks must come from separate processes.
|
|||
//
|
|||
// Actions:
|
|||
// hold — acquire F_SETLK, print "LOCKED\n", wait for stdin close, exit
|
|||
// hold-blocking — print "WAITING\n", acquire F_SETLKW, print "LOCKED\n", wait for stdin close, exit
|
|||
// try — try F_SETLK, print "OK\n" or "EAGAIN\n", exit
|
|||
// getlk — do F_GETLK, print lock type as integer, exit
|
|||
|
|||
func TestFcntlLockHelper(t *testing.T) { |
|||
action := os.Getenv("LOCK_ACTION") |
|||
if action == "" { |
|||
t.Skip("subprocess helper — not invoked directly") |
|||
} |
|||
|
|||
filePath := os.Getenv("LOCK_FILE") |
|||
lockType, err := parseLockType(os.Getenv("LOCK_TYPE")) |
|||
if err != nil { |
|||
fmt.Fprintf(os.Stderr, "lock type: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
start, _ := strconv.ParseInt(os.Getenv("LOCK_START"), 10, 64) |
|||
length, _ := strconv.ParseInt(os.Getenv("LOCK_LEN"), 10, 64) |
|||
|
|||
f, err := os.OpenFile(filePath, os.O_RDWR, 0) |
|||
if err != nil { |
|||
fmt.Fprintf(os.Stderr, "open: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
defer f.Close() |
|||
|
|||
switch action { |
|||
case "hold": |
|||
if err := fcntlSetLk(f, lockType, start, length); err != nil { |
|||
fmt.Fprintf(os.Stderr, "setlk: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
fmt.Println("LOCKED") |
|||
os.Stdout.Sync() |
|||
io.ReadAll(os.Stdin) // block until parent closes pipe
|
|||
|
|||
case "hold-blocking": |
|||
fmt.Println("WAITING") |
|||
os.Stdout.Sync() |
|||
if err := fcntlSetLkw(f, lockType, start, length); err != nil { |
|||
fmt.Fprintf(os.Stderr, "setlkw: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
fmt.Println("LOCKED") |
|||
os.Stdout.Sync() |
|||
io.ReadAll(os.Stdin) |
|||
|
|||
case "try": |
|||
if err := fcntlSetLk(f, lockType, start, length); err != nil { |
|||
if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EACCES) { |
|||
fmt.Println("EAGAIN") |
|||
} else { |
|||
fmt.Fprintf(os.Stderr, "unexpected setlk error: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
} else { |
|||
fmt.Println("OK") |
|||
} |
|||
os.Stdout.Sync() |
|||
|
|||
case "getlk": |
|||
flk := syscall.Flock_t{Type: lockType, Whence: 0, Start: start, Len: length} |
|||
if err := syscall.FcntlFlock(f.Fd(), syscall.F_GETLK, &flk); err != nil { |
|||
fmt.Fprintf(os.Stderr, "getlk: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
fmt.Println(flk.Type) |
|||
os.Stdout.Sync() |
|||
|
|||
default: |
|||
fmt.Fprintf(os.Stderr, "unknown action: %s\n", action) |
|||
os.Exit(1) |
|||
} |
|||
|
|||
os.Exit(0) |
|||
} |
|||
|
|||
func parseLockType(s string) (int16, error) { |
|||
switch s { |
|||
case "F_RDLCK": |
|||
return syscall.F_RDLCK, nil |
|||
case "F_WRLCK": |
|||
return syscall.F_WRLCK, nil |
|||
default: |
|||
return 0, fmt.Errorf("unknown lock type %q", s) |
|||
} |
|||
} |
|||
|
|||
// --- Subprocess coordination helpers ---
|
|||
|
|||
// lockHolder is a subprocess holding an fcntl lock. A background goroutine
|
|||
// reads stdout and signals the waiting/locked channels as the subprocess
|
|||
// progresses through the lock lifecycle.
|
|||
type lockHolder struct { |
|||
cmd *exec.Cmd |
|||
stdin io.WriteCloser |
|||
waiting chan struct{} // closed when subprocess prints "WAITING"
|
|||
locked chan struct{} // closed when subprocess prints "LOCKED"
|
|||
} |
|||
|
|||
// startLockHolder launches a subprocess that acquires an fcntl lock and holds
|
|||
// it until Release is called.
|
|||
func startLockHolder(t *testing.T, path string, lockType string, start, length int64) *lockHolder { |
|||
t.Helper() |
|||
return startLockProcess(t, "hold", path, lockType, start, length) |
|||
} |
|||
|
|||
// startBlockingLockHolder launches a subprocess that tries F_SETLKW.
|
|||
// It blocks until the lock is available, then holds it until Release is called.
|
|||
func startBlockingLockHolder(t *testing.T, path string, lockType string, start, length int64) *lockHolder { |
|||
t.Helper() |
|||
return startLockProcess(t, "hold-blocking", path, lockType, start, length) |
|||
} |
|||
|
|||
func startLockProcess(t *testing.T, action, path, lockType string, start, length int64) *lockHolder { |
|||
t.Helper() |
|||
|
|||
cmd := exec.Command(os.Args[0], "-test.run=^TestFcntlLockHelper$") |
|||
cmd.Env = append(os.Environ(), |
|||
"LOCK_ACTION="+action, |
|||
"LOCK_FILE="+path, |
|||
"LOCK_TYPE="+lockType, |
|||
fmt.Sprintf("LOCK_START=%d", start), |
|||
fmt.Sprintf("LOCK_LEN=%d", length), |
|||
) |
|||
|
|||
stdin, err := cmd.StdinPipe() |
|||
require.NoError(t, err) |
|||
|
|||
stdoutPipe, err := cmd.StdoutPipe() |
|||
require.NoError(t, err) |
|||
|
|||
cmd.Stderr = os.Stderr |
|||
require.NoError(t, cmd.Start()) |
|||
|
|||
waiting := make(chan struct{}) |
|||
locked := make(chan struct{}) |
|||
go func() { |
|||
scanner := bufio.NewScanner(stdoutPipe) |
|||
for scanner.Scan() { |
|||
switch strings.TrimSpace(scanner.Text()) { |
|||
case "WAITING": |
|||
select { |
|||
case <-waiting: |
|||
default: |
|||
close(waiting) |
|||
} |
|||
case "LOCKED": |
|||
select { |
|||
case <-locked: |
|||
default: |
|||
close(locked) |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
|
|||
return &lockHolder{cmd: cmd, stdin: stdin, waiting: waiting, locked: locked} |
|||
} |
|||
|
|||
// WaitWaiting waits for the subprocess to print "WAITING" before trying to
|
|||
// assert that a blocking lock request is still blocked.
|
|||
func (h *lockHolder) WaitWaiting(t *testing.T, timeout time.Duration) { |
|||
t.Helper() |
|||
select { |
|||
case <-h.waiting: |
|||
// OK
|
|||
case <-time.After(timeout): |
|||
t.Fatal("timed out waiting for subprocess to start blocking lock attempt") |
|||
} |
|||
} |
|||
|
|||
// WaitLocked waits for the subprocess to acquire the lock and signal "LOCKED".
|
|||
func (h *lockHolder) WaitLocked(t *testing.T, timeout time.Duration) { |
|||
t.Helper() |
|||
select { |
|||
case <-h.locked: |
|||
// OK
|
|||
case <-time.After(timeout): |
|||
t.Fatal("timed out waiting for subprocess to acquire lock") |
|||
} |
|||
} |
|||
|
|||
// IsLocked checks whether the subprocess has signaled "LOCKED" within 200ms.
|
|||
func (h *lockHolder) IsLocked() bool { |
|||
select { |
|||
case <-h.waiting: |
|||
// The subprocess has reached the blocking lock attempt.
|
|||
case <-h.locked: |
|||
return true |
|||
case <-time.After(200 * time.Millisecond): |
|||
return false |
|||
} |
|||
|
|||
select { |
|||
case <-h.locked: |
|||
return true |
|||
case <-time.After(200 * time.Millisecond): |
|||
return false |
|||
} |
|||
} |
|||
|
|||
// Release signals the subprocess to exit and waits for it.
|
|||
func (h *lockHolder) Release(t *testing.T) { |
|||
t.Helper() |
|||
h.stdin.Close() |
|||
_ = h.cmd.Wait() |
|||
} |
|||
|
|||
// tryLockInSubprocess tries a non-blocking fcntl lock in a subprocess.
|
|||
// Returns nil if the lock was acquired, syscall.EAGAIN if it was denied.
|
|||
func tryLockInSubprocess(t *testing.T, path, lockType string, start, length int64) error { |
|||
t.Helper() |
|||
|
|||
cmd := exec.Command(os.Args[0], "-test.run=^TestFcntlLockHelper$") |
|||
cmd.Env = append(os.Environ(), |
|||
"LOCK_ACTION=try", |
|||
"LOCK_FILE="+path, |
|||
"LOCK_TYPE="+lockType, |
|||
fmt.Sprintf("LOCK_START=%d", start), |
|||
fmt.Sprintf("LOCK_LEN=%d", length), |
|||
) |
|||
cmd.Stderr = os.Stderr |
|||
|
|||
out, err := cmd.Output() |
|||
require.NoError(t, err, "subprocess failed to run") |
|||
|
|||
result := strings.TrimSpace(string(out)) |
|||
if result == "EAGAIN" { |
|||
return syscall.EAGAIN |
|||
} |
|||
require.Equal(t, "OK", result, "unexpected subprocess output") |
|||
return nil |
|||
} |
|||
|
|||
// getLkInSubprocess does F_GETLK in a subprocess and returns the lock type.
|
|||
func getLkInSubprocess(t *testing.T, path, lockType string, start, length int64) int16 { |
|||
t.Helper() |
|||
|
|||
cmd := exec.Command(os.Args[0], "-test.run=^TestFcntlLockHelper$") |
|||
cmd.Env = append(os.Environ(), |
|||
"LOCK_ACTION=getlk", |
|||
"LOCK_FILE="+path, |
|||
"LOCK_TYPE="+lockType, |
|||
fmt.Sprintf("LOCK_START=%d", start), |
|||
fmt.Sprintf("LOCK_LEN=%d", length), |
|||
) |
|||
cmd.Stderr = os.Stderr |
|||
|
|||
out, err := cmd.Output() |
|||
require.NoError(t, err, "subprocess failed to run") |
|||
|
|||
val, err := strconv.ParseInt(strings.TrimSpace(string(out)), 10, 16) |
|||
require.NoError(t, err, "failed to parse lock type from subprocess") |
|||
return int16(val) |
|||
} |
|||
|
|||
// --- Test runner ---
|
|||
|
|||
func TestPosixFileLocking(t *testing.T) { |
|||
framework := NewFuseTestFramework(t, DefaultTestConfig()) |
|||
defer framework.Cleanup() |
|||
|
|||
require.NoError(t, framework.Setup(DefaultTestConfig())) |
|||
|
|||
t.Run("FlockExclusive", func(t *testing.T) { |
|||
testFlockExclusive(t, framework) |
|||
}) |
|||
t.Run("FlockShared", func(t *testing.T) { |
|||
testFlockShared(t, framework) |
|||
}) |
|||
t.Run("FlockUpgrade", func(t *testing.T) { |
|||
testFlockUpgrade(t, framework) |
|||
}) |
|||
t.Run("FlockReleaseOnClose", func(t *testing.T) { |
|||
testFlockReleaseOnClose(t, framework) |
|||
}) |
|||
t.Run("FcntlWriteLockConflict", func(t *testing.T) { |
|||
testFcntlWriteLockConflict(t, framework) |
|||
}) |
|||
t.Run("FcntlReadLocksShared", func(t *testing.T) { |
|||
testFcntlReadLocksShared(t, framework) |
|||
}) |
|||
t.Run("FcntlGetLk", func(t *testing.T) { |
|||
testFcntlGetLk(t, framework) |
|||
}) |
|||
t.Run("FcntlByteRangePartial", func(t *testing.T) { |
|||
testFcntlByteRangePartial(t, framework) |
|||
}) |
|||
t.Run("FcntlSetLkwBlocks", func(t *testing.T) { |
|||
testFcntlSetLkwBlocks(t, framework) |
|||
}) |
|||
t.Run("FcntlReleaseOnClose", func(t *testing.T) { |
|||
testFcntlReleaseOnClose(t, framework) |
|||
}) |
|||
t.Run("ConcurrentLockContention", func(t *testing.T) { |
|||
testConcurrentLockContention(t, framework) |
|||
}) |
|||
} |
|||
|
|||
// --- flock() tests (same-process is fine — flock uses per-fd owners) ---
|
|||
|
|||
func testFlockExclusive(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "flock_exclusive.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("data"), 0644)) |
|||
|
|||
f1, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
f2, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f2.Close() |
|||
|
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)) |
|||
|
|||
err = syscall.Flock(int(f2.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) |
|||
assert.ErrorIs(t, err, syscall.EWOULDBLOCK, "second exclusive flock should fail") |
|||
|
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_UN)) |
|||
require.NoError(t, syscall.Flock(int(f2.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)) |
|||
require.NoError(t, syscall.Flock(int(f2.Fd()), syscall.LOCK_UN)) |
|||
} |
|||
|
|||
func testFlockShared(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "flock_shared.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("data"), 0644)) |
|||
|
|||
f1, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
f2, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f2.Close() |
|||
|
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)) |
|||
require.NoError(t, syscall.Flock(int(f2.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)) |
|||
|
|||
f3, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f3.Close() |
|||
err = syscall.Flock(int(f3.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) |
|||
assert.ErrorIs(t, err, syscall.EWOULDBLOCK) |
|||
|
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_UN)) |
|||
require.NoError(t, syscall.Flock(int(f2.Fd()), syscall.LOCK_UN)) |
|||
} |
|||
|
|||
func testFlockUpgrade(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "flock_upgrade.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("data"), 0644)) |
|||
|
|||
f1, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)) |
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)) |
|||
|
|||
f2, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f2.Close() |
|||
err = syscall.Flock(int(f2.Fd()), syscall.LOCK_SH|syscall.LOCK_NB) |
|||
assert.ErrorIs(t, err, syscall.EWOULDBLOCK) |
|||
|
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_UN)) |
|||
} |
|||
|
|||
func testFlockReleaseOnClose(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "flock_close.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("data"), 0644)) |
|||
|
|||
f1, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
|
|||
require.NoError(t, syscall.Flock(int(f1.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)) |
|||
f1.Close() |
|||
|
|||
f2, err := os.Open(path) |
|||
require.NoError(t, err) |
|||
defer f2.Close() |
|||
require.NoError(t, syscall.Flock(int(f2.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)) |
|||
require.NoError(t, syscall.Flock(int(f2.Fd()), syscall.LOCK_UN)) |
|||
} |
|||
|
|||
// --- fcntl() POSIX lock tests (use subprocesses for inter-process semantics) ---
|
|||
|
|||
// testFcntlWriteLockConflict: test process holds a write lock, subprocess
|
|||
// contender is denied, then succeeds after release.
|
|||
func testFcntlWriteLockConflict(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "fcntl_write.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("0123456789"), 0644)) |
|||
|
|||
f1, err := os.OpenFile(path, os.O_RDWR, 0) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
// Test process locks [0, 10).
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_WRLCK, 0, 10)) |
|||
|
|||
// Subprocess: overlapping write lock should fail with EAGAIN.
|
|||
err = tryLockInSubprocess(t, path, "F_WRLCK", 5, 10) |
|||
assert.ErrorIs(t, err, syscall.EAGAIN, "overlapping write lock should fail with EAGAIN") |
|||
|
|||
// Unlock, then subprocess should succeed.
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_UNLCK, 0, 10)) |
|||
|
|||
err = tryLockInSubprocess(t, path, "F_WRLCK", 5, 10) |
|||
assert.NoError(t, err, "lock should succeed after release") |
|||
} |
|||
|
|||
// testFcntlReadLocksShared: test process holds a read lock, subprocess read
|
|||
// lock succeeds (shared), subprocess write lock is denied.
|
|||
func testFcntlReadLocksShared(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "fcntl_shared.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("0123456789"), 0644)) |
|||
|
|||
f1, err := os.OpenFile(path, os.O_RDWR, 0) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
// Test process holds a read lock.
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_RDLCK, 0, 10)) |
|||
|
|||
// Subprocess: read lock should succeed (shared).
|
|||
err = tryLockInSubprocess(t, path, "F_RDLCK", 0, 10) |
|||
assert.NoError(t, err, "shared read lock should succeed") |
|||
|
|||
// Subprocess: write lock should fail.
|
|||
err = tryLockInSubprocess(t, path, "F_WRLCK", 0, 10) |
|||
assert.ErrorIs(t, err, syscall.EAGAIN, "write lock should fail with EAGAIN while read lock is held") |
|||
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_UNLCK, 0, 10)) |
|||
} |
|||
|
|||
// testFcntlGetLk: test process holds a write lock, subprocess queries via
|
|||
// F_GETLK and sees the conflict.
|
|||
func testFcntlGetLk(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "fcntl_getlk.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("0123456789"), 0644)) |
|||
|
|||
f1, err := os.OpenFile(path, os.O_RDWR, 0) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
// Test process takes a write lock on [0, 5).
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_WRLCK, 0, 5)) |
|||
|
|||
// Subprocess: F_GETLK on overlapping range should report conflict.
|
|||
typ := getLkInSubprocess(t, path, "F_WRLCK", 0, 10) |
|||
assert.NotEqual(t, int16(syscall.F_UNLCK), typ, "should report a conflict") |
|||
|
|||
// Subprocess: F_GETLK on non-overlapping range should report no conflict.
|
|||
typ = getLkInSubprocess(t, path, "F_WRLCK", 5, 5) |
|||
assert.Equal(t, int16(syscall.F_UNLCK), typ, "no conflict expected on non-overlapping range") |
|||
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_UNLCK, 0, 5)) |
|||
} |
|||
|
|||
// testFcntlByteRangePartial: test process and subprocess lock non-overlapping
|
|||
// ranges independently; extending into the other's range is denied.
|
|||
func testFcntlByteRangePartial(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "fcntl_range.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("0123456789ABCDEF"), 0644)) |
|||
|
|||
f1, err := os.OpenFile(path, os.O_RDWR, 0) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
// Test process locks [0, 8).
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_WRLCK, 0, 8)) |
|||
|
|||
// Subprocess locks [8, 16) — non-overlapping, should succeed.
|
|||
holder := startLockHolder(t, path, "F_WRLCK", 8, 8) |
|||
holder.WaitLocked(t, 5*time.Second) |
|||
|
|||
// Subprocess: extending into test process's range should fail.
|
|||
err = tryLockInSubprocess(t, path, "F_WRLCK", 4, 8) |
|||
assert.ErrorIs(t, err, syscall.EAGAIN, "extending into another process's locked range should fail with EAGAIN") |
|||
|
|||
holder.Release(t) |
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_UNLCK, 0, 8)) |
|||
} |
|||
|
|||
// testFcntlSetLkwBlocks: test process holds exclusive lock, subprocess blocks
|
|||
// on F_SETLKW, then unblocks when lock is released.
|
|||
func testFcntlSetLkwBlocks(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "fcntl_setlkw.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("data"), 0644)) |
|||
|
|||
f1, err := os.OpenFile(path, os.O_RDWR, 0) |
|||
require.NoError(t, err) |
|||
defer f1.Close() |
|||
|
|||
// Test process takes exclusive lock.
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_WRLCK, 0, 0)) |
|||
|
|||
// Launch subprocess that will block on F_SETLKW.
|
|||
blocker := startBlockingLockHolder(t, path, "F_WRLCK", 0, 0) |
|||
|
|||
// Verify it has reached the blocking lock attempt but has not acquired it yet.
|
|||
blocker.WaitWaiting(t, 5*time.Second) |
|||
assert.False(t, blocker.IsLocked(), "F_SETLKW should be blocking") |
|||
|
|||
// Release our lock to unblock the subprocess.
|
|||
require.NoError(t, fcntlSetLk(f1, syscall.F_UNLCK, 0, 0)) |
|||
|
|||
// Subprocess should now acquire and signal "LOCKED".
|
|||
blocker.WaitLocked(t, 5*time.Second) |
|||
blocker.Release(t) |
|||
} |
|||
|
|||
// testFcntlReleaseOnClose: subprocess holds a lock then exits (fd closes),
|
|||
// test process verifies it can now acquire the lock.
|
|||
func testFcntlReleaseOnClose(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "fcntl_close.txt") |
|||
require.NoError(t, os.WriteFile(path, []byte("data"), 0644)) |
|||
|
|||
// Subprocess acquires exclusive lock and holds it.
|
|||
holder := startLockHolder(t, path, "F_WRLCK", 0, 0) |
|||
holder.WaitLocked(t, 5*time.Second) |
|||
|
|||
// Another subprocess trying to lock should be denied while holder is active.
|
|||
err := tryLockInSubprocess(t, path, "F_WRLCK", 0, 0) |
|||
assert.ErrorIs(t, err, syscall.EAGAIN, "lock should be held by subprocess") |
|||
|
|||
// Release subprocess → fd closes → lock released.
|
|||
// The FUSE server may not process the Release immediately, so retry.
|
|||
holder.Release(t) |
|||
|
|||
f2, err := os.OpenFile(path, os.O_RDWR, 0) |
|||
require.NoError(t, err) |
|||
defer f2.Close() |
|||
var lockErr error |
|||
for attempt := 0; attempt < 50; attempt++ { |
|||
lockErr = fcntlSetLk(f2, syscall.F_WRLCK, 0, 0) |
|||
if lockErr == nil { |
|||
break |
|||
} |
|||
time.Sleep(50 * time.Millisecond) |
|||
} |
|||
require.NoError(t, lockErr, "lock should succeed after subprocess exits") |
|||
require.NoError(t, fcntlSetLk(f2, syscall.F_UNLCK, 0, 0)) |
|||
} |
|||
|
|||
// --- Concurrency test (uses flock which has per-fd owners) ---
|
|||
|
|||
// testConcurrentLockContention has multiple goroutines competing for an
|
|||
// exclusive flock, each appending to the file while holding it.
|
|||
// Uses non-blocking flock with retry to avoid exhausting the go-fuse server's
|
|||
// reader goroutines (blocking FUSE SETLKW can starve unlock processing).
|
|||
func testConcurrentLockContention(t *testing.T, fw *FuseTestFramework) { |
|||
path := filepath.Join(fw.GetMountPoint(), "lock_contention.txt") |
|||
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) |
|||
require.NoError(t, err) |
|||
require.NoError(t, f.Close()) |
|||
|
|||
numWorkers := 8 |
|||
writesPerWorker := 5 |
|||
var wg sync.WaitGroup |
|||
var mu sync.Mutex |
|||
var errs []error |
|||
var activeHolders atomic.Int32 |
|||
var nextSlot atomic.Int64 |
|||
const recordFormat = "worker %02d write %02d\n" |
|||
recordSize := len(fmt.Sprintf(recordFormat, 0, 0)) |
|||
|
|||
addError := func(err error) { |
|||
mu.Lock() |
|||
defer mu.Unlock() |
|||
errs = append(errs, err) |
|||
} |
|||
|
|||
openWithRetry := func(flags int) (*os.File, error) { |
|||
var openErr error |
|||
for attempt := 0; attempt < 50; attempt++ { |
|||
file, err := os.OpenFile(path, flags, 0) |
|||
if err == nil { |
|||
return file, nil |
|||
} |
|||
openErr = err |
|||
if !errors.Is(err, os.ErrNotExist) && !errors.Is(err, syscall.ENOENT) { |
|||
return nil, err |
|||
} |
|||
time.Sleep(10 * time.Millisecond) |
|||
} |
|||
return nil, openErr |
|||
} |
|||
|
|||
for i := 0; i < numWorkers; i++ { |
|||
wg.Add(1) |
|||
go func(id int) { |
|||
defer wg.Done() |
|||
f, err := openWithRetry(os.O_RDWR) |
|||
if err != nil { |
|||
addError(fmt.Errorf("worker %d open: %v", id, err)) |
|||
return |
|||
} |
|||
defer f.Close() |
|||
|
|||
for j := 0; j < writesPerWorker; j++ { |
|||
// Non-blocking flock with retry to avoid FUSE server thread starvation.
|
|||
locked := false |
|||
for attempt := 0; attempt < 3000; attempt++ { |
|||
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err == nil { |
|||
locked = true |
|||
break |
|||
} |
|||
time.Sleep(10 * time.Millisecond) |
|||
} |
|||
if !locked { |
|||
addError(fmt.Errorf("worker %d: failed to acquire flock after retries", id)) |
|||
return |
|||
} |
|||
|
|||
if holders := activeHolders.Add(1); holders != 1 { |
|||
activeHolders.Add(-1) |
|||
syscall.Flock(int(f.Fd()), syscall.LOCK_UN) |
|||
addError(fmt.Errorf("worker %d: flock overlap detected with %d holders", id, holders)) |
|||
return |
|||
} |
|||
time.Sleep(5 * time.Millisecond) |
|||
|
|||
msg := fmt.Sprintf(recordFormat, id, j) |
|||
if len(msg) != recordSize { |
|||
activeHolders.Add(-1) |
|||
syscall.Flock(int(f.Fd()), syscall.LOCK_UN) |
|||
addError(fmt.Errorf("worker %d: unexpected record size %d", id, len(msg))) |
|||
return |
|||
} |
|||
|
|||
slot := nextSlot.Add(1) - 1 |
|||
offset := slot * int64(recordSize) |
|||
n, err := f.WriteAt([]byte(msg), offset) |
|||
if err != nil { |
|||
activeHolders.Add(-1) |
|||
syscall.Flock(int(f.Fd()), syscall.LOCK_UN) |
|||
addError(fmt.Errorf("worker %d write: %v", id, err)) |
|||
return |
|||
} |
|||
if n != len(msg) { |
|||
activeHolders.Add(-1) |
|||
syscall.Flock(int(f.Fd()), syscall.LOCK_UN) |
|||
addError(fmt.Errorf("worker %d short write: wrote %d of %d bytes", id, n, len(msg))) |
|||
return |
|||
} |
|||
|
|||
activeHolders.Add(-1) |
|||
syscall.Flock(int(f.Fd()), syscall.LOCK_UN) |
|||
} |
|||
}(i) |
|||
} |
|||
|
|||
wg.Wait() |
|||
require.Empty(t, errs, "concurrent lock contention errors: %v", errs) |
|||
|
|||
expectedLines := numWorkers * writesPerWorker |
|||
expectedBytes := expectedLines * recordSize |
|||
var data []byte |
|||
require.Eventually(t, func() bool { |
|||
verify, err := openWithRetry(os.O_RDONLY) |
|||
if err != nil { |
|||
return false |
|||
} |
|||
defer verify.Close() |
|||
|
|||
data, err = io.ReadAll(verify) |
|||
if err != nil { |
|||
return false |
|||
} |
|||
return len(data) == expectedBytes |
|||
}, 5*time.Second, 50*time.Millisecond, "file should eventually contain exactly %d records from all workers", expectedLines) |
|||
actualLines := bytes.Count(data, []byte("\n")) |
|||
assert.Equal(t, expectedLines, actualLines, |
|||
"file should contain exactly %d lines from all workers", expectedLines) |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
package mount |
|||
|
|||
import "time" |
|||
|
|||
const metadataFlushRetries = 3 |
|||
|
|||
var metadataFlushSleep = time.Sleep |
|||
|
|||
func retryMetadataFlush(flush func() error, onRetry func(nextAttempt, totalAttempts int, backoff time.Duration, err error)) error { |
|||
totalAttempts := metadataFlushRetries + 1 |
|||
var err error |
|||
for attempt := 1; attempt <= totalAttempts; attempt++ { |
|||
err = flush() |
|||
if err == nil { |
|||
return nil |
|||
} |
|||
if attempt == totalAttempts { |
|||
return err |
|||
} |
|||
|
|||
backoff := time.Duration(1<<uint(attempt-1)) * time.Second |
|||
if onRetry != nil { |
|||
onRetry(attempt+1, totalAttempts, backoff, err) |
|||
} |
|||
metadataFlushSleep(backoff) |
|||
} |
|||
return err |
|||
} |
|||
@ -0,0 +1,81 @@ |
|||
package mount |
|||
|
|||
import ( |
|||
"errors" |
|||
"testing" |
|||
"time" |
|||
) |
|||
|
|||
func TestRetryMetadataFlushEventuallySucceeds(t *testing.T) { |
|||
originalSleep := metadataFlushSleep |
|||
t.Cleanup(func() { |
|||
metadataFlushSleep = originalSleep |
|||
}) |
|||
|
|||
var sleeps []time.Duration |
|||
metadataFlushSleep = func(d time.Duration) { |
|||
sleeps = append(sleeps, d) |
|||
} |
|||
|
|||
attempts := 0 |
|||
err := retryMetadataFlush(func() error { |
|||
attempts++ |
|||
if attempts < 3 { |
|||
return errors.New("temporary failure") |
|||
} |
|||
return nil |
|||
}, nil) |
|||
if err != nil { |
|||
t.Fatalf("retryMetadataFlush returned error: %v", err) |
|||
} |
|||
|
|||
if attempts != 3 { |
|||
t.Fatalf("attempts = %d, want 3", attempts) |
|||
} |
|||
|
|||
wantSleeps := []time.Duration{time.Second, 2 * time.Second} |
|||
if len(sleeps) != len(wantSleeps) { |
|||
t.Fatalf("sleep count = %d, want %d", len(sleeps), len(wantSleeps)) |
|||
} |
|||
for i, want := range wantSleeps { |
|||
if sleeps[i] != want { |
|||
t.Fatalf("sleep[%d] = %v, want %v", i, sleeps[i], want) |
|||
} |
|||
} |
|||
} |
|||
|
|||
func TestRetryMetadataFlushReturnsLastError(t *testing.T) { |
|||
originalSleep := metadataFlushSleep |
|||
t.Cleanup(func() { |
|||
metadataFlushSleep = originalSleep |
|||
}) |
|||
|
|||
var sleeps []time.Duration |
|||
metadataFlushSleep = func(d time.Duration) { |
|||
sleeps = append(sleeps, d) |
|||
} |
|||
|
|||
expectedErr := errors.New("permanent failure") |
|||
attempts := 0 |
|||
err := retryMetadataFlush(func() error { |
|||
attempts++ |
|||
return expectedErr |
|||
}, nil) |
|||
if !errors.Is(err, expectedErr) { |
|||
t.Fatalf("retryMetadataFlush error = %v, want %v", err, expectedErr) |
|||
} |
|||
|
|||
if attempts != metadataFlushRetries+1 { |
|||
t.Fatalf("attempts = %d, want %d", attempts, metadataFlushRetries+1) |
|||
} |
|||
|
|||
wantSleeps := []time.Duration{time.Second, 2 * time.Second, 4 * time.Second} |
|||
if len(sleeps) != len(wantSleeps) { |
|||
t.Fatalf("sleep count = %d, want %d", len(sleeps), len(wantSleeps)) |
|||
} |
|||
for i, want := range wantSleeps { |
|||
if sleeps[i] != want { |
|||
t.Fatalf("sleep[%d] = %v, want %v", i, sleeps[i], want) |
|||
} |
|||
} |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue