diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index ff72cb9b8..33514476a 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -1,330 +1,284 @@ package needle_map +/* CompactMap is an in-memory map of needle indeces, optimized for memory usage. + * + * It's implemented as a map of sorted indeces segments, which are in turn accessed through binary + * search. This guarantees a best-case scenario (ordered inserts/updates) of O(N) and a worst case + * scenario of O(log n) runtime, with memory usage unaffected by insert ordering. + * + * Note that even at O(log n), the clock time for both reads and writes is very low, so CompactMap + * will seldom bottleneck index operations. + */ + import ( + "fmt" + "math" + "slices" "sort" "sync" - . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) const ( - MaxSectionBucketSize = 1024 * 8 - LookBackWindowSize = 1024 // how many entries to look back when inserting into a section + MaxCompactKey = math.MaxUint16 + SegmentChunkSize = 50000 // should be <= MaxCompactKey ) -type SectionalNeedleId uint32 - -const SectionalNeedleIdLimit = 1<<32 - 1 +type CompactKey uint16 +type CompactOffset [types.OffsetSize]byte +type CompactNeedleValue struct { + key CompactKey + offset CompactOffset + size types.Size +} -type SectionalNeedleValue struct { - Key SectionalNeedleId - OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G - Size Size `comment:"Size of the data portion"` - OffsetHigher OffsetHigher +type Chunk uint64 +type CompactMapSegment struct { + list []CompactNeedleValue + chunk Chunk + firstKey CompactKey + lastKey CompactKey } -type CompactSection struct { +type CompactMap struct { sync.RWMutex - values []SectionalNeedleValue - overflow Overflow - start NeedleId - end NeedleId + + segments map[Chunk]*CompactMapSegment } -type Overflow []SectionalNeedleValue +func (ck CompactKey) Key(chunk Chunk) types.NeedleId { + return (types.NeedleId(SegmentChunkSize) * types.NeedleId(chunk)) + types.NeedleId(ck) +} -func NewCompactSection(start NeedleId) *CompactSection { - return &CompactSection{ - values: make([]SectionalNeedleValue, 0), - overflow: Overflow(make([]SectionalNeedleValue, 0)), - start: start, - } +func OffsetToCompact(offset types.Offset) CompactOffset { + var co CompactOffset + types.OffsetToBytes(co[:], offset) + return co } -// return old entry size -func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { - cs.Lock() - defer cs.Unlock() +func (co CompactOffset) Offset() types.Offset { + return types.BytesToOffset(co[:]) +} - if key > cs.end { - cs.end = key - } - skey := SectionalNeedleId(key - cs.start) - if i := cs.binarySearchValues(skey); i >= 0 { - // update - oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size - cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size - return +func (cnv CompactNeedleValue) NeedleValue(chunk Chunk) NeedleValue { + return NeedleValue{ + Key: cnv.key.Key(chunk), + Offset: cnv.offset.Offset(), + Size: cnv.size, } +} - var lkey SectionalNeedleId - if len(cs.values) > 0 { - lkey = cs.values[len(cs.values)-1].Key +func newCompactMapSegment(chunk Chunk) *CompactMapSegment { + return &CompactMapSegment{ + list: []CompactNeedleValue{}, + chunk: chunk, + firstKey: MaxCompactKey, + lastKey: 0, } +} - hasAdded := false - switch { - case len(cs.values) < MaxSectionBucketSize && lkey <= skey: - // non-overflow insert - cs.values = append(cs.values, SectionalNeedleValue{ - Key: skey, - OffsetLower: offset.OffsetLower, - Size: size, - OffsetHigher: offset.OffsetHigher, - }) - hasAdded = true - case len(cs.values) < MaxSectionBucketSize: - // still has capacity and only partially out of order - lookBackIndex := len(cs.values) - LookBackWindowSize - if lookBackIndex < 0 { - lookBackIndex = 0 - } - if cs.values[lookBackIndex].Key <= skey { - for ; lookBackIndex < len(cs.values); lookBackIndex++ { - if cs.values[lookBackIndex].Key >= skey { - break - } - } - cs.values = append(cs.values, SectionalNeedleValue{}) - copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:]) - cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size - cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher - hasAdded = true - } - } +func (cs *CompactMapSegment) len() int { + return len(cs.list) +} - // overflow insert - if !hasAdded { - if oldValue, found := cs.findOverflowEntry(skey); found { - oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size - } - cs.setOverflowEntry(skey, offset, size) - } else { - // if we maxed out our values bucket, pin its capacity to minimize memory usage - if len(cs.values) == MaxSectionBucketSize { - bucket := make([]SectionalNeedleValue, len(cs.values)) - copy(bucket, cs.values) - cs.values = bucket - } - } +func (cs *CompactMapSegment) cap() int { + return cap(cs.list) +} - return +func (cs *CompactMapSegment) compactKey(key types.NeedleId) CompactKey { + return CompactKey(key - (types.NeedleId(SegmentChunkSize) * types.NeedleId(cs.chunk))) } -func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) { - needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size, OffsetHigher: offset.OffsetHigher} - insertCandidate := sort.Search(len(cs.overflow), func(i int) bool { - return cs.overflow[i].Key >= needleValue.Key - }) +// bsearchKey returns the CompactNeedleValue index for a given ID key. +// If the key is not found, it returns the index where it should be inserted instead. +func (cs *CompactMapSegment) bsearchKey(key types.NeedleId) (int, bool) { + ck := cs.compactKey(key) - if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key { - cs.overflow[insertCandidate] = needleValue - return + switch { + case len(cs.list) == 0: + return 0, false + case ck == cs.firstKey: + return 0, true + case ck <= cs.firstKey: + return 0, false + case ck == cs.lastKey: + return len(cs.list) - 1, true + case ck > cs.lastKey: + return len(cs.list), false } - cs.overflow = append(cs.overflow, SectionalNeedleValue{}) - copy(cs.overflow[insertCandidate+1:], cs.overflow[insertCandidate:]) - cs.overflow[insertCandidate] = needleValue -} - -func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) { - foundCandidate := sort.Search(len(cs.overflow), func(i int) bool { - return cs.overflow[i].Key >= key + i := sort.Search(len(cs.list), func(i int) bool { + return cs.list[i].key >= ck }) - if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key { - return cs.overflow[foundCandidate], true - } - return nv, false + return i, cs.list[i].key == ck } -func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) { - length := len(cs.overflow) - deleteCandidate := sort.Search(length, func(i int) bool { - return cs.overflow[i].Key >= key - }) - if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key { - if cs.overflow[deleteCandidate].Size.IsValid() { - cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size - } +// set inserts/updates a CompactNeedleValue. +// If the operation is an update, returns the overwritten value's previous offset and size. +func (cs *CompactMapSegment) set(key types.NeedleId, offset types.Offset, size types.Size) (oldOffset types.Offset, oldSize types.Size) { + i, found := cs.bsearchKey(key) + if found { + // update + o := cs.list[i].offset.Offset() + oldOffset.OffsetLower = o.OffsetLower + oldOffset.OffsetHigher = o.OffsetHigher + oldSize = cs.list[i].size + + o.OffsetLower = offset.OffsetLower + o.OffsetHigher = offset.OffsetHigher + cs.list[i].offset = OffsetToCompact(o) + cs.list[i].size = size + return } -} -// return old entry size -func (cs *CompactSection) Delete(key NeedleId) Size { - cs.Lock() - defer cs.Unlock() - ret := Size(0) - if key > cs.end { - return ret + // insert + if len(cs.list) >= SegmentChunkSize { + panic(fmt.Sprintf("attempted to write more than %d entries on CompactMapSegment %p!!!", SegmentChunkSize, cs)) } - skey := SectionalNeedleId(key - cs.start) - if i := cs.binarySearchValues(skey); i >= 0 { - if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() { - ret = cs.values[i].Size - cs.values[i].Size = -cs.values[i].Size - } + if len(cs.list) == SegmentChunkSize-1 { + // if we max out our segment storage, pin its capacity to minimize memory usage + nl := make([]CompactNeedleValue, SegmentChunkSize, SegmentChunkSize) + copy(nl, cs.list[:i]) + copy(nl[i+1:], cs.list[i:]) + cs.list = nl + } else { + cs.list = append(cs.list, CompactNeedleValue{}) + copy(cs.list[i+1:], cs.list[i:]) } - if v, found := cs.findOverflowEntry(skey); found { - cs.deleteOverflowEntry(skey) - ret = v.Size + + ck := cs.compactKey(key) + cs.list[i] = CompactNeedleValue{ + key: ck, + offset: OffsetToCompact(offset), + size: size, } - return ret -} -func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) { - cs.RLock() - defer cs.RUnlock() - if key > cs.end { - return nil, false + if ck < cs.firstKey { + cs.firstKey = ck } - skey := SectionalNeedleId(key - cs.start) - if v, ok := cs.findOverflowEntry(skey); ok { - nv := toNeedleValue(v, cs) - return &nv, true + if ck > cs.lastKey { + cs.lastKey = ck } - if i := cs.binarySearchValues(skey); i >= 0 { - nv := toNeedleValue(cs.values[i], cs) - return &nv, true + + return +} + +// get seeks a map entry by key. Returns an entry pointer, with a boolean specifiying if the entry was found. +func (cs *CompactMapSegment) get(key types.NeedleId) (*CompactNeedleValue, bool) { + if i, found := cs.bsearchKey(key); found { + return &cs.list[i], true } + return nil, false } -func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int { - x := sort.Search(len(cs.values), func(i int) bool { - return cs.values[i].Key >= key - }) - if x >= len(cs.values) { - return -1 - } - if cs.values[x].Key > key { - return -2 + +// delete deletes a map entry by key. Returns the entries' previous Size, if available. +func (cs *CompactMapSegment) delete(key types.NeedleId) types.Size { + if i, found := cs.bsearchKey(key); found { + if cs.list[i].size > 0 && cs.list[i].size.IsValid() { + ret := cs.list[i].size + cs.list[i].size = -cs.list[i].size + return ret + } } - return x -} -// This map assumes mostly inserting increasing keys -// This map assumes mostly inserting increasing keys -type CompactMap struct { - list []*CompactSection + return types.Size(0) } func NewCompactMap() *CompactMap { - return &CompactMap{} + return &CompactMap{ + segments: map[Chunk]*CompactMapSegment{}, + } } -func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { - x := cm.binarySearchCompactSection(key) - if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit { - // println(x, "adding to existing", len(cm.list), "sections, starting", key) - cs := NewCompactSection(key) - cm.list = append(cm.list, cs) - x = len(cm.list) - 1 - //keep compact section sorted by start - for x >= 0 { - if x > 0 && cm.list[x-1].start > key { - cm.list[x] = cm.list[x-1] - // println("shift", x, "start", cs.start, "to", x-1) - x = x - 1 - } else { - cm.list[x] = cs - // println("cs", x, "start", cs.start) - break - } - } +func (cm *CompactMap) Len() int { + l := 0 + for _, s := range cm.segments { + l += s.len() } - // println(key, "set to section[", x, "].start", cm.list[x].start) - return cm.list[x].Set(key, offset, size) + return l } -func (cm *CompactMap) Delete(key NeedleId) Size { - x := cm.binarySearchCompactSection(key) - if x < 0 { - return Size(0) + +func (cm *CompactMap) Cap() int { + c := 0 + for _, s := range cm.segments { + c += s.cap() } - return cm.list[x].Delete(key) + return c } -func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) { - x := cm.binarySearchCompactSection(key) - if x < 0 { - return nil, false + +func (cm *CompactMap) String() string { + if cm.Len() == 0 { + return "empty" } - return cm.list[x].Get(key) + return fmt.Sprintf( + "%d/%d elements on %d segments, %.02f%% efficiency", + cm.Len(), cm.Cap(), len(cm.segments), + float64(100)*float64(cm.Len())/float64(cm.Cap())) } -func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int { - l, h := 0, len(cm.list)-1 - if h < 0 { - return -5 - } - if cm.list[h].start <= key { - if len(cm.list[h].values) < MaxSectionBucketSize || key <= cm.list[h].end { - return h - } - return -4 + +func (cm *CompactMap) segmentForKey(key types.NeedleId) *CompactMapSegment { + chunk := Chunk(key / SegmentChunkSize) + if cs, ok := cm.segments[chunk]; ok { + return cs } - for l <= h { - m := (l + h) / 2 - if key < cm.list[m].start { - h = m - 1 - } else { // cm.list[m].start <= key - if cm.list[m+1].start <= key { - l = m + 1 - } else { - return m - } - } + + cs := newCompactMapSegment(chunk) + cm.segments[chunk] = cs + return cs +} + +// Set inserts/updates a NeedleValue. +// If the operation is an update, returns the overwritten value's previous offset and size. +func (cm *CompactMap) Set(key types.NeedleId, offset types.Offset, size types.Size) (oldOffset types.Offset, oldSize types.Size) { + cm.RLock() + defer cm.RUnlock() + + cs := cm.segmentForKey(key) + return cs.set(key, offset, size) +} + +// Get seeks a map entry by key. Returns an entry pointer, with a boolean specifiying if the entry was found. +func (cm *CompactMap) Get(key types.NeedleId) (*NeedleValue, bool) { + cm.RLock() + defer cm.RUnlock() + + cs := cm.segmentForKey(key) + if cnv, found := cs.get(key); found { + nv := cnv.NeedleValue(cs.chunk) + return &nv, true } - return -3 + return nil, false } -// Visit visits all entries or stop if any error when visiting +// Delete deletes a map entry by key. Returns the entries' previous Size, if available. +func (cm *CompactMap) Delete(key types.NeedleId) types.Size { + cm.RLock() + defer cm.RUnlock() + + cs := cm.segmentForKey(key) + return cs.delete(key) +} + +// AscendingVisit runs a function on all entries, in ascending key order. Returns any errors hit while visiting. func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error { - for _, cs := range cm.list { - cs.RLock() - var i, j int - for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values); { - if cs.overflow[i].Key < cs.values[j].Key { - if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { - cs.RUnlock() - return err - } - i++ - } else if cs.overflow[i].Key == cs.values[j].Key { - j++ - } else { - if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { - cs.RUnlock() - return err - } - j++ - } - } - for ; i < len(cs.overflow); i++ { - if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { - cs.RUnlock() - return err - } - } - for ; j < len(cs.values); j++ { - if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { - cs.RUnlock() + cm.RLock() + defer cm.RUnlock() + + chunks := []Chunk{} + for c := range cm.segments { + chunks = append(chunks, c) + } + slices.Sort(chunks) + + for _, c := range chunks { + cs := cm.segments[c] + for _, cnv := range cs.list { + nv := cnv.NeedleValue(cs.chunk) + if err := visit(nv); err != nil { return err } } - cs.RUnlock() } return nil } - -func toNeedleValue(snv SectionalNeedleValue, cs *CompactSection) NeedleValue { - offset := Offset{ - OffsetHigher: snv.OffsetHigher, - OffsetLower: snv.OffsetLower, - } - return NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size} -} - -func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) SectionalNeedleValue { - return SectionalNeedleValue{ - Key: SectionalNeedleId(nv.Key - cs.start), - OffsetLower: nv.Offset.OffsetLower, - Size: nv.Size, - OffsetHigher: nv.Offset.OffsetHigher, - } -} diff --git a/weed/storage/needle_map/compact_map_cases_test.go b/weed/storage/needle_map/compact_map_cases_test.go index c1eed4fdf..15a6a7b83 100644 --- a/weed/storage/needle_map/compact_map_cases_test.go +++ b/weed/storage/needle_map/compact_map_cases_test.go @@ -5,11 +5,12 @@ package needle_map import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/stretchr/testify/assert" "log" "os" "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/stretchr/testify/assert" ) func Test5bytesIndexLoading(t *testing.T) { diff --git a/weed/storage/needle_map/compact_map_perf_test.go b/weed/storage/needle_map/compact_map_perf_test.go index e1fb1b035..06f7b0d72 100644 --- a/weed/storage/needle_map/compact_map_perf_test.go +++ b/weed/storage/needle_map/compact_map_perf_test.go @@ -43,6 +43,7 @@ func TestMemoryUsage(t *testing.T) { PrintMemUsage(totalRowCount) now := time.Now() + fmt.Printf("\tCompactMap = %s", m.String()) fmt.Printf("\tTaken = %v\n", now.Sub(startTime)) startTime = now } diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go index 174438fa3..ae3a43353 100644 --- a/weed/storage/needle_map/compact_map_test.go +++ b/weed/storage/needle_map/compact_map_test.go @@ -2,240 +2,476 @@ package needle_map import ( "fmt" - "log" - "os" + "math/rand" + "reflect" "testing" - "github.com/seaweedfs/seaweedfs/weed/sequence" - . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) -func TestSnowflakeSequencer(t *testing.T) { - m := NewCompactMap() - seq, _ := sequence.NewSnowflakeSequencer("for_test", 1) - - for i := 0; i < 200000; i++ { - id := seq.NextFileId(1) - oldOffset, oldSize := m.Set(NeedleId(id), ToOffset(8), 3000073) - if oldSize != 0 { - t.Errorf("id %d oldOffset %v oldSize %d", id, oldOffset, oldSize) - } +func TestSegmentBsearchKey(t *testing.T) { + testSegment := &CompactMapSegment{ + list: []CompactNeedleValue{ + CompactNeedleValue{key: 10}, + CompactNeedleValue{key: 20}, + CompactNeedleValue{key: 21}, + CompactNeedleValue{key: 26}, + CompactNeedleValue{key: 30}, + }, + firstKey: 10, + lastKey: 30, } -} - -func TestOverflow2(t *testing.T) { - m := NewCompactMap() - _, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073) - if oldSize != 0 { - t.Fatalf("expecting no previous data") - } - _, oldSize = m.Set(NeedleId(150088), ToOffset(8), 3000073) - if oldSize != 3000073 { - t.Fatalf("expecting previous data size is %d, not %d", 3000073, oldSize) - } - m.Set(NeedleId(150073), ToOffset(8), 3000073) - m.Set(NeedleId(150089), ToOffset(8), 3000073) - m.Set(NeedleId(150076), ToOffset(8), 3000073) - m.Set(NeedleId(150124), ToOffset(8), 3000073) - m.Set(NeedleId(150137), ToOffset(8), 3000073) - m.Set(NeedleId(150147), ToOffset(8), 3000073) - m.Set(NeedleId(150145), ToOffset(8), 3000073) - m.Set(NeedleId(150158), ToOffset(8), 3000073) - m.Set(NeedleId(150162), ToOffset(8), 3000073) - - m.AscendingVisit(func(value NeedleValue) error { - println("needle key:", value.Key) - return nil - }) -} - -func TestIssue52(t *testing.T) { - m := NewCompactMap() - m.Set(NeedleId(10002), ToOffset(10002), 10002) - if element, ok := m.Get(NeedleId(10002)); ok { - fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) + testCases := []struct { + name string + cs *CompactMapSegment + key types.NeedleId + wantIndex int + wantFound bool + }{ + { + name: "empty segment", + cs: newCompactMapSegment(0), + key: 123, + wantIndex: 0, + wantFound: false, + }, + { + name: "new key, insert at beggining", + cs: testSegment, + key: 5, + wantIndex: 0, + wantFound: false, + }, + { + name: "new key, insert at end", + cs: testSegment, + key: 100, + wantIndex: 5, + wantFound: false, + }, + { + name: "new key, insert second", + cs: testSegment, + key: 12, + wantIndex: 1, + wantFound: false, + }, + { + name: "new key, insert in middle", + cs: testSegment, + key: 23, + wantIndex: 3, + wantFound: false, + }, + { + name: "key #1", + cs: testSegment, + key: 10, + wantIndex: 0, + wantFound: true, + }, + { + name: "key #2", + cs: testSegment, + key: 20, + wantIndex: 1, + wantFound: true, + }, + { + name: "key #3", + cs: testSegment, + key: 21, + wantIndex: 2, + wantFound: true, + }, + { + name: "key #4", + cs: testSegment, + key: 26, + wantIndex: 3, + wantFound: true, + }, + { + name: "key #5", + cs: testSegment, + key: 30, + wantIndex: 4, + wantFound: true, + }, } - m.Set(NeedleId(10001), ToOffset(10001), 10001) - if element, ok := m.Get(NeedleId(10002)); ok { - fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) - } else { - t.Fatal("key 10002 missing after setting 10001") + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + index, found := tc.cs.bsearchKey(tc.key) + if got, want := index, tc.wantIndex; got != want { + t.Errorf("expected %v, got %v", want, got) + } + if got, want := found, tc.wantFound; got != want { + t.Errorf("expected %v, got %v", want, got) + } + }) } } -func TestCompactMap(t *testing.T) { - m := NewCompactMap() - for i := uint32(0); i < 100*MaxSectionBucketSize; i += 2 { - m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) +func TestSegmentSet(t *testing.T) { + testSegment := &CompactMapSegment{ + list: []CompactNeedleValue{ + CompactNeedleValue{key: 10, offset: OffsetToCompact(types.Uint32ToOffset(0)), size: 100}, + CompactNeedleValue{key: 20, offset: OffsetToCompact(types.Uint32ToOffset(100)), size: 200}, + CompactNeedleValue{key: 30, offset: OffsetToCompact(types.Uint32ToOffset(300)), size: 300}, + }, + firstKey: 10, + lastKey: 30, } - for i := uint32(0); i < 100*MaxSectionBucketSize; i += 37 { - m.Delete(NeedleId(i)) + if got, want := testSegment.len(), 3; got != want { + t.Errorf("got starting size %d, want %d", got, want) } - - for i := uint32(0); i < 10*MaxSectionBucketSize; i += 3 { - m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5)) + if got, want := testSegment.cap(), 3; got != want { + t.Errorf("got starting capacity %d, want %d", got, want) } - // for i := uint32(0); i < 100; i++ { - // if v := m.Get(Key(i)); v != nil { - // glog.V(4).Infoln(i, "=", v.Key, v.Offset, v.Size) - // } - // } + testSets := []struct { + name string + key types.NeedleId + offset types.Offset + size types.Size + wantOffset types.Offset + wantSize types.Size + }{ + { + name: "insert at beggining", + key: 5, offset: types.Uint32ToOffset(1000), size: 123, + wantOffset: types.Uint32ToOffset(0), wantSize: 0, + }, + { + name: "insert at end", + key: 51, offset: types.Uint32ToOffset(7000), size: 456, + wantOffset: types.Uint32ToOffset(0), wantSize: 0, + }, + { + name: "insert in middle", + key: 25, offset: types.Uint32ToOffset(8000), size: 789, + wantOffset: types.Uint32ToOffset(0), wantSize: 0, + }, + { + name: "update existing", + key: 30, offset: types.Uint32ToOffset(9000), size: 999, + wantOffset: types.Uint32ToOffset(300), wantSize: 300, + }, + } - for i := uint32(0); i < 10*MaxSectionBucketSize; i++ { - v, ok := m.Get(NeedleId(i)) - if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } - if v.Size != Size(i+5) { - t.Fatal("key", i, "size", v.Size) - } - } else if i%37 == 0 { - if ok && v.Size.IsValid() { - t.Fatal("key", i, "should have been deleted needle value", v) - } - } else if i%2 == 0 { - if v.Size != Size(i) { - t.Fatal("key", i, "size", v.Size) - } + for _, ts := range testSets { + offset, size := testSegment.set(ts.key, ts.offset, ts.size) + if offset != ts.wantOffset { + t.Errorf("%s: got offset %v, want %v", ts.name, offset, ts.wantOffset) + } + if size != ts.wantSize { + t.Errorf("%s: got size %v, want %v", ts.name, size, ts.wantSize) } } - for i := uint32(10 * MaxSectionBucketSize); i < 100*MaxSectionBucketSize; i++ { - v, ok := m.Get(NeedleId(i)) - if i%37 == 0 { - if ok && v.Size.IsValid() { - t.Fatal("key", i, "should have been deleted needle value", v) - } - } else if i%2 == 0 { - if v == nil { - t.Fatal("key", i, "missing") - } - if v.Size != Size(i) { - t.Fatal("key", i, "size", v.Size) - } - } + wantSegment := &CompactMapSegment{ + list: []CompactNeedleValue{ + CompactNeedleValue{key: 5, offset: OffsetToCompact(types.Uint32ToOffset(1000)), size: 123}, + CompactNeedleValue{key: 10, offset: OffsetToCompact(types.Uint32ToOffset(0)), size: 100}, + CompactNeedleValue{key: 20, offset: OffsetToCompact(types.Uint32ToOffset(100)), size: 200}, + CompactNeedleValue{key: 25, offset: OffsetToCompact(types.Uint32ToOffset(8000)), size: 789}, + CompactNeedleValue{key: 30, offset: OffsetToCompact(types.Uint32ToOffset(9000)), size: 999}, + CompactNeedleValue{key: 51, offset: OffsetToCompact(types.Uint32ToOffset(7000)), size: 456}, + }, + firstKey: 5, + lastKey: 51, + } + if !reflect.DeepEqual(testSegment, wantSegment) { + t.Errorf("got result segment %v, want %v", testSegment, wantSegment) } + if got, want := testSegment.len(), 6; got != want { + t.Errorf("got result size %d, want %d", got, want) + } + if got, want := testSegment.cap(), 6; got != want { + t.Errorf("got result capacity %d, want %d", got, want) + } } -func TestOverflow(t *testing.T) { - cs := NewCompactSection(1) - - cs.setOverflowEntry(1, ToOffset(12), 12) - cs.setOverflowEntry(2, ToOffset(12), 12) - cs.setOverflowEntry(3, ToOffset(12), 12) - cs.setOverflowEntry(4, ToOffset(12), 12) - cs.setOverflowEntry(5, ToOffset(12), 12) - - if cs.overflow[2].Key != 3 { - t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) +func TestSegmentSetOrdering(t *testing.T) { + keys := []types.NeedleId{} + for i := 0; i < SegmentChunkSize; i++ { + keys = append(keys, types.NeedleId(i)) } - cs.setOverflowEntry(3, ToOffset(24), 24) + r := rand.New(rand.NewSource(123456789)) + r.Shuffle(len(keys), func(i, j int) { keys[i], keys[j] = keys[j], keys[i] }) - if cs.overflow[2].Key != 3 { - t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) + cs := newCompactMapSegment(0) + for _, k := range keys { + _, _ = cs.set(k, types.Uint32ToOffset(123), 456) } - - if cs.overflow[2].Size != 24 { - t.Fatalf("expecting o[2] has size 24: %+v", cs.overflow[2].Size) + if got, want := cs.len(), SegmentChunkSize; got != want { + t.Errorf("expected size %d, got %d", want, got) + } + for i := 1; i < cs.len(); i++ { + if ka, kb := cs.list[i-1].key, cs.list[i].key; ka >= kb { + t.Errorf("found out of order entries at (%d, %d) = (%d, %d)", i-1, i, ka, kb) + } } +} - cs.deleteOverflowEntry(4) +func TestSegmentGet(t *testing.T) { + testSegment := &CompactMapSegment{ + list: []CompactNeedleValue{ + CompactNeedleValue{key: 10, offset: OffsetToCompact(types.Uint32ToOffset(0)), size: 100}, + CompactNeedleValue{key: 20, offset: OffsetToCompact(types.Uint32ToOffset(100)), size: 200}, + CompactNeedleValue{key: 30, offset: OffsetToCompact(types.Uint32ToOffset(300)), size: 300}, + }, + firstKey: 10, + lastKey: 30, + } - if len(cs.overflow) != 5 { - t.Fatalf("expecting 5 entries now: %+v", cs.overflow) + testCases := []struct { + name string + key types.NeedleId + wantValue *CompactNeedleValue + wantFound bool + }{ + { + name: "invalid key", + key: 99, + wantValue: nil, + wantFound: false, + }, + { + name: "key #1", + key: 10, + wantValue: &testSegment.list[0], + wantFound: true, + }, + { + name: "key #2", + key: 20, + wantValue: &testSegment.list[1], + wantFound: true, + }, + { + name: "key #3", + key: 30, + wantValue: &testSegment.list[2], + wantFound: true, + }, } - x, _ := cs.findOverflowEntry(5) - if x.Key != 5 { - t.Fatalf("expecting entry 5 now: %+v", x) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + value, found := testSegment.get(tc.key) + if got, want := value, tc.wantValue; got != want { + t.Errorf("got %v, want %v", got, want) + } + if got, want := found, tc.wantFound; got != want { + t.Errorf("got %v, want %v", got, want) + } + }) } +} - for i, x := range cs.overflow { - println("overflow[", i, "]:", x.Key) +func TestSegmentDelete(t *testing.T) { + testSegment := &CompactMapSegment{ + list: []CompactNeedleValue{ + CompactNeedleValue{key: 10, offset: OffsetToCompact(types.Uint32ToOffset(0)), size: 100}, + CompactNeedleValue{key: 20, offset: OffsetToCompact(types.Uint32ToOffset(100)), size: 200}, + CompactNeedleValue{key: 30, offset: OffsetToCompact(types.Uint32ToOffset(300)), size: 300}, + CompactNeedleValue{key: 40, offset: OffsetToCompact(types.Uint32ToOffset(600)), size: 400}, + }, + firstKey: 10, + lastKey: 40, } - println() - cs.deleteOverflowEntry(1) + testDeletes := []struct { + name string + key types.NeedleId + want types.Size + }{ + { + name: "invalid key", + key: 99, + want: 0, + }, + { + name: "delete key #2", + key: 20, + want: 200, + }, + { + name: "delete key #4", + key: 40, + want: 400, + }, + } - for i, x := range cs.overflow { - println("overflow[", i, "]:", x.Key, "size", x.Size) + for _, td := range testDeletes { + size := testSegment.delete(td.key) + if got, want := size, td.want; got != want { + t.Errorf("%s: got %v, want %v", td.name, got, want) + } } - println() - cs.setOverflowEntry(4, ToOffset(44), 44) - for i, x := range cs.overflow { - println("overflow[", i, "]:", x.Key) + wantSegment := &CompactMapSegment{ + list: []CompactNeedleValue{ + CompactNeedleValue{key: 10, offset: OffsetToCompact(types.Uint32ToOffset(0)), size: 100}, + CompactNeedleValue{key: 20, offset: OffsetToCompact(types.Uint32ToOffset(100)), size: -200}, + CompactNeedleValue{key: 30, offset: OffsetToCompact(types.Uint32ToOffset(300)), size: 300}, + CompactNeedleValue{key: 40, offset: OffsetToCompact(types.Uint32ToOffset(600)), size: -400}, + }, + firstKey: 10, + lastKey: 40, + } + if !reflect.DeepEqual(testSegment, wantSegment) { + t.Errorf("got result segment %v, want %v", testSegment, wantSegment) } - println() +} - cs.setOverflowEntry(1, ToOffset(11), 11) +func TestSegmentForKey(t *testing.T) { + testMap := NewCompactMap() + + tests := []struct { + name string + key types.NeedleId + want *CompactMapSegment + }{ + { + name: "first segment", + key: 12, + want: &CompactMapSegment{ + list: []CompactNeedleValue{}, + chunk: 0, + firstKey: MaxCompactKey, + lastKey: 0, + }, + }, + { + name: "second segment, gapless", + key: SegmentChunkSize + 34, + want: &CompactMapSegment{ + list: []CompactNeedleValue{}, + chunk: 1, + firstKey: MaxCompactKey, + lastKey: 0, + }, + }, + { + name: "gapped segment", + key: (5 * SegmentChunkSize) + 56, + want: &CompactMapSegment{ + list: []CompactNeedleValue{}, + chunk: 5, + firstKey: MaxCompactKey, + lastKey: 0, + }, + }, + } - for i, x := range cs.overflow { - println("overflow[", i, "]:", x.Key) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cs := testMap.segmentForKey(tc.key) + if !reflect.DeepEqual(cs, tc.want) { + t.Errorf("got segment %v, want %v", cs, tc.want) + } + }) } - println() + wantMap := &CompactMap{ + segments: map[Chunk]*CompactMapSegment{ + 0: &CompactMapSegment{ + list: []CompactNeedleValue{}, + chunk: 0, + firstKey: MaxCompactKey, + lastKey: 0, + }, + 1: &CompactMapSegment{ + list: []CompactNeedleValue{}, + chunk: 1, + firstKey: MaxCompactKey, + lastKey: 0, + }, + 5: &CompactMapSegment{ + list: []CompactNeedleValue{}, + chunk: 5, + firstKey: MaxCompactKey, + lastKey: 0, + }, + }, + } + if !reflect.DeepEqual(testMap, wantMap) { + t.Errorf("got map %v, want %v", testMap, wantMap) + } } -func TestCompactSection_Get(t *testing.T) { - var maps []*CompactMap - totalRowCount := uint64(0) - indexFile, ie := os.OpenFile("../../../test/data/sample.idx", - os.O_RDWR|os.O_RDONLY, 0644) - defer indexFile.Close() - if ie != nil { - log.Fatalln(ie) +func TestAscendingVisit(t *testing.T) { + cm := NewCompactMap() + for _, nid := range []types.NeedleId{20, 7, 40000, 300000, 0, 100, 500, 10000, 200000} { + cm.Set(nid, types.Uint32ToOffset(123), 456) } - m, rowCount := loadNewNeedleMap(indexFile) - maps = append(maps, m) - totalRowCount += rowCount - m.Set(1574318345753513987, ToOffset(10002), 10002) - nv, ok := m.Get(1574318345753513987) - if ok { - t.Log(uint64(nv.Key)) + got := []NeedleValue{} + err := cm.AscendingVisit(func(nv NeedleValue) error { + got = append(got, nv) + return nil + }) + if err != nil { + t.Errorf("got error %v, expected none", err) } - nv1, ok := m.Get(1574318350048481283) - if ok { - t.Error(uint64(nv1.Key)) + want := []NeedleValue{ + NeedleValue{Key: 0, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 7, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 20, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 100, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 500, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 10000, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 40000, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 200000, Offset: types.Uint32ToOffset(123), Size: 456}, + NeedleValue{Key: 300000, Offset: types.Uint32ToOffset(123), Size: 456}, } - - m.Set(1574318350048481283, ToOffset(10002), 10002) - nv2, ok1 := m.Get(1574318350048481283) - if ok1 { - t.Log(uint64(nv2.Key)) + if !reflect.DeepEqual(got, want) { + t.Errorf("got values %v, want %v", got, want) } +} - m.Delete(nv2.Key) - nv3, has := m.Get(nv2.Key) - if has && nv3.Size > 0 { - t.Error(uint64(nv3.Size)) +func TestRandomInsert(t *testing.T) { + count := 8 * SegmentChunkSize + keys := []types.NeedleId{} + for i := 0; i < count; i++ { + keys = append(keys, types.NeedleId(i)) } -} -// Test after putting 1 ~ LookBackWindowSize*3 items in sequential order, but missing item LookBackWindowSize -// insert the item LookBackWindowSize in the middle of the sequence -func TestCompactSection_PutOutOfOrderItemBeyondLookBackWindow(t *testing.T) { - m := NewCompactMap() + r := rand.New(rand.NewSource(123456789)) + r.Shuffle(len(keys), func(i, j int) { keys[i], keys[j] = keys[j], keys[i] }) - // put 1 ~ 10 - for i := 1; i <= LookBackWindowSize*3; i++ { - if i != LookBackWindowSize { - m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) - } + cm := NewCompactMap() + for _, k := range keys { + _, _ = cm.Set(k, types.Uint32ToOffset(123), 456) + } + if got, want := cm.Len(), count; got != want { + t.Errorf("expected size %d, got %d", want, got) } - m.Set(NeedleId(LookBackWindowSize), ToOffset(int64(LookBackWindowSize)), Size(LookBackWindowSize)) + last := -1 + err := cm.AscendingVisit(func(nv NeedleValue) error { + key := int(nv.Key) + if key <= last { + return fmt.Errorf("found out of order entries (%d vs %d)", key, last) + } + last = key + return nil + }) + if err != nil { + t.Errorf("got error %v, expected none", err) + } - // check if 8 is in the right place - if v, ok := m.Get(NeedleId(LookBackWindowSize)); !ok || v.Offset != ToOffset(LookBackWindowSize) || v.Size != Size(LookBackWindowSize) { - t.Fatalf("expected to find LookBackWindowSize at offset %d with size %d, but got %v", LookBackWindowSize, LookBackWindowSize, v) + // Given that we've written a integer multiple of SegmentChunkSize, all + // segments should be fully utilized and capacity-adjusted. + if l, c := cm.Len(), cm.Cap(); l != c { + t.Errorf("map length (%d) doesn't match capacity (%d)", l, c) } } diff --git a/weed/storage/needle_map/old/compact_map.go b/weed/storage/needle_map/old/compact_map.go new file mode 100644 index 000000000..ca9892b0f --- /dev/null +++ b/weed/storage/needle_map/old/compact_map.go @@ -0,0 +1,332 @@ +package needle_map + +import ( + "sort" + "sync" + + . "github.com/seaweedfs/seaweedfs/weed/storage/types" + + new_map "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" +) + +const ( + MaxSectionBucketSize = 1024 * 8 + LookBackWindowSize = 1024 // how many entries to look back when inserting into a section +) + +type SectionalNeedleId uint32 + +const SectionalNeedleIdLimit = 1<<32 - 1 + +type SectionalNeedleValue struct { + Key SectionalNeedleId + OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G + Size Size `comment:"Size of the data portion"` + OffsetHigher OffsetHigher +} + +type CompactSection struct { + sync.RWMutex + values []SectionalNeedleValue + overflow Overflow + start NeedleId + end NeedleId +} + +type Overflow []SectionalNeedleValue + +func NewCompactSection(start NeedleId) *CompactSection { + return &CompactSection{ + values: make([]SectionalNeedleValue, 0), + overflow: Overflow(make([]SectionalNeedleValue, 0)), + start: start, + } +} + +// return old entry size +func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { + cs.Lock() + defer cs.Unlock() + + if key > cs.end { + cs.end = key + } + skey := SectionalNeedleId(key - cs.start) + if i := cs.binarySearchValues(skey); i >= 0 { + // update + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size + cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size + return + } + + var lkey SectionalNeedleId + if len(cs.values) > 0 { + lkey = cs.values[len(cs.values)-1].Key + } + + hasAdded := false + switch { + case len(cs.values) < MaxSectionBucketSize && lkey <= skey: + // non-overflow insert + cs.values = append(cs.values, SectionalNeedleValue{ + Key: skey, + OffsetLower: offset.OffsetLower, + Size: size, + OffsetHigher: offset.OffsetHigher, + }) + hasAdded = true + case len(cs.values) < MaxSectionBucketSize: + // still has capacity and only partially out of order + lookBackIndex := len(cs.values) - LookBackWindowSize + if lookBackIndex < 0 { + lookBackIndex = 0 + } + if cs.values[lookBackIndex].Key <= skey { + for ; lookBackIndex < len(cs.values); lookBackIndex++ { + if cs.values[lookBackIndex].Key >= skey { + break + } + } + cs.values = append(cs.values, SectionalNeedleValue{}) + copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:]) + cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size + cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher + hasAdded = true + } + } + + // overflow insert + if !hasAdded { + if oldValue, found := cs.findOverflowEntry(skey); found { + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size + } + cs.setOverflowEntry(skey, offset, size) + } else { + // if we maxed out our values bucket, pin its capacity to minimize memory usage + if len(cs.values) == MaxSectionBucketSize { + bucket := make([]SectionalNeedleValue, len(cs.values)) + copy(bucket, cs.values) + cs.values = bucket + } + } + + return +} + +func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) { + needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size, OffsetHigher: offset.OffsetHigher} + insertCandidate := sort.Search(len(cs.overflow), func(i int) bool { + return cs.overflow[i].Key >= needleValue.Key + }) + + if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key { + cs.overflow[insertCandidate] = needleValue + return + } + + cs.overflow = append(cs.overflow, SectionalNeedleValue{}) + copy(cs.overflow[insertCandidate+1:], cs.overflow[insertCandidate:]) + cs.overflow[insertCandidate] = needleValue +} + +func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) { + foundCandidate := sort.Search(len(cs.overflow), func(i int) bool { + return cs.overflow[i].Key >= key + }) + if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key { + return cs.overflow[foundCandidate], true + } + return nv, false +} + +func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) { + length := len(cs.overflow) + deleteCandidate := sort.Search(length, func(i int) bool { + return cs.overflow[i].Key >= key + }) + if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key { + if cs.overflow[deleteCandidate].Size.IsValid() { + cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size + } + } +} + +// return old entry size +func (cs *CompactSection) Delete(key NeedleId) Size { + cs.Lock() + defer cs.Unlock() + ret := Size(0) + if key > cs.end { + return ret + } + skey := SectionalNeedleId(key - cs.start) + if i := cs.binarySearchValues(skey); i >= 0 { + if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() { + ret = cs.values[i].Size + cs.values[i].Size = -cs.values[i].Size + } + } + if v, found := cs.findOverflowEntry(skey); found { + cs.deleteOverflowEntry(skey) + ret = v.Size + } + return ret +} +func (cs *CompactSection) Get(key NeedleId) (*new_map.NeedleValue, bool) { + cs.RLock() + defer cs.RUnlock() + if key > cs.end { + return nil, false + } + skey := SectionalNeedleId(key - cs.start) + if v, ok := cs.findOverflowEntry(skey); ok { + nv := toNeedleValue(v, cs) + return &nv, true + } + if i := cs.binarySearchValues(skey); i >= 0 { + nv := toNeedleValue(cs.values[i], cs) + return &nv, true + } + return nil, false +} +func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int { + x := sort.Search(len(cs.values), func(i int) bool { + return cs.values[i].Key >= key + }) + if x >= len(cs.values) { + return -1 + } + if cs.values[x].Key > key { + return -2 + } + return x +} + +// This map assumes mostly inserting increasing keys +// This map assumes mostly inserting increasing keys +type CompactMap struct { + list []*CompactSection +} + +func NewCompactMap() *CompactMap { + return &CompactMap{} +} + +func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { + x := cm.binarySearchCompactSection(key) + if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit { + // println(x, "adding to existing", len(cm.list), "sections, starting", key) + cs := NewCompactSection(key) + cm.list = append(cm.list, cs) + x = len(cm.list) - 1 + //keep compact section sorted by start + for x >= 0 { + if x > 0 && cm.list[x-1].start > key { + cm.list[x] = cm.list[x-1] + // println("shift", x, "start", cs.start, "to", x-1) + x = x - 1 + } else { + cm.list[x] = cs + // println("cs", x, "start", cs.start) + break + } + } + } + // println(key, "set to section[", x, "].start", cm.list[x].start) + return cm.list[x].Set(key, offset, size) +} +func (cm *CompactMap) Delete(key NeedleId) Size { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return Size(0) + } + return cm.list[x].Delete(key) +} +func (cm *CompactMap) Get(key NeedleId) (*new_map.NeedleValue, bool) { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return nil, false + } + return cm.list[x].Get(key) +} +func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int { + l, h := 0, len(cm.list)-1 + if h < 0 { + return -5 + } + if cm.list[h].start <= key { + if len(cm.list[h].values) < MaxSectionBucketSize || key <= cm.list[h].end { + return h + } + return -4 + } + for l <= h { + m := (l + h) / 2 + if key < cm.list[m].start { + h = m - 1 + } else { // cm.list[m].start <= key + if cm.list[m+1].start <= key { + l = m + 1 + } else { + return m + } + } + } + return -3 +} + +// Visit visits all entries or stop if any error when visiting +func (cm *CompactMap) AscendingVisit(visit func(new_map.NeedleValue) error) error { + for _, cs := range cm.list { + cs.RLock() + var i, j int + for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values); { + if cs.overflow[i].Key < cs.values[j].Key { + if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { + cs.RUnlock() + return err + } + i++ + } else if cs.overflow[i].Key == cs.values[j].Key { + j++ + } else { + if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { + cs.RUnlock() + return err + } + j++ + } + } + for ; i < len(cs.overflow); i++ { + if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { + cs.RUnlock() + return err + } + } + for ; j < len(cs.values); j++ { + if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { + cs.RUnlock() + return err + } + } + cs.RUnlock() + } + return nil +} + +func toNeedleValue(snv SectionalNeedleValue, cs *CompactSection) new_map.NeedleValue { + offset := Offset{ + OffsetHigher: snv.OffsetHigher, + OffsetLower: snv.OffsetLower, + } + return new_map.NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size} +} + +func toSectionalNeedleValue(nv new_map.NeedleValue, cs *CompactSection) SectionalNeedleValue { + return SectionalNeedleValue{ + Key: SectionalNeedleId(nv.Key - cs.start), + OffsetLower: nv.Offset.OffsetLower, + Size: nv.Size, + OffsetHigher: nv.Offset.OffsetHigher, + } +} diff --git a/weed/storage/needle_map/old/compact_map_perf_test.go b/weed/storage/needle_map/old/compact_map_perf_test.go new file mode 100644 index 000000000..4728930db --- /dev/null +++ b/weed/storage/needle_map/old/compact_map_perf_test.go @@ -0,0 +1,92 @@ +package needle_map + +import ( + "fmt" + "log" + "os" + "runtime" + "testing" + "time" + + . "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +/* + +To see the memory usage: + +go test -run TestMemoryUsage +The Alloc section shows the in-use memory increase for each iteration. + +go test -run TestMemoryUsage -memprofile=mem.out +go tool pprof --alloc_space needle.test mem.out + + +*/ + +func TestMemoryUsage(t *testing.T) { + + var maps []*CompactMap + totalRowCount := uint64(0) + + startTime := time.Now() + for i := 0; i < 10; i++ { + indexFile, ie := os.OpenFile("../../../../test/data/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + m, rowCount := loadNewNeedleMap(indexFile) + maps = append(maps, m) + totalRowCount += rowCount + + indexFile.Close() + + PrintMemUsage(totalRowCount) + now := time.Now() + fmt.Printf("\tTaken = %v\n", now.Sub(startTime)) + startTime = now + } + +} + +func loadNewNeedleMap(file *os.File) (*CompactMap, uint64) { + m := NewCompactMap() + bytes := make([]byte, NeedleMapEntrySize) + rowCount := uint64(0) + count, e := file.Read(bytes) + for count > 0 && e == nil { + for i := 0; i < count; i += NeedleMapEntrySize { + rowCount++ + key := BytesToNeedleId(bytes[i : i+NeedleIdSize]) + offset := BytesToOffset(bytes[i+NeedleIdSize : i+NeedleIdSize+OffsetSize]) + size := BytesToSize(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize]) + + if !offset.IsZero() { + m.Set(NeedleId(key), offset, size) + } else { + m.Delete(key) + } + } + + count, e = file.Read(bytes) + } + + return m, rowCount + +} + +func PrintMemUsage(totalRowCount uint64) { + + runtime.GC() + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf("Each %.02f Bytes", float64(m.Alloc)/float64(totalRowCount)) + fmt.Printf("\tAlloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v", m.NumGC) +} +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} diff --git a/weed/storage/needle_map/old/compact_map_test.go b/weed/storage/needle_map/old/compact_map_test.go new file mode 100644 index 000000000..d2d0f1569 --- /dev/null +++ b/weed/storage/needle_map/old/compact_map_test.go @@ -0,0 +1,243 @@ +package needle_map + +import ( + "fmt" + "log" + "os" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/sequence" + . "github.com/seaweedfs/seaweedfs/weed/storage/types" + + new_map "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" +) + +func TestSnowflakeSequencer(t *testing.T) { + m := NewCompactMap() + seq, _ := sequence.NewSnowflakeSequencer("for_test", 1) + + for i := 0; i < 200000; i++ { + id := seq.NextFileId(1) + oldOffset, oldSize := m.Set(NeedleId(id), ToOffset(8), 3000073) + if oldSize != 0 { + t.Errorf("id %d oldOffset %v oldSize %d", id, oldOffset, oldSize) + } + } + +} + +func TestOverflow2(t *testing.T) { + m := NewCompactMap() + _, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073) + if oldSize != 0 { + t.Fatalf("expecting no previous data") + } + _, oldSize = m.Set(NeedleId(150088), ToOffset(8), 3000073) + if oldSize != 3000073 { + t.Fatalf("expecting previous data size is %d, not %d", 3000073, oldSize) + } + m.Set(NeedleId(150073), ToOffset(8), 3000073) + m.Set(NeedleId(150089), ToOffset(8), 3000073) + m.Set(NeedleId(150076), ToOffset(8), 3000073) + m.Set(NeedleId(150124), ToOffset(8), 3000073) + m.Set(NeedleId(150137), ToOffset(8), 3000073) + m.Set(NeedleId(150147), ToOffset(8), 3000073) + m.Set(NeedleId(150145), ToOffset(8), 3000073) + m.Set(NeedleId(150158), ToOffset(8), 3000073) + m.Set(NeedleId(150162), ToOffset(8), 3000073) + + m.AscendingVisit(func(value new_map.NeedleValue) error { + println("needle key:", value.Key) + return nil + }) +} + +func TestIssue52(t *testing.T) { + m := NewCompactMap() + m.Set(NeedleId(10002), ToOffset(10002), 10002) + if element, ok := m.Get(NeedleId(10002)); ok { + fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) + } + m.Set(NeedleId(10001), ToOffset(10001), 10001) + if element, ok := m.Get(NeedleId(10002)); ok { + fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) + } else { + t.Fatal("key 10002 missing after setting 10001") + } +} + +func TestCompactMap(t *testing.T) { + m := NewCompactMap() + for i := uint32(0); i < 100*MaxSectionBucketSize; i += 2 { + m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) + } + + for i := uint32(0); i < 100*MaxSectionBucketSize; i += 37 { + m.Delete(NeedleId(i)) + } + + for i := uint32(0); i < 10*MaxSectionBucketSize; i += 3 { + m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5)) + } + + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // glog.V(4).Infoln(i, "=", v.Key, v.Offset, v.Size) + // } + // } + + for i := uint32(0); i < 10*MaxSectionBucketSize; i++ { + v, ok := m.Get(NeedleId(i)) + if i%3 == 0 { + if !ok { + t.Fatal("key", i, "missing!") + } + if v.Size != Size(i+5) { + t.Fatal("key", i, "size", v.Size) + } + } else if i%37 == 0 { + if ok && v.Size.IsValid() { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v.Size != Size(i) { + t.Fatal("key", i, "size", v.Size) + } + } + } + + for i := uint32(10 * MaxSectionBucketSize); i < 100*MaxSectionBucketSize; i++ { + v, ok := m.Get(NeedleId(i)) + if i%37 == 0 { + if ok && v.Size.IsValid() { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v == nil { + t.Fatal("key", i, "missing") + } + if v.Size != Size(i) { + t.Fatal("key", i, "size", v.Size) + } + } + } + +} + +func TestOverflow(t *testing.T) { + cs := NewCompactSection(1) + + cs.setOverflowEntry(1, ToOffset(12), 12) + cs.setOverflowEntry(2, ToOffset(12), 12) + cs.setOverflowEntry(3, ToOffset(12), 12) + cs.setOverflowEntry(4, ToOffset(12), 12) + cs.setOverflowEntry(5, ToOffset(12), 12) + + if cs.overflow[2].Key != 3 { + t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) + } + + cs.setOverflowEntry(3, ToOffset(24), 24) + + if cs.overflow[2].Key != 3 { + t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) + } + + if cs.overflow[2].Size != 24 { + t.Fatalf("expecting o[2] has size 24: %+v", cs.overflow[2].Size) + } + + cs.deleteOverflowEntry(4) + + if len(cs.overflow) != 5 { + t.Fatalf("expecting 5 entries now: %+v", cs.overflow) + } + + x, _ := cs.findOverflowEntry(5) + if x.Key != 5 { + t.Fatalf("expecting entry 5 now: %+v", x) + } + + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key) + } + println() + + cs.deleteOverflowEntry(1) + + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key, "size", x.Size) + } + println() + + cs.setOverflowEntry(4, ToOffset(44), 44) + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key) + } + println() + + cs.setOverflowEntry(1, ToOffset(11), 11) + + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key) + } + println() + +} + +func TestCompactSection_Get(t *testing.T) { + var maps []*CompactMap + totalRowCount := uint64(0) + indexFile, ie := os.OpenFile("../../../../test/data/sample.idx", + os.O_RDWR|os.O_RDONLY, 0644) + defer indexFile.Close() + if ie != nil { + log.Fatalln(ie) + } + + m, rowCount := loadNewNeedleMap(indexFile) + maps = append(maps, m) + totalRowCount += rowCount + m.Set(1574318345753513987, ToOffset(10002), 10002) + nv, ok := m.Get(1574318345753513987) + if ok { + t.Log(uint64(nv.Key)) + } + + nv1, ok := m.Get(1574318350048481283) + if ok { + t.Error(uint64(nv1.Key)) + } + + m.Set(1574318350048481283, ToOffset(10002), 10002) + nv2, ok1 := m.Get(1574318350048481283) + if ok1 { + t.Log(uint64(nv2.Key)) + } + + m.Delete(nv2.Key) + nv3, has := m.Get(nv2.Key) + if has && nv3.Size > 0 { + t.Error(uint64(nv3.Size)) + } +} + +// Test after putting 1 ~ LookBackWindowSize*3 items in sequential order, but missing item LookBackWindowSize +// insert the item LookBackWindowSize in the middle of the sequence +func TestCompactSection_PutOutOfOrderItemBeyondLookBackWindow(t *testing.T) { + m := NewCompactMap() + + // put 1 ~ 10 + for i := 1; i <= LookBackWindowSize*3; i++ { + if i != LookBackWindowSize { + m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) + } + } + + m.Set(NeedleId(LookBackWindowSize), ToOffset(int64(LookBackWindowSize)), Size(LookBackWindowSize)) + + // check if 8 is in the right place + if v, ok := m.Get(NeedleId(LookBackWindowSize)); !ok || v.Offset != ToOffset(LookBackWindowSize) || v.Size != Size(LookBackWindowSize) { + t.Fatalf("expected to find LookBackWindowSize at offset %d with size %d, but got %v", LookBackWindowSize, LookBackWindowSize, v) + } +}