Browse Source

address review comments

- Use channel (done) instead of sync.Cond for download completion signaling
  This integrates better with context cancellation patterns
- Remove redundant groupErr check in reader_at.go (errors are already captured in task.err)
- Remove buggy URL encoding logic from retriedFetchChunkDataDirect
  (The existing url.PathEscape on full URL is a pre-existing bug that should be fixed separately)
pull/7627/head
chrislu 1 month ago
parent
commit
bca9bd646c
  1. 6
      weed/filer/reader_at.go
  2. 48
      weed/filer/reader_cache.go
  3. 4
      weed/util/http/http_global_client_util.go

6
weed/filer/reader_at.go

@ -278,7 +278,7 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i
} }
// Wait for all chunk reads to complete // Wait for all chunk reads to complete
groupErr := g.Wait()
_ = g.Wait()
// Aggregate results (order is preserved since we read directly into buffer positions) // Aggregate results (order is preserved since we read directly into buffer positions)
for _, task := range tasks { for _, task := range tasks {
@ -289,10 +289,6 @@ func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n i
} }
} }
if groupErr != nil && err == nil {
err = groupErr
}
if err != nil { if err != nil {
return n, ts, err return n, ts, err
} }

48
weed/filer/reader_cache.go

@ -25,7 +25,6 @@ type ReaderCache struct {
type SingleChunkCacher struct { type SingleChunkCacher struct {
completedTimeNew int64 completedTimeNew int64
sync.Mutex sync.Mutex
cond *sync.Cond
parent *ReaderCache parent *ReaderCache
chunkFileId string chunkFileId string
data []byte data []byte
@ -34,9 +33,9 @@ type SingleChunkCacher struct {
isGzipped bool isGzipped bool
chunkSize int chunkSize int
shouldCache bool shouldCache bool
isComplete bool // indicates whether the download has completed (success or failure)
wg sync.WaitGroup wg sync.WaitGroup
cacheStartedCh chan struct{} cacheStartedCh chan struct{}
done chan struct{} // signals when download is complete
} }
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@ -158,7 +157,7 @@ func (rc *ReaderCache) destroy() {
} }
func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher { func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
s := &SingleChunkCacher{
return &SingleChunkCacher{
parent: parent, parent: parent,
chunkFileId: fileId, chunkFileId: fileId,
cipherKey: cipherKey, cipherKey: cipherKey,
@ -166,14 +165,13 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte,
chunkSize: chunkSize, chunkSize: chunkSize,
shouldCache: shouldCache, shouldCache: shouldCache,
cacheStartedCh: make(chan struct{}), cacheStartedCh: make(chan struct{}),
done: make(chan struct{}),
} }
s.cond = sync.NewCond(&s.Mutex)
return s
} }
// startCaching downloads the chunk data in the background. // startCaching downloads the chunk data in the background.
// It does NOT hold the lock during the HTTP download to allow concurrent readers // It does NOT hold the lock during the HTTP download to allow concurrent readers
// to wait efficiently using the condition variable.
// to wait efficiently using the done channel.
func (s *SingleChunkCacher) startCaching() { func (s *SingleChunkCacher) startCaching() {
s.wg.Add(1) s.wg.Add(1)
defer s.wg.Done() defer s.wg.Done()
@ -185,9 +183,8 @@ func (s *SingleChunkCacher) startCaching() {
if err != nil { if err != nil {
s.Lock() s.Lock()
s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
s.isComplete = true
s.cond.Broadcast() // wake up any waiting readers
s.Unlock() s.Unlock()
close(s.done) // signal completion
return return
} }
@ -198,25 +195,19 @@ func (s *SingleChunkCacher) startCaching() {
// Now acquire lock to update state // Now acquire lock to update state
s.Lock() s.Lock()
defer s.Unlock()
if fetchErr != nil { if fetchErr != nil {
mem.Free(data) mem.Free(data)
s.err = fetchErr s.err = fetchErr
s.isComplete = true
s.cond.Broadcast() // wake up any waiting readers
return
}
s.data = data
s.isComplete = true
if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
} else {
s.data = data
if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
}
atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
} }
atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
s.Unlock()
s.cond.Broadcast() // wake up any waiting readers
close(s.done) // signal completion to all waiting readers
} }
func (s *SingleChunkCacher) destroy() { func (s *SingleChunkCacher) destroy() {
@ -234,19 +225,18 @@ func (s *SingleChunkCacher) destroy() {
// readChunkAt reads data from the cached chunk. // readChunkAt reads data from the cached chunk.
// It waits for the download to complete if it's still in progress, // It waits for the download to complete if it's still in progress,
// using a condition variable for efficient waiting.
// using a channel for efficient waiting that integrates with context cancellation.
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
s.wg.Add(1) s.wg.Add(1)
defer s.wg.Done() defer s.wg.Done()
// Wait for download to complete using channel
// This allows for future context cancellation support
<-s.done
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
// Wait for download to complete using condition variable
// This is more efficient than spinning or holding a lock during download
for !s.isComplete {
s.cond.Wait()
}
if s.err != nil { if s.err != nil {
return 0, s.err return 0, s.err
} }

4
weed/util/http/http_global_client_util.go

@ -578,10 +578,6 @@ func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings
default: default:
} }
if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString)
}
n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer) n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer)
if err == nil { if err == nil {
return n, nil return n, nil

Loading…
Cancel
Save