diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 93fa76a2e..5e8fd6154 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -7,6 +7,8 @@ import ( "math/rand" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -19,6 +21,11 @@ import ( // the prefetch count is derived from the -concurrentReaders option. const DefaultPrefetchCount = 4 +// minReadConcurrency is the minimum number of parallel chunk fetches. +// This ensures at least some parallelism even when prefetchCount is low, +// improving throughput for reads spanning multiple chunks. +const minReadConcurrency = 4 + type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews *IntervalList[*ChunkView] @@ -175,67 +182,139 @@ func (c *ChunkReadAt) ReadAtWithTime(ctx context.Context, p []byte, offset int64 return c.doReadAt(ctx, p, offset) } +// chunkReadTask represents a single chunk read operation for parallel processing +type chunkReadTask struct { + chunk *ChunkView + bufferStart int64 // start position in the output buffer + bufferEnd int64 // end position in the output buffer + chunkOffset uint64 // offset within the chunk to read from + bytesRead int + modifiedTsNs int64 +} + func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) { + // Collect all chunk read tasks + var tasks []*chunkReadTask + var gaps []struct{ start, length int64 } // gaps that need zero-filling + startOffset, remaining := offset, int64(len(p)) - var nextChunks *Interval[*ChunkView] + var lastChunk *Interval[*ChunkView] + for x := c.chunkViews.Front(); x != nil; x = x.Next { chunk := x.Value if remaining <= 0 { break } - if x.Next != nil { - nextChunks = x.Next - } + lastChunk = x + + // Handle gap before this chunk if startOffset < chunk.ViewOffset { gap := chunk.ViewOffset - startOffset - glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset) - n += zero(p, startOffset-offset, gap) + gaps = append(gaps, struct{ start, length int64 }{startOffset - offset, gap}) startOffset, remaining = chunk.ViewOffset, remaining-gap if remaining <= 0 { break } } - // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize)) + chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining) if chunkStart >= chunkStop { continue } - // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize)) + bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk - ts = chunk.ModifiedTsNs - copied, err := c.readChunkSliceAt(ctx, p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) - if err != nil { - glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return copied, ts, err + tasks = append(tasks, &chunkReadTask{ + chunk: chunk, + bufferStart: startOffset - offset, + bufferEnd: chunkStop - chunkStart + startOffset - offset, + chunkOffset: uint64(bufferOffset), + }) + + startOffset, remaining = chunkStop, remaining-(chunkStop-chunkStart) + } + + // Zero-fill gaps + for _, gap := range gaps { + glog.V(4).Infof("zero [%d,%d)", offset+gap.start, offset+gap.start+gap.length) + n += zero(p, gap.start, gap.length) + } + + // If only one chunk or random access mode, use sequential reading + if len(tasks) <= 1 || c.readerPattern.IsRandomMode() { + for _, task := range tasks { + copied, readErr := c.readChunkSliceAt(ctx, p[task.bufferStart:task.bufferEnd], task.chunk, nil, task.chunkOffset) + ts = max(ts, task.chunk.ModifiedTsNs) + if readErr != nil { + glog.Errorf("fetching chunk %+v: %v\n", task.chunk, readErr) + return n + copied, ts, readErr + } + n += copied + } + } else { + // Parallel chunk fetching for multiple chunks + // This significantly improves throughput when chunks are on different volume servers + g, gCtx := errgroup.WithContext(ctx) + + // Limit concurrency to avoid overwhelming the system + concurrency := c.prefetchCount + if concurrency < minReadConcurrency { + concurrency = minReadConcurrency + } + if concurrency > len(tasks) { + concurrency = len(tasks) + } + g.SetLimit(concurrency) + + for _, task := range tasks { + g.Go(func() error { + // Read directly into the correct position in the output buffer + copied, readErr := c.readChunkSliceAtForParallel(gCtx, p[task.bufferStart:task.bufferEnd], task.chunk, task.chunkOffset) + task.bytesRead = copied + task.modifiedTsNs = task.chunk.ModifiedTsNs + return readErr + }) } - n += copied - startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) + // Wait for all chunk reads to complete + if waitErr := g.Wait(); waitErr != nil { + err = waitErr + } + + // Aggregate results (order is preserved since we read directly into buffer positions) + for _, task := range tasks { + n += task.bytesRead + ts = max(ts, task.modifiedTsNs) + } + + if err != nil { + return n, ts, err + } } - // glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) + // Trigger prefetch for sequential reads + if lastChunk != nil && lastChunk.Next != nil && c.prefetchCount > 0 && !c.readerPattern.IsRandomMode() { + c.readerCache.MaybeCache(lastChunk.Next, c.prefetchCount) + } - // zero the remaining bytes if a gap exists at the end of the last chunk (or a fully sparse file) - if err == nil && remaining > 0 { + // Zero the remaining bytes if a gap exists at the end + if remaining > 0 { var delta int64 if c.fileSize >= startOffset { delta = min(remaining, c.fileSize-startOffset) - startOffset -= offset - } - if delta > 0 { - glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize) - n += zero(p, startOffset, delta) + bufStart := startOffset - offset + if delta > 0 { + glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize) + n += zero(p, bufStart, delta) + } } } if err == nil && offset+int64(len(p)) >= c.fileSize { err = io.EOF } - // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err) return - } func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) { @@ -249,7 +328,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk } shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() - n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) + n, err = c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) if c.lastChunkFid != chunkView.FileId { if chunkView.OffsetInChunk == 0 { // start of a new chunk if c.lastChunkFid != "" { @@ -266,6 +345,13 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk return } +// readChunkSliceAtForParallel is a simplified version for parallel chunk fetching +// It doesn't update lastChunkFid or trigger prefetch (handled by the caller) +func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) { + shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() + return c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) +} + func zero(buffer []byte, start, length int64) int { if length <= 0 { return 0 diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 605be5e73..66cbac1e3 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -35,6 +35,7 @@ type SingleChunkCacher struct { shouldCache bool wg sync.WaitGroup cacheStartedCh chan struct{} + done chan struct{} // signals when download is complete } func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { @@ -93,14 +94,18 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) { return } -func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { +func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { rc.Lock() if cacher, found := rc.downloaders[fileId]; found { - if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil { - rc.Unlock() + rc.Unlock() + n, err := cacher.readChunkAt(ctx, buffer, offset) + if n > 0 || err != nil { return n, err } + // If n=0 and err=nil, the cacher couldn't provide data for this offset. + // Fall through to try chunkCache. + rc.Lock() } if shouldCache || rc.lookupFileIdFn == nil { n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) @@ -134,7 +139,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt rc.downloaders[fileId] = cacher rc.Unlock() - return cacher.readChunkAt(buffer, offset) + return cacher.readChunkAt(ctx, buffer, offset) } func (rc *ReaderCache) UnCache(fileId string) { @@ -166,38 +171,53 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, chunkSize: chunkSize, shouldCache: shouldCache, cacheStartedCh: make(chan struct{}), + done: make(chan struct{}), } } +// startCaching downloads the chunk data in the background. +// It does NOT hold the lock during the HTTP download to allow concurrent readers +// to wait efficiently using the done channel. func (s *SingleChunkCacher) startCaching() { s.wg.Add(1) defer s.wg.Done() - s.Lock() - defer s.Unlock() + defer close(s.done) // guarantee completion signal even on panic - s.cacheStartedCh <- struct{}{} // means this has been started + s.cacheStartedCh <- struct{}{} // signal that we've started + // Note: We intentionally use context.Background() here, NOT a request-specific context. + // The downloaded chunk is a shared resource - multiple concurrent readers may be waiting + // for this same download to complete. If we used a request context and that request was + // cancelled, it would abort the download and cause errors for all other waiting readers. + // The download should always complete once started to serve all potential consumers. + + // Lookup file ID without holding the lock urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId) if err != nil { + s.Lock() s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) + s.Unlock() return } - s.data = mem.Allocate(s.chunkSize) - - _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId) - if s.err != nil { - mem.Free(s.data) - s.data = nil - return - } + // Allocate buffer and download without holding the lock + // This allows multiple downloads to proceed in parallel + data := mem.Allocate(s.chunkSize) + _, fetchErr := util_http.RetriedFetchChunkData(context.Background(), data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId) - if s.shouldCache { - s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + // Now acquire lock to update state + s.Lock() + if fetchErr != nil { + mem.Free(data) + s.err = fetchErr + } else { + s.data = data + if s.shouldCache { + s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + } + atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano()) } - atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano()) - - return + s.Unlock() } func (s *SingleChunkCacher) destroy() { @@ -209,13 +229,34 @@ func (s *SingleChunkCacher) destroy() { if s.data != nil { mem.Free(s.data) s.data = nil - close(s.cacheStartedCh) } } -func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { +// readChunkAt reads data from the cached chunk. +// It waits for the download to complete if it's still in progress. +// The ctx parameter allows the reader to cancel its wait (but the download continues +// for other readers - see comment in startCaching about shared resource semantics). +func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) { s.wg.Add(1) defer s.wg.Done() + + // Wait for download to complete, but allow reader cancellation. + // Prioritize checking done first - if data is already available, + // return it even if context is also cancelled. + select { + case <-s.done: + // Download already completed, proceed immediately + default: + // Download not complete, wait for it or context cancellation + select { + case <-s.done: + // Download completed + case <-ctx.Done(): + // Reader cancelled while waiting - download continues for other readers + return 0, ctx.Err() + } + } + s.Lock() defer s.Unlock() @@ -228,5 +269,4 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { } return copy(buf, s.data[offset:]), nil - } diff --git a/weed/filer/reader_cache_test.go b/weed/filer/reader_cache_test.go new file mode 100644 index 000000000..0480de8a7 --- /dev/null +++ b/weed/filer/reader_cache_test.go @@ -0,0 +1,505 @@ +package filer + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +// mockChunkCacheForReaderCache implements chunk cache for testing +type mockChunkCacheForReaderCache struct { + data map[string][]byte + hitCount int32 + mu sync.Mutex +} + +func newMockChunkCacheForReaderCache() *mockChunkCacheForReaderCache { + return &mockChunkCacheForReaderCache{ + data: make(map[string][]byte), + } +} + +func (m *mockChunkCacheForReaderCache) GetChunk(fileId string, minSize uint64) []byte { + m.mu.Lock() + defer m.mu.Unlock() + if d, ok := m.data[fileId]; ok { + atomic.AddInt32(&m.hitCount, 1) + return d + } + return nil +} + +func (m *mockChunkCacheForReaderCache) ReadChunkAt(data []byte, fileId string, offset uint64) (int, error) { + m.mu.Lock() + defer m.mu.Unlock() + if d, ok := m.data[fileId]; ok && int(offset) < len(d) { + atomic.AddInt32(&m.hitCount, 1) + n := copy(data, d[offset:]) + return n, nil + } + return 0, nil +} + +func (m *mockChunkCacheForReaderCache) SetChunk(fileId string, data []byte) { + m.mu.Lock() + defer m.mu.Unlock() + m.data[fileId] = data +} + +func (m *mockChunkCacheForReaderCache) GetMaxFilePartSizeInCache() uint64 { + return 1024 * 1024 // 1MB +} + +func (m *mockChunkCacheForReaderCache) IsInCache(fileId string, lockNeeded bool) bool { + m.mu.Lock() + defer m.mu.Unlock() + _, ok := m.data[fileId] + return ok +} + +// TestReaderCacheContextCancellation tests that a reader can cancel its wait +// while the download continues for other readers +func TestReaderCacheContextCancellation(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + + // Create a ReaderCache - we can't easily test the full flow without mocking HTTP, + // but we can test the context cancellation in readChunkAt + rc := NewReaderCache(10, cache, nil) + defer rc.destroy() + + // Pre-populate cache to avoid HTTP calls + testData := []byte("test data for context cancellation") + cache.SetChunk("test-file-1", testData) + + // Test that context cancellation works + ctx, cancel := context.WithCancel(context.Background()) + + buffer := make([]byte, len(testData)) + n, err := rc.ReadChunkAt(ctx, buffer, "test-file-1", nil, false, 0, len(testData), true) + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if n != len(testData) { + t.Errorf("Expected %d bytes, got %d", len(testData), n) + } + + // Cancel context and verify it doesn't affect already completed reads + cancel() + + // Subsequent read with cancelled context should still work from cache + buffer2 := make([]byte, len(testData)) + n2, err2 := rc.ReadChunkAt(ctx, buffer2, "test-file-1", nil, false, 0, len(testData), true) + // Note: This may or may not error depending on whether it hits cache + _ = n2 + _ = err2 +} + +// TestReaderCacheFallbackToChunkCache tests that when a cacher returns n=0, err=nil, +// we fall back to the chunkCache +func TestReaderCacheFallbackToChunkCache(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + + // Pre-populate the chunk cache with data + testData := []byte("fallback test data that should be found in chunk cache") + cache.SetChunk("fallback-file", testData) + + rc := NewReaderCache(10, cache, nil) + defer rc.destroy() + + // Read should hit the chunk cache + buffer := make([]byte, len(testData)) + n, err := rc.ReadChunkAt(context.Background(), buffer, "fallback-file", nil, false, 0, len(testData), true) + + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if n != len(testData) { + t.Errorf("Expected %d bytes, got %d", len(testData), n) + } + + // Verify cache was hit + if cache.hitCount == 0 { + t.Error("Expected chunk cache to be hit") + } +} + +// TestReaderCacheMultipleReadersWaitForSameChunk tests that multiple readers +// can wait for the same chunk download to complete +func TestReaderCacheMultipleReadersWaitForSameChunk(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + + // Pre-populate cache so we don't need HTTP + testData := make([]byte, 1024) + for i := range testData { + testData[i] = byte(i % 256) + } + cache.SetChunk("shared-chunk", testData) + + rc := NewReaderCache(10, cache, nil) + defer rc.destroy() + + // Launch multiple concurrent readers for the same chunk + numReaders := 10 + var wg sync.WaitGroup + errors := make(chan error, numReaders) + bytesRead := make(chan int, numReaders) + + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buffer := make([]byte, len(testData)) + n, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, len(testData), true) + if err != nil { + errors <- err + } + bytesRead <- n + }() + } + + wg.Wait() + close(errors) + close(bytesRead) + + // Check for errors + for err := range errors { + t.Errorf("Reader got error: %v", err) + } + + // Verify all readers got the expected data + for n := range bytesRead { + if n != len(testData) { + t.Errorf("Expected %d bytes, got %d", len(testData), n) + } + } +} + +// TestReaderCachePartialRead tests reading at different offsets +func TestReaderCachePartialRead(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + + testData := []byte("0123456789ABCDEFGHIJ") + cache.SetChunk("partial-read-file", testData) + + rc := NewReaderCache(10, cache, nil) + defer rc.destroy() + + tests := []struct { + name string + offset int64 + size int + expected []byte + }{ + {"read from start", 0, 5, []byte("01234")}, + {"read from middle", 5, 5, []byte("56789")}, + {"read to end", 15, 5, []byte("FGHIJ")}, + {"read single byte", 10, 1, []byte("A")}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buffer := make([]byte, tt.size) + n, err := rc.ReadChunkAt(context.Background(), buffer, "partial-read-file", nil, false, tt.offset, len(testData), true) + + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if n != tt.size { + t.Errorf("Expected %d bytes, got %d", tt.size, n) + } + if string(buffer[:n]) != string(tt.expected) { + t.Errorf("Expected %q, got %q", tt.expected, buffer[:n]) + } + }) + } +} + +// TestReaderCacheCleanup tests that old downloaders are cleaned up +func TestReaderCacheCleanup(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + + // Create cache with limit of 3 + rc := NewReaderCache(3, cache, nil) + defer rc.destroy() + + // Add data for multiple files + for i := 0; i < 5; i++ { + fileId := string(rune('A' + i)) + data := []byte("data for file " + fileId) + cache.SetChunk(fileId, data) + } + + // Read from multiple files - should trigger cleanup when exceeding limit + for i := 0; i < 5; i++ { + fileId := string(rune('A' + i)) + buffer := make([]byte, 20) + _, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true) + if err != nil { + t.Errorf("Read error for file %s: %v", fileId, err) + } + } + + // Cache should still work - reads should succeed + for i := 0; i < 5; i++ { + fileId := string(rune('A' + i)) + buffer := make([]byte, 20) + n, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true) + if err != nil { + t.Errorf("Second read error for file %s: %v", fileId, err) + } + if n == 0 { + t.Errorf("Expected data for file %s, got 0 bytes", fileId) + } + } +} + +// TestSingleChunkCacherDoneSignal tests that done channel is always closed +func TestSingleChunkCacherDoneSignal(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + rc := NewReaderCache(10, cache, nil) + defer rc.destroy() + + // Test that we can read even when data is in cache (done channel should work) + testData := []byte("done signal test") + cache.SetChunk("done-signal-test", testData) + + // Multiple goroutines reading same chunk + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buffer := make([]byte, len(testData)) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + n, err := rc.ReadChunkAt(ctx, buffer, "done-signal-test", nil, false, 0, len(testData), true) + if err != nil && err != context.DeadlineExceeded { + t.Errorf("Unexpected error: %v", err) + } + if n == 0 && err == nil { + t.Error("Got 0 bytes with no error") + } + }() + } + + // Should complete without hanging + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(10 * time.Second): + t.Fatal("Test timed out - done channel may not be signaled correctly") + } +} + +// ============================================================================ +// Tests that exercise SingleChunkCacher concurrency logic +// ============================================================================ +// +// These tests use blocking lookupFileIdFn to exercise the wait/cancellation +// logic in SingleChunkCacher without requiring HTTP calls. + +// TestSingleChunkCacherLookupError tests handling of lookup errors +func TestSingleChunkCacherLookupError(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + + // Lookup function that returns an error + lookupFn := func(ctx context.Context, fileId string) ([]string, error) { + return nil, fmt.Errorf("lookup failed for %s", fileId) + } + + rc := NewReaderCache(10, cache, lookupFn) + defer rc.destroy() + + buffer := make([]byte, 100) + _, err := rc.ReadChunkAt(context.Background(), buffer, "error-test", nil, false, 0, 100, true) + + if err == nil { + t.Error("Expected an error, got nil") + } +} + +// TestSingleChunkCacherContextCancellationDuringLookup tests that a reader can +// cancel its wait while the lookup is in progress. This exercises the actual +// SingleChunkCacher wait/cancel logic. +func TestSingleChunkCacherContextCancellationDuringLookup(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + lookupStarted := make(chan struct{}) + lookupCanFinish := make(chan struct{}) + + // Lookup function that blocks to simulate slow operation + lookupFn := func(ctx context.Context, fileId string) ([]string, error) { + close(lookupStarted) + <-lookupCanFinish // Block until test allows completion + return nil, fmt.Errorf("lookup completed but reader should have cancelled") + } + + rc := NewReaderCache(10, cache, lookupFn) + defer rc.destroy() + defer close(lookupCanFinish) // Ensure cleanup + + ctx, cancel := context.WithCancel(context.Background()) + readResult := make(chan error, 1) + + go func() { + buffer := make([]byte, 100) + _, err := rc.ReadChunkAt(ctx, buffer, "cancel-during-lookup", nil, false, 0, 100, true) + readResult <- err + }() + + // Wait for lookup to start, then cancel the reader's context + select { + case <-lookupStarted: + cancel() // Cancel the reader while lookup is blocked + case <-time.After(5 * time.Second): + t.Fatal("Lookup never started") + } + + // Read should return with context.Canceled + select { + case err := <-readResult: + if err != context.Canceled { + t.Errorf("Expected context.Canceled, got: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Read did not complete after context cancellation") + } +} + +// TestSingleChunkCacherMultipleReadersWaitForDownload tests that multiple readers +// can wait for the same SingleChunkCacher download to complete. When lookup fails, +// all readers should receive the same error. +func TestSingleChunkCacherMultipleReadersWaitForDownload(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + lookupStarted := make(chan struct{}) + lookupCanFinish := make(chan struct{}) + var lookupStartedOnce sync.Once + + // Lookup function that blocks to simulate slow operation + lookupFn := func(ctx context.Context, fileId string) ([]string, error) { + lookupStartedOnce.Do(func() { close(lookupStarted) }) + <-lookupCanFinish + return nil, fmt.Errorf("simulated lookup error") + } + + rc := NewReaderCache(10, cache, lookupFn) + defer rc.destroy() + + numReaders := 5 + var wg sync.WaitGroup + errors := make(chan error, numReaders) + + // Start multiple readers for the same chunk + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buffer := make([]byte, 100) + _, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, 100, true) + errors <- err + }() + } + + // Wait for lookup to start, then allow completion + select { + case <-lookupStarted: + close(lookupCanFinish) + case <-time.After(5 * time.Second): + close(lookupCanFinish) + t.Fatal("Lookup never started") + } + + wg.Wait() + close(errors) + + // All readers should receive an error + errorCount := 0 + for err := range errors { + if err != nil { + errorCount++ + } + } + if errorCount != numReaders { + t.Errorf("Expected %d errors, got %d", numReaders, errorCount) + } +} + +// TestSingleChunkCacherOneReaderCancelsOthersContinue tests that when one reader +// cancels, other readers waiting on the same chunk continue to wait. +func TestSingleChunkCacherOneReaderCancelsOthersContinue(t *testing.T) { + cache := newMockChunkCacheForReaderCache() + lookupStarted := make(chan struct{}) + lookupCanFinish := make(chan struct{}) + var lookupStartedOnce sync.Once + + lookupFn := func(ctx context.Context, fileId string) ([]string, error) { + lookupStartedOnce.Do(func() { close(lookupStarted) }) + <-lookupCanFinish + return nil, fmt.Errorf("simulated error after delay") + } + + rc := NewReaderCache(10, cache, lookupFn) + defer rc.destroy() + + cancelledReaderDone := make(chan error, 1) + otherReaderDone := make(chan error, 1) + + ctx, cancel := context.WithCancel(context.Background()) + + // Start reader that will be cancelled + go func() { + buffer := make([]byte, 100) + _, err := rc.ReadChunkAt(ctx, buffer, "shared-chunk-2", nil, false, 0, 100, true) + cancelledReaderDone <- err + }() + + // Start reader that will NOT be cancelled + go func() { + buffer := make([]byte, 100) + _, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk-2", nil, false, 0, 100, true) + otherReaderDone <- err + }() + + // Wait for lookup to start + select { + case <-lookupStarted: + case <-time.After(5 * time.Second): + t.Fatal("Lookup never started") + } + + // Cancel the first reader + cancel() + + // First reader should complete with context.Canceled quickly + select { + case err := <-cancelledReaderDone: + if err != context.Canceled { + t.Errorf("Cancelled reader: expected context.Canceled, got: %v", err) + } + case <-time.After(2 * time.Second): + t.Error("Cancelled reader did not complete quickly") + } + + // Allow the download to complete + close(lookupCanFinish) + + // Other reader should eventually complete (with error since lookup returns error) + select { + case err := <-otherReaderDone: + if err == nil || err == context.Canceled { + t.Errorf("Other reader: expected non-nil non-cancelled error, got: %v", err) + } + // Expected: "simulated error after delay" + case <-time.After(5 * time.Second): + t.Error("Other reader did not complete") + } +} diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 3a969fdc8..a374c8a2b 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -487,6 +487,12 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri ) } + // For unencrypted, non-gzipped full chunks, use direct buffer read + // This avoids the 64KB intermediate buffer and callback overhead + if cipherKey == nil && !isGzipped && isFullChunk { + return retriedFetchChunkDataDirect(ctx, buffer, urlStrings, string(jwt)) + } + var shouldRetry bool for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { @@ -551,3 +557,105 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri return n, err } + +// retriedFetchChunkDataDirect reads chunk data directly into the buffer without +// intermediate buffering. This reduces memory copies and improves throughput +// for large chunk reads. +func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings []string, jwt string) (n int, err error) { + var shouldRetry bool + + for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + + for _, urlString := range urlStrings { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + + n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer) + if err == nil { + return n, nil + } + if !shouldRetry { + break + } + glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err) + } + + if err != nil && shouldRetry { + glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime) + timer := time.NewTimer(waitTime) + select { + case <-ctx.Done(): + timer.Stop() + return 0, ctx.Err() + case <-timer.C: + } + } else { + break + } + } + + return n, err +} + +// readUrlDirectToBuffer reads HTTP response directly into the provided buffer, +// avoiding intermediate buffer allocations and copies. +func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []byte) (n int, retryable bool, err error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fileUrl, nil) + if err != nil { + return 0, false, err + } + maybeAddAuth(req, jwt) + request_id.InjectToRequest(ctx, req) + + r, err := GetGlobalHttpClient().Do(req) + if err != nil { + return 0, true, err + } + defer CloseResponse(r) + + if r.StatusCode >= 400 { + if r.StatusCode == http.StatusNotFound { + return 0, true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound) + } + if r.StatusCode == http.StatusTooManyRequests { + return 0, false, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrTooManyRequests) + } + retryable = r.StatusCode >= 499 + return 0, retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + // Read directly into the buffer without intermediate copying + // This is significantly faster for large chunks (16MB+) + var totalRead int + for totalRead < len(buffer) { + select { + case <-ctx.Done(): + return totalRead, false, ctx.Err() + default: + } + + m, readErr := r.Body.Read(buffer[totalRead:]) + totalRead += m + if readErr != nil { + if readErr == io.EOF { + // Return io.ErrUnexpectedEOF if we haven't filled the buffer + // This prevents silent data corruption from truncated responses + if totalRead < len(buffer) { + return totalRead, true, io.ErrUnexpectedEOF + } + return totalRead, false, nil + } + return totalRead, true, readErr + } + } + + return totalRead, false, nil +}