Browse Source

mount: improve read throughput with parallel chunk fetching

This addresses issue #7504 where a single weed mount FUSE instance
does not fully utilize node network bandwidth when reading large files.

Changes:
- Add -concurrentReaders mount option (default: 16) to control the
  maximum number of parallel chunk fetches during read operations
- Implement parallel section reading in ChunkGroup.ReadDataAt() using
  errgroup for better throughput when reading across multiple sections
- Enhance ReaderCache with MaybeCacheMany() to prefetch multiple chunks
  ahead in parallel during sequential reads (now prefetches 4 chunks)
- Increase ReaderCache limit dynamically based on concurrentReaders
  to support higher read parallelism

The bottleneck was that chunks were being read sequentially even when
they reside on different volume servers. By introducing parallel chunk
fetching, a single mount instance can now better saturate available
network bandwidth.

Fixes: #7504
pull/7569/head
Chris Lu 5 days ago
parent
commit
e457676c8d
  1. 7
      weed/command/fuse_std.go
  2. 2
      weed/command/mount.go
  3. 1
      weed/command/mount_std.go
  4. 137
      weed/filer/filechunk_group.go
  5. 4
      weed/filer/reader_at.go
  6. 15
      weed/filer/reader_cache.go
  7. 2
      weed/mount/filehandle.go
  8. 1
      weed/mount/weedfs.go

7
weed/command/fuse_std.go

@ -155,6 +155,13 @@ func runFuse(cmd *Command, args []string) bool {
} else { } else {
panic(fmt.Errorf("concurrentWriters: %s", err)) panic(fmt.Errorf("concurrentWriters: %s", err))
} }
case "concurrentReaders":
if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
intValue := int(parsed)
mountOptions.concurrentReaders = &intValue
} else {
panic(fmt.Errorf("concurrentReaders: %s", err))
}
case "cacheDir": case "cacheDir":
mountOptions.cacheDirForRead = &parameter.value mountOptions.cacheDirForRead = &parameter.value
case "cacheCapacityMB": case "cacheCapacityMB":

2
weed/command/mount.go

@ -17,6 +17,7 @@ type MountOptions struct {
ttlSec *int ttlSec *int
chunkSizeLimitMB *int chunkSizeLimitMB *int
concurrentWriters *int concurrentWriters *int
concurrentReaders *int
cacheMetaTtlSec *int cacheMetaTtlSec *int
cacheDirForRead *string cacheDirForRead *string
cacheDirForWrite *string cacheDirForWrite *string
@ -65,6 +66,7 @@ func init() {
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers")
mountOptions.concurrentReaders = cmdMount.Flag.Int("concurrentReaders", 16, "limit concurrent chunk fetches for read operations")
mountOptions.cacheDirForRead = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheDirForRead = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMBForRead = cmdMount.Flag.Int64("cacheCapacityMB", 128, "file chunk read cache capacity in MB") mountOptions.cacheSizeMBForRead = cmdMount.Flag.Int64("cacheCapacityMB", 128, "file chunk read cache capacity in MB")
mountOptions.cacheDirForWrite = cmdMount.Flag.String("cacheDirWrite", "", "buffer writes mostly for large files") mountOptions.cacheDirForWrite = cmdMount.Flag.String("cacheDirWrite", "", "buffer writes mostly for large files")

1
weed/command/mount_std.go

@ -236,6 +236,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
DiskType: types.ToDiskType(*option.diskType), DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters, ConcurrentWriters: *option.concurrentWriters,
ConcurrentReaders: *option.concurrentReaders,
CacheDirForRead: *option.cacheDirForRead, CacheDirForRead: *option.cacheDirForRead,
CacheSizeMBForRead: *option.cacheSizeMBForRead, CacheSizeMBForRead: *option.cacheSizeMBForRead,
CacheDirForWrite: cacheDirForWrite, CacheDirForWrite: cacheDirForWrite,

137
weed/filer/filechunk_group.go

@ -5,23 +5,46 @@ import (
"io" "io"
"sync" "sync"
"golang.org/x/sync/errgroup"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/wdclient"
) )
type ChunkGroup struct { type ChunkGroup struct {
lookupFn wdclient.LookupFileIdFunctionType
sections map[SectionIndex]*FileChunkSection
sectionsLock sync.RWMutex
readerCache *ReaderCache
lookupFn wdclient.LookupFileIdFunctionType
sections map[SectionIndex]*FileChunkSection
sectionsLock sync.RWMutex
readerCache *ReaderCache
concurrentReaders int
} }
// NewChunkGroup creates a ChunkGroup with default concurrency settings.
// For better read performance, use NewChunkGroupWithConcurrency instead.
func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
return NewChunkGroupWithConcurrency(lookupFn, chunkCache, chunks, 16)
}
// NewChunkGroupWithConcurrency creates a ChunkGroup with configurable concurrency.
// concurrentReaders controls:
// - Maximum parallel chunk fetches during read operations
// - Read-ahead prefetch parallelism
// - Number of concurrent section reads for large files
func NewChunkGroupWithConcurrency(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) {
if concurrentReaders <= 0 {
concurrentReaders = 16
}
// ReaderCache limit should be at least concurrentReaders to allow parallel prefetching
readerCacheLimit := concurrentReaders * 2
if readerCacheLimit < 32 {
readerCacheLimit = 32
}
group := &ChunkGroup{ group := &ChunkGroup{
lookupFn: lookupFn,
sections: make(map[SectionIndex]*FileChunkSection),
readerCache: NewReaderCache(32, chunkCache, lookupFn),
lookupFn: lookupFn,
sections: make(map[SectionIndex]*FileChunkSection),
readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn),
concurrentReaders: concurrentReaders,
} }
err := group.SetChunks(chunks) err := group.SetChunks(chunks)
@ -54,6 +77,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
defer group.sectionsLock.RUnlock() defer group.sectionsLock.RUnlock()
sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize)
numSections := int(sectionIndexStop - sectionIndexStart + 1)
// For single section or when concurrency is disabled, use sequential reading
if numSections <= 1 || group.concurrentReaders <= 1 {
return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
}
// For multiple sections, use parallel reading
return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
}
// readDataAtSequential reads sections sequentially (original behavior)
func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
for si := sectionIndexStart; si < sectionIndexStop+1; si++ { for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si] section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
@ -78,6 +114,93 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
return return
} }
// sectionReadResult holds the result of a section read operation
type sectionReadResult struct {
sectionIndex SectionIndex
n int
tsNs int64
err error
}
// readDataAtParallel reads multiple sections in parallel for better throughput
func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
numSections := int(sectionIndexStop - sectionIndexStart + 1)
// Limit concurrency
maxConcurrent := group.concurrentReaders
if maxConcurrent > numSections {
maxConcurrent = numSections
}
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(maxConcurrent)
results := make([]sectionReadResult, numSections)
for i := 0; i < numSections; i++ {
si := sectionIndexStart + SectionIndex(i)
idx := i
section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
if rangeStart >= rangeStop {
continue
}
if !found {
// Zero-fill missing sections synchronously
rangeStop = min(rangeStop, fileSize)
for j := rangeStart; j < rangeStop; j++ {
buff[j-offset] = 0
}
results[idx] = sectionReadResult{
sectionIndex: si,
n: int(rangeStop - rangeStart),
tsNs: 0,
err: nil,
}
continue
}
// Capture variables for closure
sectionCopy := section
buffSlice := buff[rangeStart-offset : rangeStop-offset]
rangeStartCopy := rangeStart
g.Go(func() error {
xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy)
results[idx] = sectionReadResult{
sectionIndex: si,
n: xn,
tsNs: xTsNs,
err: xErr,
}
if xErr != nil && xErr != io.EOF {
return xErr
}
return nil
})
}
// Wait for all goroutines to complete
groupErr := g.Wait()
// Aggregate results
for _, result := range results {
if result.err != nil && err == nil {
err = result.err
}
n += result.n
tsNs = max(tsNs, result.tsNs)
}
if groupErr != nil && err == nil {
err = groupErr
}
return
}
func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
group.sectionsLock.RLock() group.sectionsLock.RLock()
defer group.sectionsLock.RUnlock() defer group.sectionsLock.RUnlock()

4
weed/filer/reader_at.go

@ -247,7 +247,9 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
c.readerCache.UnCache(c.lastChunkFid) c.readerCache.UnCache(c.lastChunkFid)
} }
if nextChunkViews != nil { if nextChunkViews != nil {
c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning
// Prefetch multiple chunks ahead for better sequential read throughput
// This keeps the network pipeline full with parallel chunk fetches
c.readerCache.MaybeCacheMany(nextChunkViews, 4)
} }
} }
} }

15
weed/filer/reader_cache.go

@ -47,9 +47,19 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
} }
func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
rc.MaybeCacheMany(chunkViews, 1)
}
// MaybeCacheMany prefetches up to 'count' chunks ahead in parallel.
// This improves read throughput for sequential reads by keeping the
// network pipeline full with parallel chunk fetches.
func (rc *ReaderCache) MaybeCacheMany(chunkViews *Interval[*ChunkView], count int) {
if rc.lookupFileIdFn == nil { if rc.lookupFileIdFn == nil {
return return
} }
if count <= 0 {
count = 1
}
rc.Lock() rc.Lock()
defer rc.Unlock() defer rc.Unlock()
@ -58,7 +68,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
return return
} }
for x := chunkViews; x != nil; x = x.Next {
cached := 0
for x := chunkViews; x != nil && cached < count; x = x.Next {
chunkView := x.Value chunkView := x.Value
if _, found := rc.downloaders[chunkView.FileId]; found { if _, found := rc.downloaders[chunkView.FileId]; found {
continue continue
@ -80,7 +91,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
go cacher.startCaching() go cacher.startCaching()
<-cacher.cacheStartedCh <-cacher.cacheStartedCh
rc.downloaders[chunkView.FileId] = cacher rc.downloaders[chunkView.FileId] = cacher
cached++
} }
return return

2
weed/mount/filehandle.go

@ -81,7 +81,7 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
fileSize := filer.FileSize(entry) fileSize := filer.FileSize(entry)
entry.Attributes.FileSize = fileSize entry.Attributes.FileSize = fileSize
var resolveManifestErr error var resolveManifestErr error
fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroupWithConcurrency(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks, fh.wfs.option.ConcurrentReaders)
if resolveManifestErr != nil { if resolveManifestErr != nil {
glog.Warningf("failed to resolve manifest chunks in %+v", entry) glog.Warningf("failed to resolve manifest chunks in %+v", entry)
} }

1
weed/mount/weedfs.go

@ -42,6 +42,7 @@ type Option struct {
DiskType types.DiskType DiskType types.DiskType
ChunkSizeLimit int64 ChunkSizeLimit int64
ConcurrentWriters int ConcurrentWriters int
ConcurrentReaders int
CacheDirForRead string CacheDirForRead string
CacheSizeMBForRead int64 CacheSizeMBForRead int64
CacheDirForWrite string CacheDirForWrite string

Loading…
Cancel
Save