From a8c5253ebee68e1b1e3793b43a180c339eb0ff63 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 4 Dec 2025 22:08:07 -0800 Subject: [PATCH] filer: propagate context for reader cancellation Address review comment: pass context through ReadChunkAt call chain so that a reader can cancel its wait for a download. The key distinction is: - Download uses context.Background() - shared resource, always completes - Reader wait uses request context - can be cancelled individually If a reader cancels, it stops waiting and returns ctx.Err(), but the download continues to completion for other readers waiting on the same chunk. This properly handles the shared resource semantics while still allowing individual reader cancellation. --- weed/filer/reader_at.go | 4 ++-- weed/filer/reader_cache.go | 24 +++++++++++++++--------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index b13690e0c..fda0c9766 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -329,7 +329,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk } shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() - n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) + n, err = c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) if c.lastChunkFid != chunkView.FileId { if chunkView.OffsetInChunk == 0 { // start of a new chunk if c.lastChunkFid != "" { @@ -350,7 +350,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk // It doesn't update lastChunkFid or trigger prefetch (handled by the caller) func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) { shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() - return c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) + return c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache) } func zero(buffer []byte, start, length int64) int { diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index d1e7226dc..2f4def8aa 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -94,12 +94,12 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) { return } -func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { +func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { rc.Lock() if cacher, found := rc.downloaders[fileId]; found { rc.Unlock() - return cacher.readChunkAt(buffer, offset) + return cacher.readChunkAt(ctx, buffer, offset) } if shouldCache || rc.lookupFileIdFn == nil { n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) @@ -133,7 +133,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt rc.downloaders[fileId] = cacher rc.Unlock() - return cacher.readChunkAt(buffer, offset) + return cacher.readChunkAt(ctx, buffer, offset) } func (rc *ReaderCache) UnCache(fileId string) { @@ -230,15 +230,21 @@ func (s *SingleChunkCacher) destroy() { } // readChunkAt reads data from the cached chunk. -// It waits for the download to complete if it's still in progress, -// using a channel for efficient waiting that integrates with context cancellation. -func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { +// It waits for the download to complete if it's still in progress. +// The ctx parameter allows the reader to cancel its wait (but the download continues +// for other readers - see comment in startCaching about shared resource semantics). +func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) { s.wg.Add(1) defer s.wg.Done() - // Wait for download to complete using channel - // This allows for future context cancellation support - <-s.done + // Wait for download to complete, but allow reader cancellation + select { + case <-s.done: + // Download completed + case <-ctx.Done(): + // Reader cancelled - download continues for other readers + return 0, ctx.Err() + } s.Lock() defer s.Unlock()