From 551d00d51a14438abc3e182166a4850ec1104a44 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 26 Feb 2022 23:20:45 -0800 Subject: [PATCH] prefetch other chunks when stream reading --- weed/filer/reader_at.go | 13 +++++---- weed/filer/reader_cache.go | 56 ++++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index a38a0bfd5..8ee627a21 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -21,6 +21,7 @@ type ChunkReadAt struct { fileSize int64 readerCache *ReaderCache readerPattern *ReaderPattern + lastChunkFid string } var _ = io.ReaderAt(&ChunkReadAt{}) @@ -85,7 +86,7 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun return &ChunkReadAt{ chunkViews: chunkViews, fileSize: fileSize, - readerCache: newReaderCache(5, chunkCache, lookupFn), + readerCache: newReaderCache(32, chunkCache, lookupFn), readerPattern: NewReaderPattern(), } } @@ -167,12 +168,12 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next } n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0) - for i, nextChunk := range nextChunkViews { - if i < 2 { - c.readerCache.MaybeCache(nextChunk.FileId, nextChunk.CipherKey, nextChunk.IsGzipped, int(nextChunk.ChunkSize)) - } else { - break + if c.lastChunkFid != "" && c.lastChunkFid != chunkView.FileId { + if chunkView.Offset == 0 { // start of a new chunk + c.readerCache.UnCache(c.lastChunkFid) + c.readerCache.MaybeCache(nextChunkViews) } } + c.lastChunkFid = chunkView.FileId return } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 1a0dc6a31..4f2c52303 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -40,41 +40,33 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn } } -func (rc *ReaderCache) MaybeCache(fileId string, cipherKey []byte, isGzipped bool, chunkSize int) { - rc.Lock() - defer rc.Unlock() - if _, found := rc.downloaders[fileId]; found { - return - } +func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { if rc.lookupFileIdFn == nil { return } - // if too many, delete one of them? - if len(rc.downloaders) >= rc.limit { - oldestFid, oldestTime := "", time.Now() - for fid, downloader := range rc.downloaders { - if !downloader.completedTime.IsZero() { - if downloader.completedTime.Before(oldestTime) { - oldestFid, oldestTime = fid, downloader.completedTime - } - } + rc.Lock() + defer rc.Unlock() + + for _, chunkView := range chunkViews { + if _, found := rc.downloaders[chunkView.FileId]; found { + continue } - if oldestFid != "" { - oldDownloader := rc.downloaders[oldestFid] - delete(rc.downloaders, oldestFid) - oldDownloader.destroy() - } else { + + if len(rc.downloaders) >= rc.limit { // if still no slots, return return } - } - cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, false) - cacher.wg.Add(1) - go cacher.startCaching() - cacher.wg.Wait() - rc.downloaders[fileId] = cacher + // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset) + // cache this chunk if not yet + cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false) + cacher.wg.Add(1) + go cacher.startCaching() + cacher.wg.Wait() + rc.downloaders[chunkView.FileId] = cacher + + } return } @@ -108,6 +100,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt } } + // glog.V(4).Infof("cache1 %s", fileId) + cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache) cacher.wg.Add(1) go cacher.startCaching() @@ -117,6 +111,16 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt return cacher.readChunkAt(buffer, offset) } +func (rc *ReaderCache) UnCache(fileId string) { + rc.Lock() + defer rc.Unlock() + // glog.V(4).Infof("uncache %s", fileId) + if downloader, found := rc.downloaders[fileId]; found { + downloader.destroy() + delete(rc.downloaders, fileId) + } +} + func (rc *ReaderCache) destroy() { rc.Lock() defer rc.Unlock()