From c114e8701f3020832dc71d055c29b7b22dfc7695 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 29 Nov 2025 00:39:43 -0800 Subject: [PATCH] Address review comments: make prefetch configurable, improve error handling Changes: 1. Add DefaultPrefetchCount constant (4) to reader_at.go 2. Add GetPrefetchCount() method to ChunkGroup that derives prefetch count from concurrentReaders (1/4 ratio, min 1, max 8) 3. Pass prefetch count through NewChunkReaderAtFromClient 4. Fix error handling in readDataAtParallel to prioritize errgroup error 5. Update all callers to use DefaultPrefetchCount constant For mount operations, prefetch scales with -concurrentReaders: - concurrentReaders=16 (default) -> prefetch=4 - concurrentReaders=32 -> prefetch=8 (capped) - concurrentReaders=4 -> prefetch=1 For non-mount paths (WebDAV, query engine, MQ), uses DefaultPrefetchCount. --- weed/filer/filechunk_group.go | 28 ++++++++++++++++----- weed/filer/filechunk_section.go | 2 +- weed/filer/reader_at.go | 14 ++++++++--- weed/mq/logstore/read_parquet_to_log.go | 2 +- weed/query/engine/hybrid_message_scanner.go | 2 +- weed/query/engine/parquet_scanner.go | 2 +- weed/server/webdav_server.go | 2 +- 7 files changed, 38 insertions(+), 14 deletions(-) diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 61bdf5905..9b86c5e47 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -51,6 +51,20 @@ func NewChunkGroupWithConcurrency(lookupFn wdclient.LookupFileIdFunctionType, ch return group, err } +// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads. +// This is derived from concurrentReaders to keep the network pipeline full. +func (group *ChunkGroup) GetPrefetchCount() int { + // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers) + prefetch := group.concurrentReaders / 4 + if prefetch < 1 { + prefetch = 1 + } + if prefetch > 8 { + prefetch = 8 // Cap at 8 to avoid excessive memory usage + } + return prefetch +} + func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { group.sectionsLock.Lock() @@ -126,9 +140,9 @@ type sectionReadResult struct { func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { numSections := int(sectionIndexStop - sectionIndexStart + 1) - // Limit concurrency + // Limit concurrency to the smaller of concurrentReaders and numSections maxConcurrent := group.concurrentReaders - if maxConcurrent > numSections { + if numSections < maxConcurrent { maxConcurrent = numSections } @@ -187,14 +201,16 @@ func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, // Aggregate results for _, result := range results { - if result.err != nil && err == nil { - err = result.err - } n += result.n tsNs = max(tsNs, result.tsNs) + // Collect first non-EOF error from results as fallback + if result.err != nil && result.err != io.EOF && err == nil { + err = result.err + } } - if groupErr != nil && err == nil { + // Prioritize errgroup error (first error that cancelled context) + if groupErr != nil { err = groupErr } diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go index 76eb84c23..dd7c2ea48 100644 --- a/weed/filer/filechunk_section.go +++ b/weed/filer/filechunk_section.go @@ -85,7 +85,7 @@ func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkG } if section.reader == nil { - section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize), group.GetPrefetchCount()) } section.isPrepared = true diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 2d2a58597..16287d7e4 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -13,6 +13,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" ) +// DefaultPrefetchCount is the default number of chunks to prefetch ahead during +// sequential reads. This value is used when prefetch count is not explicitly +// configured (e.g., WebDAV, query engine, message queue). For mount operations, +// the prefetch count is derived from the -concurrentReaders option. +const DefaultPrefetchCount = 4 + type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews *IntervalList[*ChunkView] @@ -20,6 +26,7 @@ type ChunkReadAt struct { readerCache *ReaderCache readerPattern *ReaderPattern lastChunkFid string + prefetchCount int // Number of chunks to prefetch ahead during sequential reads ctx context.Context // Context used for cancellation during chunk read operations } @@ -124,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, fileSize: fileSize, readerCache: readerCache, readerPattern: NewReaderPattern(), + prefetchCount: prefetchCount, ctx: ctx, } } @@ -247,10 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk if c.lastChunkFid != "" { c.readerCache.UnCache(c.lastChunkFid) } - if nextChunkViews != nil { + if nextChunkViews != nil && c.prefetchCount > 0 { // Prefetch multiple chunks ahead for better sequential read throughput // This keeps the network pipeline full with parallel chunk fetches - c.readerCache.MaybeCacheMany(nextChunkViews, 4) + c.readerCache.MaybeCacheMany(nextChunkViews, c.prefetchCount) } } } diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 01191eaad..7214965f5 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -101,7 +101,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // create parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index c09ce2f54..8fa9f4381 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -1286,7 +1286,7 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create parquet reader - this only reads metadata, not data parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index e4b5252c7..7a470817b 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -182,7 +182,7 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create Parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index aa43189f5..3e0e23148 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -566,7 +566,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize) - f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize) + f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize, filer.DefaultPrefetchCount) } readSize, err = f.reader.ReadAt(p, f.off)