diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index a1288cadb..464838bd8 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -8,7 +8,8 @@ import ( ) const ( - MaxSectionBucketSize = 10000 + MaxSectionBucketSize = 1024 * 8 + LookBackWindowSize = 1024 // how many entries to look back when inserting into a section ) type SectionalNeedleId uint32 @@ -61,6 +62,7 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset lkey = cs.values[len(cs.values)-1].Key } + hasAdded := false switch { case len(cs.values) < MaxSectionBucketSize && lkey <= skey: // non-overflow insert @@ -70,28 +72,33 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Size: size, OffsetHigher: offset.OffsetHigher, }) + hasAdded = true case len(cs.values) < MaxSectionBucketSize: // still has capacity and only partially out of order - lookBackIndex := len(cs.values) - 128 + lookBackIndex := len(cs.values) - LookBackWindowSize if lookBackIndex < 0 { lookBackIndex = 0 } - for ; lookBackIndex < len(cs.values); lookBackIndex++ { - if cs.values[lookBackIndex].Key >= skey { - break + if cs.values[lookBackIndex].Key <= skey { + for ; lookBackIndex < len(cs.values); lookBackIndex++ { + if cs.values[lookBackIndex].Key >= skey { + break + } } + cs.values = append(cs.values, SectionalNeedleValue{}) + copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:]) + cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size + cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher + hasAdded = true } - cs.values = append(cs.values, SectionalNeedleValue{}) - copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:]) - cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size - cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher - default: - // overflow insert + } + + // overflow insert + if !hasAdded { if oldValue, found := cs.findOverflowEntry(skey); found { oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size } cs.setOverflowEntry(skey, offset, size) - return } // if we maxed out our values bucket, pin its capacity to minimize memory usage diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go index 43800aaad..174438fa3 100644 --- a/weed/storage/needle_map/compact_map_test.go +++ b/weed/storage/needle_map/compact_map_test.go @@ -219,3 +219,23 @@ func TestCompactSection_Get(t *testing.T) { t.Error(uint64(nv3.Size)) } } + +// Test after putting 1 ~ LookBackWindowSize*3 items in sequential order, but missing item LookBackWindowSize +// insert the item LookBackWindowSize in the middle of the sequence +func TestCompactSection_PutOutOfOrderItemBeyondLookBackWindow(t *testing.T) { + m := NewCompactMap() + + // put 1 ~ 10 + for i := 1; i <= LookBackWindowSize*3; i++ { + if i != LookBackWindowSize { + m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) + } + } + + m.Set(NeedleId(LookBackWindowSize), ToOffset(int64(LookBackWindowSize)), Size(LookBackWindowSize)) + + // check if 8 is in the right place + if v, ok := m.Get(NeedleId(LookBackWindowSize)); !ok || v.Offset != ToOffset(LookBackWindowSize) || v.Size != Size(LookBackWindowSize) { + t.Fatalf("expected to find LookBackWindowSize at offset %d with size %d, but got %v", LookBackWindowSize, LookBackWindowSize, v) + } +}