@ -25,6 +25,7 @@ type ReaderCache struct {
type SingleChunkCacher struct {
type SingleChunkCacher struct {
completedTimeNew int64
completedTimeNew int64
sync . Mutex
sync . Mutex
cond * sync . Cond
parent * ReaderCache
parent * ReaderCache
chunkFileId string
chunkFileId string
data [ ] byte
data [ ] byte
@ -33,6 +34,7 @@ type SingleChunkCacher struct {
isGzipped bool
isGzipped bool
chunkSize int
chunkSize int
shouldCache bool
shouldCache bool
isComplete bool // indicates whether the download has completed (success or failure)
wg sync . WaitGroup
wg sync . WaitGroup
cacheStartedCh chan struct { }
cacheStartedCh chan struct { }
}
}
@ -97,10 +99,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
rc . Lock ( )
rc . Lock ( )
if cacher , found := rc . downloaders [ fileId ] ; found {
if cacher , found := rc . downloaders [ fileId ] ; found {
if n , err := cacher . readChunkAt ( buffer , offset ) ; n != 0 && err == nil {
rc . Unlock ( )
return n , err
}
rc . Unlock ( )
return cacher . readChunkAt ( buffer , offset )
}
}
if shouldCache || rc . lookupFileIdFn == nil {
if shouldCache || rc . lookupFileIdFn == nil {
n , err := rc . chunkCache . ReadChunkAt ( buffer , fileId , uint64 ( offset ) )
n , err := rc . chunkCache . ReadChunkAt ( buffer , fileId , uint64 ( offset ) )
@ -158,7 +158,7 @@ func (rc *ReaderCache) destroy() {
}
}
func newSingleChunkCacher ( parent * ReaderCache , fileId string , cipherKey [ ] byte , isGzipped bool , chunkSize int , shouldCache bool ) * SingleChunkCacher {
func newSingleChunkCacher ( parent * ReaderCache , fileId string , cipherKey [ ] byte , isGzipped bool , chunkSize int , shouldCache bool ) * SingleChunkCacher {
return & SingleChunkCacher {
s := & SingleChunkCacher {
parent : parent ,
parent : parent ,
chunkFileId : fileId ,
chunkFileId : fileId ,
cipherKey : cipherKey ,
cipherKey : cipherKey ,
@ -167,37 +167,56 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte,
shouldCache : shouldCache ,
shouldCache : shouldCache ,
cacheStartedCh : make ( chan struct { } ) ,
cacheStartedCh : make ( chan struct { } ) ,
}
}
s . cond = sync . NewCond ( & s . Mutex )
return s
}
}
// startCaching downloads the chunk data in the background.
// It does NOT hold the lock during the HTTP download to allow concurrent readers
// to wait efficiently using the condition variable.
func ( s * SingleChunkCacher ) startCaching ( ) {
func ( s * SingleChunkCacher ) startCaching ( ) {
s . wg . Add ( 1 )
s . wg . Add ( 1 )
defer s . wg . Done ( )
defer s . wg . Done ( )
s . Lock ( )
defer s . Unlock ( )
s . cacheStartedCh <- struct { } { } // means this has been started
s . cacheStartedCh <- struct { } { } // signal that we've started
// Lookup file ID without holding the lock
urlStrings , err := s . parent . lookupFileIdFn ( context . Background ( ) , s . chunkFileId )
urlStrings , err := s . parent . lookupFileIdFn ( context . Background ( ) , s . chunkFileId )
if err != nil {
if err != nil {
s . Lock ( )
s . err = fmt . Errorf ( "operation LookupFileId %s failed, err: %v" , s . chunkFileId , err )
s . err = fmt . Errorf ( "operation LookupFileId %s failed, err: %v" , s . chunkFileId , err )
s . isComplete = true
s . cond . Broadcast ( ) // wake up any waiting readers
s . Unlock ( )
return
return
}
}
s . data = mem . Allocate ( s . chunkSize )
// Allocate buffer and download without holding the lock
// This allows multiple downloads to proceed in parallel
data := mem . Allocate ( s . chunkSize )
_ , fetchErr := util_http . RetriedFetchChunkData ( context . Background ( ) , data , urlStrings , s . cipherKey , s . isGzipped , true , 0 , s . chunkFileId )
_ , s . err = util_http . RetriedFetchChunkData ( context . Background ( ) , s . data , urlStrings , s . cipherKey , s . isGzipped , true , 0 , s . chunkFileId )
if s . err != nil {
mem . Free ( s . data )
s . data = nil
// Now acquire lock to update state
s . Lock ( )
defer s . Unlock ( )
if fetchErr != nil {
mem . Free ( data )
s . err = fetchErr
s . isComplete = true
s . cond . Broadcast ( ) // wake up any waiting readers
return
return
}
}
s . data = data
s . isComplete = true
if s . shouldCache {
if s . shouldCache {
s . parent . chunkCache . SetChunk ( s . chunkFileId , s . data )
s . parent . chunkCache . SetChunk ( s . chunkFileId , s . data )
}
}
atomic . StoreInt64 ( & s . completedTimeNew , time . Now ( ) . UnixNano ( ) )
atomic . StoreInt64 ( & s . completedTimeNew , time . Now ( ) . UnixNano ( ) )
return
s . cond . Broadcast ( ) // wake up any waiting readers
}
}
func ( s * SingleChunkCacher ) destroy ( ) {
func ( s * SingleChunkCacher ) destroy ( ) {
@ -213,12 +232,21 @@ func (s *SingleChunkCacher) destroy() {
}
}
}
}
// readChunkAt reads data from the cached chunk.
// It waits for the download to complete if it's still in progress,
// using a condition variable for efficient waiting.
func ( s * SingleChunkCacher ) readChunkAt ( buf [ ] byte , offset int64 ) ( int , error ) {
func ( s * SingleChunkCacher ) readChunkAt ( buf [ ] byte , offset int64 ) ( int , error ) {
s . wg . Add ( 1 )
s . wg . Add ( 1 )
defer s . wg . Done ( )
defer s . wg . Done ( )
s . Lock ( )
s . Lock ( )
defer s . Unlock ( )
defer s . Unlock ( )
// Wait for download to complete using condition variable
// This is more efficient than spinning or holding a lock during download
for ! s . isComplete {
s . cond . Wait ( )
}
if s . err != nil {
if s . err != nil {
return 0 , s . err
return 0 , s . err
}
}
@ -228,5 +256,4 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
}
}
return copy ( buf , s . data [ offset : ] ) , nil
return copy ( buf , s . data [ offset : ] ) , nil
}
}