Browse Source

mount: improve read throughput with parallel chunk fetching (#7627)

* filer: remove lock contention during chunk download

This addresses issue #7504 where a single weed mount FUSE instance
does not fully utilize node network bandwidth when reading large files.

The SingleChunkCacher was holding a mutex during the entire HTTP download,
causing readers to block until the download completed. This serialized
chunk reads even when multiple goroutines were downloading in parallel.

Changes:
- Add sync.Cond to SingleChunkCacher for efficient waiting
- Move HTTP download outside the critical section in startCaching()
- Use condition variable in readChunkAt() to wait for download completion
- Add isComplete flag to track download state

Now multiple chunk downloads can proceed truly in parallel, and readers
wait efficiently using the condition variable instead of blocking on
a mutex held during I/O operations.

Ref: #7504

* filer: parallel chunk fetching within doReadAt

This addresses issue #7504 by enabling parallel chunk downloads within
a single read operation.

Previously, doReadAt() processed chunks sequentially in a loop, meaning
each chunk had to be fully downloaded before the next one started.
This left significant network bandwidth unused when chunks resided on
different volume servers.

Changes:
- Collect all chunk read tasks upfront
- Use errgroup to fetch multiple chunks in parallel
- Each chunk reads directly into its correct buffer position
- Limit concurrency to prefetchCount (min 4) to avoid overwhelming the system
- Handle gaps and zero-filling before parallel fetch
- Trigger prefetch after parallel reads complete

For a read spanning N chunks on different volume servers, this can
now utilize up to N times the bandwidth of a single connection.

Ref: #7504

* http: direct buffer read to reduce memory copies

This addresses issue #7504 by reducing memory copy overhead during
chunk downloads.

Previously, RetriedFetchChunkData used ReadUrlAsStream which:
1. Allocated a 64KB intermediate buffer
2. Read data in 64KB chunks
3. Called a callback to copy each chunk to the destination

For a 16MB chunk, this meant 256 copy operations plus the callback
overhead. Profiling showed significant time spent in memmove.

Changes:
- Add readUrlDirectToBuffer() that reads directly into the destination
- Add retriedFetchChunkDataDirect() for unencrypted, non-gzipped chunks
- Automatically use direct read path when possible (cipher=nil, gzip=false)
- Use http.NewRequestWithContext for proper cancellation

For unencrypted chunks (the common case), this eliminates the
intermediate buffer entirely, reading HTTP response bytes directly
into the final destination buffer.

Ref: #7504

* address review comments

- Use channel (done) instead of sync.Cond for download completion signaling
  This integrates better with context cancellation patterns
- Remove redundant groupErr check in reader_at.go (errors are already captured in task.err)
- Remove buggy URL encoding logic from retriedFetchChunkDataDirect
  (The existing url.PathEscape on full URL is a pre-existing bug that should be fixed separately)

* address review comments (round 2)

- Return io.ErrUnexpectedEOF when HTTP response is truncated
  This prevents silent data corruption from incomplete reads
- Simplify errgroup error handling by using g.Wait() error directly
  Remove redundant task.err field and manual error aggregation loop
- Define minReadConcurrency constant instead of magic number 4
  Improves code readability and maintainability

Note: Context propagation to startCaching() is intentionally NOT changed.
The downloaded chunk is a shared resource that may be used by multiple
readers. Using context.Background() ensures the download completes even
if one reader cancels, preventing data loss for other waiting readers.

* http: inject request ID for observability in direct read path

Add request_id.InjectToRequest() call to readUrlDirectToBuffer() for
consistency with ReadUrlAsStream path. This ensures full-chunk reads
carry the same tracing/correlation headers for server logs and metrics.

* filer: consistent timestamp handling in sequential read path

Use max(ts, task.chunk.ModifiedTsNs) in sequential path to match
parallel path behavior. Also update ts before error check so that
on failure, the returned timestamp reflects the max of all chunks
processed so far.

* filer: document why context.Background() is used in startCaching

Add comment explaining the intentional design decision: the downloaded
chunk is a shared resource that may be used by multiple concurrent
readers. Using context.Background() ensures the download completes
even if one reader cancels, preventing errors for other waiting readers.

* filer: propagate context for reader cancellation

Address review comment: pass context through ReadChunkAt call chain so
that a reader can cancel its wait for a download. The key distinction is:

- Download uses context.Background() - shared resource, always completes
- Reader wait uses request context - can be cancelled individually

If a reader cancels, it stops waiting and returns ctx.Err(), but the
download continues to completion for other readers waiting on the same
chunk. This properly handles the shared resource semantics while still
allowing individual reader cancellation.

* filer: use defer for close(done) to guarantee signal on panic

Move close(s.done) to a defer statement at the start of startCaching()
to ensure the completion signal is always sent, even if an unexpected
panic occurs. This prevents readers from blocking indefinitely.

* filer: remove unnecessary code

- Remove close(s.cacheStartedCh) in destroy() - the channel is only used
  for one-time synchronization, closing it provides no benefit
- Remove task := task loop variable capture - Go 1.22+ fixed loop variable
  semantics, this capture is no longer necessary (go.mod specifies Go 1.24.0)

* filer: restore fallback to chunkCache when cacher returns no data

Fix critical issue where ReadChunkAt would return 0,nil immediately
if SingleChunkCacher couldn't provide data for the requested offset,
without trying the chunkCache fallback. Now if cacher.readChunkAt
returns n=0 and err=nil, we fall through to try chunkCache.

* filer: add comprehensive tests for ReaderCache

Tests cover:
- Context cancellation while waiting for download
- Fallback to chunkCache when cacher returns n=0, err=nil
- Multiple concurrent readers waiting for same chunk
- Partial reads at different offsets
- Downloader cleanup when exceeding cache limit
- Done channel signaling (no hangs on completion)

* filer: prioritize done channel over context cancellation

If data is already available (done channel closed), return it even if
the reader's context is also cancelled. This avoids unnecessary errors
when the download has already completed.

* filer: add lookup error test and document test limitations

Add TestSingleChunkCacherLookupError to test error handling when lookup
fails. Document that full HTTP integration tests for SingleChunkCacher
require global HTTP client initialization which is complex in unit tests.
The download path is tested via FUSE integration tests.

* filer: add tests that exercise SingleChunkCacher concurrency logic

Add tests that use blocking lookupFileIdFn to exercise the actual
SingleChunkCacher wait/cancellation logic:

- TestSingleChunkCacherContextCancellationDuringLookup: tests reader
  cancellation while lookup is blocked
- TestSingleChunkCacherMultipleReadersWaitForDownload: tests multiple
  readers waiting on the same download
- TestSingleChunkCacherOneReaderCancelsOthersContinue: tests that when
  one reader cancels, other readers continue waiting

These tests properly exercise the done channel wait/cancel logic without
requiring HTTP calls - the blocking lookup simulates a slow download.
pull/7632/head
Chris Lu 5 days ago
committed by GitHub
parent
commit
5c1de633cb
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 138
      weed/filer/reader_at.go
  2. 86
      weed/filer/reader_cache.go
  3. 505
      weed/filer/reader_cache_test.go
  4. 108
      weed/util/http/http_global_client_util.go

138
weed/filer/reader_at.go

@ -7,6 +7,8 @@ import (
"math/rand"
"sync"
"golang.org/x/sync/errgroup"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@ -19,6 +21,11 @@ import (
// the prefetch count is derived from the -concurrentReaders option.
const DefaultPrefetchCount = 4
// minReadConcurrency is the minimum number of parallel chunk fetches.
// This ensures at least some parallelism even when prefetchCount is low,
// improving throughput for reads spanning multiple chunks.
const minReadConcurrency = 4
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews *IntervalList[*ChunkView]
@ -175,67 +182,139 @@ func (c *ChunkReadAt) ReadAtWithTime(ctx context.Context, p []byte, offset int64
return c.doReadAt(ctx, p, offset)
}
// chunkReadTask represents a single chunk read operation for parallel processing
type chunkReadTask struct {
chunk *ChunkView
bufferStart int64 // start position in the output buffer
bufferEnd int64 // end position in the output buffer
chunkOffset uint64 // offset within the chunk to read from
bytesRead int
modifiedTsNs int64
}
func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) {
// Collect all chunk read tasks
var tasks []*chunkReadTask
var gaps []struct{ start, length int64 } // gaps that need zero-filling
startOffset, remaining := offset, int64(len(p))
var nextChunks *Interval[*ChunkView]
var lastChunk *Interval[*ChunkView]
for x := c.chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
if remaining <= 0 {
break
}
if x.Next != nil {
nextChunks = x.Next
}
lastChunk = x
// Handle gap before this chunk
if startOffset < chunk.ViewOffset {
gap := chunk.ViewOffset - startOffset
glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset)
n += zero(p, startOffset-offset, gap)
gaps = append(gaps, struct{ start, length int64 }{startOffset - offset, gap})
startOffset, remaining = chunk.ViewOffset, remaining-gap
if remaining <= 0 {
break
}
}
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining)
if chunkStart >= chunkStop {
continue
}
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk
ts = chunk.ModifiedTsNs
copied, err := c.readChunkSliceAt(ctx, p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
return copied, ts, err
tasks = append(tasks, &chunkReadTask{
chunk: chunk,
bufferStart: startOffset - offset,
bufferEnd: chunkStop - chunkStart + startOffset - offset,
chunkOffset: uint64(bufferOffset),
})
startOffset, remaining = chunkStop, remaining-(chunkStop-chunkStart)
}
// Zero-fill gaps
for _, gap := range gaps {
glog.V(4).Infof("zero [%d,%d)", offset+gap.start, offset+gap.start+gap.length)
n += zero(p, gap.start, gap.length)
}
// If only one chunk or random access mode, use sequential reading
if len(tasks) <= 1 || c.readerPattern.IsRandomMode() {
for _, task := range tasks {
copied, readErr := c.readChunkSliceAt(ctx, p[task.bufferStart:task.bufferEnd], task.chunk, nil, task.chunkOffset)
ts = max(ts, task.chunk.ModifiedTsNs)
if readErr != nil {
glog.Errorf("fetching chunk %+v: %v\n", task.chunk, readErr)
return n + copied, ts, readErr
}
n += copied
}
} else {
// Parallel chunk fetching for multiple chunks
// This significantly improves throughput when chunks are on different volume servers
g, gCtx := errgroup.WithContext(ctx)
// Limit concurrency to avoid overwhelming the system
concurrency := c.prefetchCount
if concurrency < minReadConcurrency {
concurrency = minReadConcurrency
}
if concurrency > len(tasks) {
concurrency = len(tasks)
}
g.SetLimit(concurrency)
for _, task := range tasks {
g.Go(func() error {
// Read directly into the correct position in the output buffer
copied, readErr := c.readChunkSliceAtForParallel(gCtx, p[task.bufferStart:task.bufferEnd], task.chunk, task.chunkOffset)
task.bytesRead = copied
task.modifiedTsNs = task.chunk.ModifiedTsNs
return readErr
})
}
n += copied
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
// Wait for all chunk reads to complete
if waitErr := g.Wait(); waitErr != nil {
err = waitErr
}
// Aggregate results (order is preserved since we read directly into buffer positions)
for _, task := range tasks {
n += task.bytesRead
ts = max(ts, task.modifiedTsNs)
}
if err != nil {
return n, ts, err
}
}
// glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
// Trigger prefetch for sequential reads
if lastChunk != nil && lastChunk.Next != nil && c.prefetchCount > 0 && !c.readerPattern.IsRandomMode() {
c.readerCache.MaybeCache(lastChunk.Next, c.prefetchCount)
}
// zero the remaining bytes if a gap exists at the end of the last chunk (or a fully sparse file)
if err == nil && remaining > 0 {
// Zero the remaining bytes if a gap exists at the end
if remaining > 0 {
var delta int64
if c.fileSize >= startOffset {
delta = min(remaining, c.fileSize-startOffset)
startOffset -= offset
}
if delta > 0 {
glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize)
n += zero(p, startOffset, delta)
bufStart := startOffset - offset
if delta > 0 {
glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize)
n += zero(p, bufStart, delta)
}
}
}
if err == nil && offset+int64(len(p)) >= c.fileSize {
err = io.EOF
}
// fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
return
}
func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
@ -249,7 +328,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
}
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
n, err = c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
if c.lastChunkFid != chunkView.FileId {
if chunkView.OffsetInChunk == 0 { // start of a new chunk
if c.lastChunkFid != "" {
@ -266,6 +345,13 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
return
}
// readChunkSliceAtForParallel is a simplified version for parallel chunk fetching
// It doesn't update lastChunkFid or trigger prefetch (handled by the caller)
func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) {
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
return c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
}
func zero(buffer []byte, start, length int64) int {
if length <= 0 {
return 0

86
weed/filer/reader_cache.go

@ -35,6 +35,7 @@ type SingleChunkCacher struct {
shouldCache bool
wg sync.WaitGroup
cacheStartedCh chan struct{}
done chan struct{} // signals when download is complete
}
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@ -93,14 +94,18 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) {
return
}
func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
rc.Lock()
if cacher, found := rc.downloaders[fileId]; found {
if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
rc.Unlock()
rc.Unlock()
n, err := cacher.readChunkAt(ctx, buffer, offset)
if n > 0 || err != nil {
return n, err
}
// If n=0 and err=nil, the cacher couldn't provide data for this offset.
// Fall through to try chunkCache.
rc.Lock()
}
if shouldCache || rc.lookupFileIdFn == nil {
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
@ -134,7 +139,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
rc.downloaders[fileId] = cacher
rc.Unlock()
return cacher.readChunkAt(buffer, offset)
return cacher.readChunkAt(ctx, buffer, offset)
}
func (rc *ReaderCache) UnCache(fileId string) {
@ -166,38 +171,53 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte,
chunkSize: chunkSize,
shouldCache: shouldCache,
cacheStartedCh: make(chan struct{}),
done: make(chan struct{}),
}
}
// startCaching downloads the chunk data in the background.
// It does NOT hold the lock during the HTTP download to allow concurrent readers
// to wait efficiently using the done channel.
func (s *SingleChunkCacher) startCaching() {
s.wg.Add(1)
defer s.wg.Done()
s.Lock()
defer s.Unlock()
defer close(s.done) // guarantee completion signal even on panic
s.cacheStartedCh <- struct{}{} // means this has been started
s.cacheStartedCh <- struct{}{} // signal that we've started
// Note: We intentionally use context.Background() here, NOT a request-specific context.
// The downloaded chunk is a shared resource - multiple concurrent readers may be waiting
// for this same download to complete. If we used a request context and that request was
// cancelled, it would abort the download and cause errors for all other waiting readers.
// The download should always complete once started to serve all potential consumers.
// Lookup file ID without holding the lock
urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId)
if err != nil {
s.Lock()
s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
s.Unlock()
return
}
s.data = mem.Allocate(s.chunkSize)
_, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
if s.err != nil {
mem.Free(s.data)
s.data = nil
return
}
// Allocate buffer and download without holding the lock
// This allows multiple downloads to proceed in parallel
data := mem.Allocate(s.chunkSize)
_, fetchErr := util_http.RetriedFetchChunkData(context.Background(), data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
// Now acquire lock to update state
s.Lock()
if fetchErr != nil {
mem.Free(data)
s.err = fetchErr
} else {
s.data = data
if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
}
atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
}
atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
return
s.Unlock()
}
func (s *SingleChunkCacher) destroy() {
@ -209,13 +229,34 @@ func (s *SingleChunkCacher) destroy() {
if s.data != nil {
mem.Free(s.data)
s.data = nil
close(s.cacheStartedCh)
}
}
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
// readChunkAt reads data from the cached chunk.
// It waits for the download to complete if it's still in progress.
// The ctx parameter allows the reader to cancel its wait (but the download continues
// for other readers - see comment in startCaching about shared resource semantics).
func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) {
s.wg.Add(1)
defer s.wg.Done()
// Wait for download to complete, but allow reader cancellation.
// Prioritize checking done first - if data is already available,
// return it even if context is also cancelled.
select {
case <-s.done:
// Download already completed, proceed immediately
default:
// Download not complete, wait for it or context cancellation
select {
case <-s.done:
// Download completed
case <-ctx.Done():
// Reader cancelled while waiting - download continues for other readers
return 0, ctx.Err()
}
}
s.Lock()
defer s.Unlock()
@ -228,5 +269,4 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
}
return copy(buf, s.data[offset:]), nil
}

505
weed/filer/reader_cache_test.go

@ -0,0 +1,505 @@
package filer
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
// mockChunkCacheForReaderCache implements chunk cache for testing
type mockChunkCacheForReaderCache struct {
data map[string][]byte
hitCount int32
mu sync.Mutex
}
func newMockChunkCacheForReaderCache() *mockChunkCacheForReaderCache {
return &mockChunkCacheForReaderCache{
data: make(map[string][]byte),
}
}
func (m *mockChunkCacheForReaderCache) GetChunk(fileId string, minSize uint64) []byte {
m.mu.Lock()
defer m.mu.Unlock()
if d, ok := m.data[fileId]; ok {
atomic.AddInt32(&m.hitCount, 1)
return d
}
return nil
}
func (m *mockChunkCacheForReaderCache) ReadChunkAt(data []byte, fileId string, offset uint64) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
if d, ok := m.data[fileId]; ok && int(offset) < len(d) {
atomic.AddInt32(&m.hitCount, 1)
n := copy(data, d[offset:])
return n, nil
}
return 0, nil
}
func (m *mockChunkCacheForReaderCache) SetChunk(fileId string, data []byte) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[fileId] = data
}
func (m *mockChunkCacheForReaderCache) GetMaxFilePartSizeInCache() uint64 {
return 1024 * 1024 // 1MB
}
func (m *mockChunkCacheForReaderCache) IsInCache(fileId string, lockNeeded bool) bool {
m.mu.Lock()
defer m.mu.Unlock()
_, ok := m.data[fileId]
return ok
}
// TestReaderCacheContextCancellation tests that a reader can cancel its wait
// while the download continues for other readers
func TestReaderCacheContextCancellation(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
// Create a ReaderCache - we can't easily test the full flow without mocking HTTP,
// but we can test the context cancellation in readChunkAt
rc := NewReaderCache(10, cache, nil)
defer rc.destroy()
// Pre-populate cache to avoid HTTP calls
testData := []byte("test data for context cancellation")
cache.SetChunk("test-file-1", testData)
// Test that context cancellation works
ctx, cancel := context.WithCancel(context.Background())
buffer := make([]byte, len(testData))
n, err := rc.ReadChunkAt(ctx, buffer, "test-file-1", nil, false, 0, len(testData), true)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
if n != len(testData) {
t.Errorf("Expected %d bytes, got %d", len(testData), n)
}
// Cancel context and verify it doesn't affect already completed reads
cancel()
// Subsequent read with cancelled context should still work from cache
buffer2 := make([]byte, len(testData))
n2, err2 := rc.ReadChunkAt(ctx, buffer2, "test-file-1", nil, false, 0, len(testData), true)
// Note: This may or may not error depending on whether it hits cache
_ = n2
_ = err2
}
// TestReaderCacheFallbackToChunkCache tests that when a cacher returns n=0, err=nil,
// we fall back to the chunkCache
func TestReaderCacheFallbackToChunkCache(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
// Pre-populate the chunk cache with data
testData := []byte("fallback test data that should be found in chunk cache")
cache.SetChunk("fallback-file", testData)
rc := NewReaderCache(10, cache, nil)
defer rc.destroy()
// Read should hit the chunk cache
buffer := make([]byte, len(testData))
n, err := rc.ReadChunkAt(context.Background(), buffer, "fallback-file", nil, false, 0, len(testData), true)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
if n != len(testData) {
t.Errorf("Expected %d bytes, got %d", len(testData), n)
}
// Verify cache was hit
if cache.hitCount == 0 {
t.Error("Expected chunk cache to be hit")
}
}
// TestReaderCacheMultipleReadersWaitForSameChunk tests that multiple readers
// can wait for the same chunk download to complete
func TestReaderCacheMultipleReadersWaitForSameChunk(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
// Pre-populate cache so we don't need HTTP
testData := make([]byte, 1024)
for i := range testData {
testData[i] = byte(i % 256)
}
cache.SetChunk("shared-chunk", testData)
rc := NewReaderCache(10, cache, nil)
defer rc.destroy()
// Launch multiple concurrent readers for the same chunk
numReaders := 10
var wg sync.WaitGroup
errors := make(chan error, numReaders)
bytesRead := make(chan int, numReaders)
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buffer := make([]byte, len(testData))
n, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, len(testData), true)
if err != nil {
errors <- err
}
bytesRead <- n
}()
}
wg.Wait()
close(errors)
close(bytesRead)
// Check for errors
for err := range errors {
t.Errorf("Reader got error: %v", err)
}
// Verify all readers got the expected data
for n := range bytesRead {
if n != len(testData) {
t.Errorf("Expected %d bytes, got %d", len(testData), n)
}
}
}
// TestReaderCachePartialRead tests reading at different offsets
func TestReaderCachePartialRead(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
testData := []byte("0123456789ABCDEFGHIJ")
cache.SetChunk("partial-read-file", testData)
rc := NewReaderCache(10, cache, nil)
defer rc.destroy()
tests := []struct {
name string
offset int64
size int
expected []byte
}{
{"read from start", 0, 5, []byte("01234")},
{"read from middle", 5, 5, []byte("56789")},
{"read to end", 15, 5, []byte("FGHIJ")},
{"read single byte", 10, 1, []byte("A")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buffer := make([]byte, tt.size)
n, err := rc.ReadChunkAt(context.Background(), buffer, "partial-read-file", nil, false, tt.offset, len(testData), true)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
if n != tt.size {
t.Errorf("Expected %d bytes, got %d", tt.size, n)
}
if string(buffer[:n]) != string(tt.expected) {
t.Errorf("Expected %q, got %q", tt.expected, buffer[:n])
}
})
}
}
// TestReaderCacheCleanup tests that old downloaders are cleaned up
func TestReaderCacheCleanup(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
// Create cache with limit of 3
rc := NewReaderCache(3, cache, nil)
defer rc.destroy()
// Add data for multiple files
for i := 0; i < 5; i++ {
fileId := string(rune('A' + i))
data := []byte("data for file " + fileId)
cache.SetChunk(fileId, data)
}
// Read from multiple files - should trigger cleanup when exceeding limit
for i := 0; i < 5; i++ {
fileId := string(rune('A' + i))
buffer := make([]byte, 20)
_, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true)
if err != nil {
t.Errorf("Read error for file %s: %v", fileId, err)
}
}
// Cache should still work - reads should succeed
for i := 0; i < 5; i++ {
fileId := string(rune('A' + i))
buffer := make([]byte, 20)
n, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true)
if err != nil {
t.Errorf("Second read error for file %s: %v", fileId, err)
}
if n == 0 {
t.Errorf("Expected data for file %s, got 0 bytes", fileId)
}
}
}
// TestSingleChunkCacherDoneSignal tests that done channel is always closed
func TestSingleChunkCacherDoneSignal(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
rc := NewReaderCache(10, cache, nil)
defer rc.destroy()
// Test that we can read even when data is in cache (done channel should work)
testData := []byte("done signal test")
cache.SetChunk("done-signal-test", testData)
// Multiple goroutines reading same chunk
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buffer := make([]byte, len(testData))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
n, err := rc.ReadChunkAt(ctx, buffer, "done-signal-test", nil, false, 0, len(testData), true)
if err != nil && err != context.DeadlineExceeded {
t.Errorf("Unexpected error: %v", err)
}
if n == 0 && err == nil {
t.Error("Got 0 bytes with no error")
}
}()
}
// Should complete without hanging
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Success
case <-time.After(10 * time.Second):
t.Fatal("Test timed out - done channel may not be signaled correctly")
}
}
// ============================================================================
// Tests that exercise SingleChunkCacher concurrency logic
// ============================================================================
//
// These tests use blocking lookupFileIdFn to exercise the wait/cancellation
// logic in SingleChunkCacher without requiring HTTP calls.
// TestSingleChunkCacherLookupError tests handling of lookup errors
func TestSingleChunkCacherLookupError(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
// Lookup function that returns an error
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
return nil, fmt.Errorf("lookup failed for %s", fileId)
}
rc := NewReaderCache(10, cache, lookupFn)
defer rc.destroy()
buffer := make([]byte, 100)
_, err := rc.ReadChunkAt(context.Background(), buffer, "error-test", nil, false, 0, 100, true)
if err == nil {
t.Error("Expected an error, got nil")
}
}
// TestSingleChunkCacherContextCancellationDuringLookup tests that a reader can
// cancel its wait while the lookup is in progress. This exercises the actual
// SingleChunkCacher wait/cancel logic.
func TestSingleChunkCacherContextCancellationDuringLookup(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
lookupStarted := make(chan struct{})
lookupCanFinish := make(chan struct{})
// Lookup function that blocks to simulate slow operation
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
close(lookupStarted)
<-lookupCanFinish // Block until test allows completion
return nil, fmt.Errorf("lookup completed but reader should have cancelled")
}
rc := NewReaderCache(10, cache, lookupFn)
defer rc.destroy()
defer close(lookupCanFinish) // Ensure cleanup
ctx, cancel := context.WithCancel(context.Background())
readResult := make(chan error, 1)
go func() {
buffer := make([]byte, 100)
_, err := rc.ReadChunkAt(ctx, buffer, "cancel-during-lookup", nil, false, 0, 100, true)
readResult <- err
}()
// Wait for lookup to start, then cancel the reader's context
select {
case <-lookupStarted:
cancel() // Cancel the reader while lookup is blocked
case <-time.After(5 * time.Second):
t.Fatal("Lookup never started")
}
// Read should return with context.Canceled
select {
case err := <-readResult:
if err != context.Canceled {
t.Errorf("Expected context.Canceled, got: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("Read did not complete after context cancellation")
}
}
// TestSingleChunkCacherMultipleReadersWaitForDownload tests that multiple readers
// can wait for the same SingleChunkCacher download to complete. When lookup fails,
// all readers should receive the same error.
func TestSingleChunkCacherMultipleReadersWaitForDownload(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
lookupStarted := make(chan struct{})
lookupCanFinish := make(chan struct{})
var lookupStartedOnce sync.Once
// Lookup function that blocks to simulate slow operation
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
lookupStartedOnce.Do(func() { close(lookupStarted) })
<-lookupCanFinish
return nil, fmt.Errorf("simulated lookup error")
}
rc := NewReaderCache(10, cache, lookupFn)
defer rc.destroy()
numReaders := 5
var wg sync.WaitGroup
errors := make(chan error, numReaders)
// Start multiple readers for the same chunk
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buffer := make([]byte, 100)
_, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, 100, true)
errors <- err
}()
}
// Wait for lookup to start, then allow completion
select {
case <-lookupStarted:
close(lookupCanFinish)
case <-time.After(5 * time.Second):
close(lookupCanFinish)
t.Fatal("Lookup never started")
}
wg.Wait()
close(errors)
// All readers should receive an error
errorCount := 0
for err := range errors {
if err != nil {
errorCount++
}
}
if errorCount != numReaders {
t.Errorf("Expected %d errors, got %d", numReaders, errorCount)
}
}
// TestSingleChunkCacherOneReaderCancelsOthersContinue tests that when one reader
// cancels, other readers waiting on the same chunk continue to wait.
func TestSingleChunkCacherOneReaderCancelsOthersContinue(t *testing.T) {
cache := newMockChunkCacheForReaderCache()
lookupStarted := make(chan struct{})
lookupCanFinish := make(chan struct{})
var lookupStartedOnce sync.Once
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
lookupStartedOnce.Do(func() { close(lookupStarted) })
<-lookupCanFinish
return nil, fmt.Errorf("simulated error after delay")
}
rc := NewReaderCache(10, cache, lookupFn)
defer rc.destroy()
cancelledReaderDone := make(chan error, 1)
otherReaderDone := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
// Start reader that will be cancelled
go func() {
buffer := make([]byte, 100)
_, err := rc.ReadChunkAt(ctx, buffer, "shared-chunk-2", nil, false, 0, 100, true)
cancelledReaderDone <- err
}()
// Start reader that will NOT be cancelled
go func() {
buffer := make([]byte, 100)
_, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk-2", nil, false, 0, 100, true)
otherReaderDone <- err
}()
// Wait for lookup to start
select {
case <-lookupStarted:
case <-time.After(5 * time.Second):
t.Fatal("Lookup never started")
}
// Cancel the first reader
cancel()
// First reader should complete with context.Canceled quickly
select {
case err := <-cancelledReaderDone:
if err != context.Canceled {
t.Errorf("Cancelled reader: expected context.Canceled, got: %v", err)
}
case <-time.After(2 * time.Second):
t.Error("Cancelled reader did not complete quickly")
}
// Allow the download to complete
close(lookupCanFinish)
// Other reader should eventually complete (with error since lookup returns error)
select {
case err := <-otherReaderDone:
if err == nil || err == context.Canceled {
t.Errorf("Other reader: expected non-nil non-cancelled error, got: %v", err)
}
// Expected: "simulated error after delay"
case <-time.After(5 * time.Second):
t.Error("Other reader did not complete")
}
}

108
weed/util/http/http_global_client_util.go

@ -487,6 +487,12 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
)
}
// For unencrypted, non-gzipped full chunks, use direct buffer read
// This avoids the 64KB intermediate buffer and callback overhead
if cipherKey == nil && !isGzipped && isFullChunk {
return retriedFetchChunkDataDirect(ctx, buffer, urlStrings, string(jwt))
}
var shouldRetry bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
@ -551,3 +557,105 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
return n, err
}
// retriedFetchChunkDataDirect reads chunk data directly into the buffer without
// intermediate buffering. This reduces memory copies and improves throughput
// for large chunk reads.
func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings []string, jwt string) (n int, err error) {
var shouldRetry bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
for _, urlString := range urlStrings {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer)
if err == nil {
return n, nil
}
if !shouldRetry {
break
}
glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err)
}
if err != nil && shouldRetry {
glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
timer := time.NewTimer(waitTime)
select {
case <-ctx.Done():
timer.Stop()
return 0, ctx.Err()
case <-timer.C:
}
} else {
break
}
}
return n, err
}
// readUrlDirectToBuffer reads HTTP response directly into the provided buffer,
// avoiding intermediate buffer allocations and copies.
func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []byte) (n int, retryable bool, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fileUrl, nil)
if err != nil {
return 0, false, err
}
maybeAddAuth(req, jwt)
request_id.InjectToRequest(ctx, req)
r, err := GetGlobalHttpClient().Do(req)
if err != nil {
return 0, true, err
}
defer CloseResponse(r)
if r.StatusCode >= 400 {
if r.StatusCode == http.StatusNotFound {
return 0, true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
}
if r.StatusCode == http.StatusTooManyRequests {
return 0, false, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrTooManyRequests)
}
retryable = r.StatusCode >= 499
return 0, retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
// Read directly into the buffer without intermediate copying
// This is significantly faster for large chunks (16MB+)
var totalRead int
for totalRead < len(buffer) {
select {
case <-ctx.Done():
return totalRead, false, ctx.Err()
default:
}
m, readErr := r.Body.Read(buffer[totalRead:])
totalRead += m
if readErr != nil {
if readErr == io.EOF {
// Return io.ErrUnexpectedEOF if we haven't filled the buffer
// This prevents silent data corruption from truncated responses
if totalRead < len(buffer) {
return totalRead, true, io.ErrUnexpectedEOF
}
return totalRead, false, nil
}
return totalRead, true, readErr
}
}
return totalRead, false, nil
}
Loading…
Cancel
Save