diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go index a9d64c8e4..b31c013d9 100644 --- a/weed/mount/page_writer/chunk_interval_list.go +++ b/weed/mount/page_writer/chunk_interval_list.go @@ -8,6 +8,7 @@ import ( type ChunkWrittenInterval struct { StartOffset int64 stopOffset int64 + TsNs int64 prev *ChunkWrittenInterval next *ChunkWrittenInterval } @@ -42,10 +43,11 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList { return list } -func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) { +func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset, tsNs int64) { interval := &ChunkWrittenInterval{ StartOffset: startOffset, stopOffset: stopOffset, + TsNs: tsNs, } list.addInterval(interval) } @@ -63,49 +65,34 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { p := list.head - for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next { + for ; p.next != nil && p.next.stopOffset <= interval.StartOffset; p = p.next { } q := list.tail - for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev { + for ; q.prev != nil && q.prev.StartOffset >= interval.stopOffset; q = q.prev { } - if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { - // merge p and q together - p.stopOffset = q.stopOffset - unlinkNodesBetween(p, q.next) - return + // left side + // interval after p.next start + if p.next.StartOffset < interval.StartOffset { + p.next.stopOffset = interval.StartOffset + p.next.next = interval + interval.prev = p.next + } else { + p.next = interval + interval.prev = p } - if interval.StartOffset <= p.stopOffset { - // merge new interval into p - p.stopOffset = interval.stopOffset - unlinkNodesBetween(p, q) - return - } - if q.StartOffset <= interval.stopOffset { - // merge new interval into q - q.StartOffset = interval.StartOffset - unlinkNodesBetween(p, q) - return - } - - // add the new interval between p and q - unlinkNodesBetween(p, q) - p.next = interval - interval.prev = p - q.prev = interval - interval.next = q -} - -// unlinkNodesBetween remove all nodes after start and before stop, exclusive -func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) { - if start.next == stop { - return + // right side + // interval ends before p.prev + if q.prev.stopOffset > interval.stopOffset { + q.prev.StartOffset = interval.stopOffset + q.prev.prev = interval + interval.next = q.prev + } else { + q.prev = interval + interval.next = q } - start.next.prev = nil - start.next = stop - stop.prev.next = nil - stop.prev = start + } func (list *ChunkWrittenIntervalList) size() int { diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go index b22f5eb5d..aa2183792 100644 --- a/weed/mount/page_writer/chunk_interval_list_test.go +++ b/weed/mount/page_writer/chunk_interval_list_test.go @@ -10,40 +10,40 @@ func Test_PageChunkWrittenIntervalList(t *testing.T) { assert.Equal(t, 0, list.size(), "empty list") - list.MarkWritten(0, 5) + list.MarkWritten(0, 5, 1) assert.Equal(t, 1, list.size(), "one interval") - list.MarkWritten(0, 5) + list.MarkWritten(0, 5, 2) assert.Equal(t, 1, list.size(), "duplicated interval2") - list.MarkWritten(95, 100) + list.MarkWritten(95, 100, 3) assert.Equal(t, 2, list.size(), "two intervals") - list.MarkWritten(50, 60) + list.MarkWritten(50, 60, 4) assert.Equal(t, 3, list.size(), "three intervals") - list.MarkWritten(50, 55) - assert.Equal(t, 3, list.size(), "three intervals merge") + list.MarkWritten(50, 55, 5) + assert.Equal(t, 4, list.size(), "three intervals merge") - list.MarkWritten(40, 50) - assert.Equal(t, 3, list.size(), "three intervals grow forward") + list.MarkWritten(40, 50, 6) + assert.Equal(t, 5, list.size(), "three intervals grow forward") - list.MarkWritten(50, 65) - assert.Equal(t, 3, list.size(), "three intervals grow backward") + list.MarkWritten(50, 65, 7) + assert.Equal(t, 4, list.size(), "three intervals grow backward") - list.MarkWritten(70, 80) - assert.Equal(t, 4, list.size(), "four intervals") + list.MarkWritten(70, 80, 8) + assert.Equal(t, 5, list.size(), "four intervals") - list.MarkWritten(60, 70) - assert.Equal(t, 3, list.size(), "three intervals merged") + list.MarkWritten(60, 70, 9) + assert.Equal(t, 6, list.size(), "three intervals merged") - list.MarkWritten(59, 71) - assert.Equal(t, 3, list.size(), "covered three intervals") + list.MarkWritten(59, 71, 10) + assert.Equal(t, 6, list.size(), "covered three intervals") - list.MarkWritten(5, 59) - assert.Equal(t, 2, list.size(), "covered two intervals") + list.MarkWritten(5, 59, 11) + assert.Equal(t, 5, list.size(), "covered two intervals") - list.MarkWritten(70, 99) - assert.Equal(t, 1, list.size(), "covered one intervals") + list.MarkWritten(70, 99, 12) + assert.Equal(t, 5, list.size(), "covered one intervals") } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index ae0cfbcd8..4f49de298 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -51,7 +51,7 @@ func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) { innerOffset := offset % mc.chunkSize n = copy(mc.buf[innerOffset:], src) - mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) + mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs) return } @@ -64,11 +64,11 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) if logicStart < logicStop { - if mc.lastModifiedTsNs > tsNs { + if t.TsNs >= tsNs { copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) maxStop = max(maxStop, logicStop) } else { - println("read old data1", tsNs-mc.lastModifiedTsNs, "ns") + println("read old data1", tsNs-t.TsNs, "ns") } } } diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index 0a96fc95c..61eb338fc 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -101,7 +101,7 @@ func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n in var err error n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) if err == nil { - sc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) + sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs) } else { glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err) } @@ -117,7 +117,7 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in logicStart := max(off, chunkStartOffset+t.StartOffset) logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) if logicStart < logicStop { - if sc.lastModifiedTsNs > tsNs { + if t.TsNs >= tsNs { actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) @@ -125,7 +125,7 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in } maxStop = max(maxStop, logicStop) } else { - println("read old data2", tsNs-sc.lastModifiedTsNs, "ns") + println("read old data2", tsNs-t.TsNs, "ns") } } } @@ -153,7 +153,7 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { data := mem.Allocate(int(t.Size())) sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) reader := util.NewBytesReader(data) - saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), sc.lastModifiedTsNs, func() { + saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() { }) mem.Free(data) }