From e653de54b4fc00fc6a608266a2ea29b271dc2833 Mon Sep 17 00:00:00 2001 From: Gerry Hernandez Date: Fri, 13 Jun 2025 12:27:39 -0700 Subject: [PATCH] FUSE Mount: Fix buffer allocation during copy (#6863) Fix buffer allocation during FUSE copy --- weed/mount/weedfs.go | 36 ++++++----- weed/mount/weedfs_file_copy_range.go | 91 ++++++++++++++++++++++------ 2 files changed, 94 insertions(+), 33 deletions(-) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 2127e4a2b..849b3ad0c 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -3,11 +3,11 @@ package mount import ( "context" "errors" - "github.com/seaweedfs/seaweedfs/weed/util/version" "math/rand" "os" "path" "path/filepath" + "sync" "sync/atomic" "time" @@ -23,6 +23,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/hanwen/go-fuse/v2/fs" @@ -71,19 +72,21 @@ type WFS struct { fuse.RawFileSystem mount_pb.UnimplementedSeaweedMountServer fs.Inode - option *Option - metaCache *meta_cache.MetaCache - stats statsCache - chunkCache *chunk_cache.TieredChunkCache - signature int32 - concurrentWriters *util.LimitedConcurrentExecutor - inodeToPath *InodeToPath - fhMap *FileHandleToInode - dhMap *DirectoryHandleToInode - fuseServer *fuse.Server - IsOverQuota bool - fhLockTable *util.LockTable[FileHandleId] - FilerConf *filer.FilerConf + option *Option + metaCache *meta_cache.MetaCache + stats statsCache + chunkCache *chunk_cache.TieredChunkCache + signature int32 + concurrentWriters *util.LimitedConcurrentExecutor + copyBufferPool sync.Pool + concurrentCopiersSem chan struct{} + inodeToPath *InodeToPath + fhMap *FileHandleToInode + dhMap *DirectoryHandleToInode + fuseServer *fuse.Server + IsOverQuota bool + fhLockTable *util.LockTable[FileHandleId] + FilerConf *filer.FilerConf } func NewSeaweedFileSystem(option *Option) *WFS { @@ -139,6 +142,10 @@ func NewSeaweedFileSystem(option *Option) *WFS { if wfs.option.ConcurrentWriters > 0 { wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) + wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters) + } + wfs.copyBufferPool.New = func() any { + return make([]byte, option.ChunkSizeLimit) } return wfs } @@ -183,7 +190,6 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle } func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) { - // glog.V(3).Infof("read entry cache miss %s", fullpath) dir, name := fullpath.DirAndName() diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index 43ec289ab..bcf5ae03a 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -1,13 +1,13 @@ package mount import ( - "github.com/seaweedfs/seaweedfs/weed/util" "net/http" "time" "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" ) // CopyFileRange copies data from one file to another from and to specified offsets. @@ -70,30 +70,85 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) in.OffOut, in.OffOut+in.Len, ) - data := make([]byte, in.Len) - totalRead, err := readDataByFileHandle(data, fhIn, int64(in.OffIn)) - if err != nil { - glog.Warningf("file handle read %s %d: %v", fhIn.FullPath(), totalRead, err) - return 0, fuse.EIO + // Concurrent copy operations could allocate too much memory, so we want to + // throttle our concurrency, scaling with the number of writers the mount + // was configured with. + if wfs.concurrentCopiersSem != nil { + wfs.concurrentCopiersSem <- struct{}{} + defer func() { <-wfs.concurrentCopiersSem }() } - data = data[:totalRead] - if totalRead == 0 { + // We want to stream the copy operation to avoid allocating massive buffers. + nowUnixNano := time.Now().UnixNano() + totalCopied := int64(0) + buff := wfs.copyBufferPool.Get().([]byte) + defer wfs.copyBufferPool.Put(buff) + for { + // Comply with cancellation as best as we can, given that the underlying + // IO functions aren't cancellation-aware. + select { + case <-cancel: + glog.Warningf("canceled CopyFileRange for %s (copied %d)", + fhIn.FullPath(), totalCopied) + return uint32(totalCopied), fuse.EINTR + default: // keep going + } + + // We can save one IO by breaking early if we already know the next read + // will result in zero bytes. + remaining := int64(in.Len) - totalCopied + readLen := min(remaining, int64(len(buff))) + if readLen == 0 { + break + } + + // Perform the read + offsetIn := totalCopied + int64(in.OffIn) + numBytesRead, err := readDataByFileHandle( + buff[:readLen], fhIn, offsetIn) + if err != nil { + glog.Warningf("file handle read %s %d (total %d): %v", + fhIn.FullPath(), numBytesRead, totalCopied, err) + return 0, fuse.EIO + } + + // Break if we're done copying (no more bytes to read) + if numBytesRead == 0 { + break + } + + offsetOut := int64(in.OffOut) + totalCopied + + // Detect mime type only during the beginning of our stream, since + // DetectContentType is expecting some of the first 512 bytes of the + // file. See [http.DetectContentType] for details. + if offsetOut <= 512 { + fhOut.contentType = http.DetectContentType(buff[:numBytesRead]) + } + + // Perform the write + fhOut.dirtyPages.writerPattern.MonitorWriteAt(offsetOut, int(numBytesRead)) + fhOut.dirtyPages.AddPage( + offsetOut, + buff[:numBytesRead], + fhOut.dirtyPages.writerPattern.IsSequentialMode(), + nowUnixNano) + + // Accumulate for the next loop iteration + totalCopied += numBytesRead + } + + if totalCopied == 0 { return 0, fuse.OK } - // put data at the specified offset in target file - fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len)) + fhOut.entry.Attributes.FileSize = uint64(max( + totalCopied+int64(in.OffOut), + int64(fhOut.entry.Attributes.FileSize), + )) fhOut.entry.Content = nil - fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano()) - fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize))) fhOut.dirtyMetadata = true - written = uint32(totalRead) - - // detect mime type - if written > 0 && in.OffOut <= 512 { - fhOut.contentType = http.DetectContentType(data) - } + written = uint32(totalCopied) return written, fuse.OK }