Browse Source

look back when adding to sorted values

look back when adding to sorted values, before adding it to overflow
pull/2164/head
Chris Lu 4 years ago
parent
commit
24e11d1e90
  1. 20
      weed/storage/needle_map/compact_map.go
  2. 2
      weed/storage/needle_map/compact_map_cases_test.go

20
weed/storage/needle_map/compact_map.go

@ -64,11 +64,31 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset
needOverflow := cs.counter >= batch needOverflow := cs.counter >= batch
needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
if needOverflow { if needOverflow {
lookBackIndex := cs.counter - 128
if lookBackIndex < 0 {
lookBackIndex = 0
}
if cs.counter < batch && cs.values[lookBackIndex].Key < skey {
// still has capacity and only partially out of order
p := &cs.values[cs.counter]
p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size
//println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
for x := cs.counter - 1; x >= lookBackIndex; x-- {
if cs.values[x].Key > cs.values[x+1].Key {
cs.values[x], cs.values[x+1] = cs.values[x+1], cs.values[x]
cs.valuesExtra[x], cs.valuesExtra[x+1] = cs.valuesExtra[x+1], cs.valuesExtra[x]
} else {
break
}
}
cs.counter++
} else {
//println("start", cs.start, "counter", cs.counter, "key", key) //println("start", cs.start, "counter", cs.counter, "key", key)
if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found { if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found {
oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size
} }
cs.setOverflowEntry(skey, offset, size) cs.setOverflowEntry(skey, offset, size)
}
} else { } else {
p := &cs.values[cs.counter] p := &cs.values[cs.counter]
p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size

2
weed/storage/needle_map/compact_map_cases_test.go

@ -22,7 +22,7 @@ func Test5bytesIndexLoading(t *testing.T) {
println("total entries:", rowCount) println("total entries:", rowCount)
key := types.NeedleId(0x671b905)
key := types.NeedleId(0x671b905) // 108116229
needle, found := m.Get(types.NeedleId(0x671b905)) needle, found := m.Get(types.NeedleId(0x671b905))

Loading…
Cancel
Save