Failed to extract signature
				  
				
			
		
		
		
	
				 3 changed files with 566 additions and 443 deletions
			
			
		- 
					426weed/storage/needle_map/compact_map.go
- 
					1weed/storage/needle_map/compact_map_perf_test.go
- 
					582weed/storage/needle_map/compact_map_test.go
| @ -1,330 +1,222 @@ | |||
| package needle_map | |||
| 
 | |||
| import ( | |||
| 	"fmt" | |||
| 	"sort" | |||
| 	"sync" | |||
| 
 | |||
| 	. "github.com/seaweedfs/seaweedfs/weed/storage/types" | |||
| 	"github.com/seaweedfs/seaweedfs/weed/storage/types" | |||
| ) | |||
| 
 | |||
| const ( | |||
| 	MaxSectionBucketSize = 1024 * 8 | |||
| 	LookBackWindowSize   = 1024 // how many entries to look back when inserting into a section
 | |||
| 	SegmentChunkSize = 25000 | |||
| ) | |||
| 
 | |||
| type SectionalNeedleId uint32 | |||
| type CompactMapSegment struct { | |||
| 	// TODO: maybe a compact-er structure for needle values?
 | |||
| 	list     []NeedleValue | |||
| 	firstKey types.NeedleId | |||
| 	lastKey  types.NeedleId | |||
| } | |||
| 
 | |||
| const SectionalNeedleIdLimit = 1<<32 - 1 | |||
| type CompactMap struct { | |||
| 	sync.RWMutex | |||
| 
 | |||
| type SectionalNeedleValue struct { | |||
| 	Key          SectionalNeedleId | |||
| 	OffsetLower  OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
 | |||
| 	Size         Size        `comment:"Size of the data portion"` | |||
| 	OffsetHigher OffsetHigher | |||
| 	segments map[int]*CompactMapSegment | |||
| } | |||
| 
 | |||
| type CompactSection struct { | |||
| 	sync.RWMutex | |||
| 	values   []SectionalNeedleValue | |||
| 	overflow Overflow | |||
| 	start    NeedleId | |||
| 	end      NeedleId | |||
| func newCompactMapSegment(chunk int) *CompactMapSegment { | |||
| 	startKey := types.NeedleId(chunk * SegmentChunkSize) | |||
| 	return &CompactMapSegment{ | |||
| 		list:     []NeedleValue{}, | |||
| 		firstKey: startKey + SegmentChunkSize - 1, | |||
| 		lastKey:  startKey, | |||
| 	} | |||
| } | |||
| 
 | |||
| type Overflow []SectionalNeedleValue | |||
| func (cs *CompactMapSegment) len() int { | |||
| 	return len(cs.list) | |||
| } | |||
| 
 | |||
| func NewCompactSection(start NeedleId) *CompactSection { | |||
| 	return &CompactSection{ | |||
| 		values:   make([]SectionalNeedleValue, 0), | |||
| 		overflow: Overflow(make([]SectionalNeedleValue, 0)), | |||
| 		start:    start, | |||
| 	} | |||
| func (cs *CompactMapSegment) cap() int { | |||
| 	return cap(cs.list) | |||
| } | |||
| 
 | |||
| // return old entry size
 | |||
| func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { | |||
| 	cs.Lock() | |||
| 	defer cs.Unlock() | |||
| // bsearchKey returns the NeedleValue index for a given ID key.
 | |||
| // If the key is not found, it returns the index where it should be inserted instead.
 | |||
| func (cs *CompactMapSegment) bsearchKey(key types.NeedleId) (int, bool) { | |||
| 	switch { | |||
| 	case len(cs.list) == 0: | |||
| 		return 0, false | |||
| 	case key == cs.firstKey: | |||
| 		return 0, true | |||
| 	case key <= cs.firstKey: | |||
| 		return 0, false | |||
| 	case key == cs.lastKey: | |||
| 		return len(cs.list) - 1, true | |||
| 	case key > cs.lastKey: | |||
| 		return len(cs.list), false | |||
| 	} | |||
| 
 | |||
| 	i := sort.Search(len(cs.list), func(i int) bool { | |||
| 		return cs.list[i].Key >= key | |||
| 	}) | |||
| 	return i, cs.list[i].Key == key | |||
| } | |||
| 
 | |||
| 	if key > cs.end { | |||
| 		cs.end = key | |||
| 	} | |||
| 	skey := SectionalNeedleId(key - cs.start) | |||
| 	if i := cs.binarySearchValues(skey); i >= 0 { | |||
| // set inserts/updates a NeedleValue.
 | |||
| // If the operation is an update, returns the overwritten value's previous offset and size.
 | |||
| func (cs *CompactMapSegment) set(key types.NeedleId, offset types.Offset, size types.Size) (oldOffset types.Offset, oldSize types.Size) { | |||
| 	i, found := cs.bsearchKey(key) | |||
| 	if found { | |||
| 		// update
 | |||
| 		oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size | |||
| 		cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size | |||
| 		oldOffset.OffsetLower = cs.list[i].Offset.OffsetLower | |||
| 		oldOffset.OffsetHigher = cs.list[i].Offset.OffsetHigher | |||
| 		oldSize = cs.list[i].Size | |||
| 
 | |||
| 		cs.list[i].Size = size | |||
| 		cs.list[i].Offset.OffsetLower = offset.OffsetLower | |||
| 		cs.list[i].Offset.OffsetHigher = offset.OffsetHigher | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	var lkey SectionalNeedleId | |||
| 	if len(cs.values) > 0 { | |||
| 		lkey = cs.values[len(cs.values)-1].Key | |||
| 	// insert
 | |||
| 	if len(cs.list) >= SegmentChunkSize { | |||
| 		panic(fmt.Sprintf("attempted to write more than %d entries on CompactMapSegment %p!!!", SegmentChunkSize, cs)) | |||
| 	} | |||
| 
 | |||
| 	hasAdded := false | |||
| 	switch { | |||
| 	case len(cs.values) < MaxSectionBucketSize && lkey <= skey: | |||
| 		// non-overflow insert
 | |||
| 		cs.values = append(cs.values, SectionalNeedleValue{ | |||
| 			Key:          skey, | |||
| 			OffsetLower:  offset.OffsetLower, | |||
| 			Size:         size, | |||
| 			OffsetHigher: offset.OffsetHigher, | |||
| 		}) | |||
| 		hasAdded = true | |||
| 	case len(cs.values) < MaxSectionBucketSize: | |||
| 		// still has capacity and only partially out of order
 | |||
| 		lookBackIndex := len(cs.values) - LookBackWindowSize | |||
| 		if lookBackIndex < 0 { | |||
| 			lookBackIndex = 0 | |||
| 		} | |||
| 		if cs.values[lookBackIndex].Key <= skey { | |||
| 			for ; lookBackIndex < len(cs.values); lookBackIndex++ { | |||
| 				if cs.values[lookBackIndex].Key >= skey { | |||
| 					break | |||
| 				} | |||
| 			} | |||
| 			cs.values = append(cs.values, SectionalNeedleValue{}) | |||
| 			copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:]) | |||
| 			cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size | |||
| 			cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher | |||
| 			hasAdded = true | |||
| 		} | |||
| 	if len(cs.list) == SegmentChunkSize-1 { | |||
| 		// if we max out our segment storage, pin its capacity to minimize memory usage
 | |||
| 		nl := make([]NeedleValue, SegmentChunkSize, SegmentChunkSize) | |||
| 		copy(nl, cs.list[:i]) | |||
| 		copy(nl[i+1:], cs.list[i:]) | |||
| 		cs.list = nl | |||
| 	} else { | |||
| 		cs.list = append(cs.list, NeedleValue{}) | |||
| 		copy(cs.list[i+1:], cs.list[i:]) | |||
| 	} | |||
| 
 | |||
| 	// overflow insert
 | |||
| 	if !hasAdded { | |||
| 		if oldValue, found := cs.findOverflowEntry(skey); found { | |||
| 			oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size | |||
| 		} | |||
| 		cs.setOverflowEntry(skey, offset, size) | |||
| 	} else { | |||
| 		// if we maxed out our values bucket, pin its capacity to minimize memory usage
 | |||
| 		if len(cs.values) == MaxSectionBucketSize { | |||
| 			bucket := make([]SectionalNeedleValue, len(cs.values)) | |||
| 			copy(bucket, cs.values) | |||
| 			cs.values = bucket | |||
| 		} | |||
| 	cs.list[i] = NeedleValue{ | |||
| 		Key:    key, | |||
| 		Offset: offset, | |||
| 		Size:   size, | |||
| 	} | |||
| 	if key < cs.firstKey { | |||
| 		cs.firstKey = key | |||
| 	} | |||
| 	if key > cs.lastKey { | |||
| 		cs.lastKey = key | |||
| 	} | |||
| 
 | |||
| 	return | |||
| } | |||
| 
 | |||
| func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) { | |||
| 	needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size, OffsetHigher: offset.OffsetHigher} | |||
| 	insertCandidate := sort.Search(len(cs.overflow), func(i int) bool { | |||
| 		return cs.overflow[i].Key >= needleValue.Key | |||
| 	}) | |||
| 
 | |||
| 	if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key { | |||
| 		cs.overflow[insertCandidate] = needleValue | |||
| 		return | |||
| // get seeks a map entry by key. Returns an entry pointer, with a boolean specifiying if the entry was found.
 | |||
| func (cs *CompactMapSegment) get(key types.NeedleId) (*NeedleValue, bool) { | |||
| 	if i, found := cs.bsearchKey(key); found { | |||
| 		return &cs.list[i], true | |||
| 	} | |||
| 
 | |||
| 	cs.overflow = append(cs.overflow, SectionalNeedleValue{}) | |||
| 	copy(cs.overflow[insertCandidate+1:], cs.overflow[insertCandidate:]) | |||
| 	cs.overflow[insertCandidate] = needleValue | |||
| } | |||
| 
 | |||
| func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) { | |||
| 	foundCandidate := sort.Search(len(cs.overflow), func(i int) bool { | |||
| 		return cs.overflow[i].Key >= key | |||
| 	}) | |||
| 	if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key { | |||
| 		return cs.overflow[foundCandidate], true | |||
| 	} | |||
| 	return nv, false | |||
| 	return nil, false | |||
| } | |||
| 
 | |||
| func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) { | |||
| 	length := len(cs.overflow) | |||
| 	deleteCandidate := sort.Search(length, func(i int) bool { | |||
| 		return cs.overflow[i].Key >= key | |||
| 	}) | |||
| 	if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key { | |||
| 		if cs.overflow[deleteCandidate].Size.IsValid() { | |||
| 			cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size | |||
| // delete deletes a map entry by key. Returns the entries' previous Size, if available.
 | |||
| func (cs *CompactMapSegment) delete(key types.NeedleId) types.Size { | |||
| 	if i, found := cs.bsearchKey(key); found { | |||
| 		if cs.list[i].Size > 0 && cs.list[i].Size.IsValid() { | |||
| 			ret := cs.list[i].Size | |||
| 			cs.list[i].Size = -cs.list[i].Size | |||
| 			return ret | |||
| 		} | |||
| 	} | |||
| } | |||
| 
 | |||
| // return old entry size
 | |||
| func (cs *CompactSection) Delete(key NeedleId) Size { | |||
| 	cs.Lock() | |||
| 	defer cs.Unlock() | |||
| 	ret := Size(0) | |||
| 	if key > cs.end { | |||
| 		return ret | |||
| 	} | |||
| 	skey := SectionalNeedleId(key - cs.start) | |||
| 	if i := cs.binarySearchValues(skey); i >= 0 { | |||
| 		if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() { | |||
| 			ret = cs.values[i].Size | |||
| 			cs.values[i].Size = -cs.values[i].Size | |||
| 		} | |||
| 	} | |||
| 	if v, found := cs.findOverflowEntry(skey); found { | |||
| 		cs.deleteOverflowEntry(skey) | |||
| 		ret = v.Size | |||
| 	} | |||
| 	return ret | |||
| 	return types.Size(0) | |||
| } | |||
| func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) { | |||
| 	cs.RLock() | |||
| 	defer cs.RUnlock() | |||
| 	if key > cs.end { | |||
| 		return nil, false | |||
| 	} | |||
| 	skey := SectionalNeedleId(key - cs.start) | |||
| 	if v, ok := cs.findOverflowEntry(skey); ok { | |||
| 		nv := toNeedleValue(v, cs) | |||
| 		return &nv, true | |||
| 	} | |||
| 	if i := cs.binarySearchValues(skey); i >= 0 { | |||
| 		nv := toNeedleValue(cs.values[i], cs) | |||
| 		return &nv, true | |||
| 
 | |||
| func NewCompactMap() *CompactMap { | |||
| 	return &CompactMap{ | |||
| 		segments: map[int]*CompactMapSegment{}, | |||
| 	} | |||
| 	return nil, false | |||
| } | |||
| func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int { | |||
| 	x := sort.Search(len(cs.values), func(i int) bool { | |||
| 		return cs.values[i].Key >= key | |||
| 	}) | |||
| 	if x >= len(cs.values) { | |||
| 		return -1 | |||
| 	} | |||
| 	if cs.values[x].Key > key { | |||
| 		return -2 | |||
| 
 | |||
| func (cm *CompactMap) Len() int { | |||
| 	l := 0 | |||
| 	for _, s := range cm.segments { | |||
| 		l += s.len() | |||
| 	} | |||
| 	return x | |||
| 	return l | |||
| } | |||
| 
 | |||
| // This map assumes mostly inserting increasing keys
 | |||
| // This map assumes mostly inserting increasing keys
 | |||
| type CompactMap struct { | |||
| 	list []*CompactSection | |||
| func (cm *CompactMap) Cap() int { | |||
| 	c := 0 | |||
| 	for _, s := range cm.segments { | |||
| 		c += s.cap() | |||
| 	} | |||
| 	return c | |||
| } | |||
| 
 | |||
| func NewCompactMap() *CompactMap { | |||
| 	return &CompactMap{} | |||
| func (cm *CompactMap) String() string { | |||
| 	return fmt.Sprintf( | |||
| 		"%d/%d elements on %d segments, %.02f%% efficiency", | |||
| 		cm.Len(), cm.Cap(), len(cm.segments), | |||
| 		float64(100)*float64(cm.Len())/float64(cm.Cap())) | |||
| } | |||
| 
 | |||
| func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { | |||
| 	x := cm.binarySearchCompactSection(key) | |||
| 	if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit { | |||
| 		// println(x, "adding to existing", len(cm.list), "sections, starting", key)
 | |||
| 		cs := NewCompactSection(key) | |||
| 		cm.list = append(cm.list, cs) | |||
| 		x = len(cm.list) - 1 | |||
| 		//keep compact section sorted by start
 | |||
| 		for x >= 0 { | |||
| 			if x > 0 && cm.list[x-1].start > key { | |||
| 				cm.list[x] = cm.list[x-1] | |||
| 				// println("shift", x, "start", cs.start, "to", x-1)
 | |||
| 				x = x - 1 | |||
| 			} else { | |||
| 				cm.list[x] = cs | |||
| 				// println("cs", x, "start", cs.start)
 | |||
| 				break | |||
| 			} | |||
| 		} | |||
| func (cm *CompactMap) segmentForKey(key types.NeedleId) *CompactMapSegment { | |||
| 	chunk := int(key / SegmentChunkSize) | |||
| 	if cs, ok := cm.segments[chunk]; ok { | |||
| 		return cs | |||
| 	} | |||
| 	// println(key, "set to section[", x, "].start", cm.list[x].start)
 | |||
| 	return cm.list[x].Set(key, offset, size) | |||
| 
 | |||
| 	cs := newCompactMapSegment(chunk) | |||
| 	cm.segments[chunk] = cs | |||
| 	return cs | |||
| } | |||
| func (cm *CompactMap) Delete(key NeedleId) Size { | |||
| 	x := cm.binarySearchCompactSection(key) | |||
| 	if x < 0 { | |||
| 		return Size(0) | |||
| 	} | |||
| 	return cm.list[x].Delete(key) | |||
| 
 | |||
| // Set inserts/updates a NeedleValue.
 | |||
| // If the operation is an update, returns the overwritten value's previous offset and size.
 | |||
| func (cm *CompactMap) Set(key types.NeedleId, offset types.Offset, size types.Size) (oldOffset types.Offset, oldSize types.Size) { | |||
| 	cm.RLock() | |||
| 	defer cm.RUnlock() | |||
| 
 | |||
| 	cs := cm.segmentForKey(key) | |||
| 	return cs.set(key, offset, size) | |||
| } | |||
| func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) { | |||
| 	x := cm.binarySearchCompactSection(key) | |||
| 	if x < 0 { | |||
| 		return nil, false | |||
| 	} | |||
| 	return cm.list[x].Get(key) | |||
| 
 | |||
| // Get seeks a map entry by key. Returns an entry pointer, with a boolean specifiying if the entry was found.
 | |||
| func (cm *CompactMap) Get(key types.NeedleId) (*NeedleValue, bool) { | |||
| 	cm.RLock() | |||
| 	defer cm.RUnlock() | |||
| 
 | |||
| 	cs := cm.segmentForKey(key) | |||
| 	return cs.get(key) | |||
| } | |||
| func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int { | |||
| 	l, h := 0, len(cm.list)-1 | |||
| 	if h < 0 { | |||
| 		return -5 | |||
| 	} | |||
| 	if cm.list[h].start <= key { | |||
| 		if len(cm.list[h].values) < MaxSectionBucketSize || key <= cm.list[h].end { | |||
| 			return h | |||
| 		} | |||
| 		return -4 | |||
| 	} | |||
| 	for l <= h { | |||
| 		m := (l + h) / 2 | |||
| 		if key < cm.list[m].start { | |||
| 			h = m - 1 | |||
| 		} else { // cm.list[m].start <= key
 | |||
| 			if cm.list[m+1].start <= key { | |||
| 				l = m + 1 | |||
| 			} else { | |||
| 				return m | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 	return -3 | |||
| 
 | |||
| // Delete deletes a map entry by key. Returns the entries' previous Size, if available.
 | |||
| func (cm *CompactMap) Delete(key types.NeedleId) types.Size { | |||
| 	cm.RLock() | |||
| 	defer cm.RUnlock() | |||
| 
 | |||
| 	cs := cm.segmentForKey(key) | |||
| 	return cs.delete(key) | |||
| } | |||
| 
 | |||
| // Visit visits all entries or stop if any error when visiting
 | |||
| // AscendingVisit runs a function on all entries, in ascending key order. Returns any errors hit while visiting.
 | |||
| func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error { | |||
| 	for _, cs := range cm.list { | |||
| 		cs.RLock() | |||
| 		var i, j int | |||
| 		for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values); { | |||
| 			if cs.overflow[i].Key < cs.values[j].Key { | |||
| 				if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { | |||
| 					cs.RUnlock() | |||
| 					return err | |||
| 				} | |||
| 				i++ | |||
| 			} else if cs.overflow[i].Key == cs.values[j].Key { | |||
| 				j++ | |||
| 			} else { | |||
| 				if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { | |||
| 					cs.RUnlock() | |||
| 					return err | |||
| 				} | |||
| 				j++ | |||
| 			} | |||
| 		} | |||
| 		for ; i < len(cs.overflow); i++ { | |||
| 			if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { | |||
| 				cs.RUnlock() | |||
| 				return err | |||
| 			} | |||
| 		} | |||
| 		for ; j < len(cs.values); j++ { | |||
| 			if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { | |||
| 				cs.RUnlock() | |||
| 	cm.RLock() | |||
| 	defer cm.RUnlock() | |||
| 
 | |||
| 	chunks := []int{} | |||
| 	for c := range cm.segments { | |||
| 		chunks = append(chunks, c) | |||
| 	} | |||
| 	sort.Ints(chunks) | |||
| 
 | |||
| 	for _, c := range chunks { | |||
| 		for _, nv := range cm.segments[c].list { | |||
| 			if err := visit(nv); err != nil { | |||
| 				return err | |||
| 			} | |||
| 		} | |||
| 		cs.RUnlock() | |||
| 	} | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func toNeedleValue(snv SectionalNeedleValue, cs *CompactSection) NeedleValue { | |||
| 	offset := Offset{ | |||
| 		OffsetHigher: snv.OffsetHigher, | |||
| 		OffsetLower:  snv.OffsetLower, | |||
| 	} | |||
| 	return NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size} | |||
| } | |||
| 
 | |||
| func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) SectionalNeedleValue { | |||
| 	return SectionalNeedleValue{ | |||
| 		Key:          SectionalNeedleId(nv.Key - cs.start), | |||
| 		OffsetLower:  nv.Offset.OffsetLower, | |||
| 		Size:         nv.Size, | |||
| 		OffsetHigher: nv.Offset.OffsetHigher, | |||
| 	} | |||
| } | |||
						Write
						Preview
					
					
					Loading…
					
					Cancel
						Save
					
		Reference in new issue