Browse Source

track write and read by timestamp

pull/4089/head
chrislu 2 years ago
parent
commit
7398b046ce
  1. 61
      weed/mount/page_writer/chunk_interval_list.go
  2. 40
      weed/mount/page_writer/chunk_interval_list_test.go
  3. 6
      weed/mount/page_writer/page_chunk_mem.go
  4. 8
      weed/mount/page_writer/page_chunk_swapfile.go

61
weed/mount/page_writer/chunk_interval_list.go

@ -8,6 +8,7 @@ import (
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
TsNs int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
@ -42,10 +43,11 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
return list
}
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset, tsNs int64) {
interval := &ChunkWrittenInterval{
StartOffset: startOffset,
stopOffset: stopOffset,
TsNs: tsNs,
}
list.addInterval(interval)
}
@ -63,49 +65,34 @@ func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
p := list.head
for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
for ; p.next != nil && p.next.stopOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
for ; q.prev != nil && q.prev.StartOffset >= interval.stopOffset; q = q.prev {
}
if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together
p.stopOffset = q.stopOffset
unlinkNodesBetween(p, q.next)
return
// left side
// interval after p.next start
if p.next.StartOffset < interval.StartOffset {
p.next.stopOffset = interval.StartOffset
p.next.next = interval
interval.prev = p.next
} else {
p.next = interval
interval.prev = p
}
if interval.StartOffset <= p.stopOffset {
// merge new interval into p
p.stopOffset = interval.stopOffset
unlinkNodesBetween(p, q)
return
}
if q.StartOffset <= interval.stopOffset {
// merge new interval into q
q.StartOffset = interval.StartOffset
unlinkNodesBetween(p, q)
return
}
// add the new interval between p and q
unlinkNodesBetween(p, q)
p.next = interval
interval.prev = p
q.prev = interval
interval.next = q
}
// unlinkNodesBetween remove all nodes after start and before stop, exclusive
func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
if start.next == stop {
return
// right side
// interval ends before p.prev
if q.prev.stopOffset > interval.stopOffset {
q.prev.StartOffset = interval.stopOffset
q.prev.prev = interval
interval.next = q.prev
} else {
q.prev = interval
interval.next = q
}
start.next.prev = nil
start.next = stop
stop.prev.next = nil
stop.prev = start
}
func (list *ChunkWrittenIntervalList) size() int {

40
weed/mount/page_writer/chunk_interval_list_test.go

@ -10,40 +10,40 @@ func Test_PageChunkWrittenIntervalList(t *testing.T) {
assert.Equal(t, 0, list.size(), "empty list")
list.MarkWritten(0, 5)
list.MarkWritten(0, 5, 1)
assert.Equal(t, 1, list.size(), "one interval")
list.MarkWritten(0, 5)
list.MarkWritten(0, 5, 2)
assert.Equal(t, 1, list.size(), "duplicated interval2")
list.MarkWritten(95, 100)
list.MarkWritten(95, 100, 3)
assert.Equal(t, 2, list.size(), "two intervals")
list.MarkWritten(50, 60)
list.MarkWritten(50, 60, 4)
assert.Equal(t, 3, list.size(), "three intervals")
list.MarkWritten(50, 55)
assert.Equal(t, 3, list.size(), "three intervals merge")
list.MarkWritten(50, 55, 5)
assert.Equal(t, 4, list.size(), "three intervals merge")
list.MarkWritten(40, 50)
assert.Equal(t, 3, list.size(), "three intervals grow forward")
list.MarkWritten(40, 50, 6)
assert.Equal(t, 5, list.size(), "three intervals grow forward")
list.MarkWritten(50, 65)
assert.Equal(t, 3, list.size(), "three intervals grow backward")
list.MarkWritten(50, 65, 7)
assert.Equal(t, 4, list.size(), "three intervals grow backward")
list.MarkWritten(70, 80)
assert.Equal(t, 4, list.size(), "four intervals")
list.MarkWritten(70, 80, 8)
assert.Equal(t, 5, list.size(), "four intervals")
list.MarkWritten(60, 70)
assert.Equal(t, 3, list.size(), "three intervals merged")
list.MarkWritten(60, 70, 9)
assert.Equal(t, 6, list.size(), "three intervals merged")
list.MarkWritten(59, 71)
assert.Equal(t, 3, list.size(), "covered three intervals")
list.MarkWritten(59, 71, 10)
assert.Equal(t, 6, list.size(), "covered three intervals")
list.MarkWritten(5, 59)
assert.Equal(t, 2, list.size(), "covered two intervals")
list.MarkWritten(5, 59, 11)
assert.Equal(t, 5, list.size(), "covered two intervals")
list.MarkWritten(70, 99)
assert.Equal(t, 1, list.size(), "covered one intervals")
list.MarkWritten(70, 99, 12)
assert.Equal(t, 5, list.size(), "covered one intervals")
}

6
weed/mount/page_writer/page_chunk_mem.go

@ -51,7 +51,7 @@ func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src)
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
return
}
@ -64,11 +64,11 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
if mc.lastModifiedTsNs > tsNs {
if t.TsNs >= tsNs {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
} else {
println("read old data1", tsNs-mc.lastModifiedTsNs, "ns")
println("read old data1", tsNs-t.TsNs, "ns")
}
}
}

8
weed/mount/page_writer/page_chunk_swapfile.go

@ -101,7 +101,7 @@ func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n in
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
if err == nil {
sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
} else {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
@ -117,7 +117,7 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
if sc.lastModifiedTsNs > tsNs {
if t.TsNs >= tsNs {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
@ -125,7 +125,7 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in
}
maxStop = max(maxStop, logicStop)
} else {
println("read old data2", tsNs-sc.lastModifiedTsNs, "ns")
println("read old data2", tsNs-t.TsNs, "ns")
}
}
}
@ -153,7 +153,7 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
data := mem.Allocate(int(t.Size()))
sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
reader := util.NewBytesReader(data)
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), sc.lastModifiedTsNs, func() {
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), t.TsNs, func() {
})
mem.Free(data)
}

Loading…
Cancel
Save