@ -2,11 +2,12 @@ package filer
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"sync"
"time"
)
type ReaderCache struct {
@ -19,17 +20,17 @@ type ReaderCache struct {
type SingleChunkCacher struct {
sync . Mutex
cond * sync . Cond
parent * ReaderCache
chunkFileId string
data [ ] byte
err error
cipherKey [ ] byte
isGzipped bool
chunkSize int
shouldCache bool
wg sync . WaitGroup
completedTime time . Time
parent * ReaderCache
chunkFileId string
data [ ] byte
err error
cipherKey [ ] byte
isGzipped bool
chunkSize int
shouldCache bool
wg sync . WaitGroup
cacheStartedCh chan struct { }
completedTime time . Time
}
func newReaderCache ( limit int , chunkCache chunk_cache . ChunkCache , lookupFileIdFn wdclient . LookupFileIdFunctionType ) * ReaderCache {
@ -62,9 +63,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
// cache this chunk if not yet
cacher := newSingleChunkCacher ( rc , chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped , int ( chunkView . ChunkSize ) , false )
cacher . wg . Add ( 1 )
go cacher . startCaching ( )
cacher . wg . Wait ( )
<- cacher . cacheStartedCh
rc . downloaders [ chunkView . FileId ] = cacher
}
@ -87,6 +87,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
}
}
// clean up old downloaders
if len ( rc . downloaders ) >= rc . limit {
oldestFid , oldestTime := "" , time . Now ( )
for fid , downloader := range rc . downloaders {
@ -106,9 +107,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
// glog.V(4).Infof("cache1 %s", fileId)
cacher := newSingleChunkCacher ( rc , fileId , cipherKey , isGzipped , chunkSize , shouldCache )
cacher . wg . Add ( 1 )
go cacher . startCaching ( )
cacher . wg . Wait ( )
<- cacher . cacheStartedCh
rc . downloaders [ fileId ] = cacher
return cacher . readChunkAt ( buffer , offset )
@ -135,23 +135,24 @@ func (rc *ReaderCache) destroy() {
}
func newSingleChunkCacher ( parent * ReaderCache , fileId string , cipherKey [ ] byte , isGzipped bool , chunkSize int , shouldCache bool ) * SingleChunkCacher {
t := & SingleChunkCacher {
parent : parent ,
chunkFileId : fileId ,
cipherKey : cipherKey ,
isGzipped : isGzipped ,
chunkSize : chunkSize ,
shouldCache : shouldCache ,
return & SingleChunkCacher {
parent : parent ,
chunkFileId : fileId ,
cipherKey : cipherKey ,
isGzipped : isGzipped ,
chunkSize : chunkSize ,
shouldCache : shouldCache ,
cacheStartedCh : make ( chan struct { } ) ,
}
t . cond = sync . NewCond ( t )
return t
}
func ( s * SingleChunkCacher ) startCaching ( ) {
s . wg . Add ( 1 )
defer s . wg . Done ( )
s . Lock ( )
defer s . Unlock ( )
s . wg . Done ( ) // means this has been started
s . cacheStartedCh <- struct { } { } // means this has been started
urlStrings , err := s . parent . lookupFileIdFn ( s . chunkFileId )
if err != nil {
@ -168,16 +169,17 @@ func (s *SingleChunkCacher) startCaching() {
return
}
s . completedTime = time . Now ( )
if s . shouldCache {
s . parent . chunkCache . SetChunk ( s . chunkFileId , s . data )
}
s . cond . Broadcast ( )
s . completedTime = time . Now ( )
return
}
func ( s * SingleChunkCacher ) destroy ( ) {
// wait for all reads to finish before destroying the data
s . wg . Wait ( )
s . Lock ( )
defer s . Unlock ( )
@ -185,16 +187,15 @@ func (s *SingleChunkCacher) destroy() {
mem . Free ( s . data )
s . data = nil
}
close ( s . cacheStartedCh )
}
func ( s * SingleChunkCacher ) readChunkAt ( buf [ ] byte , offset int64 ) ( int , error ) {
s . wg . Add ( 1 )
defer s . wg . Done ( )
s . Lock ( )
defer s . Unlock ( )
for s . completedTime . IsZero ( ) {
s . cond . Wait ( )
}
if s . err != nil {
return 0 , s . err
}