diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go index 77fab75ef..ed95783a8 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/dirty_page_interval.go @@ -65,12 +65,61 @@ func (c *ContinuousIntervals) TotalSize() (total int64) { return } +func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { + var nodes []*IntervalNode + for t := list.Head; t != nil; t = t.Next { + nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) + if nodeStart >= nodeStop { + // skip non overlapping IntervalNode + continue + } + nodes = append(nodes, &IntervalNode{ + Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], + Offset: nodeStart, + Size: nodeStop-nodeStart, + Next: nil, + }) + } + for i := 1; i < len(nodes); i++ { + nodes[i-1].Next = nodes[i] + } + return &IntervalLinkedList{ + Head: nodes[0], + Tail: nodes[len(nodes)-1], + } +} + func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { // TODO AddInterval needs to handle all possible out of order writes interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} + var newLists []*IntervalLinkedList + for _, list := range c.lists { + // if list is to the left of new interval, add to the new list + if list.Tail.Offset+list.Tail.Size <= interval.Offset { + newLists = append(newLists, list) + } + // if list is to the right of new interval, add to the new list + if interval.Offset+interval.Size <= list.Head.Offset { + newLists = append(newLists, list) + } + // if new interval overwrite the right part of the list + if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size { + // create a new list of the left part of existing list + newLists = append(newLists, subList(list, list.Offset(), interval.Offset)) + } + // if new interval overwrite the left part of the list + if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size { + // create a new list of the right part of existing list + newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size)) + } + // skip anything that is fully overwritten by the new interval + } + + c.lists = newLists + // add the new interval to the lists, connecting neighbor lists var prevList, nextList *IntervalLinkedList for _, list := range c.lists { @@ -78,10 +127,6 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { nextList = list break } - if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size <= list.Head.Offset+list.Size() { - glog.V(0).Infof("unexpected [%d,%d) overlaps [%d,%d)", interval.Offset, interval.Offset+interval.Size, list.Head.Offset, list.Head.Offset+list.Size()) - break - } } for _, list := range c.lists { @@ -90,19 +135,6 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { prevList = list break } - if list.Head.Offset <= offset && offset < list.Head.Offset+list.Size() { - - // the new interval overwrites the old tail - dataStartIndex := list.Tail.Offset + list.Tail.Size - offset - glog.V(4).Infof("overlap data new [0,%d) same=%v", dataStartIndex, bytes.Compare(interval.Data[0:dataStartIndex], list.Tail.Data[len(list.Tail.Data)-int(dataStartIndex):])) - list.Tail.Data = list.Tail.Data[:len(list.Tail.Data)-int(dataStartIndex)] - list.Tail.Size -= dataStartIndex - glog.V(4).Infof("overlapping append as [%d,%d) dataSize=%d", interval.Offset, interval.Offset+interval.Size, len(interval.Data)) - - list.addNodeToTail(interval) - prevList = list - break - } } if prevList != nil && nextList != nil { diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go index 4f62f90c9..184be2f3b 100644 --- a/weed/filesys/dirty_page_interval_test.go +++ b/weed/filesys/dirty_page_interval_test.go @@ -5,7 +5,7 @@ import ( "testing" ) -func TestContinuousIntervals_AddInterval(t *testing.T) { +func TestContinuousIntervals_AddIntervalAppend(t *testing.T) { c := &ContinuousIntervals{} @@ -15,6 +15,38 @@ func TestContinuousIntervals_AddInterval(t *testing.T) { c.AddInterval(getBytes(23, 4), 2) expectedData(t, c, 0, 25, 25, 23, 23, 23, 23) + +} + +func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) { + + c := &ContinuousIntervals{} + + // 25, 25, 25, 25, 25 + c.AddInterval(getBytes(25, 5), 0) + // _, _, 23, 23 + c.AddInterval(getBytes(23, 2), 2) + + expectedData(t, c, 0, 25, 25, 23, 23, 25) + +} + +func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) { + + c := &ContinuousIntervals{} + + // 25, + c.AddInterval(getBytes(25, 1), 0) + // _, _, _, _, 23, 23 + c.AddInterval(getBytes(23, 2), 4) + // _, _, _, 24, 24, 24, 24 + c.AddInterval(getBytes(24, 4), 3) + + // _, 22, 22 + c.AddInterval(getBytes(22, 2), 1) + + expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24) + } func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) {