From e457676c8deaae15ee561cca86d8b808139e17d5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 29 Nov 2025 00:29:10 -0800 Subject: [PATCH] mount: improve read throughput with parallel chunk fetching This addresses issue #7504 where a single weed mount FUSE instance does not fully utilize node network bandwidth when reading large files. Changes: - Add -concurrentReaders mount option (default: 16) to control the maximum number of parallel chunk fetches during read operations - Implement parallel section reading in ChunkGroup.ReadDataAt() using errgroup for better throughput when reading across multiple sections - Enhance ReaderCache with MaybeCacheMany() to prefetch multiple chunks ahead in parallel during sequential reads (now prefetches 4 chunks) - Increase ReaderCache limit dynamically based on concurrentReaders to support higher read parallelism The bottleneck was that chunks were being read sequentially even when they reside on different volume servers. By introducing parallel chunk fetching, a single mount instance can now better saturate available network bandwidth. Fixes: #7504 --- weed/command/fuse_std.go | 7 ++ weed/command/mount.go | 2 + weed/command/mount_std.go | 1 + weed/filer/filechunk_group.go | 137 ++++++++++++++++++++++++++++++++-- weed/filer/reader_at.go | 4 +- weed/filer/reader_cache.go | 15 +++- weed/mount/filehandle.go | 2 +- weed/mount/weedfs.go | 1 + 8 files changed, 158 insertions(+), 11 deletions(-) diff --git a/weed/command/fuse_std.go b/weed/command/fuse_std.go index b2839aaf8..2cc6fa8ab 100644 --- a/weed/command/fuse_std.go +++ b/weed/command/fuse_std.go @@ -155,6 +155,13 @@ func runFuse(cmd *Command, args []string) bool { } else { panic(fmt.Errorf("concurrentWriters: %s", err)) } + case "concurrentReaders": + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.concurrentReaders = &intValue + } else { + panic(fmt.Errorf("concurrentReaders: %s", err)) + } case "cacheDir": mountOptions.cacheDirForRead = ¶meter.value case "cacheCapacityMB": diff --git a/weed/command/mount.go b/weed/command/mount.go index 98f139c6f..618bbd3ae 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -17,6 +17,7 @@ type MountOptions struct { ttlSec *int chunkSizeLimitMB *int concurrentWriters *int + concurrentReaders *int cacheMetaTtlSec *int cacheDirForRead *string cacheDirForWrite *string @@ -65,6 +66,7 @@ func init() { mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers") + mountOptions.concurrentReaders = cmdMount.Flag.Int("concurrentReaders", 16, "limit concurrent chunk fetches for read operations") mountOptions.cacheDirForRead = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMBForRead = cmdMount.Flag.Int64("cacheCapacityMB", 128, "file chunk read cache capacity in MB") mountOptions.cacheDirForWrite = cmdMount.Flag.String("cacheDirWrite", "", "buffer writes mostly for large files") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 53b09589d..d1593454e 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -236,6 +236,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { DiskType: types.ToDiskType(*option.diskType), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, ConcurrentWriters: *option.concurrentWriters, + ConcurrentReaders: *option.concurrentReaders, CacheDirForRead: *option.cacheDirForRead, CacheSizeMBForRead: *option.cacheSizeMBForRead, CacheDirForWrite: cacheDirForWrite, diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 0f449735a..61bdf5905 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -5,23 +5,46 @@ import ( "io" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type ChunkGroup struct { - lookupFn wdclient.LookupFileIdFunctionType - sections map[SectionIndex]*FileChunkSection - sectionsLock sync.RWMutex - readerCache *ReaderCache + lookupFn wdclient.LookupFileIdFunctionType + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex + readerCache *ReaderCache + concurrentReaders int } +// NewChunkGroup creates a ChunkGroup with default concurrency settings. +// For better read performance, use NewChunkGroupWithConcurrency instead. func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { + return NewChunkGroupWithConcurrency(lookupFn, chunkCache, chunks, 16) +} + +// NewChunkGroupWithConcurrency creates a ChunkGroup with configurable concurrency. +// concurrentReaders controls: +// - Maximum parallel chunk fetches during read operations +// - Read-ahead prefetch parallelism +// - Number of concurrent section reads for large files +func NewChunkGroupWithConcurrency(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) { + if concurrentReaders <= 0 { + concurrentReaders = 16 + } + // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching + readerCacheLimit := concurrentReaders * 2 + if readerCacheLimit < 32 { + readerCacheLimit = 32 + } group := &ChunkGroup{ - lookupFn: lookupFn, - sections: make(map[SectionIndex]*FileChunkSection), - readerCache: NewReaderCache(32, chunkCache, lookupFn), + lookupFn: lookupFn, + sections: make(map[SectionIndex]*FileChunkSection), + readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn), + concurrentReaders: concurrentReaders, } err := group.SetChunks(chunks) @@ -54,6 +77,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] defer group.sectionsLock.RUnlock() sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // For single section or when concurrency is disabled, use sequential reading + if numSections <= 1 || group.concurrentReaders <= 1 { + return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) + } + + // For multiple sections, use parallel reading + return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) +} + +// readDataAtSequential reads sections sequentially (original behavior) +func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { for si := sectionIndexStart; si < sectionIndexStop+1; si++ { section, found := group.sections[si] rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) @@ -78,6 +114,93 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] return } +// sectionReadResult holds the result of a section read operation +type sectionReadResult struct { + sectionIndex SectionIndex + n int + tsNs int64 + err error +} + +// readDataAtParallel reads multiple sections in parallel for better throughput +func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // Limit concurrency + maxConcurrent := group.concurrentReaders + if maxConcurrent > numSections { + maxConcurrent = numSections + } + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(maxConcurrent) + + results := make([]sectionReadResult, numSections) + + for i := 0; i < numSections; i++ { + si := sectionIndexStart + SectionIndex(i) + idx := i + + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if rangeStart >= rangeStop { + continue + } + + if !found { + // Zero-fill missing sections synchronously + rangeStop = min(rangeStop, fileSize) + for j := rangeStart; j < rangeStop; j++ { + buff[j-offset] = 0 + } + results[idx] = sectionReadResult{ + sectionIndex: si, + n: int(rangeStop - rangeStart), + tsNs: 0, + err: nil, + } + continue + } + + // Capture variables for closure + sectionCopy := section + buffSlice := buff[rangeStart-offset : rangeStop-offset] + rangeStartCopy := rangeStart + + g.Go(func() error { + xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy) + results[idx] = sectionReadResult{ + sectionIndex: si, + n: xn, + tsNs: xTsNs, + err: xErr, + } + if xErr != nil && xErr != io.EOF { + return xErr + } + return nil + }) + } + + // Wait for all goroutines to complete + groupErr := g.Wait() + + // Aggregate results + for _, result := range results { + if result.err != nil && err == nil { + err = result.err + } + n += result.n + tsNs = max(tsNs, result.tsNs) + } + + if groupErr != nil && err == nil { + err = groupErr + } + + return +} + func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { group.sectionsLock.RLock() defer group.sectionsLock.RUnlock() diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index aeac9b34a..baf63e630 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -247,7 +247,9 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk c.readerCache.UnCache(c.lastChunkFid) } if nextChunkViews != nil { - c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning + // Prefetch multiple chunks ahead for better sequential read throughput + // This keeps the network pipeline full with parallel chunk fetches + c.readerCache.MaybeCacheMany(nextChunkViews, 4) } } } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 11382bed3..86906d042 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -47,9 +47,19 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn } func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { + rc.MaybeCacheMany(chunkViews, 1) +} + +// MaybeCacheMany prefetches up to 'count' chunks ahead in parallel. +// This improves read throughput for sequential reads by keeping the +// network pipeline full with parallel chunk fetches. +func (rc *ReaderCache) MaybeCacheMany(chunkViews *Interval[*ChunkView], count int) { if rc.lookupFileIdFn == nil { return } + if count <= 0 { + count = 1 + } rc.Lock() defer rc.Unlock() @@ -58,7 +68,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { return } - for x := chunkViews; x != nil; x = x.Next { + cached := 0 + for x := chunkViews; x != nil && cached < count; x = x.Next { chunkView := x.Value if _, found := rc.downloaders[chunkView.FileId]; found { continue @@ -80,7 +91,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[chunkView.FileId] = cacher - + cached++ } return diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index c20f9eca8..a19b901f7 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -81,7 +81,7 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { fileSize := filer.FileSize(entry) entry.Attributes.FileSize = fileSize var resolveManifestErr error - fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks) + fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroupWithConcurrency(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks, fh.wfs.option.ConcurrentReaders) if resolveManifestErr != nil { glog.Warningf("failed to resolve manifest chunks in %+v", entry) } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 21c54841a..80e062c60 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -42,6 +42,7 @@ type Option struct { DiskType types.DiskType ChunkSizeLimit int64 ConcurrentWriters int + ConcurrentReaders int CacheDirForRead string CacheSizeMBForRead int64 CacheDirForWrite string