From d48e1e16598fea8f774301a66024af3d5928124e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 29 Nov 2025 10:06:11 -0800 Subject: [PATCH] mount: improve read throughput with parallel chunk fetching (#7569) * mount: improve read throughput with parallel chunk fetching This addresses issue #7504 where a single weed mount FUSE instance does not fully utilize node network bandwidth when reading large files. Changes: - Add -concurrentReaders mount option (default: 16) to control the maximum number of parallel chunk fetches during read operations - Implement parallel section reading in ChunkGroup.ReadDataAt() using errgroup for better throughput when reading across multiple sections - Enhance ReaderCache with MaybeCacheMany() to prefetch multiple chunks ahead in parallel during sequential reads (now prefetches 4 chunks) - Increase ReaderCache limit dynamically based on concurrentReaders to support higher read parallelism The bottleneck was that chunks were being read sequentially even when they reside on different volume servers. By introducing parallel chunk fetching, a single mount instance can now better saturate available network bandwidth. Fixes: #7504 * fmt * Address review comments: make prefetch configurable, improve error handling Changes: 1. Add DefaultPrefetchCount constant (4) to reader_at.go 2. Add GetPrefetchCount() method to ChunkGroup that derives prefetch count from concurrentReaders (1/4 ratio, min 1, max 8) 3. Pass prefetch count through NewChunkReaderAtFromClient 4. Fix error handling in readDataAtParallel to prioritize errgroup error 5. Update all callers to use DefaultPrefetchCount constant For mount operations, prefetch scales with -concurrentReaders: - concurrentReaders=16 (default) -> prefetch=4 - concurrentReaders=32 -> prefetch=8 (capped) - concurrentReaders=4 -> prefetch=1 For non-mount paths (WebDAV, query engine, MQ), uses DefaultPrefetchCount. * fmt * Refactor: use variadic parameter instead of new function name Use NewChunkGroup with optional concurrentReaders parameter instead of creating a separate NewChunkGroupWithConcurrency function. This maintains backward compatibility - existing callers without the parameter get the default of 16 concurrent readers. * Use explicit concurrentReaders parameter instead of variadic * Refactor: use MaybeCache with count parameter instead of new MaybeCacheMany function * Address nitpick review comments - Add upper bound (128) on concurrentReaders to prevent excessive goroutine fan-out - Cap readerCacheLimit at 256 accordingly - Fix SetChunks: use Lock() instead of RLock() since we are writing to group.sections --- weed/command/fuse_std.go | 7 + weed/command/mount.go | 2 + weed/command/mount_std.go | 1 + weed/filer/filechunk_group.go | 157 ++++++++++++++++++-- weed/filer/filechunk_section.go | 2 +- weed/filer/reader_at.go | 23 ++- weed/filer/reader_cache.go | 13 +- weed/mount/filehandle.go | 2 +- weed/mount/weedfs.go | 1 + weed/mq/logstore/read_parquet_to_log.go | 2 +- weed/query/engine/hybrid_message_scanner.go | 2 +- weed/query/engine/parquet_scanner.go | 2 +- weed/server/webdav_server.go | 2 +- 13 files changed, 191 insertions(+), 25 deletions(-) diff --git a/weed/command/fuse_std.go b/weed/command/fuse_std.go index b2839aaf8..2cc6fa8ab 100644 --- a/weed/command/fuse_std.go +++ b/weed/command/fuse_std.go @@ -155,6 +155,13 @@ func runFuse(cmd *Command, args []string) bool { } else { panic(fmt.Errorf("concurrentWriters: %s", err)) } + case "concurrentReaders": + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.concurrentReaders = &intValue + } else { + panic(fmt.Errorf("concurrentReaders: %s", err)) + } case "cacheDir": mountOptions.cacheDirForRead = ¶meter.value case "cacheCapacityMB": diff --git a/weed/command/mount.go b/weed/command/mount.go index 98f139c6f..618bbd3ae 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -17,6 +17,7 @@ type MountOptions struct { ttlSec *int chunkSizeLimitMB *int concurrentWriters *int + concurrentReaders *int cacheMetaTtlSec *int cacheDirForRead *string cacheDirForWrite *string @@ -65,6 +66,7 @@ func init() { mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers") + mountOptions.concurrentReaders = cmdMount.Flag.Int("concurrentReaders", 16, "limit concurrent chunk fetches for read operations") mountOptions.cacheDirForRead = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMBForRead = cmdMount.Flag.Int64("cacheCapacityMB", 128, "file chunk read cache capacity in MB") mountOptions.cacheDirForWrite = cmdMount.Flag.String("cacheDirWrite", "", "buffer writes mostly for large files") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 53b09589d..d1593454e 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -236,6 +236,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { DiskType: types.ToDiskType(*option.diskType), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, ConcurrentWriters: *option.concurrentWriters, + ConcurrentReaders: *option.concurrentReaders, CacheDirForRead: *option.cacheDirForRead, CacheSizeMBForRead: *option.cacheSizeMBForRead, CacheDirForWrite: cacheDirForWrite, diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 0f449735a..ed92e78a9 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -5,29 +5,64 @@ import ( "io" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type ChunkGroup struct { - lookupFn wdclient.LookupFileIdFunctionType - sections map[SectionIndex]*FileChunkSection - sectionsLock sync.RWMutex - readerCache *ReaderCache + lookupFn wdclient.LookupFileIdFunctionType + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex + readerCache *ReaderCache + concurrentReaders int } -func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { +// NewChunkGroup creates a ChunkGroup with configurable concurrency. +// concurrentReaders controls: +// - Maximum parallel chunk fetches during read operations +// - Read-ahead prefetch parallelism +// - Number of concurrent section reads for large files +// If concurrentReaders <= 0, defaults to 16. +func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) { + if concurrentReaders <= 0 { + concurrentReaders = 16 + } + if concurrentReaders > 128 { + concurrentReaders = 128 // Cap to prevent excessive goroutine fan-out + } + // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching + readerCacheLimit := concurrentReaders * 2 + if readerCacheLimit < 32 { + readerCacheLimit = 32 + } group := &ChunkGroup{ - lookupFn: lookupFn, - sections: make(map[SectionIndex]*FileChunkSection), - readerCache: NewReaderCache(32, chunkCache, lookupFn), + lookupFn: lookupFn, + sections: make(map[SectionIndex]*FileChunkSection), + readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn), + concurrentReaders: concurrentReaders, } err := group.SetChunks(chunks) return group, err } +// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads. +// This is derived from concurrentReaders to keep the network pipeline full. +func (group *ChunkGroup) GetPrefetchCount() int { + // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers) + prefetch := group.concurrentReaders / 4 + if prefetch < 1 { + prefetch = 1 + } + if prefetch > 8 { + prefetch = 8 // Cap at 8 to avoid excessive memory usage + } + return prefetch +} + func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { group.sectionsLock.Lock() @@ -54,6 +89,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] defer group.sectionsLock.RUnlock() sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // For single section or when concurrency is disabled, use sequential reading + if numSections <= 1 || group.concurrentReaders <= 1 { + return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) + } + + // For multiple sections, use parallel reading + return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) +} + +// readDataAtSequential reads sections sequentially (original behavior) +func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { for si := sectionIndexStart; si < sectionIndexStop+1; si++ { section, found := group.sections[si] rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) @@ -78,9 +126,98 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] return } +// sectionReadResult holds the result of a section read operation +type sectionReadResult struct { + sectionIndex SectionIndex + n int + tsNs int64 + err error +} + +// readDataAtParallel reads multiple sections in parallel for better throughput +func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // Limit concurrency to the smaller of concurrentReaders and numSections + maxConcurrent := group.concurrentReaders + if numSections < maxConcurrent { + maxConcurrent = numSections + } + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(maxConcurrent) + + results := make([]sectionReadResult, numSections) + + for i := 0; i < numSections; i++ { + si := sectionIndexStart + SectionIndex(i) + idx := i + + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if rangeStart >= rangeStop { + continue + } + + if !found { + // Zero-fill missing sections synchronously + rangeStop = min(rangeStop, fileSize) + for j := rangeStart; j < rangeStop; j++ { + buff[j-offset] = 0 + } + results[idx] = sectionReadResult{ + sectionIndex: si, + n: int(rangeStop - rangeStart), + tsNs: 0, + err: nil, + } + continue + } + + // Capture variables for closure + sectionCopy := section + buffSlice := buff[rangeStart-offset : rangeStop-offset] + rangeStartCopy := rangeStart + + g.Go(func() error { + xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy) + results[idx] = sectionReadResult{ + sectionIndex: si, + n: xn, + tsNs: xTsNs, + err: xErr, + } + if xErr != nil && xErr != io.EOF { + return xErr + } + return nil + }) + } + + // Wait for all goroutines to complete + groupErr := g.Wait() + + // Aggregate results + for _, result := range results { + n += result.n + tsNs = max(tsNs, result.tsNs) + // Collect first non-EOF error from results as fallback + if result.err != nil && result.err != io.EOF && err == nil { + err = result.err + } + } + + // Prioritize errgroup error (first error that cancelled context) + if groupErr != nil { + err = groupErr + } + + return +} + func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { - group.sectionsLock.RLock() - defer group.sectionsLock.RUnlock() + group.sectionsLock.Lock() + defer group.sectionsLock.Unlock() var dataChunks []*filer_pb.FileChunk for _, chunk := range chunks { diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go index 76eb84c23..dd7c2ea48 100644 --- a/weed/filer/filechunk_section.go +++ b/weed/filer/filechunk_section.go @@ -85,7 +85,7 @@ func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkG } if section.reader == nil { - section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize), group.GetPrefetchCount()) } section.isPrepared = true diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index aeac9b34a..93fa76a2e 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -13,6 +13,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" ) +// DefaultPrefetchCount is the default number of chunks to prefetch ahead during +// sequential reads. This value is used when prefetch count is not explicitly +// configured (e.g., WebDAV, query engine, message queue). For mount operations, +// the prefetch count is derived from the -concurrentReaders option. +const DefaultPrefetchCount = 4 + type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews *IntervalList[*ChunkView] @@ -20,6 +26,7 @@ type ChunkReadAt struct { readerCache *ReaderCache readerPattern *ReaderPattern lastChunkFid string + prefetchCount int // Number of chunks to prefetch ahead during sequential reads ctx context.Context // Context used for cancellation during chunk read operations } @@ -35,8 +42,9 @@ var _ = io.Closer(&ChunkReadAt{}) // - No high availability (single filer address, no automatic failover) // // For NEW code, especially mount operations, use wdclient.FilerClient instead: -// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) -// lookupFn := filerClient.GetLookupFileIdFunction() +// +// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) +// lookupFn := filerClient.GetLookupFileIdFunction() // // This provides: // - Bounded cache with configurable size @@ -56,7 +64,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp var vidCacheLock sync.RWMutex cacheSize := 0 const maxCacheSize = 10000 // Simple bound to prevent unbounded growth - + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { vid := VolumeId(fileId) vidCacheLock.RLock() @@ -123,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, fileSize: fileSize, readerCache: readerCache, readerPattern: NewReaderPattern(), + prefetchCount: prefetchCount, ctx: ctx, } } @@ -246,8 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk if c.lastChunkFid != "" { c.readerCache.UnCache(c.lastChunkFid) } - if nextChunkViews != nil { - c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning + if nextChunkViews != nil && c.prefetchCount > 0 { + // Prefetch multiple chunks ahead for better sequential read throughput + // This keeps the network pipeline full with parallel chunk fetches + c.readerCache.MaybeCache(nextChunkViews, c.prefetchCount) } } } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 11382bed3..605be5e73 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -46,10 +46,16 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn } } -func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { +// MaybeCache prefetches up to 'count' chunks ahead in parallel. +// This improves read throughput for sequential reads by keeping the +// network pipeline full with parallel chunk fetches. +func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) { if rc.lookupFileIdFn == nil { return } + if count <= 0 { + count = 1 + } rc.Lock() defer rc.Unlock() @@ -58,7 +64,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { return } - for x := chunkViews; x != nil; x = x.Next { + cached := 0 + for x := chunkViews; x != nil && cached < count; x = x.Next { chunkView := x.Value if _, found := rc.downloaders[chunkView.FileId]; found { continue @@ -80,7 +87,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[chunkView.FileId] = cacher - + cached++ } return diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index c20f9eca8..e912fe310 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -81,7 +81,7 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { fileSize := filer.FileSize(entry) entry.Attributes.FileSize = fileSize var resolveManifestErr error - fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks) + fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks, fh.wfs.option.ConcurrentReaders) if resolveManifestErr != nil { glog.Warningf("failed to resolve manifest chunks in %+v", entry) } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 21c54841a..80e062c60 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -42,6 +42,7 @@ type Option struct { DiskType types.DiskType ChunkSizeLimit int64 ConcurrentWriters int + ConcurrentReaders int CacheDirForRead string CacheSizeMBForRead int64 CacheDirForWrite string diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 01191eaad..7214965f5 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -101,7 +101,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // create parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index c09ce2f54..8fa9f4381 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -1286,7 +1286,7 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create parquet reader - this only reads metadata, not data parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index e4b5252c7..7a470817b 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -182,7 +182,7 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create Parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index aa43189f5..3e0e23148 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -566,7 +566,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize) - f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize) + f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize, filer.DefaultPrefetchCount) } readSize, err = f.reader.ReadAt(p, f.off)