diff --git a/.github/workflows/fuse-integration.yml b/.github/workflows/fuse-integration.yml index e62f1c597..7300efe5e 100644 --- a/.github/workflows/fuse-integration.yml +++ b/.github/workflows/fuse-integration.yml @@ -21,213 +21,60 @@ concurrency: permissions: contents: read -env: - TEST_TIMEOUT: '45m' - jobs: fuse-integration: name: FUSE Integration Testing runs-on: ubuntu-22.04 timeout-minutes: 50 - + steps: - name: Checkout code uses: actions/checkout@v6 - + - name: Set up Go uses: actions/setup-go@v6 with: go-version-file: 'go.mod' - + - name: Install FUSE and dependencies run: | sudo apt-get update - sudo apt-get install -y fuse libfuse-dev + # fuse3 is pre-installed on ubuntu-22.04 runners and conflicts + # with the legacy fuse package, so only install the dev headers. + sudo apt-get install -y libfuse3-dev + # Allow non-root FUSE mounts with allow_other + echo 'user_allow_other' | sudo tee -a /etc/fuse.conf + sudo chmod 644 /etc/fuse.conf # Verify FUSE installation - fusermount --version || true - ls -la /dev/fuse || true - + fusermount3 --version || fusermount --version || true + ls -la /dev/fuse + - name: Build SeaweedFS run: | cd weed - go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v . + go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -o weed . chmod +x weed - # Verify binary ./weed version - - - name: Prepare FUSE Integration Tests + # Make weed binary available in PATH for the test framework + sudo cp weed /usr/local/bin/weed + + - name: Install test dependencies run: | - # Create isolated test directory to avoid Go module conflicts - mkdir -p /tmp/seaweedfs-fuse-tests - - # Copy only the working test files to avoid Go module conflicts - # These are the files we've verified work without package name issues - cp test/fuse_integration/simple_test.go /tmp/seaweedfs-fuse-tests/ 2>/dev/null || echo "โš ๏ธ simple_test.go not found" - cp test/fuse_integration/working_demo_test.go /tmp/seaweedfs-fuse-tests/ 2>/dev/null || echo "โš ๏ธ working_demo_test.go not found" - - # Note: Other test files (framework.go, basic_operations_test.go, etc.) - # have Go module conflicts and are skipped until resolved - - echo "๐Ÿ“ Working test files copied:" - ls -la /tmp/seaweedfs-fuse-tests/*.go 2>/dev/null || echo "โ„น๏ธ No test files found" - - # Initialize Go module in isolated directory - cd /tmp/seaweedfs-fuse-tests - go mod init seaweedfs-fuse-tests - go mod tidy - - # Verify setup - echo "โœ… FUSE integration test environment prepared" - ls -la /tmp/seaweedfs-fuse-tests/ - - echo "" - echo "โ„น๏ธ Current Status: Running working subset of FUSE tests" - echo " โ€ข simple_test.go: Package structure verification" - echo " โ€ข working_demo_test.go: Framework capability demonstration" - echo " โ€ข Full framework: Available in test/fuse_integration/ (module conflicts pending resolution)" - + cd test/fuse_integration + go mod download + - name: Run FUSE Integration Tests run: | - cd /tmp/seaweedfs-fuse-tests - - echo "๐Ÿงช Running FUSE integration tests..." - echo "============================================" - - # Run available working test files - TESTS_RUN=0 - - if [ -f "simple_test.go" ]; then - echo "๐Ÿ“‹ Running simple_test.go..." - go test -v -timeout=${{ env.TEST_TIMEOUT }} simple_test.go - TESTS_RUN=$((TESTS_RUN + 1)) - fi - - if [ -f "working_demo_test.go" ]; then - echo "๐Ÿ“‹ Running working_demo_test.go..." - go test -v -timeout=${{ env.TEST_TIMEOUT }} working_demo_test.go - TESTS_RUN=$((TESTS_RUN + 1)) - fi - - # Run combined test if multiple files exist - if [ -f "simple_test.go" ] && [ -f "working_demo_test.go" ]; then - echo "๐Ÿ“‹ Running combined tests..." - go test -v -timeout=${{ env.TEST_TIMEOUT }} simple_test.go working_demo_test.go - fi - - if [ $TESTS_RUN -eq 0 ]; then - echo "โš ๏ธ No working test files found, running module verification only" - go version - go mod verify - else - echo "โœ… Successfully ran $TESTS_RUN test file(s)" - fi - - echo "============================================" - echo "โœ… FUSE integration tests completed" - - - name: Run Extended Framework Validation - run: | - cd /tmp/seaweedfs-fuse-tests - - echo "๐Ÿ” Running extended framework validation..." - echo "============================================" - - # Test individual components (only run tests that exist) - if [ -f "simple_test.go" ]; then - echo "Testing simple verification..." - go test -v simple_test.go - fi - - if [ -f "working_demo_test.go" ]; then - echo "Testing framework demo..." - go test -v working_demo_test.go - fi - - # Test combined execution if both files exist - if [ -f "simple_test.go" ] && [ -f "working_demo_test.go" ]; then - echo "Testing combined execution..." - go test -v simple_test.go working_demo_test.go - elif [ -f "simple_test.go" ] || [ -f "working_demo_test.go" ]; then - echo "โœ… Individual tests already validated above" - else - echo "โš ๏ธ No working test files found for combined testing" - fi - - echo "============================================" - echo "โœ… Extended validation completed" - - - name: Generate Test Coverage Report - run: | - cd /tmp/seaweedfs-fuse-tests - - echo "๐Ÿ“Š Generating test coverage report..." - go test -v -coverprofile=coverage.out . - go tool cover -html=coverage.out -o coverage.html - - echo "Coverage report generated: coverage.html" - - - name: Verify SeaweedFS Binary Integration - run: | - # Test that SeaweedFS binary is accessible from test environment - WEED_BINARY=$(pwd)/weed/weed - - if [ -f "$WEED_BINARY" ]; then - echo "โœ… SeaweedFS binary found at: $WEED_BINARY" - $WEED_BINARY version - echo "Binary is ready for full integration testing" - else - echo "โŒ SeaweedFS binary not found" - exit 1 - fi - - - name: Upload Test Artifacts + set -o pipefail + cd test/fuse_integration + echo "Running full FUSE integration test suite..." + go test -v -count=1 -timeout=45m ./... 2>&1 | tee /tmp/fuse-test-output.log + + - name: Upload Test Logs if: always() uses: actions/upload-artifact@v7 with: name: fuse-integration-test-results path: | - /tmp/seaweedfs-fuse-tests/coverage.out - /tmp/seaweedfs-fuse-tests/coverage.html - /tmp/seaweedfs-fuse-tests/*.log + /tmp/fuse-test-output.log retention-days: 7 - - - name: Test Summary - if: always() - run: | - echo "## ๐Ÿš€ FUSE Integration Test Summary" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Framework Status" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **Framework Design**: Complete and validated" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **Working Tests**: Core framework demonstration functional" >> $GITHUB_STEP_SUMMARY - echo "- โš ๏ธ **Full Framework**: Available but requires Go module resolution" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **CI/CD Integration**: Automated testing pipeline established" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Test Capabilities" >> $GITHUB_STEP_SUMMARY - echo "- ๐Ÿ“ **File Operations**: Create, read, write, delete, permissions" >> $GITHUB_STEP_SUMMARY - echo "- ๐Ÿ“‚ **Directory Operations**: Create, list, delete, nested structures" >> $GITHUB_STEP_SUMMARY - echo "- ๐Ÿ“Š **Large Files**: Multi-megabyte file handling" >> $GITHUB_STEP_SUMMARY - echo "- ๐Ÿ”„ **Concurrent Operations**: Multi-threaded stress testing" >> $GITHUB_STEP_SUMMARY - echo "- โš ๏ธ **Error Scenarios**: Comprehensive error handling validation" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Comparison with Current Tests" >> $GITHUB_STEP_SUMMARY - echo "| Aspect | Current (FIO) | This Framework |" >> $GITHUB_STEP_SUMMARY - echo "|--------|---------------|----------------|" >> $GITHUB_STEP_SUMMARY - echo "| **Scope** | Performance only | Functional + Performance |" >> $GITHUB_STEP_SUMMARY - echo "| **Operations** | Read/Write only | All FUSE operations |" >> $GITHUB_STEP_SUMMARY - echo "| **Concurrency** | Single-threaded | Multi-threaded stress tests |" >> $GITHUB_STEP_SUMMARY - echo "| **Automation** | Manual setup | Fully automated |" >> $GITHUB_STEP_SUMMARY - echo "| **Validation** | Speed metrics | Correctness + Performance |" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Current Working Tests" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **Framework Structure**: Package and module verification" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **Configuration Management**: Test config validation" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **File Operations Demo**: Basic file create/read/write simulation" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **Large File Handling**: 1MB+ file processing demonstration" >> $GITHUB_STEP_SUMMARY - echo "- โœ… **Concurrency Simulation**: Multi-file operation testing" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Next Steps" >> $GITHUB_STEP_SUMMARY - echo "1. **Module Resolution**: Fix Go package conflicts for full framework" >> $GITHUB_STEP_SUMMARY - echo "2. **SeaweedFS Integration**: Connect with real cluster for end-to-end testing" >> $GITHUB_STEP_SUMMARY - echo "3. **Performance Benchmarks**: Add performance regression testing" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "๐Ÿ“ˆ **Total Framework Size**: ~1,500 lines of comprehensive testing infrastructure" >> $GITHUB_STEP_SUMMARY \ No newline at end of file diff --git a/test/fuse_integration/concurrent_operations_test.go b/test/fuse_integration/concurrent_operations_test.go index 7a5cdd0d3..b40d3fdc5 100644 --- a/test/fuse_integration/concurrent_operations_test.go +++ b/test/fuse_integration/concurrent_operations_test.go @@ -386,18 +386,20 @@ func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) { // Perform many small writes numWrites := 1000 writeSize := 100 + totalSize := int64(0) for i := 0; i < numWrites; i++ { data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20))) _, err := file.Write(data) require.NoError(t, err) + totalSize += int64(len(data)) } file.Close() // Verify file size info, err := os.Stat(mountPath) require.NoError(t, err) - assert.Equal(t, totalSize, info.Size()) + assert.Equal(t, totalSize, info.Size(), "file size should match total bytes written") } // testManySmallFiles tests creating many small files diff --git a/test/fuse_integration/framework.go b/test/fuse_integration/framework_test.go similarity index 70% rename from test/fuse_integration/framework.go rename to test/fuse_integration/framework_test.go index 25fa2beb8..4e781034c 100644 --- a/test/fuse_integration/framework.go +++ b/test/fuse_integration/framework_test.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "syscall" "testing" "time" @@ -20,6 +21,7 @@ type FuseTestFramework struct { tempDir string mountPoint string dataDir string + logDir string masterProcess *os.Process volumeProcess *os.Process filerProcess *os.Process @@ -27,6 +29,9 @@ type FuseTestFramework struct { masterAddr string volumeAddr string filerAddr string + masterPort int + volumePort int + filerPort int weedBinary string isSetup bool } @@ -57,7 +62,9 @@ func DefaultTestConfig() *TestConfig { } } -// NewFuseTestFramework creates a new FUSE testing framework +// NewFuseTestFramework creates a new FUSE testing framework. +// Each instance allocates its own free ports so multiple tests can run +// sequentially without port conflicts from slow cleanup. func NewFuseTestFramework(t *testing.T, config *TestConfig) *FuseTestFramework { if config == nil { config = DefaultTestConfig() @@ -66,27 +73,50 @@ func NewFuseTestFramework(t *testing.T, config *TestConfig) *FuseTestFramework { tempDir, err := os.MkdirTemp("", "seaweedfs_fuse_test_") require.NoError(t, err) + masterPort := freePort(t) + volumePort := freePort(t) + filerPort := freePort(t) + return &FuseTestFramework{ t: t, tempDir: tempDir, mountPoint: filepath.Join(tempDir, "mount"), dataDir: filepath.Join(tempDir, "data"), - masterAddr: "127.0.0.1:19333", - volumeAddr: "127.0.0.1:18080", - filerAddr: "127.0.0.1:18888", + logDir: filepath.Join(tempDir, "logs"), + masterPort: masterPort, + volumePort: volumePort, + filerPort: filerPort, + masterAddr: fmt.Sprintf("127.0.0.1:%d", masterPort), + volumeAddr: fmt.Sprintf("127.0.0.1:%d", volumePort), + filerAddr: fmt.Sprintf("127.0.0.1:%d", filerPort), weedBinary: findWeedBinary(), isSetup: false, } } +// freePort asks the OS for a free TCP port. +func freePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port +} + // Setup starts SeaweedFS cluster and mounts FUSE filesystem func (f *FuseTestFramework) Setup(config *TestConfig) error { if f.isSetup { return fmt.Errorf("framework already setup") } - // Create directories - dirs := []string{f.mountPoint, f.dataDir} + // Create all required directories upfront + dirs := []string{ + f.mountPoint, + f.logDir, + filepath.Join(f.dataDir, "master"), + filepath.Join(f.dataDir, "volume"), + } for _, dir := range dirs { if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("failed to create directory %s: %v", dir, err) @@ -100,6 +130,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for master to be ready if err := f.waitForService(f.masterAddr, 30*time.Second); err != nil { + f.dumpLog("master") return fmt.Errorf("master not ready: %v", err) } @@ -110,6 +141,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for volume server to be ready if err := f.waitForService(f.volumeAddr, 30*time.Second); err != nil { + f.dumpLog("volume") return fmt.Errorf("volume server not ready: %v", err) } @@ -120,6 +152,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for filer to be ready if err := f.waitForService(f.filerAddr, 30*time.Second); err != nil { + f.dumpLog("filer") return fmt.Errorf("filer not ready: %v", err) } @@ -130,6 +163,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for mount to be ready if err := f.waitForMount(30 * time.Second); err != nil { + f.dumpLog("mount") return fmt.Errorf("FUSE mount not ready: %v", err) } @@ -168,26 +202,61 @@ func (f *FuseTestFramework) GetFilerAddr() string { return f.filerAddr } +// startProcess is a helper that starts a weed sub-command with output captured +// to a log file in f.logDir. +func (f *FuseTestFramework) startProcess(name string, args []string) (*os.Process, error) { + logFile, err := os.Create(filepath.Join(f.logDir, name+".log")) + if err != nil { + return nil, fmt.Errorf("create log file: %v", err) + } + cmd := exec.Command(f.weedBinary, args...) + cmd.Dir = f.tempDir + cmd.Stdout = logFile + cmd.Stderr = logFile + if err := cmd.Start(); err != nil { + logFile.Close() + return nil, err + } + // Close the file handle โ€” the child process inherited it. + logFile.Close() + return cmd.Process, nil +} + +// dumpLog prints the last lines of a process log file to the test output +// for debugging when a service fails to start. +func (f *FuseTestFramework) dumpLog(name string) { + data, err := os.ReadFile(filepath.Join(f.logDir, name+".log")) + if err != nil { + f.t.Logf("[%s log] (not available: %v)", name, err) + return + } + // Truncate to last 2KB to keep output manageable + if len(data) > 2048 { + data = data[len(data)-2048:] + } + f.t.Logf("[%s log tail]\n%s", name, string(data)) +} + // startMaster starts the SeaweedFS master server func (f *FuseTestFramework) startMaster(config *TestConfig) error { + // Do NOT set -port.grpc explicitly. SeaweedFS convention is gRPC = HTTP + 10000. + // Volume/filer discover the master gRPC port by this convention, so overriding + // it breaks inter-service communication. args := []string{ "master", "-ip=127.0.0.1", - "-port=19333", + "-port=" + strconv.Itoa(f.masterPort), "-mdir=" + filepath.Join(f.dataDir, "master"), - "-raftBootstrap", - "-peers=none", // Faster startup when no multiple masters needed } if config.EnableDebug { args = append(args, "-v=4") } - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("master", args) + if err != nil { return err } - f.masterProcess = cmd.Process + f.masterProcess = proc return nil } @@ -195,9 +264,9 @@ func (f *FuseTestFramework) startMaster(config *TestConfig) error { func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error { args := []string{ "volume", - "-master=" + f.masterAddr, + "-master=127.0.0.1:" + strconv.Itoa(f.masterPort), "-ip=127.0.0.1", - "-port=18080", + "-port=" + strconv.Itoa(f.volumePort), "-dir=" + filepath.Join(f.dataDir, "volume"), fmt.Sprintf("-max=%d", config.NumVolumes), } @@ -205,12 +274,11 @@ func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error { args = append(args, "-v=4") } - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("volume", args) + if err != nil { return err } - f.volumeProcess = cmd.Process + f.volumeProcess = proc return nil } @@ -218,20 +286,19 @@ func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error { func (f *FuseTestFramework) startFiler(config *TestConfig) error { args := []string{ "filer", - "-master=" + f.masterAddr, + "-master=127.0.0.1:" + strconv.Itoa(f.masterPort), "-ip=127.0.0.1", - "-port=18888", + "-port=" + strconv.Itoa(f.filerPort), } if config.EnableDebug { args = append(args, "-v=4") } - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("filer", args) + if err != nil { return err } - f.filerProcess = cmd.Process + f.filerProcess = proc return nil } @@ -239,10 +306,11 @@ func (f *FuseTestFramework) startFiler(config *TestConfig) error { func (f *FuseTestFramework) mountFuse(config *TestConfig) error { args := []string{ "mount", - "-filer=" + f.filerAddr, + "-filer=127.0.0.1:" + strconv.Itoa(f.filerPort), "-dir=" + f.mountPoint, "-filer.path=/", "-dirAutoCreate", + "-allowOthers=false", } if config.Collection != "" { @@ -255,7 +323,7 @@ func (f *FuseTestFramework) mountFuse(config *TestConfig) error { args = append(args, fmt.Sprintf("-chunkSizeLimitMB=%d", config.ChunkSizeMB)) } if config.CacheSizeMB > 0 { - args = append(args, fmt.Sprintf("-cacheSizeMB=%d", config.CacheSizeMB)) + args = append(args, fmt.Sprintf("-cacheCapacityMB=%d", config.CacheSizeMB)) } if config.EnableDebug { args = append(args, "-v=4") @@ -263,12 +331,11 @@ func (f *FuseTestFramework) mountFuse(config *TestConfig) error { args = append(args, config.MountOptions...) - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("mount", args) + if err != nil { return err } - f.mountProcess = cmd.Process + f.mountProcess = proc return nil } @@ -281,7 +348,8 @@ func (f *FuseTestFramework) unmountFuse() error { } // Also try system unmount as backup - exec.Command("umount", f.mountPoint).Run() + exec.Command("fusermount3", "-u", f.mountPoint).Run() + exec.Command("fusermount", "-u", f.mountPoint).Run() return nil } @@ -315,27 +383,31 @@ func (f *FuseTestFramework) waitForMount(timeout time.Duration) error { return fmt.Errorf("mount point not ready within timeout") } -// findWeedBinary locates the weed binary +// findWeedBinary locates the weed binary. +// Checks PATH first (most reliable in CI where the binary is installed to +// /usr/local/bin), then falls back to relative paths. Each candidate is +// verified to be a regular file so that a source directory named "weed" +// is never mistaken for the binary. func findWeedBinary() string { - // Try different possible locations + // PATH lookup first โ€” works in CI and when weed is installed globally. + if p, err := exec.LookPath("weed"); err == nil { + return p + } + + // Relative paths for local development (run from test/fuse_integration/). candidates := []string{ + "../../weed/weed", // built in-tree: weed/weed "./weed", "../weed", - "../../weed", - "weed", // in PATH } - for _, candidate := range candidates { - if _, err := exec.LookPath(candidate); err == nil { - return candidate - } - if _, err := os.Stat(candidate); err == nil { + if info, err := os.Stat(candidate); err == nil && !info.IsDir() { abs, _ := filepath.Abs(candidate) return abs } } - // Default fallback + // Default fallback โ€” will fail with a clear "not found" at exec time. return "weed" } diff --git a/test/fuse_integration/writeback_cache_test.go b/test/fuse_integration/writeback_cache_test.go new file mode 100644 index 000000000..09075f0ef --- /dev/null +++ b/test/fuse_integration/writeback_cache_test.go @@ -0,0 +1,825 @@ +package fuse_test + +import ( + "bytes" + "crypto/rand" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// writebackConfig returns a TestConfig with writebackCache enabled +func writebackConfig() *TestConfig { + return &TestConfig{ + Collection: "", + Replication: "000", + ChunkSizeMB: 2, + CacheSizeMB: 100, + NumVolumes: 3, + EnableDebug: false, + MountOptions: []string{ + "-writebackCache", + }, + SkipCleanup: false, + } +} + +// waitForFileContent polls until a file has the expected content or timeout expires. +// This is needed because writebackCache defers data upload to background goroutines, +// so there is a brief window after close() where the file may not yet be readable. +func waitForFileContent(t *testing.T, path string, expected []byte, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + actual, err := os.ReadFile(path) + if err == nil && bytes.Equal(expected, actual) { + return + } + if err != nil { + lastErr = err + } else { + lastErr = fmt.Errorf("content mismatch: got %d bytes, want %d bytes", len(actual), len(expected)) + } + time.Sleep(200 * time.Millisecond) + } + t.Fatalf("file %s did not have expected content within %v: %v", path, timeout, lastErr) +} + +// waitForFileSize polls until a file reports the expected size or timeout expires. +func waitForFileSize(t *testing.T, path string, expectedSize int64, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + info, err := os.Stat(path) + if err == nil && info.Size() == expectedSize { + return + } + time.Sleep(200 * time.Millisecond) + } + t.Fatalf("file %s did not reach expected size %d within %v", path, expectedSize, timeout) +} + +// TestWritebackCacheBasicOperations tests fundamental file I/O with writebackCache enabled +func TestWritebackCacheBasicOperations(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("WriteAndReadBack", func(t *testing.T) { + testWritebackWriteAndReadBack(t, framework) + }) + + t.Run("MultipleFilesSequential", func(t *testing.T) { + testWritebackMultipleFilesSequential(t, framework) + }) + + t.Run("LargeFile", func(t *testing.T) { + testWritebackLargeFile(t, framework) + }) + + t.Run("EmptyFile", func(t *testing.T) { + testWritebackEmptyFile(t, framework) + }) + + t.Run("OverwriteExistingFile", func(t *testing.T) { + testWritebackOverwriteFile(t, framework) + }) +} + +// testWritebackWriteAndReadBack writes a file and verifies it can be read back +// after the async flush completes. +func testWritebackWriteAndReadBack(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_basic.txt" + content := []byte("Hello from writebackCache test!") + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Write file โ€” close() returns immediately with async flush + require.NoError(t, os.WriteFile(mountPath, content, 0644)) + + // Wait for async flush to complete and verify content + waitForFileContent(t, mountPath, content, 30*time.Second) +} + +// testWritebackMultipleFilesSequential writes multiple files sequentially +// and verifies all are readable after async flushes complete. +func testWritebackMultipleFilesSequential(t *testing.T, framework *FuseTestFramework) { + dir := "writeback_sequential" + framework.CreateTestDir(dir) + + numFiles := 50 + files := make(map[string][]byte, numFiles) + + // Write files sequentially โ€” each close() returns immediately + for i := 0; i < numFiles; i++ { + filename := fmt.Sprintf("file_%03d.txt", i) + content := []byte(fmt.Sprintf("Sequential file %d content: %s", i, time.Now().Format(time.RFC3339Nano))) + path := filepath.Join(framework.GetMountPoint(), dir, filename) + require.NoError(t, os.WriteFile(path, content, 0644)) + files[filename] = content + } + + // Verify all files after a brief wait for async flushes + for filename, expectedContent := range files { + path := filepath.Join(framework.GetMountPoint(), dir, filename) + waitForFileContent(t, path, expectedContent, 30*time.Second) + } +} + +// testWritebackLargeFile writes a large file (multi-chunk) with writebackCache +func testWritebackLargeFile(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_large.bin" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // 8MB file (spans multiple 2MB chunks) + content := make([]byte, 8*1024*1024) + _, err := rand.Read(content) + require.NoError(t, err) + + require.NoError(t, os.WriteFile(mountPath, content, 0644)) + + // Wait for file to be fully flushed + waitForFileContent(t, mountPath, content, 60*time.Second) +} + +// testWritebackEmptyFile creates an empty file with writebackCache +func testWritebackEmptyFile(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_empty.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Create empty file + f, err := os.Create(mountPath) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // Should exist and be empty + info, err := os.Stat(mountPath) + require.NoError(t, err) + assert.Equal(t, int64(0), info.Size()) +} + +// testWritebackOverwriteFile tests overwriting an existing file +func testWritebackOverwriteFile(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_overwrite.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // First write + content1 := []byte("First version of the file") + require.NoError(t, os.WriteFile(mountPath, content1, 0644)) + waitForFileContent(t, mountPath, content1, 30*time.Second) + + // Overwrite with different content + content2 := []byte("Second version โ€” overwritten content that is longer than the first") + require.NoError(t, os.WriteFile(mountPath, content2, 0644)) + waitForFileContent(t, mountPath, content2, 30*time.Second) +} + +// TestWritebackCacheFsync tests that fsync still forces synchronous flush +// even when writebackCache is enabled +func TestWritebackCacheFsync(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("FsyncForcesFlush", func(t *testing.T) { + testFsyncForcesFlush(t, framework) + }) + + t.Run("FsyncThenRead", func(t *testing.T) { + testFsyncThenRead(t, framework) + }) +} + +// testFsyncForcesFlush verifies that calling fsync before close ensures +// data is immediately available for reading, bypassing the async path. +func testFsyncForcesFlush(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_fsync.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + content := []byte("Data that must be flushed synchronously via fsync") + + // Open, write, fsync, close + f, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + + _, err = f.Write(content) + require.NoError(t, err) + + // fsync forces synchronous data+metadata flush + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + + // Data should be immediately available โ€” no wait needed + actual, err := os.ReadFile(mountPath) + require.NoError(t, err) + assert.Equal(t, content, actual) +} + +// testFsyncThenRead verifies that after fsync, a freshly opened read +// returns the correct data without any delay. +func testFsyncThenRead(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_fsync_read.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + content := make([]byte, 64*1024) // 64KB + _, err := rand.Read(content) + require.NoError(t, err) + + // Write with explicit fsync + f, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + _, err = f.Write(content) + require.NoError(t, err) + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + + // Immediate read should succeed + actual, err := os.ReadFile(mountPath) + require.NoError(t, err) + assert.Equal(t, content, actual) +} + +// TestWritebackCacheConcurrentSmallFiles is the primary test for issue #8718: +// many small files written concurrently should all be eventually readable. +func TestWritebackCacheConcurrentSmallFiles(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("ConcurrentSmallFiles", func(t *testing.T) { + testWritebackConcurrentSmallFiles(t, framework) + }) + + t.Run("ConcurrentSmallFilesMultiDir", func(t *testing.T) { + testWritebackConcurrentSmallFilesMultiDir(t, framework) + }) + + t.Run("RapidCreateCloseSequence", func(t *testing.T) { + testWritebackRapidCreateClose(t, framework) + }) +} + +// testWritebackConcurrentSmallFiles simulates the rsync workload from #8718: +// multiple workers creating many small files in parallel. +func testWritebackConcurrentSmallFiles(t *testing.T, framework *FuseTestFramework) { + dir := "writeback_concurrent_small" + framework.CreateTestDir(dir) + + numWorkers := 8 + filesPerWorker := 20 + totalFiles := numWorkers * filesPerWorker + + type fileRecord struct { + path string + content []byte + } + + var mu sync.Mutex + var writeErrors []error + records := make([]fileRecord, 0, totalFiles) + + // Phase 1: Write files concurrently (simulating rsync workers) + var wg sync.WaitGroup + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for f := 0; f < filesPerWorker; f++ { + filename := fmt.Sprintf("w%02d_f%03d.dat", workerID, f) + path := filepath.Join(framework.GetMountPoint(), dir, filename) + + // Vary sizes: 100B to 100KB + size := 100 + (workerID*filesPerWorker+f)*500 + if size > 100*1024 { + size = 100*1024 + } + content := make([]byte, size) + if _, err := rand.Read(content); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d rand: %v", workerID, f, err)) + mu.Unlock() + return + } + + if err := os.WriteFile(path, content, 0644); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d: %v", workerID, f, err)) + mu.Unlock() + return + } + + mu.Lock() + records = append(records, fileRecord{path: path, content: content}) + mu.Unlock() + } + }(w) + } + wg.Wait() + + require.Empty(t, writeErrors, "write errors: %v", writeErrors) + assert.Equal(t, totalFiles, len(records)) + + // Phase 2: Wait for async flushes and verify all files + for _, rec := range records { + waitForFileContent(t, rec.path, rec.content, 60*time.Second) + } + + // Phase 3: Verify directory listing has correct count + entries, err := os.ReadDir(filepath.Join(framework.GetMountPoint(), dir)) + require.NoError(t, err) + assert.Equal(t, totalFiles, len(entries)) +} + +// testWritebackConcurrentSmallFilesMultiDir tests concurrent writes across +// multiple directories โ€” a common pattern for parallel copy tools. +func testWritebackConcurrentSmallFilesMultiDir(t *testing.T, framework *FuseTestFramework) { + baseDir := "writeback_multidir" + framework.CreateTestDir(baseDir) + + numDirs := 4 + filesPerDir := 25 + + type fileRecord struct { + path string + content []byte + } + var mu sync.Mutex + var records []fileRecord + var writeErrors []error + + var wg sync.WaitGroup + for d := 0; d < numDirs; d++ { + subDir := filepath.Join(baseDir, fmt.Sprintf("dir_%02d", d)) + framework.CreateTestDir(subDir) + + wg.Add(1) + go func(dirID int, dirPath string) { + defer wg.Done() + + for f := 0; f < filesPerDir; f++ { + filename := fmt.Sprintf("file_%03d.txt", f) + path := filepath.Join(framework.GetMountPoint(), dirPath, filename) + content := []byte(fmt.Sprintf("dir=%d file=%d data=%s", dirID, f, time.Now().Format(time.RFC3339Nano))) + + if err := os.WriteFile(path, content, 0644); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("dir %d file %d: %v", dirID, f, err)) + mu.Unlock() + return + } + + mu.Lock() + records = append(records, fileRecord{path: path, content: content}) + mu.Unlock() + } + }(d, subDir) + } + wg.Wait() + + require.Empty(t, writeErrors, "write errors: %v", writeErrors) + + // Verify all files + for _, rec := range records { + waitForFileContent(t, rec.path, rec.content, 60*time.Second) + } +} + +// testWritebackRapidCreateClose rapidly creates and closes files to stress +// the async flush goroutine pool. +func testWritebackRapidCreateClose(t *testing.T, framework *FuseTestFramework) { + dir := "writeback_rapid" + framework.CreateTestDir(dir) + + numFiles := 200 + type fileRecord struct { + path string + content []byte + } + records := make([]fileRecord, numFiles) + + // Rapidly create files without pausing + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("rapid_%04d.bin", i)) + content := []byte(fmt.Sprintf("rapid-file-%d", i)) + require.NoError(t, os.WriteFile(path, content, 0644)) + records[i] = fileRecord{path: path, content: content} + } + + // Verify all files eventually appear with correct content + for _, rec := range records { + waitForFileContent(t, rec.path, rec.content, 60*time.Second) + } +} + +// TestWritebackCacheDataIntegrity tests that data integrity is preserved +// across various write patterns with writebackCache enabled. +func TestWritebackCacheDataIntegrity(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("AppendAfterClose", func(t *testing.T) { + testWritebackAppendAfterClose(t, framework) + }) + + t.Run("PartialWrites", func(t *testing.T) { + testWritebackPartialWrites(t, framework) + }) + + t.Run("FileSizeCorrectness", func(t *testing.T) { + testWritebackFileSizeCorrectness(t, framework) + }) + + t.Run("BinaryData", func(t *testing.T) { + testWritebackBinaryData(t, framework) + }) +} + +// testWritebackAppendAfterClose writes a file, closes it (triggering async flush), +// waits for flush, then reopens and appends more data. +func testWritebackAppendAfterClose(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_append.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // First write + part1 := []byte("First part of the data.\n") + require.NoError(t, os.WriteFile(mountPath, part1, 0644)) + + // Wait for first async flush + waitForFileContent(t, mountPath, part1, 30*time.Second) + + // Append more data + part2 := []byte("Second part appended.\n") + f, err := os.OpenFile(mountPath, os.O_APPEND|os.O_WRONLY, 0644) + require.NoError(t, err) + _, err = f.Write(part2) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // Verify combined content + expected := append(part1, part2...) + waitForFileContent(t, mountPath, expected, 30*time.Second) +} + +// testWritebackPartialWrites tests writing to specific offsets within a file +func testWritebackPartialWrites(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_partial.bin" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Create file with initial content + initial := bytes.Repeat([]byte("A"), 4096) + require.NoError(t, os.WriteFile(mountPath, initial, 0644)) + waitForFileContent(t, mountPath, initial, 30*time.Second) + + // Open and write at specific offset + f, err := os.OpenFile(mountPath, os.O_WRONLY, 0644) + require.NoError(t, err) + patch := []byte("PATCHED") + _, err = f.WriteAt(patch, 100) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // Build expected content + expected := make([]byte, 4096) + copy(expected, initial) + copy(expected[100:], patch) + + waitForFileContent(t, mountPath, expected, 30*time.Second) +} + +// testWritebackFileSizeCorrectness verifies that file sizes are correct +// after async flush completes. +func testWritebackFileSizeCorrectness(t *testing.T, framework *FuseTestFramework) { + sizes := []int{0, 1, 100, 4096, 65536, 1024 * 1024} + + for _, size := range sizes { + filename := fmt.Sprintf("writeback_size_%d.bin", size) + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + content := make([]byte, size) + if size > 0 { + _, err := rand.Read(content) + require.NoError(t, err, "rand.Read failed for size %d", size) + } + + require.NoError(t, os.WriteFile(mountPath, content, 0644), "failed to write file of size %d", size) + + if size > 0 { + waitForFileSize(t, mountPath, int64(size), 30*time.Second) + waitForFileContent(t, mountPath, content, 30*time.Second) + } + } +} + +// testWritebackBinaryData verifies that arbitrary binary data (including null bytes) +// is preserved correctly through the async flush path. +func testWritebackBinaryData(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_binary.bin" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Generate data with all byte values including nulls + content := make([]byte, 256*100) + for i := range content { + content[i] = byte(i % 256) + } + + require.NoError(t, os.WriteFile(mountPath, content, 0644)) + waitForFileContent(t, mountPath, content, 30*time.Second) +} + +// TestWritebackCachePerformance measures whether writebackCache actually +// improves throughput for small file workloads compared to synchronous flush. +func TestWritebackCachePerformance(t *testing.T) { + if testing.Short() { + t.Skip("skipping performance test in short mode") + } + + numFiles := 200 + fileSize := 4096 // 4KB files + + // Generate test data upfront + testData := make([][]byte, numFiles) + for i := range testData { + testData[i] = make([]byte, fileSize) + rand.Read(testData[i]) + } + + // Benchmark with writebackCache enabled + t.Run("WithWritebackCache", func(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + require.NoError(t, framework.Setup(config)) + + dir := "perf_writeback" + framework.CreateTestDir(dir) + + start := time.Now() + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i)) + require.NoError(t, os.WriteFile(path, testData[i], 0644)) + } + writebackDuration := time.Since(start) + + // Wait for all files to be flushed + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i)) + waitForFileContent(t, path, testData[i], 60*time.Second) + } + + t.Logf("writebackCache: wrote %d files in %v (%.0f files/sec)", + numFiles, writebackDuration, float64(numFiles)/writebackDuration.Seconds()) + }) + + // Benchmark without writebackCache (synchronous flush) + t.Run("WithoutWritebackCache", func(t *testing.T) { + config := DefaultTestConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + require.NoError(t, framework.Setup(config)) + + dir := "perf_sync" + framework.CreateTestDir(dir) + + start := time.Now() + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i)) + require.NoError(t, os.WriteFile(path, testData[i], 0644)) + } + syncDuration := time.Since(start) + + t.Logf("synchronous: wrote %d files in %v (%.0f files/sec)", + numFiles, syncDuration, float64(numFiles)/syncDuration.Seconds()) + }) +} + +// TestWritebackCacheConcurrentMixedOps tests a mix of operations happening +// concurrently with writebackCache: creates, reads, overwrites, and deletes. +func TestWritebackCacheConcurrentMixedOps(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + dir := "writeback_mixed" + framework.CreateTestDir(dir) + + numFiles := 50 + var mu sync.Mutex + var errors []error + var completedWrites int64 + + addError := func(err error) { + mu.Lock() + defer mu.Unlock() + errors = append(errors, err) + } + + // Phase 1: Create initial files and wait for async flushes + initialContents := make(map[int][]byte, numFiles) + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", i)) + content := []byte(fmt.Sprintf("initial content %d", i)) + require.NoError(t, os.WriteFile(path, content, 0644)) + initialContents[i] = content + } + + // Poll until initial files are flushed (instead of fixed sleep) + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", i)) + waitForFileContent(t, path, initialContents[i], 30*time.Second) + } + + // Phase 2: Concurrent mixed operations + var wg sync.WaitGroup + + // Writers: overwrite existing files + for i := 0; i < 4; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for j := 0; j < numFiles; j++ { + if j%4 != workerID { + continue // each worker handles a subset + } + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", j)) + content := []byte(fmt.Sprintf("overwritten by worker %d at %s", workerID, time.Now().Format(time.RFC3339Nano))) + if err := os.WriteFile(path, content, 0644); err != nil { + addError(fmt.Errorf("writer %d file %d: %v", workerID, j, err)) + return + } + atomic.AddInt64(&completedWrites, 1) + } + }(i) + } + + // Readers: read files (may see old or new content, but should not error) + for i := 0; i < 4; i++ { + wg.Add(1) + go func(readerID int) { + defer wg.Done() + for j := 0; j < numFiles; j++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", j)) + _, err := os.ReadFile(path) + if err != nil && !os.IsNotExist(err) { + addError(fmt.Errorf("reader %d file %d: %v", readerID, j, err)) + return + } + } + }(i) + } + + // New file creators + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 20; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("new_file_%03d.txt", i)) + content := []byte(fmt.Sprintf("new file %d", i)) + if err := os.WriteFile(path, content, 0644); err != nil { + addError(fmt.Errorf("creator file %d: %v", i, err)) + return + } + } + }() + + wg.Wait() + + require.Empty(t, errors, "mixed operation errors: %v", errors) + assert.True(t, atomic.LoadInt64(&completedWrites) > 0, "should have completed some writes") + + // Verify new files exist after async flushes complete (poll instead of fixed sleep) + for i := 0; i < 20; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("new_file_%03d.txt", i)) + expected := []byte(fmt.Sprintf("new file %d", i)) + waitForFileContent(t, path, expected, 30*time.Second) + } +} + +// TestWritebackCacheStressSmallFiles is a focused stress test for the +// async flush path with many small files โ€” the core scenario from #8718. +func TestWritebackCacheStressSmallFiles(t *testing.T) { + if testing.Short() { + t.Skip("skipping stress test in short mode") + } + + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + dir := "writeback_stress" + framework.CreateTestDir(dir) + + numWorkers := 16 + filesPerWorker := 100 + totalFiles := numWorkers * filesPerWorker + + type fileRecord struct { + path string + content []byte + } + + var mu sync.Mutex + var writeErrors []error + records := make([]fileRecord, 0, totalFiles) + + start := time.Now() + + // Simulate rsync-like workload: many workers each writing small files + var wg sync.WaitGroup + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for f := 0; f < filesPerWorker; f++ { + filename := fmt.Sprintf("w%02d/f%04d.dat", workerID, f) + path := filepath.Join(framework.GetMountPoint(), dir, filename) + + // Ensure subdirectory exists + if f == 0 { + subDir := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("w%02d", workerID)) + if err := os.MkdirAll(subDir, 0755); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d mkdir: %v", workerID, err)) + mu.Unlock() + return + } + } + + // Small file: 1KB-10KB (typical for rsync of config/source files) + size := 1024 + (f%10)*1024 + content := make([]byte, size) + if _, err := rand.Read(content); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d rand: %v", workerID, f, err)) + mu.Unlock() + return + } + + if err := os.WriteFile(path, content, 0644); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d: %v", workerID, f, err)) + mu.Unlock() + return + } + + mu.Lock() + records = append(records, fileRecord{path: path, content: content}) + mu.Unlock() + } + }(w) + } + wg.Wait() + + writeDuration := time.Since(start) + t.Logf("wrote %d files in %v (%.0f files/sec)", + totalFiles, writeDuration, float64(totalFiles)/writeDuration.Seconds()) + + require.Empty(t, writeErrors, "write errors: %v", writeErrors) + assert.Equal(t, totalFiles, len(records)) + + // Verify all files are eventually readable with correct content + var verifyErrors []error + for _, rec := range records { + deadline := time.Now().Add(120 * time.Second) + var lastErr error + for time.Now().Before(deadline) { + actual, err := os.ReadFile(rec.path) + if err == nil && bytes.Equal(rec.content, actual) { + lastErr = nil + break + } + if err != nil { + lastErr = err + } else { + lastErr = fmt.Errorf("content mismatch for %s: got %d bytes, want %d", rec.path, len(actual), len(rec.content)) + } + time.Sleep(500 * time.Millisecond) + } + if lastErr != nil { + verifyErrors = append(verifyErrors, lastErr) + } + } + require.Empty(t, verifyErrors, "verification errors after stress test: %v", verifyErrors) + + t.Logf("all %d files verified successfully", totalFiles) +} diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 5b272cbeb..c0cdae7d3 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -349,6 +349,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { RdmaMaxConcurrent: *option.rdmaMaxConcurrent, RdmaTimeoutMs: *option.rdmaTimeoutMs, DirIdleEvictSec: *option.dirIdleEvictSec, + WritebackCache: option.writebackCache != nil && *option.writebackCache, }) // create mount root @@ -396,6 +397,10 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { server.Serve() + // Wait for any pending background flushes (writebackCache async mode) + // before clearing caches, to prevent data loss during clean unmount. + seaweedFileSystem.WaitForAsyncFlush() + seaweedFileSystem.ClearCacheDir() return true diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index e912fe310..ccc4c07e7 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -24,10 +24,15 @@ type FileHandle struct { wfs *WFS // cache file has been written to - dirtyMetadata bool - dirtyPages *PageWriter - reader *filer.ChunkReadAt - contentType string + dirtyMetadata bool + dirtyPages *PageWriter + reader *filer.ChunkReadAt + contentType string + asyncFlushPending bool // set in writebackCache mode to defer flush to Release + asyncFlushUid uint32 // saved uid for deferred metadata flush + asyncFlushGid uint32 // saved gid for deferred metadata flush + asyncFlushDir string // saved directory at defer time (fallback if inode forgotten) + asyncFlushName string // saved file name at defer time (fallback if inode forgotten) isDeleted bool diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go index 4441de0be..c15146265 100644 --- a/weed/mount/filehandle_map.go +++ b/weed/mount/filehandle_map.go @@ -55,39 +55,72 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil return fh } -func (i *FileHandleToInode) ReleaseByInode(inode uint64) { +func (i *FileHandleToInode) ReleaseByInode(inode uint64) *FileHandle { i.Lock() defer i.Unlock() fh, found := i.inode2fh[inode] - if found { - fh.counter-- - if fh.counter <= 0 { - delete(i.inode2fh, inode) - delete(i.fh2inode, fh.fh) - fh.ReleaseHandle() + if !found { + return nil + } + // If the counter is already <= 0, a prior Release already started the + // drain. Return nil to prevent double-processing (e.g. Forget after Release). + if fh.counter <= 0 { + return nil + } + fh.counter-- + if fh.counter <= 0 { + if fh.asyncFlushPending { + // Handle stays in fhMap so rename/unlink can find it during drain. + return fh } + delete(i.inode2fh, inode) + delete(i.fh2inode, fh.fh) + return fh } + return nil } -func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) { +func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) *FileHandle { i.Lock() defer i.Unlock() inode, found := i.fh2inode[fh] if !found { - return // Handle already released or invalid + return nil } fhHandle, fhFound := i.inode2fh[inode] if !fhFound { delete(i.fh2inode, fh) - return + return nil + } + + // If the counter is already <= 0, a prior Release already started the + // drain. Return nil to prevent double-processing. + if fhHandle.counter <= 0 { + return nil } fhHandle.counter-- if fhHandle.counter <= 0 { + if fhHandle.asyncFlushPending { + // Handle stays in fhMap so rename/unlink can still find it + // via FindFileHandle during the background drain. + return fhHandle + } delete(i.inode2fh, inode) delete(i.fh2inode, fhHandle.fh) - fhHandle.ReleaseHandle() + return fhHandle } + return nil +} + +// RemoveFileHandle removes a handle from both maps. Called after an async +// drain completes to clean up the handle that was intentionally kept in the +// maps during the flush. +func (i *FileHandleToInode) RemoveFileHandle(fh FileHandleId, inode uint64) { + i.Lock() + defer i.Unlock() + delete(i.inode2fh, inode) + delete(i.fh2inode, fh) } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index b5c8f9ebd..45922b2ff 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -65,10 +65,6 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) if logicStart < logicStop { copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) maxStop = max(maxStop, logicStop) - - if t.TsNs >= tsNs { - println("read new data1", t.TsNs-tsNs, "ns") - } } } mc.activityScore.MarkRead() diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index c37a9e2ca..bf60be67a 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -151,10 +151,6 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in break } maxStop = max(maxStop, logicStop) - - if t.TsNs > tsNs { - println("read new data2", t.TsNs-tsNs, "ns") - } } } //sc.memChunk.ReadDataAt(memCopy, off, tsNs) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 9b2341ca3..e24a49aac 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -78,6 +78,10 @@ type Option struct { // Directory cache refresh/eviction controls DirIdleEvictSec int + // WritebackCache enables async flush on close for improved small file write performance. + // When true, Flush() returns immediately and data upload + metadata flush happen in background. + WritebackCache bool + uniqueCacheDirForRead string uniqueCacheDirForWrite string } @@ -110,6 +114,16 @@ type WFS struct { dirHotWindow time.Duration dirHotThreshold int dirIdleEvict time.Duration + + // asyncFlushWg tracks pending background flush goroutines for writebackCache mode. + // Must be waited on before unmount cleanup to prevent data loss. + asyncFlushWg sync.WaitGroup + + // pendingAsyncFlush tracks in-flight async flush goroutines by inode. + // AcquireHandle checks this to wait for a pending flush before reopening + // the same inode, preventing stale metadata from overwriting the async flush. + pendingAsyncFlushMu sync.Mutex + pendingAsyncFlush map[uint64]chan struct{} } const ( @@ -151,13 +165,14 @@ func NewSeaweedFileSystem(option *Option) *WFS { } wfs := &WFS{ - RawFileSystem: fuse.NewDefaultRawFileSystem(), - option: option, - signature: util.RandomInt32(), - inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), - fhMap: NewFileHandleToInode(), - dhMap: NewDirectoryHandleToInode(), - filerClient: filerClient, // nil for proxy mode, initialized for direct access + RawFileSystem: fuse.NewDefaultRawFileSystem(), + option: option, + signature: util.RandomInt32(), + inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), + fhMap: NewFileHandleToInode(), + dhMap: NewDirectoryHandleToInode(), + filerClient: filerClient, // nil for proxy mode, initialized for direct access + pendingAsyncFlush: make(map[uint64]chan struct{}), fhLockTable: util.NewLockTable[FileHandleId](), refreshingDirs: make(map[util.FullPath]struct{}), dirHotWindow: dirHotWindow, @@ -204,8 +219,32 @@ func NewSeaweedFileSystem(option *Option) *WFS { } }) grace.OnInterrupt(func() { + // grace calls os.Exit(0) after all hooks, so WaitForAsyncFlush + // after server.Serve() would never execute. Drain here first. + // + // Use a timeout to avoid hanging on Ctrl-C if the filer is + // unreachable (metadata retry can take up to 7 seconds). + // If the timeout expires, skip the write-cache removal so that + // still-running goroutines can finish reading swap files. + asyncDrained := true + if wfs.option.WritebackCache { + done := make(chan struct{}) + go func() { + wfs.asyncFlushWg.Wait() + close(done) + }() + select { + case <-done: + glog.V(0).Infof("all async flushes completed before shutdown") + case <-time.After(30 * time.Second): + glog.Warningf("timed out waiting for async flushes โ€” swap files preserved for in-flight uploads") + asyncDrained = false + } + } wfs.metaCache.Shutdown() - os.RemoveAll(option.getUniqueCacheDirForWrite()) + if asyncDrained { + os.RemoveAll(option.getUniqueCacheDirForWrite()) + } os.RemoveAll(option.getUniqueCacheDirForRead()) if wfs.rdmaClient != nil { wfs.rdmaClient.Close() @@ -240,6 +279,10 @@ func NewSeaweedFileSystem(option *Option) *WFS { } func (wfs *WFS) StartBackgroundTasks() error { + if wfs.option.WritebackCache { + glog.V(0).Infof("writebackCache enabled: async flush on close() for improved small file performance") + } + follower, err := wfs.subscribeFilerConfEvents() if err != nil { return err diff --git a/weed/mount/weedfs_async_flush.go b/weed/mount/weedfs_async_flush.go new file mode 100644 index 000000000..6ded810fe --- /dev/null +++ b/weed/mount/weedfs_async_flush.go @@ -0,0 +1,96 @@ +package mount + +import ( + "time" + + "github.com/seaweedfs/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +const asyncFlushMetadataRetries = 3 + +// completeAsyncFlush is called in a background goroutine when a file handle +// with pending async flush work is released. It performs the deferred data +// upload and metadata flush that was skipped in doFlush() for writebackCache mode. +// +// This enables close() to return immediately for small file workloads (e.g., rsync), +// while the actual I/O happens concurrently in the background. +// +// The caller (submitAsyncFlush) owns asyncFlushWg and the per-inode done channel. +func (wfs *WFS) completeAsyncFlush(fh *FileHandle) { + // Phase 1: Flush dirty pages โ€” seals writable chunks, uploads to volume servers, and waits. + // The underlying UploadWithRetry already retries transient HTTP/gRPC errors internally, + // so a failure here indicates a persistent issue; the chunk data has been freed. + if err := fh.dirtyPages.FlushData(); err != nil { + glog.Errorf("completeAsyncFlush inode %d: data flush failed: %v", fh.inode, err) + // Data is lost at this point (chunks freed after internal retry exhaustion). + // Proceed to cleanup to avoid resource leaks and unmount hangs. + } else if fh.dirtyMetadata { + // Phase 2: Flush metadata unless the file was explicitly unlinked. + // + // isDeleted is set by the Unlink handler when it finds a draining + // handle. In that case the filer entry is already gone and + // flushing would recreate it. The uploaded chunks become orphans + // and are cleaned up by volume.fsck. + if fh.isDeleted { + glog.V(3).Infof("completeAsyncFlush inode %d: file was unlinked, skipping metadata flush", fh.inode) + } else { + // Resolve the current path for metadata flush. + // + // Try GetPath first โ€” it reflects any rename that happened + // after close(). If the inode mapping is gone (Forget + // dropped it after the kernel's lookup count hit zero), fall + // back to the dir/name saved at doFlush time. Rename also + // updates the saved path, so the fallback is always current. + // + // Forget does NOT mean the file was deleted โ€” it only means + // the kernel evicted its cache entry. + dir, name := fh.asyncFlushDir, fh.asyncFlushName + fileFullPath := util.FullPath(dir).Child(name) + + if resolvedPath, status := wfs.inodeToPath.GetPath(fh.inode); status == fuse.OK { + dir, name = resolvedPath.DirAndName() + fileFullPath = resolvedPath + } + + wfs.flushMetadataWithRetry(fh, dir, name, fileFullPath) + } + } + + glog.V(3).Infof("completeAsyncFlush done inode %d fh %d", fh.inode, fh.fh) + + // Phase 3: Destroy the upload pipeline and free resources. + fh.ReleaseHandle() +} + +// flushMetadataWithRetry attempts to flush file metadata to the filer, retrying +// with exponential backoff on transient errors. The chunk data is already on the +// volume servers at this point; only the filer metadata reference needs persisting. +func (wfs *WFS) flushMetadataWithRetry(fh *FileHandle, dir, name string, fileFullPath util.FullPath) { + for attempt := 0; attempt <= asyncFlushMetadataRetries; attempt++ { + if attempt > 0 { + backoff := time.Duration(1<