Browse Source

use one readerCache for the whole file

pull/4142/head
chrislu 2 years ago
parent
commit
bfe5d910c6
  1. 8
      weed/filer/filechunk_group.go
  2. 2
      weed/filer/filechunk_section.go
  3. 5
      weed/filer/reader_at.go
  4. 10
      weed/filer/reader_at_test.go
  5. 2
      weed/filer/reader_cache.go
  6. 5
      weed/mount/page_writer/upload_pipeline.go
  7. 9
      weed/server/webdav_server.go

8
weed/filer/filechunk_group.go

@ -12,13 +12,15 @@ type ChunkGroup struct {
chunkCache chunk_cache.ChunkCache chunkCache chunk_cache.ChunkCache
sections map[SectionIndex]*FileChunkSection sections map[SectionIndex]*FileChunkSection
sectionsLock sync.RWMutex sectionsLock sync.RWMutex
readerCache *ReaderCache
} }
func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
group := &ChunkGroup{ group := &ChunkGroup{
lookupFn: lookupFn,
chunkCache: chunkCache,
sections: make(map[SectionIndex]*FileChunkSection),
lookupFn: lookupFn,
chunkCache: chunkCache,
sections: make(map[SectionIndex]*FileChunkSection),
readerCache: NewReaderCache(32, chunkCache, lookupFn),
} }
err := group.SetChunks(chunks) err := group.SetChunks(chunks)

2
weed/filer/filechunk_section.go

@ -74,7 +74,7 @@ func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64)
} }
if section.reader == nil { if section.reader == nil {
section.reader = NewChunkReaderAtFromClient(group.lookupFn, section.chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
section.reader = NewChunkReaderAtFromClient(group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
} }
section.reader.fileSize = fileSize section.reader.fileSize = fileSize
} }

5
weed/filer/reader_at.go

@ -10,7 +10,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/wdclient"
) )
@ -88,12 +87,12 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
} }
} }
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews *IntervalList[*ChunkView], chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
func NewChunkReaderAtFromClient(readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt {
return &ChunkReadAt{ return &ChunkReadAt{
chunkViews: chunkViews, chunkViews: chunkViews,
fileSize: fileSize, fileSize: fileSize,
readerCache: newReaderCache(32, chunkCache, lookupFn),
readerCache: readerCache,
readerPattern: NewReaderPattern(), readerPattern: NewReaderPattern(),
} }
} }

10
weed/filer/reader_at_test.go

@ -68,7 +68,7 @@ func TestReaderAt(t *testing.T) {
readerAt := &ChunkReadAt{ readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
fileSize: 10, fileSize: 10,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerCache: NewReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(), readerPattern: NewReaderPattern(),
} }
@ -115,7 +115,7 @@ func TestReaderAt0(t *testing.T) {
readerAt := &ChunkReadAt{ readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
fileSize: 10, fileSize: 10,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerCache: NewReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(), readerPattern: NewReaderPattern(),
} }
@ -141,7 +141,7 @@ func TestReaderAt1(t *testing.T) {
readerAt := &ChunkReadAt{ readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
fileSize: 20, fileSize: 20,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerCache: NewReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(), readerPattern: NewReaderPattern(),
} }
@ -174,7 +174,7 @@ func TestReaderAtGappedChunksDoNotLeak(t *testing.T) {
readerAt := &ChunkReadAt{ readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
fileSize: 9, fileSize: 9,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerCache: NewReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(), readerPattern: NewReaderPattern(),
} }
@ -186,7 +186,7 @@ func TestReaderAtSparseFileDoesNotLeak(t *testing.T) {
readerAt := &ChunkReadAt{ readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(NewIntervalList[*VisibleInterval](), 0, math.MaxInt64), chunkViews: ViewFromVisibleIntervals(NewIntervalList[*VisibleInterval](), 0, math.MaxInt64),
fileSize: 3, fileSize: 3,
readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerCache: NewReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(), readerPattern: NewReaderPattern(),
} }

2
weed/filer/reader_cache.go

@ -34,7 +34,7 @@ type SingleChunkCacher struct {
completedTimeNew int64 completedTimeNew int64
} }
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
return &ReaderCache{ return &ReaderCache{
limit: limit, limit: limit,
chunkCache: chunkCache, chunkCache: chunkCache,

5
weed/mount/page_writer/upload_pipeline.go

@ -92,14 +92,17 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN
} }
*/ */
up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
} }
// fmt.Printf("isSequential:%v len(up.writableChunks):%v memChunkCounter:%v", isSequential, len(up.writableChunks), memChunkCounter)
if isSequential && if isSequential &&
len(up.writableChunks) < up.writableChunkLimit && len(up.writableChunks) < up.writableChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
// fmt.Printf(" create mem chunk %d\n", logicChunkIndex)
} else { } else {
pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex) pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex)
// fmt.Printf(" create file chunk %d\n", logicChunkIndex)
} }
up.writableChunks[logicChunkIndex] = pageChunk up.writableChunks[logicChunkIndex] = pageChunk
} }

9
weed/server/webdav_server.go

@ -83,6 +83,7 @@ type WebDavFileSystem struct {
secret security.SigningKey secret security.SigningKey
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
chunkCache *chunk_cache.TieredChunkCache chunkCache *chunk_cache.TieredChunkCache
readerCache *filer.ReaderCache
signature int32 signature int32
} }
@ -119,11 +120,13 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
os.MkdirAll(cacheDir, os.FileMode(0755)) os.MkdirAll(cacheDir, os.FileMode(0755))
chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
return &WebDavFileSystem{
t := &WebDavFileSystem{
option: option, option: option,
chunkCache: chunkCache, chunkCache: chunkCache,
signature: util.RandomInt32(), signature: util.RandomInt32(),
}, nil
}
t.readerCache = filer.NewReaderCache(32, chunkCache, filer.LookupFn(t))
return t, nil
} }
var _ = filer_pb.FilerClient(&WebDavFileSystem{}) var _ = filer_pb.FilerClient(&WebDavFileSystem{})
@ -527,7 +530,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
} }
if f.reader == nil { if f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize) chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(f.fs.readerCache, chunkViews, fileSize)
} }
readSize, err = f.reader.ReadAt(p, f.off) readSize, err = f.reader.ReadAt(p, f.off)

Loading…
Cancel
Save