diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 605be5e73..d6a57dc29 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -25,6 +25,7 @@ type ReaderCache struct { type SingleChunkCacher struct { completedTimeNew int64 sync.Mutex + cond *sync.Cond parent *ReaderCache chunkFileId string data []byte @@ -33,6 +34,7 @@ type SingleChunkCacher struct { isGzipped bool chunkSize int shouldCache bool + isComplete bool // indicates whether the download has completed (success or failure) wg sync.WaitGroup cacheStartedCh chan struct{} } @@ -97,10 +99,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt rc.Lock() if cacher, found := rc.downloaders[fileId]; found { - if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil { - rc.Unlock() - return n, err - } + rc.Unlock() + return cacher.readChunkAt(buffer, offset) } if shouldCache || rc.lookupFileIdFn == nil { n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) @@ -158,7 +158,7 @@ func (rc *ReaderCache) destroy() { } func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher { - return &SingleChunkCacher{ + s := &SingleChunkCacher{ parent: parent, chunkFileId: fileId, cipherKey: cipherKey, @@ -167,37 +167,56 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, shouldCache: shouldCache, cacheStartedCh: make(chan struct{}), } + s.cond = sync.NewCond(&s.Mutex) + return s } +// startCaching downloads the chunk data in the background. +// It does NOT hold the lock during the HTTP download to allow concurrent readers +// to wait efficiently using the condition variable. func (s *SingleChunkCacher) startCaching() { s.wg.Add(1) defer s.wg.Done() - s.Lock() - defer s.Unlock() - s.cacheStartedCh <- struct{}{} // means this has been started + s.cacheStartedCh <- struct{}{} // signal that we've started + // Lookup file ID without holding the lock urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId) if err != nil { + s.Lock() s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) + s.isComplete = true + s.cond.Broadcast() // wake up any waiting readers + s.Unlock() return } - s.data = mem.Allocate(s.chunkSize) + // Allocate buffer and download without holding the lock + // This allows multiple downloads to proceed in parallel + data := mem.Allocate(s.chunkSize) + _, fetchErr := util_http.RetriedFetchChunkData(context.Background(), data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId) - _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId) - if s.err != nil { - mem.Free(s.data) - s.data = nil + // Now acquire lock to update state + s.Lock() + defer s.Unlock() + + if fetchErr != nil { + mem.Free(data) + s.err = fetchErr + s.isComplete = true + s.cond.Broadcast() // wake up any waiting readers return } + s.data = data + s.isComplete = true + if s.shouldCache { s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) } atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano()) - return + s.cond.Broadcast() // wake up any waiting readers } func (s *SingleChunkCacher) destroy() { @@ -213,12 +232,21 @@ func (s *SingleChunkCacher) destroy() { } } +// readChunkAt reads data from the cached chunk. +// It waits for the download to complete if it's still in progress, +// using a condition variable for efficient waiting. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { s.wg.Add(1) defer s.wg.Done() s.Lock() defer s.Unlock() + // Wait for download to complete using condition variable + // This is more efficient than spinning or holding a lock during download + for !s.isComplete { + s.cond.Wait() + } + if s.err != nil { return 0, s.err } @@ -228,5 +256,4 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { } return copy(buf, s.data[offset:]), nil - }