From 0388d421d2a50ce4c3368457ab850a982744ea7b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 30 Dec 2018 00:51:44 -0800 Subject: [PATCH] caching visible intervals for read speeds up 4x in single thread mode speeds up 30% in 32 threads mode --- weed/filer2/filechunks.go | 26 ++++++++++++++++---------- weed/filer2/filechunks_test.go | 22 +++++++++++----------- weed/filesys/dir.go | 13 +++++++------ weed/filesys/file.go | 18 +++++++++++------- weed/filesys/filehandle.go | 12 +++++++++--- 5 files changed, 54 insertions(+), 37 deletions(-) diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 39e43cf3c..1cdd4d852 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -33,7 +33,7 @@ func ETag(chunks []*filer_pb.FileChunk) (etag string) { func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles := nonOverlappingVisibleIntervals(chunks) + visibles := NonOverlappingVisibleIntervals(chunks) fileIds := make(map[string]bool) for _, interval := range visibles { @@ -75,7 +75,13 @@ type ChunkView struct { func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { - visibles := nonOverlappingVisibleIntervals(chunks) + visibles := NonOverlappingVisibleIntervals(chunks) + + return ViewFromVisibleIntervals(visibles, offset, size) + +} + +func ViewFromVisibleIntervals(visibles []*VisibleInterval, offset int64, size int) (views []*ChunkView) { stop := offset + int64(size) @@ -97,7 +103,7 @@ func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views } -func logPrintf(name string, visibles []*visibleInterval) { +func logPrintf(name string, visibles []*VisibleInterval) { /* log.Printf("%s len %d", name, len(visibles)) for _, v := range visibles { @@ -108,11 +114,11 @@ func logPrintf(name string, visibles []*visibleInterval) { var bufPool = sync.Pool{ New: func() interface{} { - return new(visibleInterval) + return new(VisibleInterval) }, } -func mergeIntoVisibles(visibles, newVisibles []*visibleInterval, chunk *filer_pb.FileChunk) []*visibleInterval { +func mergeIntoVisibles(visibles, newVisibles []*VisibleInterval, chunk *filer_pb.FileChunk) []*VisibleInterval { newV := newVisibleInterval( chunk.Offset, @@ -173,13 +179,13 @@ func mergeIntoVisibles(visibles, newVisibles []*visibleInterval, chunk *filer_pb return newVisibles } -func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) { +func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*VisibleInterval) { sort.Slice(chunks, func(i, j int) bool { return chunks[i].Mtime < chunks[j].Mtime }) - var newVislbles []*visibleInterval + var newVislbles []*VisibleInterval for _, chunk := range chunks { newVislbles = mergeIntoVisibles(visibles, newVislbles, chunk) t := visibles[:0] @@ -196,7 +202,7 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v // find non-overlapping visible intervals // visible interval map to one file chunk -type visibleInterval struct { +type VisibleInterval struct { start int64 stop int64 modifiedTime int64 @@ -204,8 +210,8 @@ type visibleInterval struct { isFullChunk bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) *visibleInterval { - return &visibleInterval{ +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) *VisibleInterval { + return &VisibleInterval{ start: start, stop: stop, fileId: fileId, diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index c2dd18515..e75e60753 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -63,7 +63,7 @@ func TestIntervalMerging(t *testing.T) { testcases := []struct { Chunks []*filer_pb.FileChunk - Expected []*visibleInterval + Expected []*VisibleInterval }{ // case 0: normal { @@ -72,7 +72,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 100, fileId: "abc"}, {start: 100, stop: 200, fileId: "asdf"}, {start: 200, stop: 300, fileId: "fsad"}, @@ -84,7 +84,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 200, fileId: "asdf"}, }, }, @@ -94,7 +94,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 50, fileId: "asdf"}, {start: 50, stop: 100, fileId: "abc"}, }, @@ -106,7 +106,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 50, fileId: "asdf"}, {start: 50, stop: 300, fileId: "xxxx"}, }, @@ -118,7 +118,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 200, fileId: "asdf"}, {start: 250, stop: 500, fileId: "xxxx"}, }, @@ -131,7 +131,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 200, fileId: "asdf"}, {start: 200, stop: 220, fileId: "abc"}, }, @@ -143,7 +143,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 100, fileId: "abc"}, }, }, @@ -157,7 +157,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160}, {Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 2097152, fileId: "3,029565bf3092"}, {start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"}, {start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"}, @@ -174,7 +174,7 @@ func TestIntervalMerging(t *testing.T) { {Offset: 208896, Size: 339968 - 208896, FileId: "2,0b4031a72689", Mtime: 150}, {Offset: 339968, Size: 471040 - 339968, FileId: "3,0b416a557362", Mtime: 160}, }, - Expected: []*visibleInterval{ + Expected: []*VisibleInterval{ {start: 0, stop: 77824, fileId: "4,0b3df938e301"}, {start: 77824, stop: 208896, fileId: "4,0b3f0c7202f0"}, {start: 208896, stop: 339968, fileId: "2,0b4031a72689"}, @@ -186,7 +186,7 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals := nonOverlappingVisibleIntervals(testcase.Chunks) + intervals := NonOverlappingVisibleIntervals(testcase.Chunks) for x, interval := range intervals { log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", i, x, interval.start, interval.stop, interval.fileId) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 810916967..267bd44a1 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -7,11 +7,11 @@ import ( "path/filepath" "time" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" ) type Dir struct { @@ -101,10 +101,11 @@ func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File { return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, + entryViewCache: nil, } } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index d0f94a21b..a9e763a1b 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -1,12 +1,12 @@ package filesys import ( - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" "context" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" "os" "path/filepath" "time" @@ -20,11 +20,12 @@ var _ = fs.NodeFsyncer(&File{}) var _ = fs.NodeSetattrer(&File{}) type File struct { - Name string - dir *Dir - wfs *WFS - entry *filer_pb.Entry - isOpen bool + Name string + dir *Dir + wfs *WFS + entry *filer_pb.Entry + entryViewCache []*filer2.VisibleInterval + isOpen bool } func (file *File) fullpath() string { @@ -82,6 +83,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if req.Size == 0 { // fmt.Printf("truncate %v \n", fullPath) file.entry.Chunks = nil + file.entryViewCache = nil } file.entry.Attributes.FileSize = req.Size } @@ -138,6 +140,7 @@ func (file *File) maybeLoadAttributes(ctx context.Context) error { if item != nil && !item.Expired() { entry := item.Value().(*filer_pb.Entry) file.entry = entry + file.entryViewCache = nil // glog.V(1).Infof("file attr read cached %v attributes", file.Name) } else { err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { @@ -154,6 +157,7 @@ func (file *File) maybeLoadAttributes(ctx context.Context) error { } file.entry = resp.Entry + file.entryViewCache = nil glog.V(3).Infof("file attr %v %+v: %d", file.fullpath(), file.entry.Attributes, filer2.TotalSize(file.entry.Chunks)) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index a11fb06c6..8e1d6fadd 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -1,14 +1,14 @@ package filesys import ( - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" "net/http" "strings" "sync" @@ -58,7 +58,11 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus buff := make([]byte, req.Size) - chunkViews := filer2.ViewFromChunks(fh.f.entry.Chunks, req.Offset, req.Size) + if fh.f.entryViewCache == nil { + fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + } + + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size) var vids []string for _, chunkView := range chunkViews { @@ -154,6 +158,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f for _, chunk := range chunks { fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk) + fh.f.entryViewCache = nil glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) fh.dirtyMetadata = true } @@ -188,6 +193,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { } if chunk != nil { fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk) + fh.f.entryViewCache = nil } if !fh.dirtyMetadata {