Browse Source

prefetch other chunks when stream reading

original_weed_mount
chrislu 3 years ago
parent
commit
551d00d51a
  1. 13
      weed/filer/reader_at.go
  2. 50
      weed/filer/reader_cache.go

13
weed/filer/reader_at.go

@ -21,6 +21,7 @@ type ChunkReadAt struct {
fileSize int64
readerCache *ReaderCache
readerPattern *ReaderPattern
lastChunkFid string
}
var _ = io.ReaderAt(&ChunkReadAt{})
@ -85,7 +86,7 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun
return &ChunkReadAt{
chunkViews: chunkViews,
fileSize: fileSize,
readerCache: newReaderCache(5, chunkCache, lookupFn),
readerCache: newReaderCache(32, chunkCache, lookupFn),
readerPattern: NewReaderPattern(),
}
}
@ -167,12 +168,12 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next
}
n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
for i, nextChunk := range nextChunkViews {
if i < 2 {
c.readerCache.MaybeCache(nextChunk.FileId, nextChunk.CipherKey, nextChunk.IsGzipped, int(nextChunk.ChunkSize))
} else {
break
if c.lastChunkFid != "" && c.lastChunkFid != chunkView.FileId {
if chunkView.Offset == 0 { // start of a new chunk
c.readerCache.UnCache(c.lastChunkFid)
c.readerCache.MaybeCache(nextChunkViews)
}
}
c.lastChunkFid = chunkView.FileId
return
}

50
weed/filer/reader_cache.go

@ -40,41 +40,33 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
func (rc *ReaderCache) MaybeCache(fileId string, cipherKey []byte, isGzipped bool, chunkSize int) {
rc.Lock()
defer rc.Unlock()
if _, found := rc.downloaders[fileId]; found {
return
}
func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
if rc.lookupFileIdFn == nil {
return
}
// if too many, delete one of them?
if len(rc.downloaders) >= rc.limit {
oldestFid, oldestTime := "", time.Now()
for fid, downloader := range rc.downloaders {
if !downloader.completedTime.IsZero() {
if downloader.completedTime.Before(oldestTime) {
oldestFid, oldestTime = fid, downloader.completedTime
}
}
rc.Lock()
defer rc.Unlock()
for _, chunkView := range chunkViews {
if _, found := rc.downloaders[chunkView.FileId]; found {
continue
}
if oldestFid != "" {
oldDownloader := rc.downloaders[oldestFid]
delete(rc.downloaders, oldestFid)
oldDownloader.destroy()
} else {
if len(rc.downloaders) >= rc.limit {
// if still no slots, return
return
}
}
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, false)
// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
// cache this chunk if not yet
cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
cacher.wg.Add(1)
go cacher.startCaching()
cacher.wg.Wait()
rc.downloaders[fileId] = cacher
rc.downloaders[chunkView.FileId] = cacher
}
return
}
@ -108,6 +100,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
}
}
// glog.V(4).Infof("cache1 %s", fileId)
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
cacher.wg.Add(1)
go cacher.startCaching()
@ -117,6 +111,16 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
return cacher.readChunkAt(buffer, offset)
}
func (rc *ReaderCache) UnCache(fileId string) {
rc.Lock()
defer rc.Unlock()
// glog.V(4).Infof("uncache %s", fileId)
if downloader, found := rc.downloaders[fileId]; found {
downloader.destroy()
delete(rc.downloaders, fileId)
}
}
func (rc *ReaderCache) destroy() {
rc.Lock()
defer rc.Unlock()

Loading…
Cancel
Save