Browse Source

Further improve memory usage of `needle_map.CompactMap()`. (#6825)

pull/5490/merge
Lisandro Pin 1 week ago
committed by GitHub
parent
commit
9ffc8bcb54
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 61
      weed/storage/needle_map/compact_map.go
  2. 15
      weed/storage/needle_map/compact_map_test.go

61
weed/storage/needle_map/compact_map.go

@ -8,7 +8,7 @@ import (
)
const (
batch = 10000
MaxSectionBucketSize = 10000
)
type SectionalNeedleId uint32
@ -28,15 +28,14 @@ type CompactSection struct {
overflow Overflow
start NeedleId
end NeedleId
counter int
}
type Overflow []SectionalNeedleValue
func NewCompactSection(start NeedleId) *CompactSection {
return &CompactSection{
values: []SectionalNeedleValue{},
overflow: Overflow([]SectionalNeedleValue{}),
values: make([]SectionalNeedleValue, 0),
overflow: Overflow(make([]SectionalNeedleValue, 0)),
start: start,
}
}
@ -57,9 +56,13 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset
return
}
needOverflow := cs.counter >= batch
needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
if !needOverflow {
var lkey SectionalNeedleId
if len(cs.values) > 0 {
lkey = cs.values[len(cs.values)-1].Key
}
switch {
case len(cs.values) < MaxSectionBucketSize && lkey <= skey:
// non-overflow insert
cs.values = append(cs.values, SectionalNeedleValue{
Key: skey,
@ -67,17 +70,13 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset
Size: size,
OffsetHigher: offset.OffsetHigher,
})
cs.counter++
return
}
lookBackIndex := cs.counter - 128
if lookBackIndex < 0 {
lookBackIndex = 0
}
if cs.counter < batch && cs.values[lookBackIndex].Key < skey {
case len(cs.values) < MaxSectionBucketSize:
// still has capacity and only partially out of order
for ; lookBackIndex < cs.counter; lookBackIndex++ {
lookBackIndex := len(cs.values) - 128
if lookBackIndex < 0 {
lookBackIndex = 0
}
for ; lookBackIndex < len(cs.values); lookBackIndex++ {
if cs.values[lookBackIndex].Key >= skey {
break
}
@ -86,17 +85,21 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset
copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:])
cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size
cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher
cs.counter++
default:
// overflow insert
if oldValue, found := cs.findOverflowEntry(skey); found {
oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size
}
cs.setOverflowEntry(skey, offset, size)
return
}
// overflow insert
//println("start", cs.start, "counter", cs.counter, "key", key)
if oldValue, found := cs.findOverflowEntry(skey); found {
oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size
// if we maxed out our values bucket, pin its capacity to minimize memory usage
if len(cs.values) == MaxSectionBucketSize {
bucket := make([]SectionalNeedleValue, len(cs.values))
copy(bucket, cs.values)
cs.values = bucket
}
cs.setOverflowEntry(skey, offset, size)
return
}
@ -177,10 +180,10 @@ func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
return nil, false
}
func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int {
x := sort.Search(cs.counter, func(i int) bool {
x := sort.Search(len(cs.values), func(i int) bool {
return cs.values[i].Key >= key
})
if x == cs.counter {
if x >= len(cs.values) {
return -1
}
if cs.values[x].Key > key {
@ -242,7 +245,7 @@ func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
return -5
}
if cm.list[h].start <= key {
if cm.list[h].counter < batch || key <= cm.list[h].end {
if len(cm.list[h].values) < MaxSectionBucketSize || key <= cm.list[h].end {
return h
}
return -4
@ -267,7 +270,7 @@ func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error {
for _, cs := range cm.list {
cs.RLock()
var i, j int
for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values) && j < cs.counter; {
for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values); {
if cs.overflow[i].Key < cs.values[j].Key {
if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil {
cs.RUnlock()
@ -290,7 +293,7 @@ func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error {
return err
}
}
for ; j < len(cs.values) && j < cs.counter; j++ {
for ; j < len(cs.values); j++ {
if err := visit(toNeedleValue(cs.values[j], cs)); err != nil {
cs.RUnlock()
return err

15
weed/storage/needle_map/compact_map_test.go

@ -2,11 +2,12 @@ package needle_map
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/sequence"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"log"
"os"
"testing"
"github.com/seaweedfs/seaweedfs/weed/sequence"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestSnowflakeSequencer(t *testing.T) {
@ -65,15 +66,15 @@ func TestIssue52(t *testing.T) {
func TestCompactMap(t *testing.T) {
m := NewCompactMap()
for i := uint32(0); i < 100*batch; i += 2 {
for i := uint32(0); i < 100*MaxSectionBucketSize; i += 2 {
m.Set(NeedleId(i), ToOffset(int64(i)), Size(i))
}
for i := uint32(0); i < 100*batch; i += 37 {
for i := uint32(0); i < 100*MaxSectionBucketSize; i += 37 {
m.Delete(NeedleId(i))
}
for i := uint32(0); i < 10*batch; i += 3 {
for i := uint32(0); i < 10*MaxSectionBucketSize; i += 3 {
m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5))
}
@ -83,7 +84,7 @@ func TestCompactMap(t *testing.T) {
// }
// }
for i := uint32(0); i < 10*batch; i++ {
for i := uint32(0); i < 10*MaxSectionBucketSize; i++ {
v, ok := m.Get(NeedleId(i))
if i%3 == 0 {
if !ok {
@ -103,7 +104,7 @@ func TestCompactMap(t *testing.T) {
}
}
for i := uint32(10 * batch); i < 100*batch; i++ {
for i := uint32(10 * MaxSectionBucketSize); i < 100*MaxSectionBucketSize; i++ {
v, ok := m.Get(NeedleId(i))
if i%37 == 0 {
if ok && v.Size.IsValid() {

Loading…
Cancel
Save