From 9434d3733d57d49a5078d80150f4a5f743b2ef2d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 22 Mar 2026 15:24:08 -0700 Subject: [PATCH] mount: async flush on close() when writebackCache is enabled (#8727) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * mount: async flush on close() when writebackCache is enabled When -writebackCache is enabled, defer data upload and metadata flush from Flush() (triggered by close()) to a background goroutine in Release(). This allows processes like rsync that write many small files to proceed to the next file immediately instead of blocking on two network round-trips (volume upload + filer metadata) per file. Fixes #8718 * mount: add retry with backoff for async metadata flush The metadata flush in completeAsyncFlush now retries up to 3 times with exponential backoff (1s, 2s, 4s) on transient gRPC errors. Since the chunk data is already safely on volume servers at this point, only the filer metadata reference needs persisting — retrying is both safe and effective. Data flush (FlushData) is not retried externally because UploadWithRetry already handles transient HTTP/gRPC errors internally; if it still fails, the chunk memory has been freed. * test: add integration tests for writebackCache async flush Add comprehensive FUSE integration tests for the writebackCache async flush feature (issue #8718): - Basic operations: write/read, sequential files, large files, empty files, overwrites - Fsync correctness: fsync forces synchronous flush even in writeback mode, immediate read-after-fsync - Concurrent small files: multi-worker parallel writes (rsync-like workload), multi-directory, rapid create/close - Data integrity: append after close, partial writes, file size correctness, binary data preservation - Performance comparison: writeback vs synchronous flush throughput - Stress test: 16 workers x 100 files with content verification - Mixed concurrent operations: reads, writes, creates running together Also fix pre-existing test infrastructure issues: - Rename framework.go to framework_test.go (fixes Go package conflict) - Fix undefined totalSize variable in concurrent_operations_test.go * ci: update fuse-integration workflow to run full test suite The workflow previously only ran placeholder tests (simple_test.go, working_demo_test.go) in a temp directory due to a Go module conflict. Now that framework.go is renamed to framework_test.go, the full test suite compiles and runs correctly from test/fuse_integration/. Changes: - Run go test directly in test/fuse_integration/ (no temp dir copy) - Install weed binary to /usr/local/bin for test framework discovery - Configure /etc/fuse.conf with user_allow_other for FUSE mounts - Install fuse3 for modern FUSE support - Stream test output to log file for artifact upload * mount: fix three P1 races in async flush P1-1: Reopen overwrites data still flushing in background ReleaseByHandle removes the old handle from fhMap before the deferred flush finishes. A reopen of the same inode during that window would build from stale filer metadata, overwriting the async flush. Fix: Track in-flight async flushes per inode via pendingAsyncFlush map. AcquireHandle now calls waitForPendingAsyncFlush(inode) to block until any pending flush completes before reading filer metadata. P1-2: Deferred flush races rename and unlink after close completeAsyncFlush captured the path once at entry, but rename or unlink after close() could cause metadata to be written under the wrong name or recreate a deleted file. Fix: Re-resolve path from inode via GetPath right before metadata flush. GetPath returns the current path (reflecting renames) or ENOENT (if unlinked), in which case we skip the metadata flush. P1-3: SIGINT/SIGTERM bypasses the async-flush drain grace.OnInterrupt runs hooks then calls os.Exit(0), so WaitForAsyncFlush after server.Serve() never executes on signal. Fix: Add WaitForAsyncFlush (with 10s timeout) to the WFS interrupt handler, before cache cleanup. The timeout prevents hanging on Ctrl-C when the filer is unreachable. * mount: fix P1 races — draining handle stays in fhMap P1-1: Reopen TOCTOU The gap between ReleaseByHandle removing from fhMap and submitAsyncFlush registering in pendingAsyncFlush allowed a concurrent AcquireHandle to slip through with stale metadata. Fix: Hold pendingAsyncFlushMu across both the counter decrement (ReleaseByHandle) and the pending registration. The handle is registered as pending before the lock is released, so waitForPendingAsyncFlush always sees it. P1-2: Rename/unlink can't find draining handle ReleaseByHandle deleted from fhMap immediately. Rename's FindFileHandle(inode) at line 251 could not find the handle to update entry.Name. Unlink could not coordinate either. Fix: When asyncFlushPending is true, ReleaseByHandle/ReleaseByInode leave the handle in fhMap (counter=0 but maps intact). The handle stays visible to FindFileHandle so rename can update entry.Name. completeAsyncFlush re-resolves the path from the inode (GetPath) right before metadata flush for correctness after rename/unlink. After drain, RemoveFileHandle cleans up the maps. Double-return prevention: ReleaseByHandle/ReleaseByInode return nil if counter is already <= 0, so Forget after Release doesn't start a second drain goroutine. P1-3: SIGINT deletes swap files under running goroutines After the 10s timeout, os.RemoveAll deleted the write cache dir (containing swap files) while FlushData goroutines were still reading from them. Fix: Increase timeout to 30s. If timeout expires, skip write cache dir removal so in-flight goroutines can finish reading swap files. The OS (or next mount) cleans them up. Read cache is always removed. * mount: never skip metadata flush when Forget drops inode mapping Forget removes the inode→path mapping when the kernel's lookup count reaches zero, but this does NOT mean the file was unlinked — it only means the kernel evicted its cache entry. completeAsyncFlush was treating GetPath failure as "file unlinked" and skipping the metadata flush, which orphaned the just-uploaded chunks for live files. Fix: Save dir and name at doFlush defer time. In completeAsyncFlush, try GetPath first to pick up renames; if the mapping is gone, fall back to the saved dir/name. Always attempt the metadata flush — the filer is the authority on whether the file exists, not the local inode cache. * mount: distinguish Forget from Unlink in async flush path fallback The saved-path fallback (from the previous fix) always flushed metadata when GetPath failed, which recreated files that were explicitly unlinked after close(). The same stale fallback could recreate the pre-rename path if Forget dropped the inode mapping after a rename. Root cause: GetPath failure has two meanings: 1. Forget — kernel evicted the cache entry (file still exists) 2. Unlink — file was explicitly deleted (should not recreate) Fix (three coordinated changes): Unlink (weedfs_file_mkrm.go): Before RemovePath, look up the inode and find any draining handle via FindFileHandle. Set fh.isDeleted = true so the async flush knows the file was explicitly removed. Rename (weedfs_rename.go): When renaming a file with a draining handle, update asyncFlushDir/asyncFlushName to the post-rename location. This keeps the saved-path fallback current so Forget after rename doesn't flush to the old (pre-rename) path. completeAsyncFlush (weedfs_async_flush.go): Check fh.isDeleted first — if true, skip metadata flush (file was unlinked, chunks become orphans for volume.fsck). Otherwise, try GetPath for the current path (renames); fall back to saved path if Forget dropped the mapping (file is live, just evicted from kernel cache). * test/ci: address PR review nitpicks concurrent_operations_test.go: - Restore precise totalSize assertion instead of info.Size() > 0 writeback_cache_test.go: - Check rand.Read errors in all 3 locations (lines 310, 512, 757) - Check os.MkdirAll error in stress test (line 752) - Remove dead verifyErrors variable (line 332) - Replace both time.Sleep(5s) with polling via waitForFileContent to avoid flaky tests under CI load (lines 638, 700) fuse-integration.yml: - Add set -o pipefail so go test failures propagate through tee * ci: fix fuse3/fuse package conflict on ubuntu-22.04 runner fuse3 is pre-installed on ubuntu-22.04 runners and conflicts with the legacy fuse package. Only install libfuse3-dev for the headers. * mount/page_writer: remove debug println statements Remove leftover debug println("read new data1/2") from ReadDataAt in MemChunk and SwapFileChunk. * test: fix findWeedBinary matching source directory instead of binary findWeedBinary() matched ../../weed (the source directory) via os.Stat before checking PATH, then tried to exec a directory which fails with "permission denied" on the CI runner. Fix: Check PATH first (reliable in CI where the binary is installed to /usr/local/bin). For relative paths, verify the candidate is a regular file (!info.IsDir()). Add ../../weed/weed as a candidate for in-tree builds. * test: fix framework — dynamic ports, output capture, data dirs The integration test framework was failing in CI because: 1. All tests used hardcoded ports (19333/18080/18888), so sequential tests could conflict when prior processes hadn't fully released their ports yet. 2. Data subdirectories (data/master, data/volume) were not created before starting processes. 3. Master was started with -peers=none which is not a valid address. 4. Process stdout/stderr was not captured, making failures opaque ("service not ready within timeout" with no diagnostics). 5. The unmount fallback used 'umount' instead of 'fusermount -u'. 6. The mount used -cacheSizeMB (nonexistent) instead of -cacheCapacityMB and was missing -allowOthers=false for unprivileged CI runners. Fixes: - Dynamic port allocation via freePort() (net.Listen ":0") - Explicit gRPC ports via -port.grpc to avoid default port conflicts - Create data/master and data/volume directories in Setup() - Remove invalid -peers=none and -raftBootstrap flags - Capture process output to logDir/*.log via startProcess() helper - dumpLog() prints tail of log file on service startup failure - Use fusermount3/fusermount -u for unmount - Fix mount flag names (-cacheCapacityMB, -allowOthers=false) * test: remove explicit -port.grpc flags from test framework SeaweedFS convention: gRPC port = HTTP port + 10000. Volume and filer discover the master gRPC port by this convention. Setting explicit -port.grpc on master/volume/filer broke inter-service communication because the volume server computed master gRPC as HTTP+10000 but the actual gRPC was on a different port. Remove all -port.grpc flags and let the default convention work. Dynamic HTTP ports already ensure uniqueness; the derived gRPC ports (HTTP+10000) will also be unique. --------- Co-authored-by: Copilot --- .github/workflows/fuse-integration.yml | 207 +---- .../concurrent_operations_test.go | 4 +- .../{framework.go => framework_test.go} | 156 +++- test/fuse_integration/writeback_cache_test.go | 825 ++++++++++++++++++ weed/command/mount_std.go | 5 + weed/mount/filehandle.go | 13 +- weed/mount/filehandle_map.go | 55 +- weed/mount/page_writer/page_chunk_mem.go | 4 - weed/mount/page_writer/page_chunk_swapfile.go | 4 - weed/mount/weedfs.go | 59 +- weed/mount/weedfs_async_flush.go | 96 ++ weed/mount/weedfs_file_mkrm.go | 9 + weed/mount/weedfs_file_sync.go | 58 +- weed/mount/weedfs_filehandle.go | 60 +- weed/mount/weedfs_forget.go | 7 +- weed/mount/weedfs_rename.go | 6 + 16 files changed, 1296 insertions(+), 272 deletions(-) rename test/fuse_integration/{framework.go => framework_test.go} (70%) create mode 100644 test/fuse_integration/writeback_cache_test.go create mode 100644 weed/mount/weedfs_async_flush.go diff --git a/.github/workflows/fuse-integration.yml b/.github/workflows/fuse-integration.yml index e62f1c597..7300efe5e 100644 --- a/.github/workflows/fuse-integration.yml +++ b/.github/workflows/fuse-integration.yml @@ -21,213 +21,60 @@ concurrency: permissions: contents: read -env: - TEST_TIMEOUT: '45m' - jobs: fuse-integration: name: FUSE Integration Testing runs-on: ubuntu-22.04 timeout-minutes: 50 - + steps: - name: Checkout code uses: actions/checkout@v6 - + - name: Set up Go uses: actions/setup-go@v6 with: go-version-file: 'go.mod' - + - name: Install FUSE and dependencies run: | sudo apt-get update - sudo apt-get install -y fuse libfuse-dev + # fuse3 is pre-installed on ubuntu-22.04 runners and conflicts + # with the legacy fuse package, so only install the dev headers. + sudo apt-get install -y libfuse3-dev + # Allow non-root FUSE mounts with allow_other + echo 'user_allow_other' | sudo tee -a /etc/fuse.conf + sudo chmod 644 /etc/fuse.conf # Verify FUSE installation - fusermount --version || true - ls -la /dev/fuse || true - + fusermount3 --version || fusermount --version || true + ls -la /dev/fuse + - name: Build SeaweedFS run: | cd weed - go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v . + go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -o weed . chmod +x weed - # Verify binary ./weed version - - - name: Prepare FUSE Integration Tests + # Make weed binary available in PATH for the test framework + sudo cp weed /usr/local/bin/weed + + - name: Install test dependencies run: | - # Create isolated test directory to avoid Go module conflicts - mkdir -p /tmp/seaweedfs-fuse-tests - - # Copy only the working test files to avoid Go module conflicts - # These are the files we've verified work without package name issues - cp test/fuse_integration/simple_test.go /tmp/seaweedfs-fuse-tests/ 2>/dev/null || echo "⚠️ simple_test.go not found" - cp test/fuse_integration/working_demo_test.go /tmp/seaweedfs-fuse-tests/ 2>/dev/null || echo "⚠️ working_demo_test.go not found" - - # Note: Other test files (framework.go, basic_operations_test.go, etc.) - # have Go module conflicts and are skipped until resolved - - echo "📁 Working test files copied:" - ls -la /tmp/seaweedfs-fuse-tests/*.go 2>/dev/null || echo "ℹ️ No test files found" - - # Initialize Go module in isolated directory - cd /tmp/seaweedfs-fuse-tests - go mod init seaweedfs-fuse-tests - go mod tidy - - # Verify setup - echo "✅ FUSE integration test environment prepared" - ls -la /tmp/seaweedfs-fuse-tests/ - - echo "" - echo "ℹ️ Current Status: Running working subset of FUSE tests" - echo " • simple_test.go: Package structure verification" - echo " • working_demo_test.go: Framework capability demonstration" - echo " • Full framework: Available in test/fuse_integration/ (module conflicts pending resolution)" - + cd test/fuse_integration + go mod download + - name: Run FUSE Integration Tests run: | - cd /tmp/seaweedfs-fuse-tests - - echo "🧪 Running FUSE integration tests..." - echo "============================================" - - # Run available working test files - TESTS_RUN=0 - - if [ -f "simple_test.go" ]; then - echo "📋 Running simple_test.go..." - go test -v -timeout=${{ env.TEST_TIMEOUT }} simple_test.go - TESTS_RUN=$((TESTS_RUN + 1)) - fi - - if [ -f "working_demo_test.go" ]; then - echo "📋 Running working_demo_test.go..." - go test -v -timeout=${{ env.TEST_TIMEOUT }} working_demo_test.go - TESTS_RUN=$((TESTS_RUN + 1)) - fi - - # Run combined test if multiple files exist - if [ -f "simple_test.go" ] && [ -f "working_demo_test.go" ]; then - echo "📋 Running combined tests..." - go test -v -timeout=${{ env.TEST_TIMEOUT }} simple_test.go working_demo_test.go - fi - - if [ $TESTS_RUN -eq 0 ]; then - echo "⚠️ No working test files found, running module verification only" - go version - go mod verify - else - echo "✅ Successfully ran $TESTS_RUN test file(s)" - fi - - echo "============================================" - echo "✅ FUSE integration tests completed" - - - name: Run Extended Framework Validation - run: | - cd /tmp/seaweedfs-fuse-tests - - echo "🔍 Running extended framework validation..." - echo "============================================" - - # Test individual components (only run tests that exist) - if [ -f "simple_test.go" ]; then - echo "Testing simple verification..." - go test -v simple_test.go - fi - - if [ -f "working_demo_test.go" ]; then - echo "Testing framework demo..." - go test -v working_demo_test.go - fi - - # Test combined execution if both files exist - if [ -f "simple_test.go" ] && [ -f "working_demo_test.go" ]; then - echo "Testing combined execution..." - go test -v simple_test.go working_demo_test.go - elif [ -f "simple_test.go" ] || [ -f "working_demo_test.go" ]; then - echo "✅ Individual tests already validated above" - else - echo "⚠️ No working test files found for combined testing" - fi - - echo "============================================" - echo "✅ Extended validation completed" - - - name: Generate Test Coverage Report - run: | - cd /tmp/seaweedfs-fuse-tests - - echo "📊 Generating test coverage report..." - go test -v -coverprofile=coverage.out . - go tool cover -html=coverage.out -o coverage.html - - echo "Coverage report generated: coverage.html" - - - name: Verify SeaweedFS Binary Integration - run: | - # Test that SeaweedFS binary is accessible from test environment - WEED_BINARY=$(pwd)/weed/weed - - if [ -f "$WEED_BINARY" ]; then - echo "✅ SeaweedFS binary found at: $WEED_BINARY" - $WEED_BINARY version - echo "Binary is ready for full integration testing" - else - echo "❌ SeaweedFS binary not found" - exit 1 - fi - - - name: Upload Test Artifacts + set -o pipefail + cd test/fuse_integration + echo "Running full FUSE integration test suite..." + go test -v -count=1 -timeout=45m ./... 2>&1 | tee /tmp/fuse-test-output.log + + - name: Upload Test Logs if: always() uses: actions/upload-artifact@v7 with: name: fuse-integration-test-results path: | - /tmp/seaweedfs-fuse-tests/coverage.out - /tmp/seaweedfs-fuse-tests/coverage.html - /tmp/seaweedfs-fuse-tests/*.log + /tmp/fuse-test-output.log retention-days: 7 - - - name: Test Summary - if: always() - run: | - echo "## 🚀 FUSE Integration Test Summary" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Framework Status" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **Framework Design**: Complete and validated" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **Working Tests**: Core framework demonstration functional" >> $GITHUB_STEP_SUMMARY - echo "- ⚠️ **Full Framework**: Available but requires Go module resolution" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **CI/CD Integration**: Automated testing pipeline established" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Test Capabilities" >> $GITHUB_STEP_SUMMARY - echo "- 📁 **File Operations**: Create, read, write, delete, permissions" >> $GITHUB_STEP_SUMMARY - echo "- 📂 **Directory Operations**: Create, list, delete, nested structures" >> $GITHUB_STEP_SUMMARY - echo "- 📊 **Large Files**: Multi-megabyte file handling" >> $GITHUB_STEP_SUMMARY - echo "- 🔄 **Concurrent Operations**: Multi-threaded stress testing" >> $GITHUB_STEP_SUMMARY - echo "- ⚠️ **Error Scenarios**: Comprehensive error handling validation" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Comparison with Current Tests" >> $GITHUB_STEP_SUMMARY - echo "| Aspect | Current (FIO) | This Framework |" >> $GITHUB_STEP_SUMMARY - echo "|--------|---------------|----------------|" >> $GITHUB_STEP_SUMMARY - echo "| **Scope** | Performance only | Functional + Performance |" >> $GITHUB_STEP_SUMMARY - echo "| **Operations** | Read/Write only | All FUSE operations |" >> $GITHUB_STEP_SUMMARY - echo "| **Concurrency** | Single-threaded | Multi-threaded stress tests |" >> $GITHUB_STEP_SUMMARY - echo "| **Automation** | Manual setup | Fully automated |" >> $GITHUB_STEP_SUMMARY - echo "| **Validation** | Speed metrics | Correctness + Performance |" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Current Working Tests" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **Framework Structure**: Package and module verification" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **Configuration Management**: Test config validation" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **File Operations Demo**: Basic file create/read/write simulation" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **Large File Handling**: 1MB+ file processing demonstration" >> $GITHUB_STEP_SUMMARY - echo "- ✅ **Concurrency Simulation**: Multi-file operation testing" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Next Steps" >> $GITHUB_STEP_SUMMARY - echo "1. **Module Resolution**: Fix Go package conflicts for full framework" >> $GITHUB_STEP_SUMMARY - echo "2. **SeaweedFS Integration**: Connect with real cluster for end-to-end testing" >> $GITHUB_STEP_SUMMARY - echo "3. **Performance Benchmarks**: Add performance regression testing" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "📈 **Total Framework Size**: ~1,500 lines of comprehensive testing infrastructure" >> $GITHUB_STEP_SUMMARY \ No newline at end of file diff --git a/test/fuse_integration/concurrent_operations_test.go b/test/fuse_integration/concurrent_operations_test.go index 7a5cdd0d3..b40d3fdc5 100644 --- a/test/fuse_integration/concurrent_operations_test.go +++ b/test/fuse_integration/concurrent_operations_test.go @@ -386,18 +386,20 @@ func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) { // Perform many small writes numWrites := 1000 writeSize := 100 + totalSize := int64(0) for i := 0; i < numWrites; i++ { data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20))) _, err := file.Write(data) require.NoError(t, err) + totalSize += int64(len(data)) } file.Close() // Verify file size info, err := os.Stat(mountPath) require.NoError(t, err) - assert.Equal(t, totalSize, info.Size()) + assert.Equal(t, totalSize, info.Size(), "file size should match total bytes written") } // testManySmallFiles tests creating many small files diff --git a/test/fuse_integration/framework.go b/test/fuse_integration/framework_test.go similarity index 70% rename from test/fuse_integration/framework.go rename to test/fuse_integration/framework_test.go index 25fa2beb8..4e781034c 100644 --- a/test/fuse_integration/framework.go +++ b/test/fuse_integration/framework_test.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "syscall" "testing" "time" @@ -20,6 +21,7 @@ type FuseTestFramework struct { tempDir string mountPoint string dataDir string + logDir string masterProcess *os.Process volumeProcess *os.Process filerProcess *os.Process @@ -27,6 +29,9 @@ type FuseTestFramework struct { masterAddr string volumeAddr string filerAddr string + masterPort int + volumePort int + filerPort int weedBinary string isSetup bool } @@ -57,7 +62,9 @@ func DefaultTestConfig() *TestConfig { } } -// NewFuseTestFramework creates a new FUSE testing framework +// NewFuseTestFramework creates a new FUSE testing framework. +// Each instance allocates its own free ports so multiple tests can run +// sequentially without port conflicts from slow cleanup. func NewFuseTestFramework(t *testing.T, config *TestConfig) *FuseTestFramework { if config == nil { config = DefaultTestConfig() @@ -66,27 +73,50 @@ func NewFuseTestFramework(t *testing.T, config *TestConfig) *FuseTestFramework { tempDir, err := os.MkdirTemp("", "seaweedfs_fuse_test_") require.NoError(t, err) + masterPort := freePort(t) + volumePort := freePort(t) + filerPort := freePort(t) + return &FuseTestFramework{ t: t, tempDir: tempDir, mountPoint: filepath.Join(tempDir, "mount"), dataDir: filepath.Join(tempDir, "data"), - masterAddr: "127.0.0.1:19333", - volumeAddr: "127.0.0.1:18080", - filerAddr: "127.0.0.1:18888", + logDir: filepath.Join(tempDir, "logs"), + masterPort: masterPort, + volumePort: volumePort, + filerPort: filerPort, + masterAddr: fmt.Sprintf("127.0.0.1:%d", masterPort), + volumeAddr: fmt.Sprintf("127.0.0.1:%d", volumePort), + filerAddr: fmt.Sprintf("127.0.0.1:%d", filerPort), weedBinary: findWeedBinary(), isSetup: false, } } +// freePort asks the OS for a free TCP port. +func freePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port +} + // Setup starts SeaweedFS cluster and mounts FUSE filesystem func (f *FuseTestFramework) Setup(config *TestConfig) error { if f.isSetup { return fmt.Errorf("framework already setup") } - // Create directories - dirs := []string{f.mountPoint, f.dataDir} + // Create all required directories upfront + dirs := []string{ + f.mountPoint, + f.logDir, + filepath.Join(f.dataDir, "master"), + filepath.Join(f.dataDir, "volume"), + } for _, dir := range dirs { if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("failed to create directory %s: %v", dir, err) @@ -100,6 +130,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for master to be ready if err := f.waitForService(f.masterAddr, 30*time.Second); err != nil { + f.dumpLog("master") return fmt.Errorf("master not ready: %v", err) } @@ -110,6 +141,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for volume server to be ready if err := f.waitForService(f.volumeAddr, 30*time.Second); err != nil { + f.dumpLog("volume") return fmt.Errorf("volume server not ready: %v", err) } @@ -120,6 +152,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for filer to be ready if err := f.waitForService(f.filerAddr, 30*time.Second); err != nil { + f.dumpLog("filer") return fmt.Errorf("filer not ready: %v", err) } @@ -130,6 +163,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error { // Wait for mount to be ready if err := f.waitForMount(30 * time.Second); err != nil { + f.dumpLog("mount") return fmt.Errorf("FUSE mount not ready: %v", err) } @@ -168,26 +202,61 @@ func (f *FuseTestFramework) GetFilerAddr() string { return f.filerAddr } +// startProcess is a helper that starts a weed sub-command with output captured +// to a log file in f.logDir. +func (f *FuseTestFramework) startProcess(name string, args []string) (*os.Process, error) { + logFile, err := os.Create(filepath.Join(f.logDir, name+".log")) + if err != nil { + return nil, fmt.Errorf("create log file: %v", err) + } + cmd := exec.Command(f.weedBinary, args...) + cmd.Dir = f.tempDir + cmd.Stdout = logFile + cmd.Stderr = logFile + if err := cmd.Start(); err != nil { + logFile.Close() + return nil, err + } + // Close the file handle — the child process inherited it. + logFile.Close() + return cmd.Process, nil +} + +// dumpLog prints the last lines of a process log file to the test output +// for debugging when a service fails to start. +func (f *FuseTestFramework) dumpLog(name string) { + data, err := os.ReadFile(filepath.Join(f.logDir, name+".log")) + if err != nil { + f.t.Logf("[%s log] (not available: %v)", name, err) + return + } + // Truncate to last 2KB to keep output manageable + if len(data) > 2048 { + data = data[len(data)-2048:] + } + f.t.Logf("[%s log tail]\n%s", name, string(data)) +} + // startMaster starts the SeaweedFS master server func (f *FuseTestFramework) startMaster(config *TestConfig) error { + // Do NOT set -port.grpc explicitly. SeaweedFS convention is gRPC = HTTP + 10000. + // Volume/filer discover the master gRPC port by this convention, so overriding + // it breaks inter-service communication. args := []string{ "master", "-ip=127.0.0.1", - "-port=19333", + "-port=" + strconv.Itoa(f.masterPort), "-mdir=" + filepath.Join(f.dataDir, "master"), - "-raftBootstrap", - "-peers=none", // Faster startup when no multiple masters needed } if config.EnableDebug { args = append(args, "-v=4") } - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("master", args) + if err != nil { return err } - f.masterProcess = cmd.Process + f.masterProcess = proc return nil } @@ -195,9 +264,9 @@ func (f *FuseTestFramework) startMaster(config *TestConfig) error { func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error { args := []string{ "volume", - "-master=" + f.masterAddr, + "-master=127.0.0.1:" + strconv.Itoa(f.masterPort), "-ip=127.0.0.1", - "-port=18080", + "-port=" + strconv.Itoa(f.volumePort), "-dir=" + filepath.Join(f.dataDir, "volume"), fmt.Sprintf("-max=%d", config.NumVolumes), } @@ -205,12 +274,11 @@ func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error { args = append(args, "-v=4") } - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("volume", args) + if err != nil { return err } - f.volumeProcess = cmd.Process + f.volumeProcess = proc return nil } @@ -218,20 +286,19 @@ func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error { func (f *FuseTestFramework) startFiler(config *TestConfig) error { args := []string{ "filer", - "-master=" + f.masterAddr, + "-master=127.0.0.1:" + strconv.Itoa(f.masterPort), "-ip=127.0.0.1", - "-port=18888", + "-port=" + strconv.Itoa(f.filerPort), } if config.EnableDebug { args = append(args, "-v=4") } - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("filer", args) + if err != nil { return err } - f.filerProcess = cmd.Process + f.filerProcess = proc return nil } @@ -239,10 +306,11 @@ func (f *FuseTestFramework) startFiler(config *TestConfig) error { func (f *FuseTestFramework) mountFuse(config *TestConfig) error { args := []string{ "mount", - "-filer=" + f.filerAddr, + "-filer=127.0.0.1:" + strconv.Itoa(f.filerPort), "-dir=" + f.mountPoint, "-filer.path=/", "-dirAutoCreate", + "-allowOthers=false", } if config.Collection != "" { @@ -255,7 +323,7 @@ func (f *FuseTestFramework) mountFuse(config *TestConfig) error { args = append(args, fmt.Sprintf("-chunkSizeLimitMB=%d", config.ChunkSizeMB)) } if config.CacheSizeMB > 0 { - args = append(args, fmt.Sprintf("-cacheSizeMB=%d", config.CacheSizeMB)) + args = append(args, fmt.Sprintf("-cacheCapacityMB=%d", config.CacheSizeMB)) } if config.EnableDebug { args = append(args, "-v=4") @@ -263,12 +331,11 @@ func (f *FuseTestFramework) mountFuse(config *TestConfig) error { args = append(args, config.MountOptions...) - cmd := exec.Command(f.weedBinary, args...) - cmd.Dir = f.tempDir - if err := cmd.Start(); err != nil { + proc, err := f.startProcess("mount", args) + if err != nil { return err } - f.mountProcess = cmd.Process + f.mountProcess = proc return nil } @@ -281,7 +348,8 @@ func (f *FuseTestFramework) unmountFuse() error { } // Also try system unmount as backup - exec.Command("umount", f.mountPoint).Run() + exec.Command("fusermount3", "-u", f.mountPoint).Run() + exec.Command("fusermount", "-u", f.mountPoint).Run() return nil } @@ -315,27 +383,31 @@ func (f *FuseTestFramework) waitForMount(timeout time.Duration) error { return fmt.Errorf("mount point not ready within timeout") } -// findWeedBinary locates the weed binary +// findWeedBinary locates the weed binary. +// Checks PATH first (most reliable in CI where the binary is installed to +// /usr/local/bin), then falls back to relative paths. Each candidate is +// verified to be a regular file so that a source directory named "weed" +// is never mistaken for the binary. func findWeedBinary() string { - // Try different possible locations + // PATH lookup first — works in CI and when weed is installed globally. + if p, err := exec.LookPath("weed"); err == nil { + return p + } + + // Relative paths for local development (run from test/fuse_integration/). candidates := []string{ + "../../weed/weed", // built in-tree: weed/weed "./weed", "../weed", - "../../weed", - "weed", // in PATH } - for _, candidate := range candidates { - if _, err := exec.LookPath(candidate); err == nil { - return candidate - } - if _, err := os.Stat(candidate); err == nil { + if info, err := os.Stat(candidate); err == nil && !info.IsDir() { abs, _ := filepath.Abs(candidate) return abs } } - // Default fallback + // Default fallback — will fail with a clear "not found" at exec time. return "weed" } diff --git a/test/fuse_integration/writeback_cache_test.go b/test/fuse_integration/writeback_cache_test.go new file mode 100644 index 000000000..09075f0ef --- /dev/null +++ b/test/fuse_integration/writeback_cache_test.go @@ -0,0 +1,825 @@ +package fuse_test + +import ( + "bytes" + "crypto/rand" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// writebackConfig returns a TestConfig with writebackCache enabled +func writebackConfig() *TestConfig { + return &TestConfig{ + Collection: "", + Replication: "000", + ChunkSizeMB: 2, + CacheSizeMB: 100, + NumVolumes: 3, + EnableDebug: false, + MountOptions: []string{ + "-writebackCache", + }, + SkipCleanup: false, + } +} + +// waitForFileContent polls until a file has the expected content or timeout expires. +// This is needed because writebackCache defers data upload to background goroutines, +// so there is a brief window after close() where the file may not yet be readable. +func waitForFileContent(t *testing.T, path string, expected []byte, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + actual, err := os.ReadFile(path) + if err == nil && bytes.Equal(expected, actual) { + return + } + if err != nil { + lastErr = err + } else { + lastErr = fmt.Errorf("content mismatch: got %d bytes, want %d bytes", len(actual), len(expected)) + } + time.Sleep(200 * time.Millisecond) + } + t.Fatalf("file %s did not have expected content within %v: %v", path, timeout, lastErr) +} + +// waitForFileSize polls until a file reports the expected size or timeout expires. +func waitForFileSize(t *testing.T, path string, expectedSize int64, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + info, err := os.Stat(path) + if err == nil && info.Size() == expectedSize { + return + } + time.Sleep(200 * time.Millisecond) + } + t.Fatalf("file %s did not reach expected size %d within %v", path, expectedSize, timeout) +} + +// TestWritebackCacheBasicOperations tests fundamental file I/O with writebackCache enabled +func TestWritebackCacheBasicOperations(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("WriteAndReadBack", func(t *testing.T) { + testWritebackWriteAndReadBack(t, framework) + }) + + t.Run("MultipleFilesSequential", func(t *testing.T) { + testWritebackMultipleFilesSequential(t, framework) + }) + + t.Run("LargeFile", func(t *testing.T) { + testWritebackLargeFile(t, framework) + }) + + t.Run("EmptyFile", func(t *testing.T) { + testWritebackEmptyFile(t, framework) + }) + + t.Run("OverwriteExistingFile", func(t *testing.T) { + testWritebackOverwriteFile(t, framework) + }) +} + +// testWritebackWriteAndReadBack writes a file and verifies it can be read back +// after the async flush completes. +func testWritebackWriteAndReadBack(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_basic.txt" + content := []byte("Hello from writebackCache test!") + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Write file — close() returns immediately with async flush + require.NoError(t, os.WriteFile(mountPath, content, 0644)) + + // Wait for async flush to complete and verify content + waitForFileContent(t, mountPath, content, 30*time.Second) +} + +// testWritebackMultipleFilesSequential writes multiple files sequentially +// and verifies all are readable after async flushes complete. +func testWritebackMultipleFilesSequential(t *testing.T, framework *FuseTestFramework) { + dir := "writeback_sequential" + framework.CreateTestDir(dir) + + numFiles := 50 + files := make(map[string][]byte, numFiles) + + // Write files sequentially — each close() returns immediately + for i := 0; i < numFiles; i++ { + filename := fmt.Sprintf("file_%03d.txt", i) + content := []byte(fmt.Sprintf("Sequential file %d content: %s", i, time.Now().Format(time.RFC3339Nano))) + path := filepath.Join(framework.GetMountPoint(), dir, filename) + require.NoError(t, os.WriteFile(path, content, 0644)) + files[filename] = content + } + + // Verify all files after a brief wait for async flushes + for filename, expectedContent := range files { + path := filepath.Join(framework.GetMountPoint(), dir, filename) + waitForFileContent(t, path, expectedContent, 30*time.Second) + } +} + +// testWritebackLargeFile writes a large file (multi-chunk) with writebackCache +func testWritebackLargeFile(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_large.bin" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // 8MB file (spans multiple 2MB chunks) + content := make([]byte, 8*1024*1024) + _, err := rand.Read(content) + require.NoError(t, err) + + require.NoError(t, os.WriteFile(mountPath, content, 0644)) + + // Wait for file to be fully flushed + waitForFileContent(t, mountPath, content, 60*time.Second) +} + +// testWritebackEmptyFile creates an empty file with writebackCache +func testWritebackEmptyFile(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_empty.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Create empty file + f, err := os.Create(mountPath) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // Should exist and be empty + info, err := os.Stat(mountPath) + require.NoError(t, err) + assert.Equal(t, int64(0), info.Size()) +} + +// testWritebackOverwriteFile tests overwriting an existing file +func testWritebackOverwriteFile(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_overwrite.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // First write + content1 := []byte("First version of the file") + require.NoError(t, os.WriteFile(mountPath, content1, 0644)) + waitForFileContent(t, mountPath, content1, 30*time.Second) + + // Overwrite with different content + content2 := []byte("Second version — overwritten content that is longer than the first") + require.NoError(t, os.WriteFile(mountPath, content2, 0644)) + waitForFileContent(t, mountPath, content2, 30*time.Second) +} + +// TestWritebackCacheFsync tests that fsync still forces synchronous flush +// even when writebackCache is enabled +func TestWritebackCacheFsync(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("FsyncForcesFlush", func(t *testing.T) { + testFsyncForcesFlush(t, framework) + }) + + t.Run("FsyncThenRead", func(t *testing.T) { + testFsyncThenRead(t, framework) + }) +} + +// testFsyncForcesFlush verifies that calling fsync before close ensures +// data is immediately available for reading, bypassing the async path. +func testFsyncForcesFlush(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_fsync.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + content := []byte("Data that must be flushed synchronously via fsync") + + // Open, write, fsync, close + f, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + + _, err = f.Write(content) + require.NoError(t, err) + + // fsync forces synchronous data+metadata flush + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + + // Data should be immediately available — no wait needed + actual, err := os.ReadFile(mountPath) + require.NoError(t, err) + assert.Equal(t, content, actual) +} + +// testFsyncThenRead verifies that after fsync, a freshly opened read +// returns the correct data without any delay. +func testFsyncThenRead(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_fsync_read.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + content := make([]byte, 64*1024) // 64KB + _, err := rand.Read(content) + require.NoError(t, err) + + // Write with explicit fsync + f, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + _, err = f.Write(content) + require.NoError(t, err) + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + + // Immediate read should succeed + actual, err := os.ReadFile(mountPath) + require.NoError(t, err) + assert.Equal(t, content, actual) +} + +// TestWritebackCacheConcurrentSmallFiles is the primary test for issue #8718: +// many small files written concurrently should all be eventually readable. +func TestWritebackCacheConcurrentSmallFiles(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("ConcurrentSmallFiles", func(t *testing.T) { + testWritebackConcurrentSmallFiles(t, framework) + }) + + t.Run("ConcurrentSmallFilesMultiDir", func(t *testing.T) { + testWritebackConcurrentSmallFilesMultiDir(t, framework) + }) + + t.Run("RapidCreateCloseSequence", func(t *testing.T) { + testWritebackRapidCreateClose(t, framework) + }) +} + +// testWritebackConcurrentSmallFiles simulates the rsync workload from #8718: +// multiple workers creating many small files in parallel. +func testWritebackConcurrentSmallFiles(t *testing.T, framework *FuseTestFramework) { + dir := "writeback_concurrent_small" + framework.CreateTestDir(dir) + + numWorkers := 8 + filesPerWorker := 20 + totalFiles := numWorkers * filesPerWorker + + type fileRecord struct { + path string + content []byte + } + + var mu sync.Mutex + var writeErrors []error + records := make([]fileRecord, 0, totalFiles) + + // Phase 1: Write files concurrently (simulating rsync workers) + var wg sync.WaitGroup + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for f := 0; f < filesPerWorker; f++ { + filename := fmt.Sprintf("w%02d_f%03d.dat", workerID, f) + path := filepath.Join(framework.GetMountPoint(), dir, filename) + + // Vary sizes: 100B to 100KB + size := 100 + (workerID*filesPerWorker+f)*500 + if size > 100*1024 { + size = 100*1024 + } + content := make([]byte, size) + if _, err := rand.Read(content); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d rand: %v", workerID, f, err)) + mu.Unlock() + return + } + + if err := os.WriteFile(path, content, 0644); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d: %v", workerID, f, err)) + mu.Unlock() + return + } + + mu.Lock() + records = append(records, fileRecord{path: path, content: content}) + mu.Unlock() + } + }(w) + } + wg.Wait() + + require.Empty(t, writeErrors, "write errors: %v", writeErrors) + assert.Equal(t, totalFiles, len(records)) + + // Phase 2: Wait for async flushes and verify all files + for _, rec := range records { + waitForFileContent(t, rec.path, rec.content, 60*time.Second) + } + + // Phase 3: Verify directory listing has correct count + entries, err := os.ReadDir(filepath.Join(framework.GetMountPoint(), dir)) + require.NoError(t, err) + assert.Equal(t, totalFiles, len(entries)) +} + +// testWritebackConcurrentSmallFilesMultiDir tests concurrent writes across +// multiple directories — a common pattern for parallel copy tools. +func testWritebackConcurrentSmallFilesMultiDir(t *testing.T, framework *FuseTestFramework) { + baseDir := "writeback_multidir" + framework.CreateTestDir(baseDir) + + numDirs := 4 + filesPerDir := 25 + + type fileRecord struct { + path string + content []byte + } + var mu sync.Mutex + var records []fileRecord + var writeErrors []error + + var wg sync.WaitGroup + for d := 0; d < numDirs; d++ { + subDir := filepath.Join(baseDir, fmt.Sprintf("dir_%02d", d)) + framework.CreateTestDir(subDir) + + wg.Add(1) + go func(dirID int, dirPath string) { + defer wg.Done() + + for f := 0; f < filesPerDir; f++ { + filename := fmt.Sprintf("file_%03d.txt", f) + path := filepath.Join(framework.GetMountPoint(), dirPath, filename) + content := []byte(fmt.Sprintf("dir=%d file=%d data=%s", dirID, f, time.Now().Format(time.RFC3339Nano))) + + if err := os.WriteFile(path, content, 0644); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("dir %d file %d: %v", dirID, f, err)) + mu.Unlock() + return + } + + mu.Lock() + records = append(records, fileRecord{path: path, content: content}) + mu.Unlock() + } + }(d, subDir) + } + wg.Wait() + + require.Empty(t, writeErrors, "write errors: %v", writeErrors) + + // Verify all files + for _, rec := range records { + waitForFileContent(t, rec.path, rec.content, 60*time.Second) + } +} + +// testWritebackRapidCreateClose rapidly creates and closes files to stress +// the async flush goroutine pool. +func testWritebackRapidCreateClose(t *testing.T, framework *FuseTestFramework) { + dir := "writeback_rapid" + framework.CreateTestDir(dir) + + numFiles := 200 + type fileRecord struct { + path string + content []byte + } + records := make([]fileRecord, numFiles) + + // Rapidly create files without pausing + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("rapid_%04d.bin", i)) + content := []byte(fmt.Sprintf("rapid-file-%d", i)) + require.NoError(t, os.WriteFile(path, content, 0644)) + records[i] = fileRecord{path: path, content: content} + } + + // Verify all files eventually appear with correct content + for _, rec := range records { + waitForFileContent(t, rec.path, rec.content, 60*time.Second) + } +} + +// TestWritebackCacheDataIntegrity tests that data integrity is preserved +// across various write patterns with writebackCache enabled. +func TestWritebackCacheDataIntegrity(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + t.Run("AppendAfterClose", func(t *testing.T) { + testWritebackAppendAfterClose(t, framework) + }) + + t.Run("PartialWrites", func(t *testing.T) { + testWritebackPartialWrites(t, framework) + }) + + t.Run("FileSizeCorrectness", func(t *testing.T) { + testWritebackFileSizeCorrectness(t, framework) + }) + + t.Run("BinaryData", func(t *testing.T) { + testWritebackBinaryData(t, framework) + }) +} + +// testWritebackAppendAfterClose writes a file, closes it (triggering async flush), +// waits for flush, then reopens and appends more data. +func testWritebackAppendAfterClose(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_append.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // First write + part1 := []byte("First part of the data.\n") + require.NoError(t, os.WriteFile(mountPath, part1, 0644)) + + // Wait for first async flush + waitForFileContent(t, mountPath, part1, 30*time.Second) + + // Append more data + part2 := []byte("Second part appended.\n") + f, err := os.OpenFile(mountPath, os.O_APPEND|os.O_WRONLY, 0644) + require.NoError(t, err) + _, err = f.Write(part2) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // Verify combined content + expected := append(part1, part2...) + waitForFileContent(t, mountPath, expected, 30*time.Second) +} + +// testWritebackPartialWrites tests writing to specific offsets within a file +func testWritebackPartialWrites(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_partial.bin" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Create file with initial content + initial := bytes.Repeat([]byte("A"), 4096) + require.NoError(t, os.WriteFile(mountPath, initial, 0644)) + waitForFileContent(t, mountPath, initial, 30*time.Second) + + // Open and write at specific offset + f, err := os.OpenFile(mountPath, os.O_WRONLY, 0644) + require.NoError(t, err) + patch := []byte("PATCHED") + _, err = f.WriteAt(patch, 100) + require.NoError(t, err) + require.NoError(t, f.Close()) + + // Build expected content + expected := make([]byte, 4096) + copy(expected, initial) + copy(expected[100:], patch) + + waitForFileContent(t, mountPath, expected, 30*time.Second) +} + +// testWritebackFileSizeCorrectness verifies that file sizes are correct +// after async flush completes. +func testWritebackFileSizeCorrectness(t *testing.T, framework *FuseTestFramework) { + sizes := []int{0, 1, 100, 4096, 65536, 1024 * 1024} + + for _, size := range sizes { + filename := fmt.Sprintf("writeback_size_%d.bin", size) + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + content := make([]byte, size) + if size > 0 { + _, err := rand.Read(content) + require.NoError(t, err, "rand.Read failed for size %d", size) + } + + require.NoError(t, os.WriteFile(mountPath, content, 0644), "failed to write file of size %d", size) + + if size > 0 { + waitForFileSize(t, mountPath, int64(size), 30*time.Second) + waitForFileContent(t, mountPath, content, 30*time.Second) + } + } +} + +// testWritebackBinaryData verifies that arbitrary binary data (including null bytes) +// is preserved correctly through the async flush path. +func testWritebackBinaryData(t *testing.T, framework *FuseTestFramework) { + filename := "writeback_binary.bin" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Generate data with all byte values including nulls + content := make([]byte, 256*100) + for i := range content { + content[i] = byte(i % 256) + } + + require.NoError(t, os.WriteFile(mountPath, content, 0644)) + waitForFileContent(t, mountPath, content, 30*time.Second) +} + +// TestWritebackCachePerformance measures whether writebackCache actually +// improves throughput for small file workloads compared to synchronous flush. +func TestWritebackCachePerformance(t *testing.T) { + if testing.Short() { + t.Skip("skipping performance test in short mode") + } + + numFiles := 200 + fileSize := 4096 // 4KB files + + // Generate test data upfront + testData := make([][]byte, numFiles) + for i := range testData { + testData[i] = make([]byte, fileSize) + rand.Read(testData[i]) + } + + // Benchmark with writebackCache enabled + t.Run("WithWritebackCache", func(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + require.NoError(t, framework.Setup(config)) + + dir := "perf_writeback" + framework.CreateTestDir(dir) + + start := time.Now() + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i)) + require.NoError(t, os.WriteFile(path, testData[i], 0644)) + } + writebackDuration := time.Since(start) + + // Wait for all files to be flushed + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i)) + waitForFileContent(t, path, testData[i], 60*time.Second) + } + + t.Logf("writebackCache: wrote %d files in %v (%.0f files/sec)", + numFiles, writebackDuration, float64(numFiles)/writebackDuration.Seconds()) + }) + + // Benchmark without writebackCache (synchronous flush) + t.Run("WithoutWritebackCache", func(t *testing.T) { + config := DefaultTestConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + require.NoError(t, framework.Setup(config)) + + dir := "perf_sync" + framework.CreateTestDir(dir) + + start := time.Now() + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i)) + require.NoError(t, os.WriteFile(path, testData[i], 0644)) + } + syncDuration := time.Since(start) + + t.Logf("synchronous: wrote %d files in %v (%.0f files/sec)", + numFiles, syncDuration, float64(numFiles)/syncDuration.Seconds()) + }) +} + +// TestWritebackCacheConcurrentMixedOps tests a mix of operations happening +// concurrently with writebackCache: creates, reads, overwrites, and deletes. +func TestWritebackCacheConcurrentMixedOps(t *testing.T) { + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + dir := "writeback_mixed" + framework.CreateTestDir(dir) + + numFiles := 50 + var mu sync.Mutex + var errors []error + var completedWrites int64 + + addError := func(err error) { + mu.Lock() + defer mu.Unlock() + errors = append(errors, err) + } + + // Phase 1: Create initial files and wait for async flushes + initialContents := make(map[int][]byte, numFiles) + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", i)) + content := []byte(fmt.Sprintf("initial content %d", i)) + require.NoError(t, os.WriteFile(path, content, 0644)) + initialContents[i] = content + } + + // Poll until initial files are flushed (instead of fixed sleep) + for i := 0; i < numFiles; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", i)) + waitForFileContent(t, path, initialContents[i], 30*time.Second) + } + + // Phase 2: Concurrent mixed operations + var wg sync.WaitGroup + + // Writers: overwrite existing files + for i := 0; i < 4; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for j := 0; j < numFiles; j++ { + if j%4 != workerID { + continue // each worker handles a subset + } + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", j)) + content := []byte(fmt.Sprintf("overwritten by worker %d at %s", workerID, time.Now().Format(time.RFC3339Nano))) + if err := os.WriteFile(path, content, 0644); err != nil { + addError(fmt.Errorf("writer %d file %d: %v", workerID, j, err)) + return + } + atomic.AddInt64(&completedWrites, 1) + } + }(i) + } + + // Readers: read files (may see old or new content, but should not error) + for i := 0; i < 4; i++ { + wg.Add(1) + go func(readerID int) { + defer wg.Done() + for j := 0; j < numFiles; j++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", j)) + _, err := os.ReadFile(path) + if err != nil && !os.IsNotExist(err) { + addError(fmt.Errorf("reader %d file %d: %v", readerID, j, err)) + return + } + } + }(i) + } + + // New file creators + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 20; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("new_file_%03d.txt", i)) + content := []byte(fmt.Sprintf("new file %d", i)) + if err := os.WriteFile(path, content, 0644); err != nil { + addError(fmt.Errorf("creator file %d: %v", i, err)) + return + } + } + }() + + wg.Wait() + + require.Empty(t, errors, "mixed operation errors: %v", errors) + assert.True(t, atomic.LoadInt64(&completedWrites) > 0, "should have completed some writes") + + // Verify new files exist after async flushes complete (poll instead of fixed sleep) + for i := 0; i < 20; i++ { + path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("new_file_%03d.txt", i)) + expected := []byte(fmt.Sprintf("new file %d", i)) + waitForFileContent(t, path, expected, 30*time.Second) + } +} + +// TestWritebackCacheStressSmallFiles is a focused stress test for the +// async flush path with many small files — the core scenario from #8718. +func TestWritebackCacheStressSmallFiles(t *testing.T) { + if testing.Short() { + t.Skip("skipping stress test in short mode") + } + + config := writebackConfig() + framework := NewFuseTestFramework(t, config) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(config)) + + dir := "writeback_stress" + framework.CreateTestDir(dir) + + numWorkers := 16 + filesPerWorker := 100 + totalFiles := numWorkers * filesPerWorker + + type fileRecord struct { + path string + content []byte + } + + var mu sync.Mutex + var writeErrors []error + records := make([]fileRecord, 0, totalFiles) + + start := time.Now() + + // Simulate rsync-like workload: many workers each writing small files + var wg sync.WaitGroup + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for f := 0; f < filesPerWorker; f++ { + filename := fmt.Sprintf("w%02d/f%04d.dat", workerID, f) + path := filepath.Join(framework.GetMountPoint(), dir, filename) + + // Ensure subdirectory exists + if f == 0 { + subDir := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("w%02d", workerID)) + if err := os.MkdirAll(subDir, 0755); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d mkdir: %v", workerID, err)) + mu.Unlock() + return + } + } + + // Small file: 1KB-10KB (typical for rsync of config/source files) + size := 1024 + (f%10)*1024 + content := make([]byte, size) + if _, err := rand.Read(content); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d rand: %v", workerID, f, err)) + mu.Unlock() + return + } + + if err := os.WriteFile(path, content, 0644); err != nil { + mu.Lock() + writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d: %v", workerID, f, err)) + mu.Unlock() + return + } + + mu.Lock() + records = append(records, fileRecord{path: path, content: content}) + mu.Unlock() + } + }(w) + } + wg.Wait() + + writeDuration := time.Since(start) + t.Logf("wrote %d files in %v (%.0f files/sec)", + totalFiles, writeDuration, float64(totalFiles)/writeDuration.Seconds()) + + require.Empty(t, writeErrors, "write errors: %v", writeErrors) + assert.Equal(t, totalFiles, len(records)) + + // Verify all files are eventually readable with correct content + var verifyErrors []error + for _, rec := range records { + deadline := time.Now().Add(120 * time.Second) + var lastErr error + for time.Now().Before(deadline) { + actual, err := os.ReadFile(rec.path) + if err == nil && bytes.Equal(rec.content, actual) { + lastErr = nil + break + } + if err != nil { + lastErr = err + } else { + lastErr = fmt.Errorf("content mismatch for %s: got %d bytes, want %d", rec.path, len(actual), len(rec.content)) + } + time.Sleep(500 * time.Millisecond) + } + if lastErr != nil { + verifyErrors = append(verifyErrors, lastErr) + } + } + require.Empty(t, verifyErrors, "verification errors after stress test: %v", verifyErrors) + + t.Logf("all %d files verified successfully", totalFiles) +} diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 5b272cbeb..c0cdae7d3 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -349,6 +349,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { RdmaMaxConcurrent: *option.rdmaMaxConcurrent, RdmaTimeoutMs: *option.rdmaTimeoutMs, DirIdleEvictSec: *option.dirIdleEvictSec, + WritebackCache: option.writebackCache != nil && *option.writebackCache, }) // create mount root @@ -396,6 +397,10 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { server.Serve() + // Wait for any pending background flushes (writebackCache async mode) + // before clearing caches, to prevent data loss during clean unmount. + seaweedFileSystem.WaitForAsyncFlush() + seaweedFileSystem.ClearCacheDir() return true diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index e912fe310..ccc4c07e7 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -24,10 +24,15 @@ type FileHandle struct { wfs *WFS // cache file has been written to - dirtyMetadata bool - dirtyPages *PageWriter - reader *filer.ChunkReadAt - contentType string + dirtyMetadata bool + dirtyPages *PageWriter + reader *filer.ChunkReadAt + contentType string + asyncFlushPending bool // set in writebackCache mode to defer flush to Release + asyncFlushUid uint32 // saved uid for deferred metadata flush + asyncFlushGid uint32 // saved gid for deferred metadata flush + asyncFlushDir string // saved directory at defer time (fallback if inode forgotten) + asyncFlushName string // saved file name at defer time (fallback if inode forgotten) isDeleted bool diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go index 4441de0be..c15146265 100644 --- a/weed/mount/filehandle_map.go +++ b/weed/mount/filehandle_map.go @@ -55,39 +55,72 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil return fh } -func (i *FileHandleToInode) ReleaseByInode(inode uint64) { +func (i *FileHandleToInode) ReleaseByInode(inode uint64) *FileHandle { i.Lock() defer i.Unlock() fh, found := i.inode2fh[inode] - if found { - fh.counter-- - if fh.counter <= 0 { - delete(i.inode2fh, inode) - delete(i.fh2inode, fh.fh) - fh.ReleaseHandle() + if !found { + return nil + } + // If the counter is already <= 0, a prior Release already started the + // drain. Return nil to prevent double-processing (e.g. Forget after Release). + if fh.counter <= 0 { + return nil + } + fh.counter-- + if fh.counter <= 0 { + if fh.asyncFlushPending { + // Handle stays in fhMap so rename/unlink can find it during drain. + return fh } + delete(i.inode2fh, inode) + delete(i.fh2inode, fh.fh) + return fh } + return nil } -func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) { +func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) *FileHandle { i.Lock() defer i.Unlock() inode, found := i.fh2inode[fh] if !found { - return // Handle already released or invalid + return nil } fhHandle, fhFound := i.inode2fh[inode] if !fhFound { delete(i.fh2inode, fh) - return + return nil + } + + // If the counter is already <= 0, a prior Release already started the + // drain. Return nil to prevent double-processing. + if fhHandle.counter <= 0 { + return nil } fhHandle.counter-- if fhHandle.counter <= 0 { + if fhHandle.asyncFlushPending { + // Handle stays in fhMap so rename/unlink can still find it + // via FindFileHandle during the background drain. + return fhHandle + } delete(i.inode2fh, inode) delete(i.fh2inode, fhHandle.fh) - fhHandle.ReleaseHandle() + return fhHandle } + return nil +} + +// RemoveFileHandle removes a handle from both maps. Called after an async +// drain completes to clean up the handle that was intentionally kept in the +// maps during the flush. +func (i *FileHandleToInode) RemoveFileHandle(fh FileHandleId, inode uint64) { + i.Lock() + defer i.Unlock() + delete(i.inode2fh, inode) + delete(i.fh2inode, fh) } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index b5c8f9ebd..45922b2ff 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -65,10 +65,6 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) if logicStart < logicStop { copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) maxStop = max(maxStop, logicStop) - - if t.TsNs >= tsNs { - println("read new data1", t.TsNs-tsNs, "ns") - } } } mc.activityScore.MarkRead() diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index c37a9e2ca..bf60be67a 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -151,10 +151,6 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in break } maxStop = max(maxStop, logicStop) - - if t.TsNs > tsNs { - println("read new data2", t.TsNs-tsNs, "ns") - } } } //sc.memChunk.ReadDataAt(memCopy, off, tsNs) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 9b2341ca3..e24a49aac 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -78,6 +78,10 @@ type Option struct { // Directory cache refresh/eviction controls DirIdleEvictSec int + // WritebackCache enables async flush on close for improved small file write performance. + // When true, Flush() returns immediately and data upload + metadata flush happen in background. + WritebackCache bool + uniqueCacheDirForRead string uniqueCacheDirForWrite string } @@ -110,6 +114,16 @@ type WFS struct { dirHotWindow time.Duration dirHotThreshold int dirIdleEvict time.Duration + + // asyncFlushWg tracks pending background flush goroutines for writebackCache mode. + // Must be waited on before unmount cleanup to prevent data loss. + asyncFlushWg sync.WaitGroup + + // pendingAsyncFlush tracks in-flight async flush goroutines by inode. + // AcquireHandle checks this to wait for a pending flush before reopening + // the same inode, preventing stale metadata from overwriting the async flush. + pendingAsyncFlushMu sync.Mutex + pendingAsyncFlush map[uint64]chan struct{} } const ( @@ -151,13 +165,14 @@ func NewSeaweedFileSystem(option *Option) *WFS { } wfs := &WFS{ - RawFileSystem: fuse.NewDefaultRawFileSystem(), - option: option, - signature: util.RandomInt32(), - inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), - fhMap: NewFileHandleToInode(), - dhMap: NewDirectoryHandleToInode(), - filerClient: filerClient, // nil for proxy mode, initialized for direct access + RawFileSystem: fuse.NewDefaultRawFileSystem(), + option: option, + signature: util.RandomInt32(), + inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), + fhMap: NewFileHandleToInode(), + dhMap: NewDirectoryHandleToInode(), + filerClient: filerClient, // nil for proxy mode, initialized for direct access + pendingAsyncFlush: make(map[uint64]chan struct{}), fhLockTable: util.NewLockTable[FileHandleId](), refreshingDirs: make(map[util.FullPath]struct{}), dirHotWindow: dirHotWindow, @@ -204,8 +219,32 @@ func NewSeaweedFileSystem(option *Option) *WFS { } }) grace.OnInterrupt(func() { + // grace calls os.Exit(0) after all hooks, so WaitForAsyncFlush + // after server.Serve() would never execute. Drain here first. + // + // Use a timeout to avoid hanging on Ctrl-C if the filer is + // unreachable (metadata retry can take up to 7 seconds). + // If the timeout expires, skip the write-cache removal so that + // still-running goroutines can finish reading swap files. + asyncDrained := true + if wfs.option.WritebackCache { + done := make(chan struct{}) + go func() { + wfs.asyncFlushWg.Wait() + close(done) + }() + select { + case <-done: + glog.V(0).Infof("all async flushes completed before shutdown") + case <-time.After(30 * time.Second): + glog.Warningf("timed out waiting for async flushes — swap files preserved for in-flight uploads") + asyncDrained = false + } + } wfs.metaCache.Shutdown() - os.RemoveAll(option.getUniqueCacheDirForWrite()) + if asyncDrained { + os.RemoveAll(option.getUniqueCacheDirForWrite()) + } os.RemoveAll(option.getUniqueCacheDirForRead()) if wfs.rdmaClient != nil { wfs.rdmaClient.Close() @@ -240,6 +279,10 @@ func NewSeaweedFileSystem(option *Option) *WFS { } func (wfs *WFS) StartBackgroundTasks() error { + if wfs.option.WritebackCache { + glog.V(0).Infof("writebackCache enabled: async flush on close() for improved small file performance") + } + follower, err := wfs.subscribeFilerConfEvents() if err != nil { return err diff --git a/weed/mount/weedfs_async_flush.go b/weed/mount/weedfs_async_flush.go new file mode 100644 index 000000000..6ded810fe --- /dev/null +++ b/weed/mount/weedfs_async_flush.go @@ -0,0 +1,96 @@ +package mount + +import ( + "time" + + "github.com/seaweedfs/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +const asyncFlushMetadataRetries = 3 + +// completeAsyncFlush is called in a background goroutine when a file handle +// with pending async flush work is released. It performs the deferred data +// upload and metadata flush that was skipped in doFlush() for writebackCache mode. +// +// This enables close() to return immediately for small file workloads (e.g., rsync), +// while the actual I/O happens concurrently in the background. +// +// The caller (submitAsyncFlush) owns asyncFlushWg and the per-inode done channel. +func (wfs *WFS) completeAsyncFlush(fh *FileHandle) { + // Phase 1: Flush dirty pages — seals writable chunks, uploads to volume servers, and waits. + // The underlying UploadWithRetry already retries transient HTTP/gRPC errors internally, + // so a failure here indicates a persistent issue; the chunk data has been freed. + if err := fh.dirtyPages.FlushData(); err != nil { + glog.Errorf("completeAsyncFlush inode %d: data flush failed: %v", fh.inode, err) + // Data is lost at this point (chunks freed after internal retry exhaustion). + // Proceed to cleanup to avoid resource leaks and unmount hangs. + } else if fh.dirtyMetadata { + // Phase 2: Flush metadata unless the file was explicitly unlinked. + // + // isDeleted is set by the Unlink handler when it finds a draining + // handle. In that case the filer entry is already gone and + // flushing would recreate it. The uploaded chunks become orphans + // and are cleaned up by volume.fsck. + if fh.isDeleted { + glog.V(3).Infof("completeAsyncFlush inode %d: file was unlinked, skipping metadata flush", fh.inode) + } else { + // Resolve the current path for metadata flush. + // + // Try GetPath first — it reflects any rename that happened + // after close(). If the inode mapping is gone (Forget + // dropped it after the kernel's lookup count hit zero), fall + // back to the dir/name saved at doFlush time. Rename also + // updates the saved path, so the fallback is always current. + // + // Forget does NOT mean the file was deleted — it only means + // the kernel evicted its cache entry. + dir, name := fh.asyncFlushDir, fh.asyncFlushName + fileFullPath := util.FullPath(dir).Child(name) + + if resolvedPath, status := wfs.inodeToPath.GetPath(fh.inode); status == fuse.OK { + dir, name = resolvedPath.DirAndName() + fileFullPath = resolvedPath + } + + wfs.flushMetadataWithRetry(fh, dir, name, fileFullPath) + } + } + + glog.V(3).Infof("completeAsyncFlush done inode %d fh %d", fh.inode, fh.fh) + + // Phase 3: Destroy the upload pipeline and free resources. + fh.ReleaseHandle() +} + +// flushMetadataWithRetry attempts to flush file metadata to the filer, retrying +// with exponential backoff on transient errors. The chunk data is already on the +// volume servers at this point; only the filer metadata reference needs persisting. +func (wfs *WFS) flushMetadataWithRetry(fh *FileHandle, dir, name string, fileFullPath util.FullPath) { + for attempt := 0; attempt <= asyncFlushMetadataRetries; attempt++ { + if attempt > 0 { + backoff := time.Duration(1<