diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 61bdf5905..9b86c5e47 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -51,6 +51,20 @@ func NewChunkGroupWithConcurrency(lookupFn wdclient.LookupFileIdFunctionType, ch return group, err } +// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads. +// This is derived from concurrentReaders to keep the network pipeline full. +func (group *ChunkGroup) GetPrefetchCount() int { + // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers) + prefetch := group.concurrentReaders / 4 + if prefetch < 1 { + prefetch = 1 + } + if prefetch > 8 { + prefetch = 8 // Cap at 8 to avoid excessive memory usage + } + return prefetch +} + func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { group.sectionsLock.Lock() @@ -126,9 +140,9 @@ type sectionReadResult struct { func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { numSections := int(sectionIndexStop - sectionIndexStart + 1) - // Limit concurrency + // Limit concurrency to the smaller of concurrentReaders and numSections maxConcurrent := group.concurrentReaders - if maxConcurrent > numSections { + if numSections < maxConcurrent { maxConcurrent = numSections } @@ -187,14 +201,16 @@ func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, // Aggregate results for _, result := range results { - if result.err != nil && err == nil { - err = result.err - } n += result.n tsNs = max(tsNs, result.tsNs) + // Collect first non-EOF error from results as fallback + if result.err != nil && result.err != io.EOF && err == nil { + err = result.err + } } - if groupErr != nil && err == nil { + // Prioritize errgroup error (first error that cancelled context) + if groupErr != nil { err = groupErr } diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go index 76eb84c23..dd7c2ea48 100644 --- a/weed/filer/filechunk_section.go +++ b/weed/filer/filechunk_section.go @@ -85,7 +85,7 @@ func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkG } if section.reader == nil { - section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize), group.GetPrefetchCount()) } section.isPrepared = true diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 2d2a58597..16287d7e4 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -13,6 +13,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" ) +// DefaultPrefetchCount is the default number of chunks to prefetch ahead during +// sequential reads. This value is used when prefetch count is not explicitly +// configured (e.g., WebDAV, query engine, message queue). For mount operations, +// the prefetch count is derived from the -concurrentReaders option. +const DefaultPrefetchCount = 4 + type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews *IntervalList[*ChunkView] @@ -20,6 +26,7 @@ type ChunkReadAt struct { readerCache *ReaderCache readerPattern *ReaderPattern lastChunkFid string + prefetchCount int // Number of chunks to prefetch ahead during sequential reads ctx context.Context // Context used for cancellation during chunk read operations } @@ -124,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, fileSize: fileSize, readerCache: readerCache, readerPattern: NewReaderPattern(), + prefetchCount: prefetchCount, ctx: ctx, } } @@ -247,10 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk if c.lastChunkFid != "" { c.readerCache.UnCache(c.lastChunkFid) } - if nextChunkViews != nil { + if nextChunkViews != nil && c.prefetchCount > 0 { // Prefetch multiple chunks ahead for better sequential read throughput // This keeps the network pipeline full with parallel chunk fetches - c.readerCache.MaybeCacheMany(nextChunkViews, 4) + c.readerCache.MaybeCacheMany(nextChunkViews, c.prefetchCount) } } } diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 01191eaad..7214965f5 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -101,7 +101,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // create parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index c09ce2f54..8fa9f4381 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -1286,7 +1286,7 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create parquet reader - this only reads metadata, not data parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index e4b5252c7..7a470817b 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -182,7 +182,7 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create Parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index aa43189f5..3e0e23148 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -566,7 +566,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize) - f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize) + f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize, filer.DefaultPrefetchCount) } readSize, err = f.reader.ReadAt(p, f.off)