diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 82d570c7d..ff7ea52de 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -119,10 +119,15 @@ type WFS struct { dirHotThreshold int dirIdleEvict time.Duration - // asyncFlushWg tracks pending background flush goroutines for writebackCache mode. + // asyncFlushWg tracks pending background flush work items for writebackCache mode. // Must be waited on before unmount cleanup to prevent data loss. asyncFlushWg sync.WaitGroup + // asyncFlushCh is a bounded work queue for background flush operations. + // A fixed pool of worker goroutines processes items from this channel, + // preventing resource exhaustion from unbounded goroutine creation. + asyncFlushCh chan *asyncFlushItem + // pendingAsyncFlush tracks in-flight async flush goroutines by inode. // AcquireHandle checks this to wait for a pending flush before reopening // the same inode, preventing stale metadata from overwriting the async flush. @@ -277,6 +282,13 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters) } + if wfs.option.WritebackCache { + numWorkers := wfs.option.ConcurrentWriters + if numWorkers <= 0 { + numWorkers = 128 + } + wfs.startAsyncFlushWorkers(numWorkers) + } wfs.copyBufferPool.New = func() any { return make([]byte, option.ChunkSizeLimit) } diff --git a/weed/mount/weedfs_async_flush.go b/weed/mount/weedfs_async_flush.go index 9446d278f..5ace5e341 100644 --- a/weed/mount/weedfs_async_flush.go +++ b/weed/mount/weedfs_async_flush.go @@ -8,14 +8,48 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -// completeAsyncFlush is called in a background goroutine when a file handle -// with pending async flush work is released. It performs the deferred data -// upload and metadata flush that was skipped in doFlush() for writebackCache mode. +// asyncFlushItem holds the data needed for a background flush work item. +type asyncFlushItem struct { + fh *FileHandle + done chan struct{} +} + +// startAsyncFlushWorkers launches a fixed pool of goroutines that process +// background flush work items from asyncFlushCh. This bounds the number of +// concurrent flush operations to prevent resource exhaustion (connections, +// goroutines) when many files are closed rapidly (e.g., cp -r with writebackCache). +func (wfs *WFS) startAsyncFlushWorkers(numWorkers int) { + wfs.asyncFlushCh = make(chan *asyncFlushItem, numWorkers*4) + for i := 0; i < numWorkers; i++ { + go wfs.asyncFlushWorker() + } +} + +func (wfs *WFS) asyncFlushWorker() { + for item := range wfs.asyncFlushCh { + wfs.processAsyncFlushItem(item) + } +} + +func (wfs *WFS) processAsyncFlushItem(item *asyncFlushItem) { + defer wfs.asyncFlushWg.Done() + defer func() { + // Remove from fhMap first (so AcquireFileHandle creates a fresh handle). + wfs.fhMap.RemoveFileHandle(item.fh.fh, item.fh.inode) + // Then signal completion (unblocks waitForPendingAsyncFlush). + close(item.done) + wfs.pendingAsyncFlushMu.Lock() + delete(wfs.pendingAsyncFlush, item.fh.inode) + wfs.pendingAsyncFlushMu.Unlock() + }() + wfs.completeAsyncFlush(item.fh) +} + +// completeAsyncFlush performs the deferred data upload and metadata flush +// that was skipped in doFlush() for writebackCache mode. // // This enables close() to return immediately for small file workloads (e.g., rsync), // while the actual I/O happens concurrently in the background. -// -// The caller (submitAsyncFlush) owns asyncFlushWg and the per-inode done channel. func (wfs *WFS) completeAsyncFlush(fh *FileHandle) { // Phase 1: Flush dirty pages — seals writable chunks, uploads to volume servers, and waits. // The underlying UploadWithRetry already retries transient HTTP/gRPC errors internally, @@ -80,8 +114,11 @@ func (wfs *WFS) flushMetadataWithRetry(fh *FileHandle, dir, name string, fileFul } } -// WaitForAsyncFlush waits for all pending background flush goroutines to complete. +// WaitForAsyncFlush waits for all pending background flush work items to complete. // Called before unmount cleanup to ensure no data is lost. func (wfs *WFS) WaitForAsyncFlush() { wfs.asyncFlushWg.Wait() + if wfs.asyncFlushCh != nil { + close(wfs.asyncFlushCh) + } } diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go index 122834573..02fec0b6b 100644 --- a/weed/mount/weedfs_file_mkrm.go +++ b/weed/mount/weedfs_file_mkrm.go @@ -6,6 +6,7 @@ import ( "time" "github.com/seaweedfs/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -64,7 +65,7 @@ func (wfs *WFS) Create(cancel <-chan struct{}, in *fuse.CreateIn, name string, o return code } - inode, newEntry, code = wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, 0) + inode, newEntry, code = wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, 0, true) if code == fuse.Status(syscall.EEXIST) && in.Flags&syscall.O_EXCL == 0 { // Race: another process created the file between our check and create. // Reopen the winner's entry. @@ -99,10 +100,14 @@ func (wfs *WFS) Create(cancel <-chan struct{}, in *fuse.CreateIn, name string, o wfs.outputPbEntry(&out.EntryOut, inode, newEntry) - fileHandle, status := wfs.AcquireHandle(inode, in.Flags, in.Uid, in.Gid) - if status != fuse.OK { - return status - } + // For deferred creates, bypass AcquireHandle (which calls maybeReadEntry + // and would fail since the entry is not yet on the filer or in the meta cache). + // We already have the entry from createRegularFile, so create the handle directly. + fileHandle := wfs.fhMap.AcquireFileHandle(wfs, inode, newEntry) + fileHandle.RememberPath(entryFullPath) + // Mark dirty so the deferred filer create happens on Flush, + // even if the file is closed without any writes. + fileHandle.dirtyMetadata = true out.Fh = uint64(fileHandle.fh) out.OpenFlags = 0 @@ -126,7 +131,7 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out return } - inode, newEntry, code := wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, in.Rdev) + inode, newEntry, code := wfs.createRegularFile(dirFullPath, name, in.Mode, in.Uid, in.Gid, in.Rdev, false) if code != fuse.OK { return code } @@ -202,7 +207,7 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin } -func (wfs *WFS) createRegularFile(dirFullPath util.FullPath, name string, mode uint32, uid, gid, rdev uint32) (inode uint64, newEntry *filer_pb.Entry, code fuse.Status) { +func (wfs *WFS) createRegularFile(dirFullPath util.FullPath, name string, mode uint32, uid, gid, rdev uint32, deferFilerCreate bool) (inode uint64, newEntry *filer_pb.Entry, code fuse.Status) { if wfs.IsOverQuotaWithUncommitted() { return 0, nil, fuse.Status(syscall.ENOSPC) } @@ -244,6 +249,21 @@ func (wfs *WFS) createRegularFile(dirFullPath util.FullPath, name string, mode u }, } + if deferFilerCreate { + // Defer the filer gRPC call to flush time. The caller (Create) will + // build a file handle directly from newEntry, bypassing AcquireHandle. + // Insert a local placeholder into the metadata cache so that + // maybeLoadEntry() can find the file (e.g., duplicate-create checks, + // stat, readdir). The actual filer entry is created by flushMetadataToFiler. + // We use InsertEntry directly instead of applyLocalMetadataEvent to avoid + // triggering directory hot-threshold eviction that would wipe the entry. + if insertErr := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(string(dirFullPath), newEntry)); insertErr != nil { + glog.Warningf("createFile %s: insert local entry: %v", entryFullPath, insertErr) + } + glog.V(3).Infof("createFile %s: deferred to flush", entryFullPath) + return inode, newEntry, fuse.OK + } + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { wfs.mapPbIdFromLocalToFiler(newEntry) defer wfs.mapPbIdFromFilerToLocal(newEntry) diff --git a/weed/mount/weedfs_file_mkrm_test.go b/weed/mount/weedfs_file_mkrm_test.go index 36a7cdf0a..03506b48f 100644 --- a/weed/mount/weedfs_file_mkrm_test.go +++ b/weed/mount/weedfs_file_mkrm_test.go @@ -175,6 +175,18 @@ func TestCreateCreatesAndOpensFile(t *testing.T) { t.Fatalf("FullPath = %q, want %q", got, "/hello.txt") } + // File creation is deferred to flush time. Trigger a synchronous flush + // so the CreateEntry gRPC call is sent to the test server. + if flushStatus := wfs.Flush(make(chan struct{}), &fuse.FlushIn{ + InHeader: fuse.InHeader{ + NodeId: out.NodeId, + Caller: fuse.Caller{Owner: fuse.Owner{Uid: 123, Gid: 456}}, + }, + Fh: out.Fh, + }); flushStatus != fuse.OK { + t.Fatalf("Flush status = %v, want OK", flushStatus) + } + snapshot := testServer.snapshot() if snapshot.directory != "/" { t.Fatalf("CreateEntry directory = %q, want %q", snapshot.directory, "/") diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go index 155a97cd8..669b3190c 100644 --- a/weed/mount/weedfs_filehandle.go +++ b/weed/mount/weedfs_filehandle.go @@ -50,22 +50,14 @@ func (wfs *WFS) ReleaseHandle(handleId FileHandleId) { if fhToRelease != nil && fhToRelease.asyncFlushPending { done := make(chan struct{}) wfs.pendingAsyncFlush[fhToRelease.inode] = done + // Add(1) while holding the mutex so WaitForAsyncFlush cannot + // observe a zero counter and close the channel before we send. + wfs.asyncFlushWg.Add(1) wfs.pendingAsyncFlushMu.Unlock() - wfs.asyncFlushWg.Add(1) - go func() { - defer wfs.asyncFlushWg.Done() - defer func() { - // Remove from fhMap first (so AcquireFileHandle creates a fresh handle). - wfs.fhMap.RemoveFileHandle(fhToRelease.fh, fhToRelease.inode) - // Then signal completion (unblocks waitForPendingAsyncFlush). - close(done) - wfs.pendingAsyncFlushMu.Lock() - delete(wfs.pendingAsyncFlush, fhToRelease.inode) - wfs.pendingAsyncFlushMu.Unlock() - }() - wfs.completeAsyncFlush(fhToRelease) - }() + // Send after unlock to avoid deadlock — workers acquire the + // same mutex during cleanup. + wfs.asyncFlushCh <- &asyncFlushItem{fh: fhToRelease, done: done} return } wfs.pendingAsyncFlushMu.Unlock()