diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index b13690e0c..fda0c9766 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -329,7 +329,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk } shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() - n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) + n, err = c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) if c.lastChunkFid != chunkView.FileId { if chunkView.OffsetInChunk == 0 { // start of a new chunk if c.lastChunkFid != "" { @@ -350,7 +350,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk // It doesn't update lastChunkFid or trigger prefetch (handled by the caller) func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) { shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() - return c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) + return c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) } func zero(buffer []byte, start, length int64) int { diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index d1e7226dc..2f4def8aa 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -94,12 +94,12 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) { return } -func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { +func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { rc.Lock() if cacher, found := rc.downloaders[fileId]; found { rc.Unlock() - return cacher.readChunkAt(buffer, offset) + return cacher.readChunkAt(ctx, buffer, offset) } if shouldCache || rc.lookupFileIdFn == nil { n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) @@ -133,7 +133,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt rc.downloaders[fileId] = cacher rc.Unlock() - return cacher.readChunkAt(buffer, offset) + return cacher.readChunkAt(ctx, buffer, offset) } func (rc *ReaderCache) UnCache(fileId string) { @@ -230,15 +230,21 @@ func (s *SingleChunkCacher) destroy() { } // readChunkAt reads data from the cached chunk. -// It waits for the download to complete if it's still in progress, -// using a channel for efficient waiting that integrates with context cancellation. -func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { +// It waits for the download to complete if it's still in progress. +// The ctx parameter allows the reader to cancel its wait (but the download continues +// for other readers - see comment in startCaching about shared resource semantics). +func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) { s.wg.Add(1) defer s.wg.Done() - // Wait for download to complete using channel - // This allows for future context cancellation support - <-s.done + // Wait for download to complete, but allow reader cancellation + select { + case <-s.done: + // Download completed + case <-ctx.Done(): + // Reader cancelled - download continues for other readers + return 0, ctx.Err() + } s.Lock() defer s.Unlock()