From bca9bd646c1bb2fed8cd1238d94fbe5babd44164 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 4 Dec 2025 19:59:12 -0800 Subject: [PATCH] address review comments - Use channel (done) instead of sync.Cond for download completion signaling This integrates better with context cancellation patterns - Remove redundant groupErr check in reader_at.go (errors are already captured in task.err) - Remove buggy URL encoding logic from retriedFetchChunkDataDirect (The existing url.PathEscape on full URL is a pre-existing bug that should be fixed separately) --- weed/filer/reader_at.go | 6 +-- weed/filer/reader_cache.go | 48 +++++++++-------------- weed/util/http/http_global_client_util.go | 4 -- 3 files changed, 20 insertions(+), 38 deletions(-) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 6d091286d..917f9e029 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -278,7 +278,7 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i } // Wait for all chunk reads to complete - groupErr := g.Wait() + _ = g.Wait() // Aggregate results (order is preserved since we read directly into buffer positions) for _, task := range tasks { @@ -289,10 +289,6 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i } } - if groupErr != nil && err == nil { - err = groupErr - } - if err != nil { return n, ts, err } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index d6a57dc29..f291a7e7a 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -25,7 +25,6 @@ type ReaderCache struct { type SingleChunkCacher struct { completedTimeNew int64 sync.Mutex - cond *sync.Cond parent *ReaderCache chunkFileId string data []byte @@ -34,9 +33,9 @@ type SingleChunkCacher struct { isGzipped bool chunkSize int shouldCache bool - isComplete bool // indicates whether the download has completed (success or failure) wg sync.WaitGroup cacheStartedCh chan struct{} + done chan struct{} // signals when download is complete } func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { @@ -158,7 +157,7 @@ func (rc *ReaderCache) destroy() { } func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher { - s := &SingleChunkCacher{ + return &SingleChunkCacher{ parent: parent, chunkFileId: fileId, cipherKey: cipherKey, @@ -166,14 +165,13 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, chunkSize: chunkSize, shouldCache: shouldCache, cacheStartedCh: make(chan struct{}), + done: make(chan struct{}), } - s.cond = sync.NewCond(&s.Mutex) - return s } // startCaching downloads the chunk data in the background. // It does NOT hold the lock during the HTTP download to allow concurrent readers -// to wait efficiently using the condition variable. +// to wait efficiently using the done channel. func (s *SingleChunkCacher) startCaching() { s.wg.Add(1) defer s.wg.Done() @@ -185,9 +183,8 @@ func (s *SingleChunkCacher) startCaching() { if err != nil { s.Lock() s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) - s.isComplete = true - s.cond.Broadcast() // wake up any waiting readers s.Unlock() + close(s.done) // signal completion return } @@ -198,25 +195,19 @@ func (s *SingleChunkCacher) startCaching() { // Now acquire lock to update state s.Lock() - defer s.Unlock() - if fetchErr != nil { mem.Free(data) s.err = fetchErr - s.isComplete = true - s.cond.Broadcast() // wake up any waiting readers - return - } - - s.data = data - s.isComplete = true - - if s.shouldCache { - s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + } else { + s.data = data + if s.shouldCache { + s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + } + atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano()) } - atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano()) + s.Unlock() - s.cond.Broadcast() // wake up any waiting readers + close(s.done) // signal completion to all waiting readers } func (s *SingleChunkCacher) destroy() { @@ -234,19 +225,18 @@ func (s *SingleChunkCacher) destroy() { // readChunkAt reads data from the cached chunk. // It waits for the download to complete if it's still in progress, -// using a condition variable for efficient waiting. +// using a channel for efficient waiting that integrates with context cancellation. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { s.wg.Add(1) defer s.wg.Done() + + // Wait for download to complete using channel + // This allows for future context cancellation support + <-s.done + s.Lock() defer s.Unlock() - // Wait for download to complete using condition variable - // This is more efficient than spinning or holding a lock during download - for !s.isComplete { - s.cond.Wait() - } - if s.err != nil { return 0, s.err } diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index c48388ac9..128a81143 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -578,10 +578,6 @@ func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings default: } - if strings.Contains(urlString, "%") { - urlString = url.PathEscape(urlString) - } - n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer) if err == nil { return n, nil