diff --git a/weed/command/fuse_std.go b/weed/command/fuse_std.go index b2839aaf8..2cc6fa8ab 100644 --- a/weed/command/fuse_std.go +++ b/weed/command/fuse_std.go @@ -155,6 +155,13 @@ func runFuse(cmd *Command, args []string) bool { } else { panic(fmt.Errorf("concurrentWriters: %s", err)) } + case "concurrentReaders": + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.concurrentReaders = &intValue + } else { + panic(fmt.Errorf("concurrentReaders: %s", err)) + } case "cacheDir": mountOptions.cacheDirForRead = ¶meter.value case "cacheCapacityMB": diff --git a/weed/command/mount.go b/weed/command/mount.go index 98f139c6f..618bbd3ae 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -17,6 +17,7 @@ type MountOptions struct { ttlSec *int chunkSizeLimitMB *int concurrentWriters *int + concurrentReaders *int cacheMetaTtlSec *int cacheDirForRead *string cacheDirForWrite *string @@ -65,6 +66,7 @@ func init() { mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers") + mountOptions.concurrentReaders = cmdMount.Flag.Int("concurrentReaders", 16, "limit concurrent chunk fetches for read operations") mountOptions.cacheDirForRead = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMBForRead = cmdMount.Flag.Int64("cacheCapacityMB", 128, "file chunk read cache capacity in MB") mountOptions.cacheDirForWrite = cmdMount.Flag.String("cacheDirWrite", "", "buffer writes mostly for large files") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 53b09589d..d1593454e 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -236,6 +236,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { DiskType: types.ToDiskType(*option.diskType), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, ConcurrentWriters: *option.concurrentWriters, + ConcurrentReaders: *option.concurrentReaders, CacheDirForRead: *option.cacheDirForRead, CacheSizeMBForRead: *option.cacheSizeMBForRead, CacheDirForWrite: cacheDirForWrite, diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 0f449735a..61bdf5905 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -5,23 +5,46 @@ import ( "io" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type ChunkGroup struct { - lookupFn wdclient.LookupFileIdFunctionType - sections map[SectionIndex]*FileChunkSection - sectionsLock sync.RWMutex - readerCache *ReaderCache + lookupFn wdclient.LookupFileIdFunctionType + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex + readerCache *ReaderCache + concurrentReaders int } +// NewChunkGroup creates a ChunkGroup with default concurrency settings. +// For better read performance, use NewChunkGroupWithConcurrency instead. func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { + return NewChunkGroupWithConcurrency(lookupFn, chunkCache, chunks, 16) +} + +// NewChunkGroupWithConcurrency creates a ChunkGroup with configurable concurrency. +// concurrentReaders controls: +// - Maximum parallel chunk fetches during read operations +// - Read-ahead prefetch parallelism +// - Number of concurrent section reads for large files +func NewChunkGroupWithConcurrency(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) { + if concurrentReaders <= 0 { + concurrentReaders = 16 + } + // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching + readerCacheLimit := concurrentReaders * 2 + if readerCacheLimit < 32 { + readerCacheLimit = 32 + } group := &ChunkGroup{ - lookupFn: lookupFn, - sections: make(map[SectionIndex]*FileChunkSection), - readerCache: NewReaderCache(32, chunkCache, lookupFn), + lookupFn: lookupFn, + sections: make(map[SectionIndex]*FileChunkSection), + readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn), + concurrentReaders: concurrentReaders, } err := group.SetChunks(chunks) @@ -54,6 +77,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] defer group.sectionsLock.RUnlock() sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // For single section or when concurrency is disabled, use sequential reading + if numSections <= 1 || group.concurrentReaders <= 1 { + return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) + } + + // For multiple sections, use parallel reading + return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) +} + +// readDataAtSequential reads sections sequentially (original behavior) +func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { for si := sectionIndexStart; si < sectionIndexStop+1; si++ { section, found := group.sections[si] rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) @@ -78,6 +114,93 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] return } +// sectionReadResult holds the result of a section read operation +type sectionReadResult struct { + sectionIndex SectionIndex + n int + tsNs int64 + err error +} + +// readDataAtParallel reads multiple sections in parallel for better throughput +func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // Limit concurrency + maxConcurrent := group.concurrentReaders + if maxConcurrent > numSections { + maxConcurrent = numSections + } + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(maxConcurrent) + + results := make([]sectionReadResult, numSections) + + for i := 0; i < numSections; i++ { + si := sectionIndexStart + SectionIndex(i) + idx := i + + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if rangeStart >= rangeStop { + continue + } + + if !found { + // Zero-fill missing sections synchronously + rangeStop = min(rangeStop, fileSize) + for j := rangeStart; j < rangeStop; j++ { + buff[j-offset] = 0 + } + results[idx] = sectionReadResult{ + sectionIndex: si, + n: int(rangeStop - rangeStart), + tsNs: 0, + err: nil, + } + continue + } + + // Capture variables for closure + sectionCopy := section + buffSlice := buff[rangeStart-offset : rangeStop-offset] + rangeStartCopy := rangeStart + + g.Go(func() error { + xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy) + results[idx] = sectionReadResult{ + sectionIndex: si, + n: xn, + tsNs: xTsNs, + err: xErr, + } + if xErr != nil && xErr != io.EOF { + return xErr + } + return nil + }) + } + + // Wait for all goroutines to complete + groupErr := g.Wait() + + // Aggregate results + for _, result := range results { + if result.err != nil && err == nil { + err = result.err + } + n += result.n + tsNs = max(tsNs, result.tsNs) + } + + if groupErr != nil && err == nil { + err = groupErr + } + + return +} + func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { group.sectionsLock.RLock() defer group.sectionsLock.RUnlock() diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index aeac9b34a..baf63e630 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -247,7 +247,9 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk c.readerCache.UnCache(c.lastChunkFid) } if nextChunkViews != nil { - c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning + // Prefetch multiple chunks ahead for better sequential read throughput + // This keeps the network pipeline full with parallel chunk fetches + c.readerCache.MaybeCacheMany(nextChunkViews, 4) } } } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 11382bed3..86906d042 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -47,9 +47,19 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn } func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { + rc.MaybeCacheMany(chunkViews, 1) +} + +// MaybeCacheMany prefetches up to 'count' chunks ahead in parallel. +// This improves read throughput for sequential reads by keeping the +// network pipeline full with parallel chunk fetches. +func (rc *ReaderCache) MaybeCacheMany(chunkViews *Interval[*ChunkView], count int) { if rc.lookupFileIdFn == nil { return } + if count <= 0 { + count = 1 + } rc.Lock() defer rc.Unlock() @@ -58,7 +68,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { return } - for x := chunkViews; x != nil; x = x.Next { + cached := 0 + for x := chunkViews; x != nil && cached < count; x = x.Next { chunkView := x.Value if _, found := rc.downloaders[chunkView.FileId]; found { continue @@ -80,7 +91,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[chunkView.FileId] = cacher - + cached++ } return diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index c20f9eca8..a19b901f7 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -81,7 +81,7 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { fileSize := filer.FileSize(entry) entry.Attributes.FileSize = fileSize var resolveManifestErr error - fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks) + fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroupWithConcurrency(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks, fh.wfs.option.ConcurrentReaders) if resolveManifestErr != nil { glog.Warningf("failed to resolve manifest chunks in %+v", entry) } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 21c54841a..80e062c60 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -42,6 +42,7 @@ type Option struct { DiskType types.DiskType ChunkSizeLimit int64 ConcurrentWriters int + ConcurrentReaders int CacheDirForRead string CacheSizeMBForRead int64 CacheDirForWrite string