diff --git a/weed/command/fuse_std.go b/weed/command/fuse_std.go index b2839aaf8..2cc6fa8ab 100644 --- a/weed/command/fuse_std.go +++ b/weed/command/fuse_std.go @@ -155,6 +155,13 @@ func runFuse(cmd *Command, args []string) bool { } else { panic(fmt.Errorf("concurrentWriters: %s", err)) } + case "concurrentReaders": + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.concurrentReaders = &intValue + } else { + panic(fmt.Errorf("concurrentReaders: %s", err)) + } case "cacheDir": mountOptions.cacheDirForRead = ¶meter.value case "cacheCapacityMB": diff --git a/weed/command/mount.go b/weed/command/mount.go index 98f139c6f..618bbd3ae 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -17,6 +17,7 @@ type MountOptions struct { ttlSec *int chunkSizeLimitMB *int concurrentWriters *int + concurrentReaders *int cacheMetaTtlSec *int cacheDirForRead *string cacheDirForWrite *string @@ -65,6 +66,7 @@ func init() { mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers") + mountOptions.concurrentReaders = cmdMount.Flag.Int("concurrentReaders", 16, "limit concurrent chunk fetches for read operations") mountOptions.cacheDirForRead = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMBForRead = cmdMount.Flag.Int64("cacheCapacityMB", 128, "file chunk read cache capacity in MB") mountOptions.cacheDirForWrite = cmdMount.Flag.String("cacheDirWrite", "", "buffer writes mostly for large files") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 53b09589d..d1593454e 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -236,6 +236,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { DiskType: types.ToDiskType(*option.diskType), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, ConcurrentWriters: *option.concurrentWriters, + ConcurrentReaders: *option.concurrentReaders, CacheDirForRead: *option.cacheDirForRead, CacheSizeMBForRead: *option.cacheSizeMBForRead, CacheDirForWrite: cacheDirForWrite, diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 0f449735a..ed92e78a9 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -5,29 +5,64 @@ import ( "io" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type ChunkGroup struct { - lookupFn wdclient.LookupFileIdFunctionType - sections map[SectionIndex]*FileChunkSection - sectionsLock sync.RWMutex - readerCache *ReaderCache + lookupFn wdclient.LookupFileIdFunctionType + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex + readerCache *ReaderCache + concurrentReaders int } -func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { +// NewChunkGroup creates a ChunkGroup with configurable concurrency. +// concurrentReaders controls: +// - Maximum parallel chunk fetches during read operations +// - Read-ahead prefetch parallelism +// - Number of concurrent section reads for large files +// If concurrentReaders <= 0, defaults to 16. +func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) { + if concurrentReaders <= 0 { + concurrentReaders = 16 + } + if concurrentReaders > 128 { + concurrentReaders = 128 // Cap to prevent excessive goroutine fan-out + } + // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching + readerCacheLimit := concurrentReaders * 2 + if readerCacheLimit < 32 { + readerCacheLimit = 32 + } group := &ChunkGroup{ - lookupFn: lookupFn, - sections: make(map[SectionIndex]*FileChunkSection), - readerCache: NewReaderCache(32, chunkCache, lookupFn), + lookupFn: lookupFn, + sections: make(map[SectionIndex]*FileChunkSection), + readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn), + concurrentReaders: concurrentReaders, } err := group.SetChunks(chunks) return group, err } +// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads. +// This is derived from concurrentReaders to keep the network pipeline full. +func (group *ChunkGroup) GetPrefetchCount() int { + // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers) + prefetch := group.concurrentReaders / 4 + if prefetch < 1 { + prefetch = 1 + } + if prefetch > 8 { + prefetch = 8 // Cap at 8 to avoid excessive memory usage + } + return prefetch +} + func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { group.sectionsLock.Lock() @@ -54,6 +89,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] defer group.sectionsLock.RUnlock() sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // For single section or when concurrency is disabled, use sequential reading + if numSections <= 1 || group.concurrentReaders <= 1 { + return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) + } + + // For multiple sections, use parallel reading + return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) +} + +// readDataAtSequential reads sections sequentially (original behavior) +func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { for si := sectionIndexStart; si < sectionIndexStop+1; si++ { section, found := group.sections[si] rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) @@ -78,9 +126,98 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] return } +// sectionReadResult holds the result of a section read operation +type sectionReadResult struct { + sectionIndex SectionIndex + n int + tsNs int64 + err error +} + +// readDataAtParallel reads multiple sections in parallel for better throughput +func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // Limit concurrency to the smaller of concurrentReaders and numSections + maxConcurrent := group.concurrentReaders + if numSections < maxConcurrent { + maxConcurrent = numSections + } + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(maxConcurrent) + + results := make([]sectionReadResult, numSections) + + for i := 0; i < numSections; i++ { + si := sectionIndexStart + SectionIndex(i) + idx := i + + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if rangeStart >= rangeStop { + continue + } + + if !found { + // Zero-fill missing sections synchronously + rangeStop = min(rangeStop, fileSize) + for j := rangeStart; j < rangeStop; j++ { + buff[j-offset] = 0 + } + results[idx] = sectionReadResult{ + sectionIndex: si, + n: int(rangeStop - rangeStart), + tsNs: 0, + err: nil, + } + continue + } + + // Capture variables for closure + sectionCopy := section + buffSlice := buff[rangeStart-offset : rangeStop-offset] + rangeStartCopy := rangeStart + + g.Go(func() error { + xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy) + results[idx] = sectionReadResult{ + sectionIndex: si, + n: xn, + tsNs: xTsNs, + err: xErr, + } + if xErr != nil && xErr != io.EOF { + return xErr + } + return nil + }) + } + + // Wait for all goroutines to complete + groupErr := g.Wait() + + // Aggregate results + for _, result := range results { + n += result.n + tsNs = max(tsNs, result.tsNs) + // Collect first non-EOF error from results as fallback + if result.err != nil && result.err != io.EOF && err == nil { + err = result.err + } + } + + // Prioritize errgroup error (first error that cancelled context) + if groupErr != nil { + err = groupErr + } + + return +} + func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { - group.sectionsLock.RLock() - defer group.sectionsLock.RUnlock() + group.sectionsLock.Lock() + defer group.sectionsLock.Unlock() var dataChunks []*filer_pb.FileChunk for _, chunk := range chunks { diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go index 76eb84c23..dd7c2ea48 100644 --- a/weed/filer/filechunk_section.go +++ b/weed/filer/filechunk_section.go @@ -85,7 +85,7 @@ func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkG } if section.reader == nil { - section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize), group.GetPrefetchCount()) } section.isPrepared = true diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index aeac9b34a..93fa76a2e 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -13,6 +13,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" ) +// DefaultPrefetchCount is the default number of chunks to prefetch ahead during +// sequential reads. This value is used when prefetch count is not explicitly +// configured (e.g., WebDAV, query engine, message queue). For mount operations, +// the prefetch count is derived from the -concurrentReaders option. +const DefaultPrefetchCount = 4 + type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews *IntervalList[*ChunkView] @@ -20,6 +26,7 @@ type ChunkReadAt struct { readerCache *ReaderCache readerPattern *ReaderPattern lastChunkFid string + prefetchCount int // Number of chunks to prefetch ahead during sequential reads ctx context.Context // Context used for cancellation during chunk read operations } @@ -35,8 +42,9 @@ var _ = io.Closer(&ChunkReadAt{}) // - No high availability (single filer address, no automatic failover) // // For NEW code, especially mount operations, use wdclient.FilerClient instead: -// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) -// lookupFn := filerClient.GetLookupFileIdFunction() +// +// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) +// lookupFn := filerClient.GetLookupFileIdFunction() // // This provides: // - Bounded cache with configurable size @@ -56,7 +64,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp var vidCacheLock sync.RWMutex cacheSize := 0 const maxCacheSize = 10000 // Simple bound to prevent unbounded growth - + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { vid := VolumeId(fileId) vidCacheLock.RLock() @@ -123,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, fileSize: fileSize, readerCache: readerCache, readerPattern: NewReaderPattern(), + prefetchCount: prefetchCount, ctx: ctx, } } @@ -246,8 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk if c.lastChunkFid != "" { c.readerCache.UnCache(c.lastChunkFid) } - if nextChunkViews != nil { - c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning + if nextChunkViews != nil && c.prefetchCount > 0 { + // Prefetch multiple chunks ahead for better sequential read throughput + // This keeps the network pipeline full with parallel chunk fetches + c.readerCache.MaybeCache(nextChunkViews, c.prefetchCount) } } } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 11382bed3..605be5e73 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -46,10 +46,16 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn } } -func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { +// MaybeCache prefetches up to 'count' chunks ahead in parallel. +// This improves read throughput for sequential reads by keeping the +// network pipeline full with parallel chunk fetches. +func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) { if rc.lookupFileIdFn == nil { return } + if count <= 0 { + count = 1 + } rc.Lock() defer rc.Unlock() @@ -58,7 +64,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { return } - for x := chunkViews; x != nil; x = x.Next { + cached := 0 + for x := chunkViews; x != nil && cached < count; x = x.Next { chunkView := x.Value if _, found := rc.downloaders[chunkView.FileId]; found { continue @@ -80,7 +87,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[chunkView.FileId] = cacher - + cached++ } return diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index c20f9eca8..e912fe310 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -81,7 +81,7 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { fileSize := filer.FileSize(entry) entry.Attributes.FileSize = fileSize var resolveManifestErr error - fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks) + fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks, fh.wfs.option.ConcurrentReaders) if resolveManifestErr != nil { glog.Warningf("failed to resolve manifest chunks in %+v", entry) } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 21c54841a..80e062c60 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -42,6 +42,7 @@ type Option struct { DiskType types.DiskType ChunkSizeLimit int64 ConcurrentWriters int + ConcurrentReaders int CacheDirForRead string CacheSizeMBForRead int64 CacheDirForWrite string diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 01191eaad..7214965f5 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -101,7 +101,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // create parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index c09ce2f54..8fa9f4381 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -1286,7 +1286,7 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create parquet reader - this only reads metadata, not data parquetReader := parquet.NewReader(readerAt) diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index e4b5252c7..7a470817b 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -182,7 +182,7 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize)) + readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) // Create Parquet reader parquetReader := parquet.NewReader(readerAt) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index aa43189f5..3e0e23148 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -566,7 +566,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize) - f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize) + f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize, filer.DefaultPrefetchCount) } readSize, err = f.reader.ReadAt(p, f.off)