From 24e11d1e90c2bf6ef512fbf787490caeb59348de Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 28 Jun 2021 22:46:49 -0700 Subject: [PATCH] look back when adding to sorted values look back when adding to sorted values, before adding it to overflow --- weed/storage/needle_map/compact_map.go | 28 ++++++++++++++++--- .../needle_map/compact_map_cases_test.go | 2 +- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index 9495a49ab..3d2047f99 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -64,11 +64,31 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset needOverflow := cs.counter >= batch needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey if needOverflow { - //println("start", cs.start, "counter", cs.counter, "key", key) - if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found { - oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size + lookBackIndex := cs.counter - 128 + if lookBackIndex < 0 { + lookBackIndex = 0 + } + if cs.counter < batch && cs.values[lookBackIndex].Key < skey { + // still has capacity and only partially out of order + p := &cs.values[cs.counter] + p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size + //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) + for x := cs.counter - 1; x >= lookBackIndex; x-- { + if cs.values[x].Key > cs.values[x+1].Key { + cs.values[x], cs.values[x+1] = cs.values[x+1], cs.values[x] + cs.valuesExtra[x], cs.valuesExtra[x+1] = cs.valuesExtra[x+1], cs.valuesExtra[x] + } else { + break + } + } + cs.counter++ + } else { + //println("start", cs.start, "counter", cs.counter, "key", key) + if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found { + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size + } + cs.setOverflowEntry(skey, offset, size) } - cs.setOverflowEntry(skey, offset, size) } else { p := &cs.values[cs.counter] p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size diff --git a/weed/storage/needle_map/compact_map_cases_test.go b/weed/storage/needle_map/compact_map_cases_test.go index a0142b57d..305925699 100644 --- a/weed/storage/needle_map/compact_map_cases_test.go +++ b/weed/storage/needle_map/compact_map_cases_test.go @@ -22,7 +22,7 @@ func Test5bytesIndexLoading(t *testing.T) { println("total entries:", rowCount) - key := types.NeedleId(0x671b905) + key := types.NeedleId(0x671b905) // 108116229 needle, found := m.Get(types.NeedleId(0x671b905))