Browse Source

address review comments (round 2)

- Return io.ErrUnexpectedEOF when HTTP response is truncated
  This prevents silent data corruption from incomplete reads
- Simplify errgroup error handling by using g.Wait() error directly
  Remove redundant task.err field and manual error aggregation loop
- Define minReadConcurrency constant instead of magic number 4
  Improves code readability and maintainability

Note: Context propagation to startCaching() is intentionally NOT changed.
The downloaded chunk is a shared resource that may be used by multiple
readers. Using context.Background() ensures the download completes even
if one reader cancels, preventing data loss for other waiting readers.
pull/7627/head
chrislu 6 days ago
parent
commit
f908a006fc
  1. 25
      weed/filer/reader_at.go
  2. 5
      weed/util/http/http_global_client_util.go

25
weed/filer/reader_at.go

@ -21,6 +21,11 @@ import (
// the prefetch count is derived from the -concurrentReaders option.
const DefaultPrefetchCount = 4
// minReadConcurrency is the minimum number of parallel chunk fetches.
// This ensures at least some parallelism even when prefetchCount is low,
// improving throughput for reads spanning multiple chunks.
const minReadConcurrency = 4
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews *IntervalList[*ChunkView]
@ -185,7 +190,6 @@ type chunkReadTask struct {
chunkOffset uint64 // offset within the chunk to read from
bytesRead int
modifiedTsNs int64
err error
}
func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) {
@ -252,10 +256,10 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i
// This significantly improves throughput when chunks are on different volume servers
g, gCtx := errgroup.WithContext(ctx)
// Limit concurrency to prefetchCount to avoid overwhelming the system
// Limit concurrency to avoid overwhelming the system
concurrency := c.prefetchCount
if concurrency < 4 {
concurrency = 4
if concurrency < minReadConcurrency {
concurrency = minReadConcurrency
}
if concurrency > len(tasks) {
concurrency = len(tasks)
@ -269,24 +273,19 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i
copied, readErr := c.readChunkSliceAtForParallel(gCtx, p[task.bufferStart:task.bufferEnd], task.chunk, task.chunkOffset)
task.bytesRead = copied
task.modifiedTsNs = task.chunk.ModifiedTsNs
task.err = readErr
if readErr != nil {
return readErr
}
return nil
return readErr
})
}
// Wait for all chunk reads to complete
_ = g.Wait()
if waitErr := g.Wait(); waitErr != nil {
err = waitErr
}
// Aggregate results (order is preserved since we read directly into buffer positions)
for _, task := range tasks {
n += task.bytesRead
ts = max(ts, task.modifiedTsNs)
if task.err != nil && err == nil {
err = task.err
}
}
if err != nil {

5
weed/util/http/http_global_client_util.go

@ -645,6 +645,11 @@ func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []by
totalRead += m
if readErr != nil {
if readErr == io.EOF {
// Return io.ErrUnexpectedEOF if we haven't filled the buffer
// This prevents silent data corruption from truncated responses
if totalRead < len(buffer) {
return totalRead, true, io.ErrUnexpectedEOF
}
return totalRead, false, nil
}
return totalRead, true, readErr

Loading…
Cancel
Save