diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index d475e6e11..b70942edc 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -199,7 +199,7 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) } - n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.ViewOffset == 0) + n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), (uint64(chunkView.ViewOffset)+chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()) if c.lastChunkFid != chunkView.FileId { if chunkView.OffsetInChunk == 0 { // start of a new chunk if c.lastChunkFid != "" { diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go index 8bc383184..0d95d1aad 100644 --- a/weed/filer/reader_at_test.go +++ b/weed/filer/reader_at_test.go @@ -31,6 +31,14 @@ func (m *mockChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) func (m *mockChunkCache) SetChunk(fileId string, data []byte) { } +func (m *mockChunkCache) GetMaxFilePartSizeInCache() (uint64) { + return 0 +} + +func (m *mockChunkCache) IsInCache(fileId string, lockNeeded bool) (answer bool) { + return false +} + func TestReaderAt(t *testing.T) { visibles := NewIntervalList[*VisibleInterval]() diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index a3df8e0bd..716e796c9 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/mem" @@ -61,6 +62,10 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { if _, found := rc.downloaders[chunkView.FileId]; found { continue } + if rc.chunkCache.IsInCache(chunkView.FileId, true) { + glog.V(4).Infof("%s is in cache", chunkView.FileId) + continue + } if len(rc.downloaders) >= rc.limit { // abort when slots are filled @@ -69,7 +74,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset) // cache this chunk if not yet - cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), chunkView.ViewOffset == 0) + cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), (uint64(chunkView.ViewOffset)+chunkView.ChunkSize) <= rc.chunkCache.GetMaxFilePartSizeInCache()) go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[chunkView.FileId] = cacher diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 866455a24..37dde1950 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -13,6 +13,8 @@ var ErrorOutOfBounds = errors.New("attempt to read out of bounds") type ChunkCache interface { ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) SetChunk(fileId string, data []byte) + IsInCache(fileId string, lockNeeded bool) (answer bool) + GetMaxFilePartSizeInCache() (answer uint64) } // a global cache for recently accessed file chunks @@ -23,6 +25,7 @@ type TieredChunkCache struct { onDiskCacheSizeLimit0 uint64 onDiskCacheSizeLimit1 uint64 onDiskCacheSizeLimit2 uint64 + maxFilePartSizeInCache uint64 } var _ ChunkCache = &TieredChunkCache{} @@ -39,10 +42,49 @@ func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, uni c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2) c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3) c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2) + c.maxFilePartSizeInCache = uint64(unitSize*diskSizeInUnit)/4 return c } +func (c *TieredChunkCache) GetMaxFilePartSizeInCache() (answer uint64) { + return c.maxFilePartSizeInCache +} + +func (c *TieredChunkCache) IsInCache(fileId string, lockNeeded bool) (answer bool) { + if c == nil { + return false + } + + if lockNeeded { + c.RLock() + defer c.RUnlock() + } + + item := c.memCache.cache.Get(fileId) + if item != nil { + glog.V(4).Infof("fileId %s is in memcache", fileId) + return true + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.V(4).Infof("failed to parse file id %s", fileId) + return false + } + + for i, diskCacheLayer := range c.diskCaches { + for k, v := range diskCacheLayer.diskCaches { + _, ok := v.nm.Get(fid.Key) + if ok { + glog.V(4).Infof("fileId %s is in diskCaches[%d].volume[%d]", fileId, i, k) + return true + } + } + } + return false +} + func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) { if c == nil { return 0, nil @@ -99,6 +141,10 @@ func (c *TieredChunkCache) SetChunk(fileId string, data []byte) { defer c.Unlock() glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data)) + if c.IsInCache(fileId, false) { + glog.V(4).Infof("fileId %s is already in cache", fileId) + return + } c.doSetChunk(fileId, data) }