From e47054a7e7fb599a17414a4e242356dace4cd826 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 24 Mar 2026 20:31:53 -0700 Subject: [PATCH] mount: improve small file write performance (#8769) * mount: defer file creation gRPC to flush time for faster small file writes When creating a file via FUSE Create(), skip the synchronous gRPC CreateEntry call to the filer. Instead, allocate the inode and build the entry locally, deferring the filer create to the Flush/Release path where flushMetadataToFiler already sends a CreateEntry with chunk data. This eliminates one synchronous gRPC round-trip per file during creation. For workloads with many small files (e.g. 30K files), this reduces the per-file overhead from ~2 gRPC calls to ~1. Mknod retains synchronous filer creation since it has no file handle and thus no flush path. * mount: use bounded worker pool for async flush operations Replace unbounded goroutine spawning in writebackCache async flush with a fixed-size worker pool backed by a channel. When many files are closed rapidly (e.g., cp -r of 30K files), the previous approach spawned one goroutine per file, leading to resource contention on gRPC/HTTP connections and high goroutine overhead. The worker pool size matches ConcurrentWriters (default 128), which provides good parallelism while bounding resource usage. Work items are queued into a buffered channel and processed by persistent worker goroutines. * mount: fix deferred create cache visibility and async flush race Three fixes for the deferred create and async flush changes: 1. Insert a local placeholder entry into the metadata cache during deferred file creation so that maybeLoadEntry() can find the file for duplicate-create checks, stat, and readdir. Uses InsertEntry directly (not applyLocalMetadataEvent) to avoid triggering the directory hot-threshold eviction that would wipe the entry. 2. Fix race in ReleaseHandle where asyncFlushWg.Add(1) and the channel send happened after pendingAsyncFlushMu was unlocked. A concurrent WaitForAsyncFlush could observe a zero counter, close the channel, and cause a send-on-closed panic. Move Add(1) before the unlock; keep the send after unlock to avoid deadlock with workers that acquire the same mutex during cleanup. 3. Update TestCreateCreatesAndOpensFile to flush the file handle before verifying the CreateEntry gRPC call, since file creation is now deferred to flush time. --- weed/mount/weedfs.go | 14 ++++++++- weed/mount/weedfs_async_flush.go | 49 +++++++++++++++++++++++++---- weed/mount/weedfs_file_mkrm.go | 34 +++++++++++++++----- weed/mount/weedfs_file_mkrm_test.go | 12 +++++++ weed/mount/weedfs_filehandle.go | 20 ++++-------- 5 files changed, 101 insertions(+), 28 deletions(-) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 82d570c7d..ff7ea52de 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -119,10 +119,15 @@ type WFS struct { dirHotThreshold int dirIdleEvict time.Duration - // asyncFlushWg tracks pending background flush goroutines for writebackCache mode. + // asyncFlushWg tracks pending background flush work items for writebackCache mode. // Must be waited on before unmount cleanup to prevent data loss. asyncFlushWg sync.WaitGroup + // asyncFlushCh is a bounded work queue for background flush operations. + // A fixed pool of worker goroutines processes items from this channel, + // preventing resource exhaustion from unbounded goroutine creation. + asyncFlushCh chan *asyncFlushItem + // pendingAsyncFlush tracks in-flight async flush goroutines by inode. // AcquireHandle checks this to wait for a pending flush before reopening // the same inode, preventing stale metadata from overwriting the async flush. @@ -277,6 +282,13 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters) } + if wfs.option.WritebackCache { + numWorkers := wfs.option.ConcurrentWriters + if numWorkers <= 0 { + numWorkers = 128 + } + wfs.startAsyncFlushWorkers(numWorkers) + } wfs.copyBufferPool.New = func() any { return make([]byte, option.ChunkSizeLimit) } diff --git a/weed/mount/weedfs_async_flush.go b/weed/mount/weedfs_async_flush.go index 9446d278f..5ace5e341 100644 --- a/weed/mount/weedfs_async_flush.go +++ b/weed/mount/weedfs_async_flush.go @@ -8,14 +8,48 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -// completeAsyncFlush is called in a background goroutine when a file handle -// with pending async flush work is released. It performs the deferred data -// upload and metadata flush that was skipped in doFlush() for writebackCache mode. +// asyncFlushItem holds the data needed for a background flush work item. +type asyncFlushItem struct { + fh *FileHandle + done chan struct{} +} + +// startAsyncFlushWorkers launches a fixed pool of goroutines that process +// background flush work items from asyncFlushCh. This bounds the number of +// concurrent flush operations to prevent resource exhaustion (connections, +// goroutines) when many files are closed rapidly (e.g., cp -r with writebackCache). +func (wfs *WFS) startAsyncFlushWorkers(numWorkers int) { + wfs.asyncFlushCh = make(chan *asyncFlushItem, numWorkers*4) + for i := 0; i < numWorkers; i++ { + go wfs.asyncFlushWorker() + } +} + +func (wfs *WFS) asyncFlushWorker() { + for item := range wfs.asyncFlushCh { + wfs.processAsyncFlushItem(item) + } +} + +func (wfs *WFS) processAsyncFlushItem(item *asyncFlushItem) { + defer wfs.asyncFlushWg.Done() + defer func() { + // Remove from fhMap first (so AcquireFileHandle creates a fresh handle). + wfs.fhMap.RemoveFileHandle(item.fh.fh, item.fh.inode) + // Then signal completion (unblocks waitForPendingAsyncFlush). + close(item.done) + wfs.pendingAsyncFlushMu.Lock() + delete(wfs.pendingAsyncFlush, item.fh.inode) + wfs.pendingAsyncFlushMu.Unlock() + }() + wfs.completeAsyncFlush(item.fh) +} + +// completeAsyncFlush performs the deferred data upload and metadata flush +// that was skipped in doFlush() for writebackCache mode. // // This enables close() to return immediately for small file workloads (e.g., rsync), // while the actual I/O happens concurrently in the background. -// -// The caller (submitAsyncFlush) owns asyncFlushWg and the per-inode done channel. func (wfs *WFS) completeAsyncFlush(fh *FileHandle) { // Phase 1: Flush dirty pages — seals writable chunks, uploads to volume servers, and waits. // The underlying UploadWithRetry already retries transient HTTP/gRPC errors internally, @@ -80,8 +114,11 @@ func (wfs *WFS) flushMetadataWithRetry(fh *FileHandle, dir, name string, fileFul } } -// WaitForAsyncFlush waits for all pending background flush goroutines to complete. +// WaitForAsyncFlush waits for all pending background flush work items to complete. // Called before unmount cleanup to ensure no data is lost. func (wfs *WFS) WaitForAsyncFlush() { wfs.asyncFlushWg.Wait() + if wfs.asyncFlushCh != nil { + close(wfs.asyncFlushCh) + } } diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go index 122834573..02fec0b6b 100644 --- a/weed/mount/weedfs_file_mkrm.go +++ b/weed/mount/weedfs_file_mkrm.go @@ -6,6 +6,7 @@ import ( "time" "github.com/seaweedfs/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -64,7 +65,7 @@ func (wfs *WFS) Create(cancel <-chan struct{}, in *fuse.CreateIn, name string, o return code } - inode, newEntry, code = wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, 0) + inode, newEntry, code = wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, 0, true) if code == fuse.Status(syscall.EEXIST) && in.Flags&syscall.O_EXCL == 0 { // Race: another process created the file between our check and create. // Reopen the winner's entry. @@ -99,10 +100,14 @@ func (wfs *WFS) Create(cancel <-chan struct{}, in *fuse.CreateIn, name string, o wfs.outputPbEntry(&out.EntryOut, inode, newEntry) - fileHandle, status := wfs.AcquireHandle(inode, in.Flags, in.Uid, in.Gid) - if status != fuse.OK { - return status - } + // For deferred creates, bypass AcquireHandle (which calls maybeReadEntry + // and would fail since the entry is not yet on the filer or in the meta cache). + // We already have the entry from createRegularFile, so create the handle directly. + fileHandle := wfs.fhMap.AcquireFileHandle(wfs, inode, newEntry) + fileHandle.RememberPath(entryFullPath) + // Mark dirty so the deferred filer create happens on Flush, + // even if the file is closed without any writes. + fileHandle.dirtyMetadata = true out.Fh = uint64(fileHandle.fh) out.OpenFlags = 0 @@ -126,7 +131,7 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out return } - inode, newEntry, code := wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, in.Rdev) + inode, newEntry, code := wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, in.Rdev, false) if code != fuse.OK { return code } @@ -202,7 +207,7 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin } -func (wfs *WFS) createRegularFile(dirFullPath util.FullPath, name string, mode uint32, uid, gid, rdev uint32) (inode uint64, newEntry *filer_pb.Entry, code fuse.Status) { +func (wfs *WFS) createRegularFile(dirFullPath util.FullPath, name string, mode uint32, uid, gid, rdev uint32, deferFilerCreate bool) (inode uint64, newEntry *filer_pb.Entry, code fuse.Status) { if wfs.IsOverQuotaWithUncommitted() { return 0, nil, fuse.Status(syscall.ENOSPC) } @@ -244,6 +249,21 @@ func (wfs *WFS) createRegularFile(dirFullPath util.FullPath, name string, mode u }, } + if deferFilerCreate { + // Defer the filer gRPC call to flush time. The caller (Create) will + // build a file handle directly from newEntry, bypassing AcquireHandle. + // Insert a local placeholder into the metadata cache so that + // maybeLoadEntry() can find the file (e.g., duplicate-create checks, + // stat, readdir). The actual filer entry is created by flushMetadataToFiler. + // We use InsertEntry directly instead of applyLocalMetadataEvent to avoid + // triggering directory hot-threshold eviction that would wipe the entry. + if insertErr := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(string(dirFullPath), newEntry)); insertErr != nil { + glog.Warningf("createFile %s: insert local entry: %v", entryFullPath, insertErr) + } + glog.V(3).Infof("createFile %s: deferred to flush", entryFullPath) + return inode, newEntry, fuse.OK + } + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { wfs.mapPbIdFromLocalToFiler(newEntry) defer wfs.mapPbIdFromFilerToLocal(newEntry) diff --git a/weed/mount/weedfs_file_mkrm_test.go b/weed/mount/weedfs_file_mkrm_test.go index 36a7cdf0a..03506b48f 100644 --- a/weed/mount/weedfs_file_mkrm_test.go +++ b/weed/mount/weedfs_file_mkrm_test.go @@ -175,6 +175,18 @@ func TestCreateCreatesAndOpensFile(t *testing.T) { t.Fatalf("FullPath = %q, want %q", got, "/hello.txt") } + // File creation is deferred to flush time. Trigger a synchronous flush + // so the CreateEntry gRPC call is sent to the test server. + if flushStatus := wfs.Flush(make(chan struct{}), &fuse.FlushIn{ + InHeader: fuse.InHeader{ + NodeId: out.NodeId, + Caller: fuse.Caller{Owner: fuse.Owner{Uid: 123, Gid: 456}}, + }, + Fh: out.Fh, + }); flushStatus != fuse.OK { + t.Fatalf("Flush status = %v, want OK", flushStatus) + } + snapshot := testServer.snapshot() if snapshot.directory != "/" { t.Fatalf("CreateEntry directory = %q, want %q", snapshot.directory, "/") diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go index 155a97cd8..669b3190c 100644 --- a/weed/mount/weedfs_filehandle.go +++ b/weed/mount/weedfs_filehandle.go @@ -50,22 +50,14 @@ func (wfs *WFS) ReleaseHandle(handleId FileHandleId) { if fhToRelease != nil && fhToRelease.asyncFlushPending { done := make(chan struct{}) wfs.pendingAsyncFlush[fhToRelease.inode] = done + // Add(1) while holding the mutex so WaitForAsyncFlush cannot + // observe a zero counter and close the channel before we send. + wfs.asyncFlushWg.Add(1) wfs.pendingAsyncFlushMu.Unlock() - wfs.asyncFlushWg.Add(1) - go func() { - defer wfs.asyncFlushWg.Done() - defer func() { - // Remove from fhMap first (so AcquireFileHandle creates a fresh handle). - wfs.fhMap.RemoveFileHandle(fhToRelease.fh, fhToRelease.inode) - // Then signal completion (unblocks waitForPendingAsyncFlush). - close(done) - wfs.pendingAsyncFlushMu.Lock() - delete(wfs.pendingAsyncFlush, fhToRelease.inode) - wfs.pendingAsyncFlushMu.Unlock() - }() - wfs.completeAsyncFlush(fhToRelease) - }() + // Send after unlock to avoid deadlock — workers acquire the + // same mutex during cleanup. + wfs.asyncFlushCh <- &asyncFlushItem{fh: fhToRelease, done: done} return } wfs.pendingAsyncFlushMu.Unlock()