From 9ffc8bcb54b4cbed9cccf21476a02c77b975d5e8 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Wed, 28 May 2025 20:42:00 +0200 Subject: [PATCH] Further improve memory usage of `needle_map.CompactMap()`. (#6825) --- weed/storage/needle_map/compact_map.go | 61 +++++++++++---------- weed/storage/needle_map/compact_map_test.go | 15 ++--- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index 10d0802ad..a1288cadb 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -8,7 +8,7 @@ import ( ) const ( - batch = 10000 + MaxSectionBucketSize = 10000 ) type SectionalNeedleId uint32 @@ -28,15 +28,14 @@ type CompactSection struct { overflow Overflow start NeedleId end NeedleId - counter int } type Overflow []SectionalNeedleValue func NewCompactSection(start NeedleId) *CompactSection { return &CompactSection{ - values: []SectionalNeedleValue{}, - overflow: Overflow([]SectionalNeedleValue{}), + values: make([]SectionalNeedleValue, 0), + overflow: Overflow(make([]SectionalNeedleValue, 0)), start: start, } } @@ -57,9 +56,13 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset return } - needOverflow := cs.counter >= batch - needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey - if !needOverflow { + var lkey SectionalNeedleId + if len(cs.values) > 0 { + lkey = cs.values[len(cs.values)-1].Key + } + + switch { + case len(cs.values) < MaxSectionBucketSize && lkey <= skey: // non-overflow insert cs.values = append(cs.values, SectionalNeedleValue{ Key: skey, @@ -67,17 +70,13 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Size: size, OffsetHigher: offset.OffsetHigher, }) - cs.counter++ - return - } - - lookBackIndex := cs.counter - 128 - if lookBackIndex < 0 { - lookBackIndex = 0 - } - if cs.counter < batch && cs.values[lookBackIndex].Key < skey { + case len(cs.values) < MaxSectionBucketSize: // still has capacity and only partially out of order - for ; lookBackIndex < cs.counter; lookBackIndex++ { + lookBackIndex := len(cs.values) - 128 + if lookBackIndex < 0 { + lookBackIndex = 0 + } + for ; lookBackIndex < len(cs.values); lookBackIndex++ { if cs.values[lookBackIndex].Key >= skey { break } @@ -86,17 +85,21 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:]) cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher - cs.counter++ + default: + // overflow insert + if oldValue, found := cs.findOverflowEntry(skey); found { + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size + } + cs.setOverflowEntry(skey, offset, size) return } - // overflow insert - //println("start", cs.start, "counter", cs.counter, "key", key) - if oldValue, found := cs.findOverflowEntry(skey); found { - oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size + // if we maxed out our values bucket, pin its capacity to minimize memory usage + if len(cs.values) == MaxSectionBucketSize { + bucket := make([]SectionalNeedleValue, len(cs.values)) + copy(bucket, cs.values) + cs.values = bucket } - cs.setOverflowEntry(skey, offset, size) - return } @@ -177,10 +180,10 @@ func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) { return nil, false } func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int { - x := sort.Search(cs.counter, func(i int) bool { + x := sort.Search(len(cs.values), func(i int) bool { return cs.values[i].Key >= key }) - if x == cs.counter { + if x >= len(cs.values) { return -1 } if cs.values[x].Key > key { @@ -242,7 +245,7 @@ func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int { return -5 } if cm.list[h].start <= key { - if cm.list[h].counter < batch || key <= cm.list[h].end { + if len(cm.list[h].values) < MaxSectionBucketSize || key <= cm.list[h].end { return h } return -4 @@ -267,7 +270,7 @@ func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error { for _, cs := range cm.list { cs.RLock() var i, j int - for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values) && j < cs.counter; { + for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values); { if cs.overflow[i].Key < cs.values[j].Key { if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { cs.RUnlock() @@ -290,7 +293,7 @@ func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error { return err } } - for ; j < len(cs.values) && j < cs.counter; j++ { + for ; j < len(cs.values); j++ { if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { cs.RUnlock() return err diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go index 4863e1767..43800aaad 100644 --- a/weed/storage/needle_map/compact_map_test.go +++ b/weed/storage/needle_map/compact_map_test.go @@ -2,11 +2,12 @@ package needle_map import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/sequence" - . "github.com/seaweedfs/seaweedfs/weed/storage/types" "log" "os" "testing" + + "github.com/seaweedfs/seaweedfs/weed/sequence" + . "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func TestSnowflakeSequencer(t *testing.T) { @@ -65,15 +66,15 @@ func TestIssue52(t *testing.T) { func TestCompactMap(t *testing.T) { m := NewCompactMap() - for i := uint32(0); i < 100*batch; i += 2 { + for i := uint32(0); i < 100*MaxSectionBucketSize; i += 2 { m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) } - for i := uint32(0); i < 100*batch; i += 37 { + for i := uint32(0); i < 100*MaxSectionBucketSize; i += 37 { m.Delete(NeedleId(i)) } - for i := uint32(0); i < 10*batch; i += 3 { + for i := uint32(0); i < 10*MaxSectionBucketSize; i += 3 { m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5)) } @@ -83,7 +84,7 @@ func TestCompactMap(t *testing.T) { // } // } - for i := uint32(0); i < 10*batch; i++ { + for i := uint32(0); i < 10*MaxSectionBucketSize; i++ { v, ok := m.Get(NeedleId(i)) if i%3 == 0 { if !ok { @@ -103,7 +104,7 @@ func TestCompactMap(t *testing.T) { } } - for i := uint32(10 * batch); i < 100*batch; i++ { + for i := uint32(10 * MaxSectionBucketSize); i < 100*MaxSectionBucketSize; i++ { v, ok := m.Get(NeedleId(i)) if i%37 == 0 { if ok && v.Size.IsValid() {