From f908a006fc47943f20f8454a552fdb0d5ce751ac Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 4 Dec 2025 21:01:35 -0800 Subject: [PATCH] address review comments (round 2) - Return io.ErrUnexpectedEOF when HTTP response is truncated This prevents silent data corruption from incomplete reads - Simplify errgroup error handling by using g.Wait() error directly Remove redundant task.err field and manual error aggregation loop - Define minReadConcurrency constant instead of magic number 4 Improves code readability and maintainability Note: Context propagation to startCaching() is intentionally NOT changed. The downloaded chunk is a shared resource that may be used by multiple readers. Using context.Background() ensures the download completes even if one reader cancels, preventing data loss for other waiting readers. --- weed/filer/reader_at.go | 25 +++++++++++------------ weed/util/http/http_global_client_util.go | 5 +++++ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 917f9e029..4e2719e00 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -21,6 +21,11 @@ import ( // the prefetch count is derived from the -concurrentReaders option. const DefaultPrefetchCount = 4 +// minReadConcurrency is the minimum number of parallel chunk fetches. +// This ensures at least some parallelism even when prefetchCount is low, +// improving throughput for reads spanning multiple chunks. +const minReadConcurrency = 4 + type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews *IntervalList[*ChunkView] @@ -185,7 +190,6 @@ type chunkReadTask struct { chunkOffset uint64 // offset within the chunk to read from bytesRead int modifiedTsNs int64 - err error } func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) { @@ -252,10 +256,10 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i // This significantly improves throughput when chunks are on different volume servers g, gCtx := errgroup.WithContext(ctx) - // Limit concurrency to prefetchCount to avoid overwhelming the system + // Limit concurrency to avoid overwhelming the system concurrency := c.prefetchCount - if concurrency < 4 { - concurrency = 4 + if concurrency < minReadConcurrency { + concurrency = minReadConcurrency } if concurrency > len(tasks) { concurrency = len(tasks) @@ -269,24 +273,19 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i copied, readErr := c.readChunkSliceAtForParallel(gCtx, p[task.bufferStart:task.bufferEnd], task.chunk, task.chunkOffset) task.bytesRead = copied task.modifiedTsNs = task.chunk.ModifiedTsNs - task.err = readErr - if readErr != nil { - return readErr - } - return nil + return readErr }) } // Wait for all chunk reads to complete - _ = g.Wait() + if waitErr := g.Wait(); waitErr != nil { + err = waitErr + } // Aggregate results (order is preserved since we read directly into buffer positions) for _, task := range tasks { n += task.bytesRead ts = max(ts, task.modifiedTsNs) - if task.err != nil && err == nil { - err = task.err - } } if err != nil { diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 128a81143..9fa2c6e40 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -645,6 +645,11 @@ func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []by totalRead += m if readErr != nil { if readErr == io.EOF { + // Return io.ErrUnexpectedEOF if we haven't filled the buffer + // This prevents silent data corruption from truncated responses + if totalRead < len(buffer) { + return totalRead, true, io.ErrUnexpectedEOF + } return totalRead, false, nil } return totalRead, true, readErr