Browse Source

mount: async flush on close() when writebackCache is enabled (#8727)

* mount: async flush on close() when writebackCache is enabled

When -writebackCache is enabled, defer data upload and metadata flush
from Flush() (triggered by close()) to a background goroutine in
Release(). This allows processes like rsync that write many small files
to proceed to the next file immediately instead of blocking on two
network round-trips (volume upload + filer metadata) per file.

Fixes #8718

* mount: add retry with backoff for async metadata flush

The metadata flush in completeAsyncFlush now retries up to 3 times
with exponential backoff (1s, 2s, 4s) on transient gRPC errors.
Since the chunk data is already safely on volume servers at this point,
only the filer metadata reference needs persisting — retrying is both
safe and effective.

Data flush (FlushData) is not retried externally because
UploadWithRetry already handles transient HTTP/gRPC errors internally;
if it still fails, the chunk memory has been freed.

* test: add integration tests for writebackCache async flush

Add comprehensive FUSE integration tests for the writebackCache
async flush feature (issue #8718):

- Basic operations: write/read, sequential files, large files, empty
  files, overwrites
- Fsync correctness: fsync forces synchronous flush even in writeback
  mode, immediate read-after-fsync
- Concurrent small files: multi-worker parallel writes (rsync-like
  workload), multi-directory, rapid create/close
- Data integrity: append after close, partial writes, file size
  correctness, binary data preservation
- Performance comparison: writeback vs synchronous flush throughput
- Stress test: 16 workers x 100 files with content verification
- Mixed concurrent operations: reads, writes, creates running together

Also fix pre-existing test infrastructure issues:
- Rename framework.go to framework_test.go (fixes Go package conflict)
- Fix undefined totalSize variable in concurrent_operations_test.go

* ci: update fuse-integration workflow to run full test suite

The workflow previously only ran placeholder tests (simple_test.go,
working_demo_test.go) in a temp directory due to a Go module conflict.
Now that framework.go is renamed to framework_test.go, the full test
suite compiles and runs correctly from test/fuse_integration/.

Changes:
- Run go test directly in test/fuse_integration/ (no temp dir copy)
- Install weed binary to /usr/local/bin for test framework discovery
- Configure /etc/fuse.conf with user_allow_other for FUSE mounts
- Install fuse3 for modern FUSE support
- Stream test output to log file for artifact upload

* mount: fix three P1 races in async flush

P1-1: Reopen overwrites data still flushing in background
ReleaseByHandle removes the old handle from fhMap before the deferred
flush finishes. A reopen of the same inode during that window would
build from stale filer metadata, overwriting the async flush.

Fix: Track in-flight async flushes per inode via pendingAsyncFlush map.
AcquireHandle now calls waitForPendingAsyncFlush(inode) to block until
any pending flush completes before reading filer metadata.

P1-2: Deferred flush races rename and unlink after close
completeAsyncFlush captured the path once at entry, but rename or
unlink after close() could cause metadata to be written under the
wrong name or recreate a deleted file.

Fix: Re-resolve path from inode via GetPath right before metadata
flush. GetPath returns the current path (reflecting renames) or
ENOENT (if unlinked), in which case we skip the metadata flush.

P1-3: SIGINT/SIGTERM bypasses the async-flush drain
grace.OnInterrupt runs hooks then calls os.Exit(0), so
WaitForAsyncFlush after server.Serve() never executes on signal.

Fix: Add WaitForAsyncFlush (with 10s timeout) to the WFS interrupt
handler, before cache cleanup. The timeout prevents hanging on Ctrl-C
when the filer is unreachable.

* mount: fix P1 races — draining handle stays in fhMap

P1-1: Reopen TOCTOU
The gap between ReleaseByHandle removing from fhMap and
submitAsyncFlush registering in pendingAsyncFlush allowed a
concurrent AcquireHandle to slip through with stale metadata.

Fix: Hold pendingAsyncFlushMu across both the counter decrement
(ReleaseByHandle) and the pending registration. The handle is
registered as pending before the lock is released, so
waitForPendingAsyncFlush always sees it.

P1-2: Rename/unlink can't find draining handle
ReleaseByHandle deleted from fhMap immediately. Rename's
FindFileHandle(inode) at line 251 could not find the handle to
update entry.Name. Unlink could not coordinate either.

Fix: When asyncFlushPending is true, ReleaseByHandle/ReleaseByInode
leave the handle in fhMap (counter=0 but maps intact). The handle
stays visible to FindFileHandle so rename can update entry.Name.
completeAsyncFlush re-resolves the path from the inode (GetPath)
right before metadata flush for correctness after rename/unlink.
After drain, RemoveFileHandle cleans up the maps.

Double-return prevention: ReleaseByHandle/ReleaseByInode return nil
if counter is already <= 0, so Forget after Release doesn't start a
second drain goroutine.

P1-3: SIGINT deletes swap files under running goroutines
After the 10s timeout, os.RemoveAll deleted the write cache dir
(containing swap files) while FlushData goroutines were still
reading from them.

Fix: Increase timeout to 30s. If timeout expires, skip write cache
dir removal so in-flight goroutines can finish reading swap files.
The OS (or next mount) cleans them up. Read cache is always removed.

* mount: never skip metadata flush when Forget drops inode mapping

Forget removes the inode→path mapping when the kernel's lookup count
reaches zero, but this does NOT mean the file was unlinked — it only
means the kernel evicted its cache entry.  completeAsyncFlush was
treating GetPath failure as "file unlinked" and skipping the metadata
flush, which orphaned the just-uploaded chunks for live files.

Fix: Save dir and name at doFlush defer time.  In completeAsyncFlush,
try GetPath first to pick up renames; if the mapping is gone, fall
back to the saved dir/name.  Always attempt the metadata flush — the
filer is the authority on whether the file exists, not the local
inode cache.

* mount: distinguish Forget from Unlink in async flush path fallback

The saved-path fallback (from the previous fix) always flushed
metadata when GetPath failed, which recreated files that were
explicitly unlinked after close().  The same stale fallback could
recreate the pre-rename path if Forget dropped the inode mapping
after a rename.

Root cause: GetPath failure has two meanings:
  1. Forget — kernel evicted the cache entry (file still exists)
  2. Unlink — file was explicitly deleted (should not recreate)

Fix (three coordinated changes):

Unlink (weedfs_file_mkrm.go): Before RemovePath, look up the inode
and find any draining handle via FindFileHandle.  Set fh.isDeleted =
true so the async flush knows the file was explicitly removed.

Rename (weedfs_rename.go): When renaming a file with a draining
handle, update asyncFlushDir/asyncFlushName to the post-rename
location.  This keeps the saved-path fallback current so Forget
after rename doesn't flush to the old (pre-rename) path.

completeAsyncFlush (weedfs_async_flush.go): Check fh.isDeleted
first — if true, skip metadata flush (file was unlinked, chunks
become orphans for volume.fsck).  Otherwise, try GetPath for the
current path (renames); fall back to saved path if Forget dropped
the mapping (file is live, just evicted from kernel cache).

* test/ci: address PR review nitpicks

concurrent_operations_test.go:
- Restore precise totalSize assertion instead of info.Size() > 0

writeback_cache_test.go:
- Check rand.Read errors in all 3 locations (lines 310, 512, 757)
- Check os.MkdirAll error in stress test (line 752)
- Remove dead verifyErrors variable (line 332)
- Replace both time.Sleep(5s) with polling via waitForFileContent
  to avoid flaky tests under CI load (lines 638, 700)

fuse-integration.yml:
- Add set -o pipefail so go test failures propagate through tee

* ci: fix fuse3/fuse package conflict on ubuntu-22.04 runner

fuse3 is pre-installed on ubuntu-22.04 runners and conflicts with
the legacy fuse package. Only install libfuse3-dev for the headers.

* mount/page_writer: remove debug println statements

Remove leftover debug println("read new data1/2") from
ReadDataAt in MemChunk and SwapFileChunk.

* test: fix findWeedBinary matching source directory instead of binary

findWeedBinary() matched ../../weed (the source directory) via
os.Stat before checking PATH, then tried to exec a directory
which fails with "permission denied" on the CI runner.

Fix: Check PATH first (reliable in CI where the binary is installed
to /usr/local/bin). For relative paths, verify the candidate is a
regular file (!info.IsDir()). Add ../../weed/weed as a candidate
for in-tree builds.

* test: fix framework — dynamic ports, output capture, data dirs

The integration test framework was failing in CI because:

1. All tests used hardcoded ports (19333/18080/18888), so sequential
   tests could conflict when prior processes hadn't fully released
   their ports yet.

2. Data subdirectories (data/master, data/volume) were not created
   before starting processes.

3. Master was started with -peers=none which is not a valid address.

4. Process stdout/stderr was not captured, making failures opaque
   ("service not ready within timeout" with no diagnostics).

5. The unmount fallback used 'umount' instead of 'fusermount -u'.

6. The mount used -cacheSizeMB (nonexistent) instead of
   -cacheCapacityMB and was missing -allowOthers=false for
   unprivileged CI runners.

Fixes:
- Dynamic port allocation via freePort() (net.Listen ":0")
- Explicit gRPC ports via -port.grpc to avoid default port conflicts
- Create data/master and data/volume directories in Setup()
- Remove invalid -peers=none and -raftBootstrap flags
- Capture process output to logDir/*.log via startProcess() helper
- dumpLog() prints tail of log file on service startup failure
- Use fusermount3/fusermount -u for unmount
- Fix mount flag names (-cacheCapacityMB, -allowOthers=false)

* test: remove explicit -port.grpc flags from test framework

SeaweedFS convention: gRPC port = HTTP port + 10000.  Volume and
filer discover the master gRPC port by this convention.  Setting
explicit -port.grpc on master/volume/filer broke inter-service
communication because the volume server computed master gRPC as
HTTP+10000 but the actual gRPC was on a different port.

Remove all -port.grpc flags and let the default convention work.
Dynamic HTTP ports already ensure uniqueness; the derived gRPC
ports (HTTP+10000) will also be unique.

---------

Co-authored-by: Copilot <copilot@github.com>
pull/8436/merge
Chris Lu 1 day ago
committed by GitHub
parent
commit
9434d3733d
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 207
      .github/workflows/fuse-integration.yml
  2. 4
      test/fuse_integration/concurrent_operations_test.go
  3. 156
      test/fuse_integration/framework_test.go
  4. 825
      test/fuse_integration/writeback_cache_test.go
  5. 5
      weed/command/mount_std.go
  6. 13
      weed/mount/filehandle.go
  7. 55
      weed/mount/filehandle_map.go
  8. 4
      weed/mount/page_writer/page_chunk_mem.go
  9. 4
      weed/mount/page_writer/page_chunk_swapfile.go
  10. 59
      weed/mount/weedfs.go
  11. 96
      weed/mount/weedfs_async_flush.go
  12. 9
      weed/mount/weedfs_file_mkrm.go
  13. 58
      weed/mount/weedfs_file_sync.go
  14. 60
      weed/mount/weedfs_filehandle.go
  15. 7
      weed/mount/weedfs_forget.go
  16. 6
      weed/mount/weedfs_rename.go

207
.github/workflows/fuse-integration.yml

@ -21,213 +21,60 @@ concurrency:
permissions:
contents: read
env:
TEST_TIMEOUT: '45m'
jobs:
fuse-integration:
name: FUSE Integration Testing
runs-on: ubuntu-22.04
timeout-minutes: 50
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
- name: Install FUSE and dependencies
run: |
sudo apt-get update
sudo apt-get install -y fuse libfuse-dev
# fuse3 is pre-installed on ubuntu-22.04 runners and conflicts
# with the legacy fuse package, so only install the dev headers.
sudo apt-get install -y libfuse3-dev
# Allow non-root FUSE mounts with allow_other
echo 'user_allow_other' | sudo tee -a /etc/fuse.conf
sudo chmod 644 /etc/fuse.conf
# Verify FUSE installation
fusermount --version || true
ls -la /dev/fuse || true
fusermount3 --version || fusermount --version || true
ls -la /dev/fuse
- name: Build SeaweedFS
run: |
cd weed
go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v .
go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -o weed .
chmod +x weed
# Verify binary
./weed version
- name: Prepare FUSE Integration Tests
# Make weed binary available in PATH for the test framework
sudo cp weed /usr/local/bin/weed
- name: Install test dependencies
run: |
# Create isolated test directory to avoid Go module conflicts
mkdir -p /tmp/seaweedfs-fuse-tests
# Copy only the working test files to avoid Go module conflicts
# These are the files we've verified work without package name issues
cp test/fuse_integration/simple_test.go /tmp/seaweedfs-fuse-tests/ 2>/dev/null || echo "⚠️ simple_test.go not found"
cp test/fuse_integration/working_demo_test.go /tmp/seaweedfs-fuse-tests/ 2>/dev/null || echo "⚠️ working_demo_test.go not found"
# Note: Other test files (framework.go, basic_operations_test.go, etc.)
# have Go module conflicts and are skipped until resolved
echo "📁 Working test files copied:"
ls -la /tmp/seaweedfs-fuse-tests/*.go 2>/dev/null || echo "ℹ️ No test files found"
# Initialize Go module in isolated directory
cd /tmp/seaweedfs-fuse-tests
go mod init seaweedfs-fuse-tests
go mod tidy
# Verify setup
echo "✅ FUSE integration test environment prepared"
ls -la /tmp/seaweedfs-fuse-tests/
echo ""
echo "ℹ️ Current Status: Running working subset of FUSE tests"
echo " • simple_test.go: Package structure verification"
echo " • working_demo_test.go: Framework capability demonstration"
echo " • Full framework: Available in test/fuse_integration/ (module conflicts pending resolution)"
cd test/fuse_integration
go mod download
- name: Run FUSE Integration Tests
run: |
cd /tmp/seaweedfs-fuse-tests
echo "🧪 Running FUSE integration tests..."
echo "============================================"
# Run available working test files
TESTS_RUN=0
if [ -f "simple_test.go" ]; then
echo "📋 Running simple_test.go..."
go test -v -timeout=${{ env.TEST_TIMEOUT }} simple_test.go
TESTS_RUN=$((TESTS_RUN + 1))
fi
if [ -f "working_demo_test.go" ]; then
echo "📋 Running working_demo_test.go..."
go test -v -timeout=${{ env.TEST_TIMEOUT }} working_demo_test.go
TESTS_RUN=$((TESTS_RUN + 1))
fi
# Run combined test if multiple files exist
if [ -f "simple_test.go" ] && [ -f "working_demo_test.go" ]; then
echo "📋 Running combined tests..."
go test -v -timeout=${{ env.TEST_TIMEOUT }} simple_test.go working_demo_test.go
fi
if [ $TESTS_RUN -eq 0 ]; then
echo "⚠️ No working test files found, running module verification only"
go version
go mod verify
else
echo "✅ Successfully ran $TESTS_RUN test file(s)"
fi
echo "============================================"
echo "✅ FUSE integration tests completed"
- name: Run Extended Framework Validation
run: |
cd /tmp/seaweedfs-fuse-tests
echo "🔍 Running extended framework validation..."
echo "============================================"
# Test individual components (only run tests that exist)
if [ -f "simple_test.go" ]; then
echo "Testing simple verification..."
go test -v simple_test.go
fi
if [ -f "working_demo_test.go" ]; then
echo "Testing framework demo..."
go test -v working_demo_test.go
fi
# Test combined execution if both files exist
if [ -f "simple_test.go" ] && [ -f "working_demo_test.go" ]; then
echo "Testing combined execution..."
go test -v simple_test.go working_demo_test.go
elif [ -f "simple_test.go" ] || [ -f "working_demo_test.go" ]; then
echo "✅ Individual tests already validated above"
else
echo "⚠️ No working test files found for combined testing"
fi
echo "============================================"
echo "✅ Extended validation completed"
- name: Generate Test Coverage Report
run: |
cd /tmp/seaweedfs-fuse-tests
echo "📊 Generating test coverage report..."
go test -v -coverprofile=coverage.out .
go tool cover -html=coverage.out -o coverage.html
echo "Coverage report generated: coverage.html"
- name: Verify SeaweedFS Binary Integration
run: |
# Test that SeaweedFS binary is accessible from test environment
WEED_BINARY=$(pwd)/weed/weed
if [ -f "$WEED_BINARY" ]; then
echo "✅ SeaweedFS binary found at: $WEED_BINARY"
$WEED_BINARY version
echo "Binary is ready for full integration testing"
else
echo "❌ SeaweedFS binary not found"
exit 1
fi
- name: Upload Test Artifacts
set -o pipefail
cd test/fuse_integration
echo "Running full FUSE integration test suite..."
go test -v -count=1 -timeout=45m ./... 2>&1 | tee /tmp/fuse-test-output.log
- name: Upload Test Logs
if: always()
uses: actions/upload-artifact@v7
with:
name: fuse-integration-test-results
path: |
/tmp/seaweedfs-fuse-tests/coverage.out
/tmp/seaweedfs-fuse-tests/coverage.html
/tmp/seaweedfs-fuse-tests/*.log
/tmp/fuse-test-output.log
retention-days: 7
- name: Test Summary
if: always()
run: |
echo "## 🚀 FUSE Integration Test Summary" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Framework Status" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **Framework Design**: Complete and validated" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **Working Tests**: Core framework demonstration functional" >> $GITHUB_STEP_SUMMARY
echo "- ⚠️ **Full Framework**: Available but requires Go module resolution" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **CI/CD Integration**: Automated testing pipeline established" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Test Capabilities" >> $GITHUB_STEP_SUMMARY
echo "- 📁 **File Operations**: Create, read, write, delete, permissions" >> $GITHUB_STEP_SUMMARY
echo "- 📂 **Directory Operations**: Create, list, delete, nested structures" >> $GITHUB_STEP_SUMMARY
echo "- 📊 **Large Files**: Multi-megabyte file handling" >> $GITHUB_STEP_SUMMARY
echo "- 🔄 **Concurrent Operations**: Multi-threaded stress testing" >> $GITHUB_STEP_SUMMARY
echo "- ⚠️ **Error Scenarios**: Comprehensive error handling validation" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Comparison with Current Tests" >> $GITHUB_STEP_SUMMARY
echo "| Aspect | Current (FIO) | This Framework |" >> $GITHUB_STEP_SUMMARY
echo "|--------|---------------|----------------|" >> $GITHUB_STEP_SUMMARY
echo "| **Scope** | Performance only | Functional + Performance |" >> $GITHUB_STEP_SUMMARY
echo "| **Operations** | Read/Write only | All FUSE operations |" >> $GITHUB_STEP_SUMMARY
echo "| **Concurrency** | Single-threaded | Multi-threaded stress tests |" >> $GITHUB_STEP_SUMMARY
echo "| **Automation** | Manual setup | Fully automated |" >> $GITHUB_STEP_SUMMARY
echo "| **Validation** | Speed metrics | Correctness + Performance |" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Current Working Tests" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **Framework Structure**: Package and module verification" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **Configuration Management**: Test config validation" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **File Operations Demo**: Basic file create/read/write simulation" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **Large File Handling**: 1MB+ file processing demonstration" >> $GITHUB_STEP_SUMMARY
echo "- ✅ **Concurrency Simulation**: Multi-file operation testing" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Next Steps" >> $GITHUB_STEP_SUMMARY
echo "1. **Module Resolution**: Fix Go package conflicts for full framework" >> $GITHUB_STEP_SUMMARY
echo "2. **SeaweedFS Integration**: Connect with real cluster for end-to-end testing" >> $GITHUB_STEP_SUMMARY
echo "3. **Performance Benchmarks**: Add performance regression testing" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "📈 **Total Framework Size**: ~1,500 lines of comprehensive testing infrastructure" >> $GITHUB_STEP_SUMMARY

4
test/fuse_integration/concurrent_operations_test.go

@ -386,18 +386,20 @@ func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) {
// Perform many small writes
numWrites := 1000
writeSize := 100
totalSize := int64(0)
for i := 0; i < numWrites; i++ {
data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20)))
_, err := file.Write(data)
require.NoError(t, err)
totalSize += int64(len(data))
}
file.Close()
// Verify file size
info, err := os.Stat(mountPath)
require.NoError(t, err)
assert.Equal(t, totalSize, info.Size())
assert.Equal(t, totalSize, info.Size(), "file size should match total bytes written")
}
// testManySmallFiles tests creating many small files

156
test/fuse_integration/framework.go → test/fuse_integration/framework_test.go

@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"syscall"
"testing"
"time"
@ -20,6 +21,7 @@ type FuseTestFramework struct {
tempDir string
mountPoint string
dataDir string
logDir string
masterProcess *os.Process
volumeProcess *os.Process
filerProcess *os.Process
@ -27,6 +29,9 @@ type FuseTestFramework struct {
masterAddr string
volumeAddr string
filerAddr string
masterPort int
volumePort int
filerPort int
weedBinary string
isSetup bool
}
@ -57,7 +62,9 @@ func DefaultTestConfig() *TestConfig {
}
}
// NewFuseTestFramework creates a new FUSE testing framework
// NewFuseTestFramework creates a new FUSE testing framework.
// Each instance allocates its own free ports so multiple tests can run
// sequentially without port conflicts from slow cleanup.
func NewFuseTestFramework(t *testing.T, config *TestConfig) *FuseTestFramework {
if config == nil {
config = DefaultTestConfig()
@ -66,27 +73,50 @@ func NewFuseTestFramework(t *testing.T, config *TestConfig) *FuseTestFramework {
tempDir, err := os.MkdirTemp("", "seaweedfs_fuse_test_")
require.NoError(t, err)
masterPort := freePort(t)
volumePort := freePort(t)
filerPort := freePort(t)
return &FuseTestFramework{
t: t,
tempDir: tempDir,
mountPoint: filepath.Join(tempDir, "mount"),
dataDir: filepath.Join(tempDir, "data"),
masterAddr: "127.0.0.1:19333",
volumeAddr: "127.0.0.1:18080",
filerAddr: "127.0.0.1:18888",
logDir: filepath.Join(tempDir, "logs"),
masterPort: masterPort,
volumePort: volumePort,
filerPort: filerPort,
masterAddr: fmt.Sprintf("127.0.0.1:%d", masterPort),
volumeAddr: fmt.Sprintf("127.0.0.1:%d", volumePort),
filerAddr: fmt.Sprintf("127.0.0.1:%d", filerPort),
weedBinary: findWeedBinary(),
isSetup: false,
}
}
// freePort asks the OS for a free TCP port.
func freePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := l.Addr().(*net.TCPAddr).Port
l.Close()
return port
}
// Setup starts SeaweedFS cluster and mounts FUSE filesystem
func (f *FuseTestFramework) Setup(config *TestConfig) error {
if f.isSetup {
return fmt.Errorf("framework already setup")
}
// Create directories
dirs := []string{f.mountPoint, f.dataDir}
// Create all required directories upfront
dirs := []string{
f.mountPoint,
f.logDir,
filepath.Join(f.dataDir, "master"),
filepath.Join(f.dataDir, "volume"),
}
for _, dir := range dirs {
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %v", dir, err)
@ -100,6 +130,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error {
// Wait for master to be ready
if err := f.waitForService(f.masterAddr, 30*time.Second); err != nil {
f.dumpLog("master")
return fmt.Errorf("master not ready: %v", err)
}
@ -110,6 +141,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error {
// Wait for volume server to be ready
if err := f.waitForService(f.volumeAddr, 30*time.Second); err != nil {
f.dumpLog("volume")
return fmt.Errorf("volume server not ready: %v", err)
}
@ -120,6 +152,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error {
// Wait for filer to be ready
if err := f.waitForService(f.filerAddr, 30*time.Second); err != nil {
f.dumpLog("filer")
return fmt.Errorf("filer not ready: %v", err)
}
@ -130,6 +163,7 @@ func (f *FuseTestFramework) Setup(config *TestConfig) error {
// Wait for mount to be ready
if err := f.waitForMount(30 * time.Second); err != nil {
f.dumpLog("mount")
return fmt.Errorf("FUSE mount not ready: %v", err)
}
@ -168,26 +202,61 @@ func (f *FuseTestFramework) GetFilerAddr() string {
return f.filerAddr
}
// startProcess is a helper that starts a weed sub-command with output captured
// to a log file in f.logDir.
func (f *FuseTestFramework) startProcess(name string, args []string) (*os.Process, error) {
logFile, err := os.Create(filepath.Join(f.logDir, name+".log"))
if err != nil {
return nil, fmt.Errorf("create log file: %v", err)
}
cmd := exec.Command(f.weedBinary, args...)
cmd.Dir = f.tempDir
cmd.Stdout = logFile
cmd.Stderr = logFile
if err := cmd.Start(); err != nil {
logFile.Close()
return nil, err
}
// Close the file handle — the child process inherited it.
logFile.Close()
return cmd.Process, nil
}
// dumpLog prints the last lines of a process log file to the test output
// for debugging when a service fails to start.
func (f *FuseTestFramework) dumpLog(name string) {
data, err := os.ReadFile(filepath.Join(f.logDir, name+".log"))
if err != nil {
f.t.Logf("[%s log] (not available: %v)", name, err)
return
}
// Truncate to last 2KB to keep output manageable
if len(data) > 2048 {
data = data[len(data)-2048:]
}
f.t.Logf("[%s log tail]\n%s", name, string(data))
}
// startMaster starts the SeaweedFS master server
func (f *FuseTestFramework) startMaster(config *TestConfig) error {
// Do NOT set -port.grpc explicitly. SeaweedFS convention is gRPC = HTTP + 10000.
// Volume/filer discover the master gRPC port by this convention, so overriding
// it breaks inter-service communication.
args := []string{
"master",
"-ip=127.0.0.1",
"-port=19333",
"-port=" + strconv.Itoa(f.masterPort),
"-mdir=" + filepath.Join(f.dataDir, "master"),
"-raftBootstrap",
"-peers=none", // Faster startup when no multiple masters needed
}
if config.EnableDebug {
args = append(args, "-v=4")
}
cmd := exec.Command(f.weedBinary, args...)
cmd.Dir = f.tempDir
if err := cmd.Start(); err != nil {
proc, err := f.startProcess("master", args)
if err != nil {
return err
}
f.masterProcess = cmd.Process
f.masterProcess = proc
return nil
}
@ -195,9 +264,9 @@ func (f *FuseTestFramework) startMaster(config *TestConfig) error {
func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error {
args := []string{
"volume",
"-master=" + f.masterAddr,
"-master=127.0.0.1:" + strconv.Itoa(f.masterPort),
"-ip=127.0.0.1",
"-port=18080",
"-port=" + strconv.Itoa(f.volumePort),
"-dir=" + filepath.Join(f.dataDir, "volume"),
fmt.Sprintf("-max=%d", config.NumVolumes),
}
@ -205,12 +274,11 @@ func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error {
args = append(args, "-v=4")
}
cmd := exec.Command(f.weedBinary, args...)
cmd.Dir = f.tempDir
if err := cmd.Start(); err != nil {
proc, err := f.startProcess("volume", args)
if err != nil {
return err
}
f.volumeProcess = cmd.Process
f.volumeProcess = proc
return nil
}
@ -218,20 +286,19 @@ func (f *FuseTestFramework) startVolumeServers(config *TestConfig) error {
func (f *FuseTestFramework) startFiler(config *TestConfig) error {
args := []string{
"filer",
"-master=" + f.masterAddr,
"-master=127.0.0.1:" + strconv.Itoa(f.masterPort),
"-ip=127.0.0.1",
"-port=18888",
"-port=" + strconv.Itoa(f.filerPort),
}
if config.EnableDebug {
args = append(args, "-v=4")
}
cmd := exec.Command(f.weedBinary, args...)
cmd.Dir = f.tempDir
if err := cmd.Start(); err != nil {
proc, err := f.startProcess("filer", args)
if err != nil {
return err
}
f.filerProcess = cmd.Process
f.filerProcess = proc
return nil
}
@ -239,10 +306,11 @@ func (f *FuseTestFramework) startFiler(config *TestConfig) error {
func (f *FuseTestFramework) mountFuse(config *TestConfig) error {
args := []string{
"mount",
"-filer=" + f.filerAddr,
"-filer=127.0.0.1:" + strconv.Itoa(f.filerPort),
"-dir=" + f.mountPoint,
"-filer.path=/",
"-dirAutoCreate",
"-allowOthers=false",
}
if config.Collection != "" {
@ -255,7 +323,7 @@ func (f *FuseTestFramework) mountFuse(config *TestConfig) error {
args = append(args, fmt.Sprintf("-chunkSizeLimitMB=%d", config.ChunkSizeMB))
}
if config.CacheSizeMB > 0 {
args = append(args, fmt.Sprintf("-cacheSizeMB=%d", config.CacheSizeMB))
args = append(args, fmt.Sprintf("-cacheCapacityMB=%d", config.CacheSizeMB))
}
if config.EnableDebug {
args = append(args, "-v=4")
@ -263,12 +331,11 @@ func (f *FuseTestFramework) mountFuse(config *TestConfig) error {
args = append(args, config.MountOptions...)
cmd := exec.Command(f.weedBinary, args...)
cmd.Dir = f.tempDir
if err := cmd.Start(); err != nil {
proc, err := f.startProcess("mount", args)
if err != nil {
return err
}
f.mountProcess = cmd.Process
f.mountProcess = proc
return nil
}
@ -281,7 +348,8 @@ func (f *FuseTestFramework) unmountFuse() error {
}
// Also try system unmount as backup
exec.Command("umount", f.mountPoint).Run()
exec.Command("fusermount3", "-u", f.mountPoint).Run()
exec.Command("fusermount", "-u", f.mountPoint).Run()
return nil
}
@ -315,27 +383,31 @@ func (f *FuseTestFramework) waitForMount(timeout time.Duration) error {
return fmt.Errorf("mount point not ready within timeout")
}
// findWeedBinary locates the weed binary
// findWeedBinary locates the weed binary.
// Checks PATH first (most reliable in CI where the binary is installed to
// /usr/local/bin), then falls back to relative paths. Each candidate is
// verified to be a regular file so that a source directory named "weed"
// is never mistaken for the binary.
func findWeedBinary() string {
// Try different possible locations
// PATH lookup first — works in CI and when weed is installed globally.
if p, err := exec.LookPath("weed"); err == nil {
return p
}
// Relative paths for local development (run from test/fuse_integration/).
candidates := []string{
"../../weed/weed", // built in-tree: weed/weed
"./weed",
"../weed",
"../../weed",
"weed", // in PATH
}
for _, candidate := range candidates {
if _, err := exec.LookPath(candidate); err == nil {
return candidate
}
if _, err := os.Stat(candidate); err == nil {
if info, err := os.Stat(candidate); err == nil && !info.IsDir() {
abs, _ := filepath.Abs(candidate)
return abs
}
}
// Default fallback
// Default fallback — will fail with a clear "not found" at exec time.
return "weed"
}

825
test/fuse_integration/writeback_cache_test.go

@ -0,0 +1,825 @@
package fuse_test
import (
"bytes"
"crypto/rand"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// writebackConfig returns a TestConfig with writebackCache enabled
func writebackConfig() *TestConfig {
return &TestConfig{
Collection: "",
Replication: "000",
ChunkSizeMB: 2,
CacheSizeMB: 100,
NumVolumes: 3,
EnableDebug: false,
MountOptions: []string{
"-writebackCache",
},
SkipCleanup: false,
}
}
// waitForFileContent polls until a file has the expected content or timeout expires.
// This is needed because writebackCache defers data upload to background goroutines,
// so there is a brief window after close() where the file may not yet be readable.
func waitForFileContent(t *testing.T, path string, expected []byte, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
var lastErr error
for time.Now().Before(deadline) {
actual, err := os.ReadFile(path)
if err == nil && bytes.Equal(expected, actual) {
return
}
if err != nil {
lastErr = err
} else {
lastErr = fmt.Errorf("content mismatch: got %d bytes, want %d bytes", len(actual), len(expected))
}
time.Sleep(200 * time.Millisecond)
}
t.Fatalf("file %s did not have expected content within %v: %v", path, timeout, lastErr)
}
// waitForFileSize polls until a file reports the expected size or timeout expires.
func waitForFileSize(t *testing.T, path string, expectedSize int64, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
info, err := os.Stat(path)
if err == nil && info.Size() == expectedSize {
return
}
time.Sleep(200 * time.Millisecond)
}
t.Fatalf("file %s did not reach expected size %d within %v", path, expectedSize, timeout)
}
// TestWritebackCacheBasicOperations tests fundamental file I/O with writebackCache enabled
func TestWritebackCacheBasicOperations(t *testing.T) {
config := writebackConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
t.Run("WriteAndReadBack", func(t *testing.T) {
testWritebackWriteAndReadBack(t, framework)
})
t.Run("MultipleFilesSequential", func(t *testing.T) {
testWritebackMultipleFilesSequential(t, framework)
})
t.Run("LargeFile", func(t *testing.T) {
testWritebackLargeFile(t, framework)
})
t.Run("EmptyFile", func(t *testing.T) {
testWritebackEmptyFile(t, framework)
})
t.Run("OverwriteExistingFile", func(t *testing.T) {
testWritebackOverwriteFile(t, framework)
})
}
// testWritebackWriteAndReadBack writes a file and verifies it can be read back
// after the async flush completes.
func testWritebackWriteAndReadBack(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_basic.txt"
content := []byte("Hello from writebackCache test!")
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// Write file — close() returns immediately with async flush
require.NoError(t, os.WriteFile(mountPath, content, 0644))
// Wait for async flush to complete and verify content
waitForFileContent(t, mountPath, content, 30*time.Second)
}
// testWritebackMultipleFilesSequential writes multiple files sequentially
// and verifies all are readable after async flushes complete.
func testWritebackMultipleFilesSequential(t *testing.T, framework *FuseTestFramework) {
dir := "writeback_sequential"
framework.CreateTestDir(dir)
numFiles := 50
files := make(map[string][]byte, numFiles)
// Write files sequentially — each close() returns immediately
for i := 0; i < numFiles; i++ {
filename := fmt.Sprintf("file_%03d.txt", i)
content := []byte(fmt.Sprintf("Sequential file %d content: %s", i, time.Now().Format(time.RFC3339Nano)))
path := filepath.Join(framework.GetMountPoint(), dir, filename)
require.NoError(t, os.WriteFile(path, content, 0644))
files[filename] = content
}
// Verify all files after a brief wait for async flushes
for filename, expectedContent := range files {
path := filepath.Join(framework.GetMountPoint(), dir, filename)
waitForFileContent(t, path, expectedContent, 30*time.Second)
}
}
// testWritebackLargeFile writes a large file (multi-chunk) with writebackCache
func testWritebackLargeFile(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_large.bin"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// 8MB file (spans multiple 2MB chunks)
content := make([]byte, 8*1024*1024)
_, err := rand.Read(content)
require.NoError(t, err)
require.NoError(t, os.WriteFile(mountPath, content, 0644))
// Wait for file to be fully flushed
waitForFileContent(t, mountPath, content, 60*time.Second)
}
// testWritebackEmptyFile creates an empty file with writebackCache
func testWritebackEmptyFile(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_empty.txt"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// Create empty file
f, err := os.Create(mountPath)
require.NoError(t, err)
require.NoError(t, f.Close())
// Should exist and be empty
info, err := os.Stat(mountPath)
require.NoError(t, err)
assert.Equal(t, int64(0), info.Size())
}
// testWritebackOverwriteFile tests overwriting an existing file
func testWritebackOverwriteFile(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_overwrite.txt"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// First write
content1 := []byte("First version of the file")
require.NoError(t, os.WriteFile(mountPath, content1, 0644))
waitForFileContent(t, mountPath, content1, 30*time.Second)
// Overwrite with different content
content2 := []byte("Second version — overwritten content that is longer than the first")
require.NoError(t, os.WriteFile(mountPath, content2, 0644))
waitForFileContent(t, mountPath, content2, 30*time.Second)
}
// TestWritebackCacheFsync tests that fsync still forces synchronous flush
// even when writebackCache is enabled
func TestWritebackCacheFsync(t *testing.T) {
config := writebackConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
t.Run("FsyncForcesFlush", func(t *testing.T) {
testFsyncForcesFlush(t, framework)
})
t.Run("FsyncThenRead", func(t *testing.T) {
testFsyncThenRead(t, framework)
})
}
// testFsyncForcesFlush verifies that calling fsync before close ensures
// data is immediately available for reading, bypassing the async path.
func testFsyncForcesFlush(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_fsync.txt"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
content := []byte("Data that must be flushed synchronously via fsync")
// Open, write, fsync, close
f, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
_, err = f.Write(content)
require.NoError(t, err)
// fsync forces synchronous data+metadata flush
require.NoError(t, f.Sync())
require.NoError(t, f.Close())
// Data should be immediately available — no wait needed
actual, err := os.ReadFile(mountPath)
require.NoError(t, err)
assert.Equal(t, content, actual)
}
// testFsyncThenRead verifies that after fsync, a freshly opened read
// returns the correct data without any delay.
func testFsyncThenRead(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_fsync_read.txt"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
content := make([]byte, 64*1024) // 64KB
_, err := rand.Read(content)
require.NoError(t, err)
// Write with explicit fsync
f, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
_, err = f.Write(content)
require.NoError(t, err)
require.NoError(t, f.Sync())
require.NoError(t, f.Close())
// Immediate read should succeed
actual, err := os.ReadFile(mountPath)
require.NoError(t, err)
assert.Equal(t, content, actual)
}
// TestWritebackCacheConcurrentSmallFiles is the primary test for issue #8718:
// many small files written concurrently should all be eventually readable.
func TestWritebackCacheConcurrentSmallFiles(t *testing.T) {
config := writebackConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
t.Run("ConcurrentSmallFiles", func(t *testing.T) {
testWritebackConcurrentSmallFiles(t, framework)
})
t.Run("ConcurrentSmallFilesMultiDir", func(t *testing.T) {
testWritebackConcurrentSmallFilesMultiDir(t, framework)
})
t.Run("RapidCreateCloseSequence", func(t *testing.T) {
testWritebackRapidCreateClose(t, framework)
})
}
// testWritebackConcurrentSmallFiles simulates the rsync workload from #8718:
// multiple workers creating many small files in parallel.
func testWritebackConcurrentSmallFiles(t *testing.T, framework *FuseTestFramework) {
dir := "writeback_concurrent_small"
framework.CreateTestDir(dir)
numWorkers := 8
filesPerWorker := 20
totalFiles := numWorkers * filesPerWorker
type fileRecord struct {
path string
content []byte
}
var mu sync.Mutex
var writeErrors []error
records := make([]fileRecord, 0, totalFiles)
// Phase 1: Write files concurrently (simulating rsync workers)
var wg sync.WaitGroup
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for f := 0; f < filesPerWorker; f++ {
filename := fmt.Sprintf("w%02d_f%03d.dat", workerID, f)
path := filepath.Join(framework.GetMountPoint(), dir, filename)
// Vary sizes: 100B to 100KB
size := 100 + (workerID*filesPerWorker+f)*500
if size > 100*1024 {
size = 100*1024
}
content := make([]byte, size)
if _, err := rand.Read(content); err != nil {
mu.Lock()
writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d rand: %v", workerID, f, err))
mu.Unlock()
return
}
if err := os.WriteFile(path, content, 0644); err != nil {
mu.Lock()
writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d: %v", workerID, f, err))
mu.Unlock()
return
}
mu.Lock()
records = append(records, fileRecord{path: path, content: content})
mu.Unlock()
}
}(w)
}
wg.Wait()
require.Empty(t, writeErrors, "write errors: %v", writeErrors)
assert.Equal(t, totalFiles, len(records))
// Phase 2: Wait for async flushes and verify all files
for _, rec := range records {
waitForFileContent(t, rec.path, rec.content, 60*time.Second)
}
// Phase 3: Verify directory listing has correct count
entries, err := os.ReadDir(filepath.Join(framework.GetMountPoint(), dir))
require.NoError(t, err)
assert.Equal(t, totalFiles, len(entries))
}
// testWritebackConcurrentSmallFilesMultiDir tests concurrent writes across
// multiple directories — a common pattern for parallel copy tools.
func testWritebackConcurrentSmallFilesMultiDir(t *testing.T, framework *FuseTestFramework) {
baseDir := "writeback_multidir"
framework.CreateTestDir(baseDir)
numDirs := 4
filesPerDir := 25
type fileRecord struct {
path string
content []byte
}
var mu sync.Mutex
var records []fileRecord
var writeErrors []error
var wg sync.WaitGroup
for d := 0; d < numDirs; d++ {
subDir := filepath.Join(baseDir, fmt.Sprintf("dir_%02d", d))
framework.CreateTestDir(subDir)
wg.Add(1)
go func(dirID int, dirPath string) {
defer wg.Done()
for f := 0; f < filesPerDir; f++ {
filename := fmt.Sprintf("file_%03d.txt", f)
path := filepath.Join(framework.GetMountPoint(), dirPath, filename)
content := []byte(fmt.Sprintf("dir=%d file=%d data=%s", dirID, f, time.Now().Format(time.RFC3339Nano)))
if err := os.WriteFile(path, content, 0644); err != nil {
mu.Lock()
writeErrors = append(writeErrors, fmt.Errorf("dir %d file %d: %v", dirID, f, err))
mu.Unlock()
return
}
mu.Lock()
records = append(records, fileRecord{path: path, content: content})
mu.Unlock()
}
}(d, subDir)
}
wg.Wait()
require.Empty(t, writeErrors, "write errors: %v", writeErrors)
// Verify all files
for _, rec := range records {
waitForFileContent(t, rec.path, rec.content, 60*time.Second)
}
}
// testWritebackRapidCreateClose rapidly creates and closes files to stress
// the async flush goroutine pool.
func testWritebackRapidCreateClose(t *testing.T, framework *FuseTestFramework) {
dir := "writeback_rapid"
framework.CreateTestDir(dir)
numFiles := 200
type fileRecord struct {
path string
content []byte
}
records := make([]fileRecord, numFiles)
// Rapidly create files without pausing
for i := 0; i < numFiles; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("rapid_%04d.bin", i))
content := []byte(fmt.Sprintf("rapid-file-%d", i))
require.NoError(t, os.WriteFile(path, content, 0644))
records[i] = fileRecord{path: path, content: content}
}
// Verify all files eventually appear with correct content
for _, rec := range records {
waitForFileContent(t, rec.path, rec.content, 60*time.Second)
}
}
// TestWritebackCacheDataIntegrity tests that data integrity is preserved
// across various write patterns with writebackCache enabled.
func TestWritebackCacheDataIntegrity(t *testing.T) {
config := writebackConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
t.Run("AppendAfterClose", func(t *testing.T) {
testWritebackAppendAfterClose(t, framework)
})
t.Run("PartialWrites", func(t *testing.T) {
testWritebackPartialWrites(t, framework)
})
t.Run("FileSizeCorrectness", func(t *testing.T) {
testWritebackFileSizeCorrectness(t, framework)
})
t.Run("BinaryData", func(t *testing.T) {
testWritebackBinaryData(t, framework)
})
}
// testWritebackAppendAfterClose writes a file, closes it (triggering async flush),
// waits for flush, then reopens and appends more data.
func testWritebackAppendAfterClose(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_append.txt"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// First write
part1 := []byte("First part of the data.\n")
require.NoError(t, os.WriteFile(mountPath, part1, 0644))
// Wait for first async flush
waitForFileContent(t, mountPath, part1, 30*time.Second)
// Append more data
part2 := []byte("Second part appended.\n")
f, err := os.OpenFile(mountPath, os.O_APPEND|os.O_WRONLY, 0644)
require.NoError(t, err)
_, err = f.Write(part2)
require.NoError(t, err)
require.NoError(t, f.Close())
// Verify combined content
expected := append(part1, part2...)
waitForFileContent(t, mountPath, expected, 30*time.Second)
}
// testWritebackPartialWrites tests writing to specific offsets within a file
func testWritebackPartialWrites(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_partial.bin"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// Create file with initial content
initial := bytes.Repeat([]byte("A"), 4096)
require.NoError(t, os.WriteFile(mountPath, initial, 0644))
waitForFileContent(t, mountPath, initial, 30*time.Second)
// Open and write at specific offset
f, err := os.OpenFile(mountPath, os.O_WRONLY, 0644)
require.NoError(t, err)
patch := []byte("PATCHED")
_, err = f.WriteAt(patch, 100)
require.NoError(t, err)
require.NoError(t, f.Close())
// Build expected content
expected := make([]byte, 4096)
copy(expected, initial)
copy(expected[100:], patch)
waitForFileContent(t, mountPath, expected, 30*time.Second)
}
// testWritebackFileSizeCorrectness verifies that file sizes are correct
// after async flush completes.
func testWritebackFileSizeCorrectness(t *testing.T, framework *FuseTestFramework) {
sizes := []int{0, 1, 100, 4096, 65536, 1024 * 1024}
for _, size := range sizes {
filename := fmt.Sprintf("writeback_size_%d.bin", size)
mountPath := filepath.Join(framework.GetMountPoint(), filename)
content := make([]byte, size)
if size > 0 {
_, err := rand.Read(content)
require.NoError(t, err, "rand.Read failed for size %d", size)
}
require.NoError(t, os.WriteFile(mountPath, content, 0644), "failed to write file of size %d", size)
if size > 0 {
waitForFileSize(t, mountPath, int64(size), 30*time.Second)
waitForFileContent(t, mountPath, content, 30*time.Second)
}
}
}
// testWritebackBinaryData verifies that arbitrary binary data (including null bytes)
// is preserved correctly through the async flush path.
func testWritebackBinaryData(t *testing.T, framework *FuseTestFramework) {
filename := "writeback_binary.bin"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// Generate data with all byte values including nulls
content := make([]byte, 256*100)
for i := range content {
content[i] = byte(i % 256)
}
require.NoError(t, os.WriteFile(mountPath, content, 0644))
waitForFileContent(t, mountPath, content, 30*time.Second)
}
// TestWritebackCachePerformance measures whether writebackCache actually
// improves throughput for small file workloads compared to synchronous flush.
func TestWritebackCachePerformance(t *testing.T) {
if testing.Short() {
t.Skip("skipping performance test in short mode")
}
numFiles := 200
fileSize := 4096 // 4KB files
// Generate test data upfront
testData := make([][]byte, numFiles)
for i := range testData {
testData[i] = make([]byte, fileSize)
rand.Read(testData[i])
}
// Benchmark with writebackCache enabled
t.Run("WithWritebackCache", func(t *testing.T) {
config := writebackConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
dir := "perf_writeback"
framework.CreateTestDir(dir)
start := time.Now()
for i := 0; i < numFiles; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i))
require.NoError(t, os.WriteFile(path, testData[i], 0644))
}
writebackDuration := time.Since(start)
// Wait for all files to be flushed
for i := 0; i < numFiles; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i))
waitForFileContent(t, path, testData[i], 60*time.Second)
}
t.Logf("writebackCache: wrote %d files in %v (%.0f files/sec)",
numFiles, writebackDuration, float64(numFiles)/writebackDuration.Seconds())
})
// Benchmark without writebackCache (synchronous flush)
t.Run("WithoutWritebackCache", func(t *testing.T) {
config := DefaultTestConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
dir := "perf_sync"
framework.CreateTestDir(dir)
start := time.Now()
for i := 0; i < numFiles; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%04d.bin", i))
require.NoError(t, os.WriteFile(path, testData[i], 0644))
}
syncDuration := time.Since(start)
t.Logf("synchronous: wrote %d files in %v (%.0f files/sec)",
numFiles, syncDuration, float64(numFiles)/syncDuration.Seconds())
})
}
// TestWritebackCacheConcurrentMixedOps tests a mix of operations happening
// concurrently with writebackCache: creates, reads, overwrites, and deletes.
func TestWritebackCacheConcurrentMixedOps(t *testing.T) {
config := writebackConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
dir := "writeback_mixed"
framework.CreateTestDir(dir)
numFiles := 50
var mu sync.Mutex
var errors []error
var completedWrites int64
addError := func(err error) {
mu.Lock()
defer mu.Unlock()
errors = append(errors, err)
}
// Phase 1: Create initial files and wait for async flushes
initialContents := make(map[int][]byte, numFiles)
for i := 0; i < numFiles; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", i))
content := []byte(fmt.Sprintf("initial content %d", i))
require.NoError(t, os.WriteFile(path, content, 0644))
initialContents[i] = content
}
// Poll until initial files are flushed (instead of fixed sleep)
for i := 0; i < numFiles; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", i))
waitForFileContent(t, path, initialContents[i], 30*time.Second)
}
// Phase 2: Concurrent mixed operations
var wg sync.WaitGroup
// Writers: overwrite existing files
for i := 0; i < 4; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := 0; j < numFiles; j++ {
if j%4 != workerID {
continue // each worker handles a subset
}
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", j))
content := []byte(fmt.Sprintf("overwritten by worker %d at %s", workerID, time.Now().Format(time.RFC3339Nano)))
if err := os.WriteFile(path, content, 0644); err != nil {
addError(fmt.Errorf("writer %d file %d: %v", workerID, j, err))
return
}
atomic.AddInt64(&completedWrites, 1)
}
}(i)
}
// Readers: read files (may see old or new content, but should not error)
for i := 0; i < 4; i++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for j := 0; j < numFiles; j++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("file_%03d.txt", j))
_, err := os.ReadFile(path)
if err != nil && !os.IsNotExist(err) {
addError(fmt.Errorf("reader %d file %d: %v", readerID, j, err))
return
}
}
}(i)
}
// New file creators
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("new_file_%03d.txt", i))
content := []byte(fmt.Sprintf("new file %d", i))
if err := os.WriteFile(path, content, 0644); err != nil {
addError(fmt.Errorf("creator file %d: %v", i, err))
return
}
}
}()
wg.Wait()
require.Empty(t, errors, "mixed operation errors: %v", errors)
assert.True(t, atomic.LoadInt64(&completedWrites) > 0, "should have completed some writes")
// Verify new files exist after async flushes complete (poll instead of fixed sleep)
for i := 0; i < 20; i++ {
path := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("new_file_%03d.txt", i))
expected := []byte(fmt.Sprintf("new file %d", i))
waitForFileContent(t, path, expected, 30*time.Second)
}
}
// TestWritebackCacheStressSmallFiles is a focused stress test for the
// async flush path with many small files — the core scenario from #8718.
func TestWritebackCacheStressSmallFiles(t *testing.T) {
if testing.Short() {
t.Skip("skipping stress test in short mode")
}
config := writebackConfig()
framework := NewFuseTestFramework(t, config)
defer framework.Cleanup()
require.NoError(t, framework.Setup(config))
dir := "writeback_stress"
framework.CreateTestDir(dir)
numWorkers := 16
filesPerWorker := 100
totalFiles := numWorkers * filesPerWorker
type fileRecord struct {
path string
content []byte
}
var mu sync.Mutex
var writeErrors []error
records := make([]fileRecord, 0, totalFiles)
start := time.Now()
// Simulate rsync-like workload: many workers each writing small files
var wg sync.WaitGroup
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for f := 0; f < filesPerWorker; f++ {
filename := fmt.Sprintf("w%02d/f%04d.dat", workerID, f)
path := filepath.Join(framework.GetMountPoint(), dir, filename)
// Ensure subdirectory exists
if f == 0 {
subDir := filepath.Join(framework.GetMountPoint(), dir, fmt.Sprintf("w%02d", workerID))
if err := os.MkdirAll(subDir, 0755); err != nil {
mu.Lock()
writeErrors = append(writeErrors, fmt.Errorf("worker %d mkdir: %v", workerID, err))
mu.Unlock()
return
}
}
// Small file: 1KB-10KB (typical for rsync of config/source files)
size := 1024 + (f%10)*1024
content := make([]byte, size)
if _, err := rand.Read(content); err != nil {
mu.Lock()
writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d rand: %v", workerID, f, err))
mu.Unlock()
return
}
if err := os.WriteFile(path, content, 0644); err != nil {
mu.Lock()
writeErrors = append(writeErrors, fmt.Errorf("worker %d file %d: %v", workerID, f, err))
mu.Unlock()
return
}
mu.Lock()
records = append(records, fileRecord{path: path, content: content})
mu.Unlock()
}
}(w)
}
wg.Wait()
writeDuration := time.Since(start)
t.Logf("wrote %d files in %v (%.0f files/sec)",
totalFiles, writeDuration, float64(totalFiles)/writeDuration.Seconds())
require.Empty(t, writeErrors, "write errors: %v", writeErrors)
assert.Equal(t, totalFiles, len(records))
// Verify all files are eventually readable with correct content
var verifyErrors []error
for _, rec := range records {
deadline := time.Now().Add(120 * time.Second)
var lastErr error
for time.Now().Before(deadline) {
actual, err := os.ReadFile(rec.path)
if err == nil && bytes.Equal(rec.content, actual) {
lastErr = nil
break
}
if err != nil {
lastErr = err
} else {
lastErr = fmt.Errorf("content mismatch for %s: got %d bytes, want %d", rec.path, len(actual), len(rec.content))
}
time.Sleep(500 * time.Millisecond)
}
if lastErr != nil {
verifyErrors = append(verifyErrors, lastErr)
}
}
require.Empty(t, verifyErrors, "verification errors after stress test: %v", verifyErrors)
t.Logf("all %d files verified successfully", totalFiles)
}

5
weed/command/mount_std.go

@ -349,6 +349,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
RdmaTimeoutMs: *option.rdmaTimeoutMs,
DirIdleEvictSec: *option.dirIdleEvictSec,
WritebackCache: option.writebackCache != nil && *option.writebackCache,
})
// create mount root
@ -396,6 +397,10 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
server.Serve()
// Wait for any pending background flushes (writebackCache async mode)
// before clearing caches, to prevent data loss during clean unmount.
seaweedFileSystem.WaitForAsyncFlush()
seaweedFileSystem.ClearCacheDir()
return true

13
weed/mount/filehandle.go

@ -24,10 +24,15 @@ type FileHandle struct {
wfs *WFS
// cache file has been written to
dirtyMetadata bool
dirtyPages *PageWriter
reader *filer.ChunkReadAt
contentType string
dirtyMetadata bool
dirtyPages *PageWriter
reader *filer.ChunkReadAt
contentType string
asyncFlushPending bool // set in writebackCache mode to defer flush to Release
asyncFlushUid uint32 // saved uid for deferred metadata flush
asyncFlushGid uint32 // saved gid for deferred metadata flush
asyncFlushDir string // saved directory at defer time (fallback if inode forgotten)
asyncFlushName string // saved file name at defer time (fallback if inode forgotten)
isDeleted bool

55
weed/mount/filehandle_map.go

@ -55,39 +55,72 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil
return fh
}
func (i *FileHandleToInode) ReleaseByInode(inode uint64) {
func (i *FileHandleToInode) ReleaseByInode(inode uint64) *FileHandle {
i.Lock()
defer i.Unlock()
fh, found := i.inode2fh[inode]
if found {
fh.counter--
if fh.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fh.fh)
fh.ReleaseHandle()
if !found {
return nil
}
// If the counter is already <= 0, a prior Release already started the
// drain. Return nil to prevent double-processing (e.g. Forget after Release).
if fh.counter <= 0 {
return nil
}
fh.counter--
if fh.counter <= 0 {
if fh.asyncFlushPending {
// Handle stays in fhMap so rename/unlink can find it during drain.
return fh
}
delete(i.inode2fh, inode)
delete(i.fh2inode, fh.fh)
return fh
}
return nil
}
func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) {
func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) *FileHandle {
i.Lock()
defer i.Unlock()
inode, found := i.fh2inode[fh]
if !found {
return // Handle already released or invalid
return nil
}
fhHandle, fhFound := i.inode2fh[inode]
if !fhFound {
delete(i.fh2inode, fh)
return
return nil
}
// If the counter is already <= 0, a prior Release already started the
// drain. Return nil to prevent double-processing.
if fhHandle.counter <= 0 {
return nil
}
fhHandle.counter--
if fhHandle.counter <= 0 {
if fhHandle.asyncFlushPending {
// Handle stays in fhMap so rename/unlink can still find it
// via FindFileHandle during the background drain.
return fhHandle
}
delete(i.inode2fh, inode)
delete(i.fh2inode, fhHandle.fh)
fhHandle.ReleaseHandle()
return fhHandle
}
return nil
}
// RemoveFileHandle removes a handle from both maps. Called after an async
// drain completes to clean up the handle that was intentionally kept in the
// maps during the flush.
func (i *FileHandleToInode) RemoveFileHandle(fh FileHandleId, inode uint64) {
i.Lock()
defer i.Unlock()
delete(i.inode2fh, inode)
delete(i.fh2inode, fh)
}

4
weed/mount/page_writer/page_chunk_mem.go

@ -65,10 +65,6 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
if logicStart < logicStop {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
if t.TsNs >= tsNs {
println("read new data1", t.TsNs-tsNs, "ns")
}
}
}
mc.activityScore.MarkRead()

4
weed/mount/page_writer/page_chunk_swapfile.go

@ -151,10 +151,6 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in
break
}
maxStop = max(maxStop, logicStop)
if t.TsNs > tsNs {
println("read new data2", t.TsNs-tsNs, "ns")
}
}
}
//sc.memChunk.ReadDataAt(memCopy, off, tsNs)

59
weed/mount/weedfs.go

@ -78,6 +78,10 @@ type Option struct {
// Directory cache refresh/eviction controls
DirIdleEvictSec int
// WritebackCache enables async flush on close for improved small file write performance.
// When true, Flush() returns immediately and data upload + metadata flush happen in background.
WritebackCache bool
uniqueCacheDirForRead string
uniqueCacheDirForWrite string
}
@ -110,6 +114,16 @@ type WFS struct {
dirHotWindow time.Duration
dirHotThreshold int
dirIdleEvict time.Duration
// asyncFlushWg tracks pending background flush goroutines for writebackCache mode.
// Must be waited on before unmount cleanup to prevent data loss.
asyncFlushWg sync.WaitGroup
// pendingAsyncFlush tracks in-flight async flush goroutines by inode.
// AcquireHandle checks this to wait for a pending flush before reopening
// the same inode, preventing stale metadata from overwriting the async flush.
pendingAsyncFlushMu sync.Mutex
pendingAsyncFlush map[uint64]chan struct{}
}
const (
@ -151,13 +165,14 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
fhMap: NewFileHandleToInode(),
dhMap: NewDirectoryHandleToInode(),
filerClient: filerClient, // nil for proxy mode, initialized for direct access
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
fhMap: NewFileHandleToInode(),
dhMap: NewDirectoryHandleToInode(),
filerClient: filerClient, // nil for proxy mode, initialized for direct access
pendingAsyncFlush: make(map[uint64]chan struct{}),
fhLockTable: util.NewLockTable[FileHandleId](),
refreshingDirs: make(map[util.FullPath]struct{}),
dirHotWindow: dirHotWindow,
@ -204,8 +219,32 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}
})
grace.OnInterrupt(func() {
// grace calls os.Exit(0) after all hooks, so WaitForAsyncFlush
// after server.Serve() would never execute. Drain here first.
//
// Use a timeout to avoid hanging on Ctrl-C if the filer is
// unreachable (metadata retry can take up to 7 seconds).
// If the timeout expires, skip the write-cache removal so that
// still-running goroutines can finish reading swap files.
asyncDrained := true
if wfs.option.WritebackCache {
done := make(chan struct{})
go func() {
wfs.asyncFlushWg.Wait()
close(done)
}()
select {
case <-done:
glog.V(0).Infof("all async flushes completed before shutdown")
case <-time.After(30 * time.Second):
glog.Warningf("timed out waiting for async flushes — swap files preserved for in-flight uploads")
asyncDrained = false
}
}
wfs.metaCache.Shutdown()
os.RemoveAll(option.getUniqueCacheDirForWrite())
if asyncDrained {
os.RemoveAll(option.getUniqueCacheDirForWrite())
}
os.RemoveAll(option.getUniqueCacheDirForRead())
if wfs.rdmaClient != nil {
wfs.rdmaClient.Close()
@ -240,6 +279,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}
func (wfs *WFS) StartBackgroundTasks() error {
if wfs.option.WritebackCache {
glog.V(0).Infof("writebackCache enabled: async flush on close() for improved small file performance")
}
follower, err := wfs.subscribeFilerConfEvents()
if err != nil {
return err

96
weed/mount/weedfs_async_flush.go

@ -0,0 +1,96 @@
package mount
import (
"time"
"github.com/seaweedfs/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const asyncFlushMetadataRetries = 3
// completeAsyncFlush is called in a background goroutine when a file handle
// with pending async flush work is released. It performs the deferred data
// upload and metadata flush that was skipped in doFlush() for writebackCache mode.
//
// This enables close() to return immediately for small file workloads (e.g., rsync),
// while the actual I/O happens concurrently in the background.
//
// The caller (submitAsyncFlush) owns asyncFlushWg and the per-inode done channel.
func (wfs *WFS) completeAsyncFlush(fh *FileHandle) {
// Phase 1: Flush dirty pages — seals writable chunks, uploads to volume servers, and waits.
// The underlying UploadWithRetry already retries transient HTTP/gRPC errors internally,
// so a failure here indicates a persistent issue; the chunk data has been freed.
if err := fh.dirtyPages.FlushData(); err != nil {
glog.Errorf("completeAsyncFlush inode %d: data flush failed: %v", fh.inode, err)
// Data is lost at this point (chunks freed after internal retry exhaustion).
// Proceed to cleanup to avoid resource leaks and unmount hangs.
} else if fh.dirtyMetadata {
// Phase 2: Flush metadata unless the file was explicitly unlinked.
//
// isDeleted is set by the Unlink handler when it finds a draining
// handle. In that case the filer entry is already gone and
// flushing would recreate it. The uploaded chunks become orphans
// and are cleaned up by volume.fsck.
if fh.isDeleted {
glog.V(3).Infof("completeAsyncFlush inode %d: file was unlinked, skipping metadata flush", fh.inode)
} else {
// Resolve the current path for metadata flush.
//
// Try GetPath first — it reflects any rename that happened
// after close(). If the inode mapping is gone (Forget
// dropped it after the kernel's lookup count hit zero), fall
// back to the dir/name saved at doFlush time. Rename also
// updates the saved path, so the fallback is always current.
//
// Forget does NOT mean the file was deleted — it only means
// the kernel evicted its cache entry.
dir, name := fh.asyncFlushDir, fh.asyncFlushName
fileFullPath := util.FullPath(dir).Child(name)
if resolvedPath, status := wfs.inodeToPath.GetPath(fh.inode); status == fuse.OK {
dir, name = resolvedPath.DirAndName()
fileFullPath = resolvedPath
}
wfs.flushMetadataWithRetry(fh, dir, name, fileFullPath)
}
}
glog.V(3).Infof("completeAsyncFlush done inode %d fh %d", fh.inode, fh.fh)
// Phase 3: Destroy the upload pipeline and free resources.
fh.ReleaseHandle()
}
// flushMetadataWithRetry attempts to flush file metadata to the filer, retrying
// with exponential backoff on transient errors. The chunk data is already on the
// volume servers at this point; only the filer metadata reference needs persisting.
func (wfs *WFS) flushMetadataWithRetry(fh *FileHandle, dir, name string, fileFullPath util.FullPath) {
for attempt := 0; attempt <= asyncFlushMetadataRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(1<<uint(attempt-1)) * time.Second
glog.Warningf("completeAsyncFlush %s: retrying metadata flush (attempt %d/%d) after %v",
fileFullPath, attempt+1, asyncFlushMetadataRetries+1, backoff)
time.Sleep(backoff)
}
if err := wfs.flushMetadataToFiler(fh, dir, name, fh.asyncFlushUid, fh.asyncFlushGid); err != nil {
if attempt == asyncFlushMetadataRetries {
glog.Errorf("completeAsyncFlush %s: metadata flush failed after %d attempts: %v — "+
"chunks are uploaded but NOT referenced in filer metadata; "+
"they will appear as orphans in volume.fsck",
fileFullPath, asyncFlushMetadataRetries+1, err)
}
continue
}
return // success
}
}
// WaitForAsyncFlush waits for all pending background flush goroutines to complete.
// Called before unmount cleanup to ensure no data is lost.
func (wfs *WFS) WaitForAsyncFlush() {
wfs.asyncFlushWg.Wait()
}

9
weed/mount/weedfs_file_mkrm.go

@ -161,6 +161,15 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin
}
wfs.inodeToPath.TouchDirectory(dirFullPath)
// If there is an async-draining handle for this file, mark it as deleted
// so the background flush skips the metadata write instead of recreating
// the just-unlinked entry. The handle is still in fhMap during drain.
if inode, found := wfs.inodeToPath.GetInode(entryFullPath); found {
if fh, fhFound := wfs.fhMap.FindFileHandle(inode); fhFound {
fh.isDeleted = true
}
}
wfs.inodeToPath.RemovePath(entryFullPath)
return fuse.OK

58
weed/mount/weedfs_file_sync.go

@ -59,7 +59,7 @@ func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status {
return fuse.OK
}
return wfs.doFlush(fh, in.Uid, in.Gid)
return wfs.doFlush(fh, in.Uid, in.Gid, true)
}
/**
@ -88,11 +88,12 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
return fuse.ENOENT
}
return wfs.doFlush(fh, in.Uid, in.Gid)
// Fsync is an explicit sync request — always flush synchronously
return wfs.doFlush(fh, in.Uid, in.Gid, false)
}
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32, allowAsync bool) fuse.Status {
// flush works at fh level
fileFullPath := fh.FullPath()
@ -100,6 +101,26 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
// send the data to the OS
glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.fh)
// When writebackCache is enabled and this is a close()-triggered Flush (not fsync),
// defer the expensive data upload + metadata flush to a background goroutine.
// This allows the calling process (e.g., rsync) to proceed to the next file immediately.
// POSIX does not require close() to wait for delayed I/O to complete.
if allowAsync && wfs.option.WritebackCache && fh.dirtyMetadata {
if wfs.IsOverQuotaWithUncommitted() {
return fuse.Status(syscall.ENOSPC)
}
fh.asyncFlushPending = true
fh.asyncFlushUid = uid
fh.asyncFlushGid = gid
fh.asyncFlushDir = dir
fh.asyncFlushName = name
glog.V(3).Infof("doFlush async deferred %s fh %d", fileFullPath, fh.fh)
return fuse.OK
}
// Synchronous flush path (normal mode, fsync, or no dirty data)
fh.asyncFlushPending = false
// Check quota including uncommitted writes for real-time enforcement
isOverQuota := wfs.IsOverQuotaWithUncommitted()
if !isOverQuota {
@ -117,6 +138,23 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
return fuse.Status(syscall.ENOSPC)
}
if err := wfs.flushMetadataToFiler(fh, dir, name, uid, gid); err != nil {
glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.fh, err)
return fuse.EIO
}
if IsDebugFileReadWrite {
fh.mirrorFile.Sync()
}
return fuse.OK
}
// flushMetadataToFiler sends the file's chunk references and attributes to the filer.
// This is shared between the synchronous doFlush path and the async flush completion.
func (wfs *WFS) flushMetadataToFiler(fh *FileHandle, dir, name string, uid, gid uint32) error {
fileFullPath := fh.FullPath()
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("doFlush", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
@ -144,9 +182,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
}
glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.GetChunks()))
//for i, chunk := range entry.GetChunks() {
// glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
//}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks())
@ -183,14 +218,5 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
fh.dirtyMetadata = false
}
if err != nil {
glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.fh, err)
return fuse.EIO
}
if IsDebugFileReadWrite {
fh.mirrorFile.Sync()
}
return fuse.OK
return err
}

60
weed/mount/weedfs_filehandle.go

@ -2,11 +2,18 @@ package mount
import (
"github.com/seaweedfs/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (wfs *WFS) AcquireHandle(inode uint64, flags, uid, gid uint32) (fileHandle *FileHandle, status fuse.Status) {
// If there is an in-flight async flush for this inode, wait for it to
// complete before reopening. Otherwise the new handle would be built
// from pre-close filer metadata and its next flush could overwrite the
// data that was just written asynchronously.
wfs.waitForPendingAsyncFlush(inode)
var entry *filer_pb.Entry
var path util.FullPath
path, _, entry, status = wfs.maybeReadEntry(inode)
@ -20,8 +27,59 @@ func (wfs *WFS) AcquireHandle(inode uint64, flags, uid, gid uint32) (fileHandle
return
}
// ReleaseHandle is called from FUSE Release. For handles with a pending
// async flush, the map removal and the pendingAsyncFlush registration are
// done under a single lock hold so that a concurrent AcquireHandle cannot
// slip through the gap between the two (P1-1 TOCTOU fix).
//
// The handle intentionally stays in fhMap during the drain so that rename
// and unlink can still find it via FindFileHandle (P1-2 fix). It is
// removed from fhMap only after the drain completes (RemoveFileHandle).
func (wfs *WFS) ReleaseHandle(handleId FileHandleId) {
wfs.fhMap.ReleaseByHandle(handleId)
// Hold pendingAsyncFlushMu across the counter decrement and the
// pending-flush registration. Lock ordering: pendingAsyncFlushMu → fhMap.
wfs.pendingAsyncFlushMu.Lock()
fhToRelease := wfs.fhMap.ReleaseByHandle(handleId)
if fhToRelease != nil && fhToRelease.asyncFlushPending {
done := make(chan struct{})
wfs.pendingAsyncFlush[fhToRelease.inode] = done
wfs.pendingAsyncFlushMu.Unlock()
wfs.asyncFlushWg.Add(1)
go func() {
defer wfs.asyncFlushWg.Done()
defer func() {
// Remove from fhMap first (so AcquireFileHandle creates a fresh handle).
wfs.fhMap.RemoveFileHandle(fhToRelease.fh, fhToRelease.inode)
// Then signal completion (unblocks waitForPendingAsyncFlush).
close(done)
wfs.pendingAsyncFlushMu.Lock()
delete(wfs.pendingAsyncFlush, fhToRelease.inode)
wfs.pendingAsyncFlushMu.Unlock()
}()
wfs.completeAsyncFlush(fhToRelease)
}()
return
}
wfs.pendingAsyncFlushMu.Unlock()
if fhToRelease != nil {
fhToRelease.ReleaseHandle()
}
}
// waitForPendingAsyncFlush blocks until any in-flight async flush for
// the given inode completes. Called from AcquireHandle before building
// new handle state, so the filer metadata reflects the flushed data.
func (wfs *WFS) waitForPendingAsyncFlush(inode uint64) {
wfs.pendingAsyncFlushMu.Lock()
done, found := wfs.pendingAsyncFlush[inode]
wfs.pendingAsyncFlushMu.Unlock()
if found {
glog.V(3).Infof("waitForPendingAsyncFlush: waiting for inode %d", inode)
<-done
}
}
func (wfs *WFS) GetHandle(handleId FileHandleId) *FileHandle {

7
weed/mount/weedfs_forget.go

@ -66,5 +66,10 @@ func (wfs *WFS) Forget(nodeid, nlookup uint64) {
wfs.inodeToPath.Forget(nodeid, nlookup, func(dir util.FullPath) {
wfs.metaCache.DeleteFolderChildren(context.Background(), dir)
})
wfs.fhMap.ReleaseByInode(nodeid)
// ReleaseByInode returns nil if the handle is already draining (counter
// was already <= 0 from a prior Release). Only non-async handles that
// reach counter 0 here need cleanup.
if fhToRelease := wfs.fhMap.ReleaseByInode(nodeid); fhToRelease != nil {
fhToRelease.ReleaseHandle()
}
}

6
weed/mount/weedfs_rename.go

@ -253,6 +253,12 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
if entry := fh.GetEntry(); entry != nil {
entry.Name = newName
}
// Keep the saved async-flush path current so the fallback
// after Forget uses the post-rename location, not the old one.
if fh.asyncFlushPending {
fh.asyncFlushDir = string(newParent)
fh.asyncFlushName = newName
}
}
// invalidate attr and data
// wfs.fuseServer.InodeNotify(sourceInode, 0, -1)

Loading…
Cancel
Save