Browse Source

filer: propagate context for reader cancellation

Address review comment: pass context through ReadChunkAt call chain so
that a reader can cancel its wait for a download. The key distinction is:

- Download uses context.Background() - shared resource, always completes
- Reader wait uses request context - can be cancelled individually

If a reader cancels, it stops waiting and returns ctx.Err(), but the
download continues to completion for other readers waiting on the same
chunk. This properly handles the shared resource semantics while still
allowing individual reader cancellation.
pull/7627/head
chrislu 6 days ago
parent
commit
a8c5253ebe
  1. 4
      weed/filer/reader_at.go
  2. 24
      weed/filer/reader_cache.go

4
weed/filer/reader_at.go

@ -329,7 +329,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
}
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
n, err = c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
if c.lastChunkFid != chunkView.FileId {
if chunkView.OffsetInChunk == 0 { // start of a new chunk
if c.lastChunkFid != "" {
@ -350,7 +350,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
// It doesn't update lastChunkFid or trigger prefetch (handled by the caller)
func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) {
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
return c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
return c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
}
func zero(buffer []byte, start, length int64) int {

24
weed/filer/reader_cache.go

@ -94,12 +94,12 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) {
return
}
func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
rc.Lock()
if cacher, found := rc.downloaders[fileId]; found {
rc.Unlock()
return cacher.readChunkAt(buffer, offset)
return cacher.readChunkAt(ctx, buffer, offset)
}
if shouldCache || rc.lookupFileIdFn == nil {
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
@ -133,7 +133,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
rc.downloaders[fileId] = cacher
rc.Unlock()
return cacher.readChunkAt(buffer, offset)
return cacher.readChunkAt(ctx, buffer, offset)
}
func (rc *ReaderCache) UnCache(fileId string) {
@ -230,15 +230,21 @@ func (s *SingleChunkCacher) destroy() {
}
// readChunkAt reads data from the cached chunk.
// It waits for the download to complete if it's still in progress,
// using a channel for efficient waiting that integrates with context cancellation.
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
// It waits for the download to complete if it's still in progress.
// The ctx parameter allows the reader to cancel its wait (but the download continues
// for other readers - see comment in startCaching about shared resource semantics).
func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) {
s.wg.Add(1)
defer s.wg.Done()
// Wait for download to complete using channel
// This allows for future context cancellation support
<-s.done
// Wait for download to complete, but allow reader cancellation
select {
case <-s.done:
// Download completed
case <-ctx.Done():
// Reader cancelled - download continues for other readers
return 0, ctx.Err()
}
s.Lock()
defer s.Unlock()

Loading…
Cancel
Save