@ -7,6 +7,8 @@ import (
"math/rand"
"sync"
"golang.org/x/sync/errgroup"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@ -175,67 +177,150 @@ func (c *ChunkReadAt) ReadAtWithTime(ctx context.Context, p []byte, offset int64
return c . doReadAt ( ctx , p , offset )
}
// chunkReadTask represents a single chunk read operation for parallel processing
type chunkReadTask struct {
chunk * ChunkView
bufferStart int64 // start position in the output buffer
bufferEnd int64 // end position in the output buffer
chunkOffset uint64 // offset within the chunk to read from
bytesRead int
modifiedTsNs int64
err error
}
func ( c * ChunkReadAt ) doReadAt ( ctx context . Context , p [ ] byte , offset int64 ) ( n int , ts int64 , err error ) {
// Collect all chunk read tasks
var tasks [ ] * chunkReadTask
var gaps [ ] struct { start , length int64 } // gaps that need zero-filling
startOffset , remaining := offset , int64 ( len ( p ) )
var nextChunks * Interval [ * ChunkView ]
var lastChunk * Interval [ * ChunkView ]
for x := c . chunkViews . Front ( ) ; x != nil ; x = x . Next {
chunk := x . Value
if remaining <= 0 {
break
}
if x . Next != nil {
nextChunks = x . Next
}
lastChunk = x
// Handle gap before this chunk
if startOffset < chunk . ViewOffset {
gap := chunk . ViewOffset - startOffset
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , startOffset , chunk . ViewOffset )
n += zero ( p , startOffset - offset , gap )
gaps = append ( gaps , struct { start , length int64 } { startOffset - offset , gap } )
startOffset , remaining = chunk . ViewOffset , remaining - gap
if remaining <= 0 {
break
}
}
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
chunkStart , chunkStop := max ( chunk . ViewOffset , startOffset ) , min ( chunk . ViewOffset + int64 ( chunk . ViewSize ) , startOffset + remaining )
if chunkStart >= chunkStop {
continue
}
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
bufferOffset := chunkStart - chunk . ViewOffset + chunk . OffsetInChunk
ts = chunk . ModifiedTsNs
copied , err := c . readChunkSliceAt ( ctx , p [ startOffset - offset : chunkStop - chunkStart + startOffset - offset ] , chunk , nextChunks , uint64 ( bufferOffset ) )
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return copied , ts , err
tasks = append ( tasks , & chunkReadTask {
chunk : chunk ,
bufferStart : startOffset - offset ,
bufferEnd : chunkStop - chunkStart + startOffset - offset ,
chunkOffset : uint64 ( bufferOffset ) ,
} )
startOffset , remaining = chunkStop , remaining - ( chunkStop - chunkStart )
}
// Zero-fill gaps
for _ , gap := range gaps {
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , offset + gap . start , offset + gap . start + gap . length )
n += zero ( p , gap . start , gap . length )
}
// If only one chunk or random access mode, use sequential reading
if len ( tasks ) <= 1 || c . readerPattern . IsRandomMode ( ) {
for _ , task := range tasks {
copied , readErr := c . readChunkSliceAt ( ctx , p [ task . bufferStart : task . bufferEnd ] , task . chunk , nil , task . chunkOffset )
if readErr != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , task . chunk , readErr )
return n + copied , task . chunk . ModifiedTsNs , readErr
}
n += copied
ts = task . chunk . ModifiedTsNs
}
} else {
// Parallel chunk fetching for multiple chunks
// This significantly improves throughput when chunks are on different volume servers
g , gCtx := errgroup . WithContext ( ctx )
// Limit concurrency to prefetchCount to avoid overwhelming the system
concurrency := c . prefetchCount
if concurrency < 4 {
concurrency = 4
}
if concurrency > len ( tasks ) {
concurrency = len ( tasks )
}
g . SetLimit ( concurrency )
for _ , task := range tasks {
task := task // capture for closure
g . Go ( func ( ) error {
// Read directly into the correct position in the output buffer
copied , readErr := c . readChunkSliceAtForParallel ( gCtx , p [ task . bufferStart : task . bufferEnd ] , task . chunk , task . chunkOffset )
task . bytesRead = copied
task . modifiedTsNs = task . chunk . ModifiedTsNs
task . err = readErr
if readErr != nil {
return readErr
}
return nil
} )
}
// Wait for all chunk reads to complete
groupErr := g . Wait ( )
// Aggregate results (order is preserved since we read directly into buffer positions)
for _ , task := range tasks {
n += task . bytesRead
ts = max ( ts , task . modifiedTsNs )
if task . err != nil && err == nil {
err = task . err
}
}
if groupErr != nil && err == nil {
err = groupErr
}
n += copied
startOffset , remaining = startOffset + int64 ( copied ) , remaining - int64 ( copied )
if err != nil {
return n , ts , err
}
}
// glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
// Trigger prefetch for sequential reads
if lastChunk != nil && lastChunk . Next != nil && c . prefetchCount > 0 && ! c . readerPattern . IsRandomMode ( ) {
c . readerCache . MaybeCache ( lastChunk . Next , c . prefetchCount )
}
// zero the remaining bytes if a gap exists at the end of the last chunk (or a fully sparse file)
if err == nil && remaining > 0 {
// Z ero the remaining bytes if a gap exists at the end
if remaining > 0 {
var delta int64
if c . fileSize >= startOffset {
delta = min ( remaining , c . fileSize - startOffset )
startOffset -= offset
}
if delta > 0 {
glog . V ( 4 ) . Infof ( "zero2 [%d,%d) of file size %d bytes" , startOffset , startOffset + delta , c . fileSize )
n += zero ( p , startOffset , delta )
bufStart := startOffset - offset
if delta > 0 {
glog . V ( 4 ) . Infof ( "zero2 [%d,%d) of file size %d bytes" , startOffset , startOffset + delta , c . fileSize )
n += zero ( p , bufStart , delta )
}
}
}
if err == nil && offset + int64 ( len ( p ) ) >= c . fileSize {
err = io . EOF
}
// fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
return
}
func ( c * ChunkReadAt ) readChunkSliceAt ( ctx context . Context , buffer [ ] byte , chunkView * ChunkView , nextChunkViews * Interval [ * ChunkView ] , offset uint64 ) ( n int , err error ) {
@ -266,6 +351,13 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
return
}
// readChunkSliceAtForParallel is a simplified version for parallel chunk fetching
// It doesn't update lastChunkFid or trigger prefetch (handled by the caller)
func ( c * ChunkReadAt ) readChunkSliceAtForParallel ( ctx context . Context , buffer [ ] byte , chunkView * ChunkView , offset uint64 ) ( n int , err error ) {
shouldCache := ( uint64 ( chunkView . ViewOffset ) + chunkView . ChunkSize ) <= c . readerCache . chunkCache . GetMaxFilePartSizeInCache ( )
return c . readerCache . ReadChunkAt ( buffer , chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped , int64 ( offset ) , int ( chunkView . ChunkSize ) , shouldCache )
}
func zero ( buffer [ ] byte , start , length int64 ) int {
if length <= 0 {
return 0