From cc66e25cd28063ba8497ea3cf7eca084b60a661b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 20 May 2018 17:06:09 -0700 Subject: [PATCH] merge intervals --- weed/filer2/filechunks.go | 356 ++++++++++++++++++++++++++++++- weed/filer2/filechunks_test.go | 112 ++++++++++ weed/server/filer_grpc_server.go | 5 +- 3 files changed, 460 insertions(+), 13 deletions(-) create mode 100644 weed/filer2/filechunks_test.go diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 9ddbe236e..257aba548 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -1,10 +1,13 @@ package filer2 -import "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "sort" + "log" + "math" +) -type Chunks []*filer_pb.FileChunk - -func (chunks Chunks) TotalSize() (size uint64) { +func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { for _, c := range chunks { t := uint64(c.Offset + int64(c.Size)) if size < t { @@ -14,12 +17,345 @@ func (chunks Chunks) TotalSize() (size uint64) { return } -func (chunks Chunks) Len() int { - return len(chunks) +func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { + return } -func (chunks Chunks) Swap(i, j int) { - chunks[i], chunks[j] = chunks[j], chunks[i] + +func mergeToVisibleIntervals(visibles []*visibleInterval, chunk *filer_pb.FileChunk) (merged []*visibleInterval) { + if len(visibles) == 0 { + return []*visibleInterval{newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId, chunk.Mtime)} + } + + log.Printf("merge chunk %+v => %d", chunk, len(visibles)) + for _, v := range visibles { + log.Printf("=> %+v", v) + } + + var nonOverlappingStop int + + // find merge candidates + var mergeCandidates []int + for t := len(visibles) - 1; t >= 0; t-- { + if visibles[t].stop > chunk.Offset { + mergeCandidates = append(mergeCandidates, t) + } else { + nonOverlappingStop = t + break + } + } + log.Printf("merged candidates: %+v, starting from %d", mergeCandidates, nonOverlappingStop) + + if len(mergeCandidates) == 0 { + merged = append(visibles, newVisibleInterval( + chunk.Offset, + chunk.Offset+int64(chunk.Size), + chunk.FileId, + chunk.Mtime, + )) + return + } + + // reverse merge candidates + i, j := 0, len(mergeCandidates)-1 + for i < j { + mergeCandidates[i], mergeCandidates[j] = mergeCandidates[j], mergeCandidates[i] + i++ + j-- + } + log.Printf("reversed merged candidates: %+v", mergeCandidates) + + // add chunk into a possibly connected intervals + var overlappingIntevals []*visibleInterval + for i = 0; i < len(mergeCandidates); i++ { + interval := visibles[mergeCandidates[i]] + if interval.modifiedTime >= chunk.Mtime { + log.Printf("overlappingIntevals add existing interval: [%d,%d)", interval.start, interval.stop) + overlappingIntevals = append(overlappingIntevals, interval) + } else { + start := max(interval.start, chunk.Offset) + stop := min(interval.stop, chunk.Offset+int64(chunk.Size)) + if interval.start <= chunk.Offset { + if interval.start < start { + log.Printf("overlappingIntevals add 1: [%d,%d)", interval.start, start) + overlappingIntevals = append(overlappingIntevals, newVisibleInterval( + interval.start, + start, + interval.fileId, + interval.modifiedTime, + )) + } + log.Printf("overlappingIntevals add 2: [%d,%d)", start, stop) + overlappingIntevals = append(overlappingIntevals, newVisibleInterval( + start, + stop, + chunk.FileId, + chunk.Mtime, + )) + if interval.stop < stop { + log.Printf("overlappingIntevals add 3: [%d,%d)", interval.stop, stop) + overlappingIntevals = append(overlappingIntevals, newVisibleInterval( + interval.stop, + stop, + interval.fileId, + interval.modifiedTime, + )) + } + } + } + } + logPrintf("overlappingIntevals", overlappingIntevals) + + // merge connected intervals + merged = visibles[:nonOverlappingStop] + var lastInterval *visibleInterval + var prevIntervalIndex int + for i, interval := range overlappingIntevals { + if i == 0 { + prevIntervalIndex = i + continue + } + if overlappingIntevals[prevIntervalIndex].fileId != interval.fileId { + merged = append(merged, newVisibleInterval( + overlappingIntevals[prevIntervalIndex].start, + interval.start, + overlappingIntevals[prevIntervalIndex].fileId, + overlappingIntevals[prevIntervalIndex].modifiedTime, + )) + prevIntervalIndex = i + } + } + + if lastInterval != nil { + merged = append(merged, newVisibleInterval( + overlappingIntevals[prevIntervalIndex].start, + lastInterval.start, + overlappingIntevals[prevIntervalIndex].fileId, + overlappingIntevals[prevIntervalIndex].modifiedTime, + )) + } + + logPrintf("merged", merged) + + return +} + +func logPrintf(name string, visibles []*visibleInterval) { + log.Printf("%s len %d", name, len(visibles)) + for _, v := range visibles { + log.Printf("%s: => %+v", name, v) + } } -func (chunks Chunks) Less(i, j int) bool { - return chunks[i].Offset < chunks[j].Offset + +func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) { + + sort.Slice(chunks, func(i, j int) bool { + if chunks[i].Offset < chunks[j].Offset { + return true + } + if chunks[i].Offset == chunks[j].Offset { + return chunks[i].Mtime < chunks[j].Mtime + } + return false + }) + + if len(chunks) == 0 { + return + } + + var parallelIntervals, intervals []*visibleInterval + var minStopInterval, upToDateInterval *visibleInterval + watermarkStart := chunks[0].Offset + for _, chunk := range chunks { + log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) + logPrintf("parallelIntervals", parallelIntervals) + for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset { + logPrintf("parallelIntervals loop 1", parallelIntervals) + logPrintf("parallelIntervals loop 1 intervals", intervals) + minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals) + nextStop := min(minStopInterval.stop, chunk.Offset) + intervals = append(intervals, newVisibleInterval( + max(watermarkStart, minStopInterval.start), + nextStop, + upToDateInterval.fileId, + upToDateInterval.modifiedTime, + )) + watermarkStart = nextStop + logPrintf("parallelIntervals loop intervals =>", intervals) + + // remove processed intervals, possibly multiple + var remaining []*visibleInterval + for _, interval := range parallelIntervals { + if interval.stop != watermarkStart { + remaining = append(remaining, newVisibleInterval( + interval.start, + interval.stop, + interval.fileId, + interval.modifiedTime, + )) + } + } + parallelIntervals = remaining + logPrintf("parallelIntervals loop 2", parallelIntervals) + logPrintf("parallelIntervals loop 2 intervals", intervals) + } + parallelIntervals = append(parallelIntervals, newVisibleInterval( + chunk.Offset, + chunk.Offset+int64(chunk.Size), + chunk.FileId, + chunk.Mtime, + )) + } + + logPrintf("parallelIntervals loop 3", parallelIntervals) + logPrintf("parallelIntervals loop 3 intervals", intervals) + for len(parallelIntervals) > 0 { + minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals) + intervals = append(intervals, newVisibleInterval( + max(watermarkStart, minStopInterval.start), + minStopInterval.stop, + upToDateInterval.fileId, + upToDateInterval.modifiedTime, + )) + watermarkStart = minStopInterval.stop + + // remove processed intervals, possibly multiple + var remaining []*visibleInterval + for _, interval := range parallelIntervals { + if interval.stop != watermarkStart { + remaining = append(remaining, newVisibleInterval( + interval.start, + interval.stop, + interval.fileId, + interval.modifiedTime, + )) + } + } + parallelIntervals = remaining + } + logPrintf("parallelIntervals loop 4", parallelIntervals) + logPrintf("intervals", intervals) + + // merge connected intervals, now the intervals are non-intersecting + var lastInterval *visibleInterval + var prevIntervalIndex int + for i, interval := range intervals { + if i == 0 { + prevIntervalIndex = i + continue + } + if intervals[i-1].fileId != interval.fileId || + intervals[i-1].stop < intervals[i].start { + visibles = append(visibles, newVisibleInterval( + intervals[prevIntervalIndex].start, + intervals[i-1].stop, + intervals[prevIntervalIndex].fileId, + intervals[prevIntervalIndex].modifiedTime, + )) + prevIntervalIndex = i + } + lastInterval = intervals[i] + logPrintf("intervals loop 1 visibles", visibles) + } + + if lastInterval != nil { + visibles = append(visibles, newVisibleInterval( + intervals[prevIntervalIndex].start, + lastInterval.stop, + intervals[prevIntervalIndex].fileId, + intervals[prevIntervalIndex].modifiedTime, + )) + } + + logPrintf("visibles", visibles) + + return +} + +func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) { + var latestMtime int64 + latestIntervalIndex := 0 + minStop := int64(math.MaxInt64) + minIntervalIndex := 0 + for i, interval := range intervals { + if minStop > interval.stop { + minIntervalIndex = i + minStop = interval.stop + } + if latestMtime < interval.modifiedTime { + latestMtime = interval.modifiedTime + latestIntervalIndex = i + } + } + minStopInterval = intervals[minIntervalIndex] + upToDateInterval = intervals[latestIntervalIndex] + return +} + +func nonOverlappingVisibleIntervals0(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) { + + sort.Slice(chunks, func(i, j int) bool { + if chunks[i].Offset < chunks[j].Offset { + return true + } + if chunks[i].Offset == chunks[j].Offset { + return chunks[i].Mtime < chunks[j].Mtime + } + return false + }) + + for _, c := range chunks { + visibles = mergeToVisibleIntervals(visibles, c) + } + + return +} + +// find non-overlapping visible intervals +// visible interval map to one file chunk + +type visibleInterval struct { + start int64 + stop int64 + modifiedTime int64 + fileId string +} + +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval { + return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime} +} + +type stackOfChunkIds struct { + ids []int +} + +func (s *stackOfChunkIds) isEmpty() bool { + return len(s.ids) == 0 +} + +func (s *stackOfChunkIds) pop() int { + t := s.ids[len(s.ids)-1] + s.ids = s.ids[:len(s.ids)-1] + return t +} + +func (s *stackOfChunkIds) push(x int) { + s.ids = append(s.ids, x) +} + +func (s *stackOfChunkIds) peek() int { + return s.ids[len(s.ids)-1] +} + +func min(x, y int64) int64 { + if x <= y { + return x + } + return y +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y } diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go new file mode 100644 index 000000000..e5cb8810f --- /dev/null +++ b/weed/filer2/filechunks_test.go @@ -0,0 +1,112 @@ +package filer2 + +import ( + "testing" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "log" +) + +func TestIntervalMerging(t *testing.T) { + + testcases := []struct { + Chunks []*filer_pb.FileChunk + Expected []*visibleInterval + }{ + // case 0: normal + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, + {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, + }, + Expected: []*visibleInterval{ + {start: 0, stop: 100, fileId: "abc"}, + {start: 100, stop: 200, fileId: "asdf"}, + {start: 200, stop: 300, fileId: "fsad"}, + }, + }, + // case 1: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + }, + Expected: []*visibleInterval{ + {start: 0, stop: 200, fileId: "asdf"}, + }, + }, + // case 2: updates overwrite part of previous chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, + }, + Expected: []*visibleInterval{ + {start: 0, stop: 50, fileId: "asdf"}, + {start: 50, stop: 100, fileId: "abc"}, + }, + }, + // case 3: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, + }, + Expected: []*visibleInterval{ + {start: 0, stop: 50, fileId: "asdf"}, + {start: 50, stop: 300, fileId: "xxxx"}, + }, + }, + // case 4: updates far away from prev chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, + }, + Expected: []*visibleInterval{ + {start: 0, stop: 200, fileId: "asdf"}, + {start: 250, stop: 500, fileId: "xxxx"}, + }, + }, + // case 5: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184}, + {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, + {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, + }, + Expected: []*visibleInterval{ + {start: 0, stop: 200, fileId: "asdf"}, + {start: 200, stop: 220, fileId: "abc"}, + }, + }, + } + + for i, testcase := range testcases { + log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) + intervals := nonOverlappingVisibleIntervals(testcase.Chunks) + for x, interval := range intervals { + log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", + i, x, interval.start, interval.stop, interval.fileId) + if interval.start != testcase.Expected[x].start { + t.Fatalf("failed on test case %d, interval %d, start %d, expect %d", + i, x, interval.start, testcase.Expected[x].start) + } + if interval.stop != testcase.Expected[x].stop { + t.Fatalf("failed on test case %d, interval %d, stop %d, expect %d", + i, x, interval.stop, testcase.Expected[x].stop) + } + if interval.fileId != testcase.Expected[x].fileId { + t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s", + i, x, interval.fileId, testcase.Expected[x].fileId) + } + } + if len(intervals) != len(testcase.Expected) { + t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected)) + } + } + +} diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 46a534ffa..6cc5021d6 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -42,13 +42,12 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie resp := &filer_pb.ListEntriesResponse{} for _, entry := range entries { - glog.V(0).Infof("%s attr=%v size=%d", entry.Name(), entry.Attr, filer2.Chunks(entry.Chunks).TotalSize()) resp.Entries = append(resp.Entries, &filer_pb.Entry{ Name: entry.Name(), IsDirectory: entry.IsDirectory(), Chunks: entry.Chunks, Attributes: &filer_pb.FuseAttributes{ - FileSize: filer2.Chunks(entry.Chunks).TotalSize(), + FileSize: filer2.TotalSize(entry.Chunks), Mtime: entry.Mtime.Unix(), Gid: entry.Gid, Uid: entry.Uid, @@ -71,7 +70,7 @@ func (fs *FilerServer) GetFileAttributes(ctx context.Context, req *filer_pb.GetF if !found { attributes.FileSize = 0 } else { - attributes.FileSize = filer2.Chunks(entry.Chunks).TotalSize() + attributes.FileSize = filer2.TotalSize(entry.Chunks) attributes.FileMode = uint32(entry.Mode) attributes.Uid = entry.Uid attributes.Gid = entry.Gid