diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 93fa76a2e..6d091286d 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -7,6 +7,8 @@ import ( "math/rand" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -175,67 +177,150 @@ func (c *ChunkReadAt) ReadAtWithTime(ctx context.Context, p []byte, offset int64 return c.doReadAt(ctx, p, offset) } +// chunkReadTask represents a single chunk read operation for parallel processing +type chunkReadTask struct { + chunk *ChunkView + bufferStart int64 // start position in the output buffer + bufferEnd int64 // end position in the output buffer + chunkOffset uint64 // offset within the chunk to read from + bytesRead int + modifiedTsNs int64 + err error +} + func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) { + // Collect all chunk read tasks + var tasks []*chunkReadTask + var gaps []struct{ start, length int64 } // gaps that need zero-filling + startOffset, remaining := offset, int64(len(p)) - var nextChunks *Interval[*ChunkView] + var lastChunk *Interval[*ChunkView] + for x := c.chunkViews.Front(); x != nil; x = x.Next { chunk := x.Value if remaining <= 0 { break } - if x.Next != nil { - nextChunks = x.Next - } + lastChunk = x + + // Handle gap before this chunk if startOffset < chunk.ViewOffset { gap := chunk.ViewOffset - startOffset - glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset) - n += zero(p, startOffset-offset, gap) + gaps = append(gaps, struct{ start, length int64 }{startOffset - offset, gap}) startOffset, remaining = chunk.ViewOffset, remaining-gap if remaining <= 0 { break } } - // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize)) + chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining) if chunkStart >= chunkStop { continue } - // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize)) + bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk - ts = chunk.ModifiedTsNs - copied, err := c.readChunkSliceAt(ctx, p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) - if err != nil { - glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return copied, ts, err + tasks = append(tasks, &chunkReadTask{ + chunk: chunk, + bufferStart: startOffset - offset, + bufferEnd: chunkStop - chunkStart + startOffset - offset, + chunkOffset: uint64(bufferOffset), + }) + + startOffset, remaining = chunkStop, remaining-(chunkStop-chunkStart) + } + + // Zero-fill gaps + for _, gap := range gaps { + glog.V(4).Infof("zero [%d,%d)", offset+gap.start, offset+gap.start+gap.length) + n += zero(p, gap.start, gap.length) + } + + // If only one chunk or random access mode, use sequential reading + if len(tasks) <= 1 || c.readerPattern.IsRandomMode() { + for _, task := range tasks { + copied, readErr := c.readChunkSliceAt(ctx, p[task.bufferStart:task.bufferEnd], task.chunk, nil, task.chunkOffset) + if readErr != nil { + glog.Errorf("fetching chunk %+v: %v\n", task.chunk, readErr) + return n + copied, task.chunk.ModifiedTsNs, readErr + } + n += copied + ts = task.chunk.ModifiedTsNs + } + } else { + // Parallel chunk fetching for multiple chunks + // This significantly improves throughput when chunks are on different volume servers + g, gCtx := errgroup.WithContext(ctx) + + // Limit concurrency to prefetchCount to avoid overwhelming the system + concurrency := c.prefetchCount + if concurrency < 4 { + concurrency = 4 + } + if concurrency > len(tasks) { + concurrency = len(tasks) + } + g.SetLimit(concurrency) + + for _, task := range tasks { + task := task // capture for closure + g.Go(func() error { + // Read directly into the correct position in the output buffer + copied, readErr := c.readChunkSliceAtForParallel(gCtx, p[task.bufferStart:task.bufferEnd], task.chunk, task.chunkOffset) + task.bytesRead = copied + task.modifiedTsNs = task.chunk.ModifiedTsNs + task.err = readErr + if readErr != nil { + return readErr + } + return nil + }) + } + + // Wait for all chunk reads to complete + groupErr := g.Wait() + + // Aggregate results (order is preserved since we read directly into buffer positions) + for _, task := range tasks { + n += task.bytesRead + ts = max(ts, task.modifiedTsNs) + if task.err != nil && err == nil { + err = task.err + } + } + + if groupErr != nil && err == nil { + err = groupErr } - n += copied - startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) + if err != nil { + return n, ts, err + } } - // glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) + // Trigger prefetch for sequential reads + if lastChunk != nil && lastChunk.Next != nil && c.prefetchCount > 0 && !c.readerPattern.IsRandomMode() { + c.readerCache.MaybeCache(lastChunk.Next, c.prefetchCount) + } - // zero the remaining bytes if a gap exists at the end of the last chunk (or a fully sparse file) - if err == nil && remaining > 0 { + // Zero the remaining bytes if a gap exists at the end + if remaining > 0 { var delta int64 if c.fileSize >= startOffset { delta = min(remaining, c.fileSize-startOffset) - startOffset -= offset - } - if delta > 0 { - glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize) - n += zero(p, startOffset, delta) + bufStart := startOffset - offset + if delta > 0 { + glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize) + n += zero(p, bufStart, delta) + } } } if err == nil && offset+int64(len(p)) >= c.fileSize { err = io.EOF } - // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err) return - } func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) { @@ -266,6 +351,13 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk return } +// readChunkSliceAtForParallel is a simplified version for parallel chunk fetching +// It doesn't update lastChunkFid or trigger prefetch (handled by the caller) +func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) { + shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() + return c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) +} + func zero(buffer []byte, start, length int64) int { if length <= 0 { return 0