You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

288 lines
8.2 KiB

6 years ago
7 years ago
6 years ago
6 years ago
  1. package needle_map
  2. import (
  3. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  4. "sort"
  5. "sync"
  6. )
  7. const (
  8. batch = 100000
  9. )
  10. type SectionalNeedleId uint32
  11. const SectionalNeedleIdLimit = 1<<32 - 1
  12. type SectionalNeedleValue struct {
  13. Key SectionalNeedleId
  14. OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
  15. Size uint32 `comment:"Size of the data portion"`
  16. }
  17. type SectionalNeedleValueExtra struct {
  18. OffsetHigher OffsetHigher
  19. }
  20. type CompactSection struct {
  21. sync.RWMutex
  22. values []SectionalNeedleValue
  23. valuesExtra []SectionalNeedleValueExtra
  24. overflow Overflow
  25. overflowExtra OverflowExtra
  26. start NeedleId
  27. end NeedleId
  28. counter int
  29. }
  30. type Overflow []SectionalNeedleValue
  31. type OverflowExtra []SectionalNeedleValueExtra
  32. func NewCompactSection(start NeedleId) *CompactSection {
  33. return &CompactSection{
  34. values: make([]SectionalNeedleValue, batch),
  35. valuesExtra: make([]SectionalNeedleValueExtra, batch),
  36. overflow: Overflow(make([]SectionalNeedleValue, 0)),
  37. overflowExtra: OverflowExtra(make([]SectionalNeedleValueExtra, 0)),
  38. start: start,
  39. }
  40. }
  41. //return old entry size
  42. func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  43. cs.Lock()
  44. if key > cs.end {
  45. cs.end = key
  46. }
  47. skey := SectionalNeedleId(key - cs.start)
  48. if i := cs.binarySearchValues(skey); i >= 0 {
  49. oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size
  50. //println("key", key, "old size", ret)
  51. cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size
  52. } else {
  53. needOverflow := cs.counter >= batch
  54. needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
  55. if needOverflow {
  56. //println("start", cs.start, "counter", cs.counter, "key", key)
  57. if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found {
  58. oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size
  59. }
  60. cs.setOverflowEntry(skey, offset, size)
  61. } else {
  62. p := &cs.values[cs.counter]
  63. p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size
  64. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  65. cs.counter++
  66. }
  67. }
  68. cs.Unlock()
  69. return
  70. }
  71. func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size uint32) {
  72. needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size}
  73. needleValueExtra := SectionalNeedleValueExtra{OffsetHigher: OffsetHigher{}}
  74. insertCandidate := sort.Search(len(cs.overflow), func(i int) bool {
  75. return cs.overflow[i].Key >= needleValue.Key
  76. })
  77. if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key {
  78. cs.overflow[insertCandidate] = needleValue
  79. } else {
  80. cs.overflow = append(cs.overflow, needleValue)
  81. cs.overflowExtra = append(cs.overflowExtra, needleValueExtra)
  82. for i := len(cs.overflow) - 1; i > insertCandidate; i-- {
  83. cs.overflow[i] = cs.overflow[i-1]
  84. cs.overflowExtra[i] = cs.overflowExtra[i-1]
  85. }
  86. cs.overflow[insertCandidate] = needleValue
  87. }
  88. }
  89. func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nve SectionalNeedleValueExtra, nv SectionalNeedleValue, found bool) {
  90. foundCandidate := sort.Search(len(cs.overflow), func(i int) bool {
  91. return cs.overflow[i].Key >= key
  92. })
  93. if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key {
  94. return cs.overflowExtra[foundCandidate], cs.overflow[foundCandidate], true
  95. }
  96. return nve, nv, false
  97. }
  98. func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
  99. length := len(cs.overflow)
  100. deleteCandidate := sort.Search(length, func(i int) bool {
  101. return cs.overflow[i].Key >= key
  102. })
  103. if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
  104. for i := deleteCandidate; i < length-1; i++ {
  105. cs.overflow[i] = cs.overflow[i+1]
  106. cs.overflowExtra[i] = cs.overflowExtra[i+1]
  107. }
  108. cs.overflow = cs.overflow[0 : length-1]
  109. cs.overflowExtra = cs.overflowExtra[0 : length-1]
  110. }
  111. }
  112. //return old entry size
  113. func (cs *CompactSection) Delete(key NeedleId) uint32 {
  114. skey := SectionalNeedleId(key - cs.start)
  115. cs.Lock()
  116. ret := uint32(0)
  117. if i := cs.binarySearchValues(skey); i >= 0 {
  118. if cs.values[i].Size > 0 && cs.values[i].Size != TombstoneFileSize {
  119. ret = cs.values[i].Size
  120. cs.values[i].Size = TombstoneFileSize
  121. }
  122. }
  123. if _, v, found := cs.findOverflowEntry(skey); found {
  124. cs.deleteOverflowEntry(skey)
  125. ret = v.Size
  126. }
  127. cs.Unlock()
  128. return ret
  129. }
  130. func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
  131. cs.RLock()
  132. skey := SectionalNeedleId(key - cs.start)
  133. if ve, v, ok := cs.findOverflowEntry(skey); ok {
  134. cs.RUnlock()
  135. nv := toNeedleValue(ve, v, cs)
  136. return &nv, true
  137. }
  138. if i := cs.binarySearchValues(skey); i >= 0 {
  139. cs.RUnlock()
  140. nv := toNeedleValue(cs.valuesExtra[i], cs.values[i], cs)
  141. return &nv, true
  142. }
  143. cs.RUnlock()
  144. return nil, false
  145. }
  146. func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int {
  147. x := sort.Search(cs.counter, func(i int) bool {
  148. return cs.values[i].Key >= key
  149. })
  150. if x == cs.counter {
  151. return -1
  152. }
  153. if cs.values[x].Key > key {
  154. return -2
  155. }
  156. return x
  157. }
  158. //This map assumes mostly inserting increasing keys
  159. //This map assumes mostly inserting increasing keys
  160. type CompactMap struct {
  161. list []*CompactSection
  162. }
  163. func NewCompactMap() *CompactMap {
  164. return &CompactMap{}
  165. }
  166. func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  167. x := cm.binarySearchCompactSection(key)
  168. if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
  169. // println(x, "adding to existing", len(cm.list), "sections, starting", key)
  170. cs := NewCompactSection(key)
  171. cm.list = append(cm.list, cs)
  172. x = len(cm.list) - 1
  173. //keep compact section sorted by start
  174. for x >= 0 {
  175. if x > 0 && cm.list[x-1].start > key {
  176. cm.list[x] = cm.list[x-1]
  177. // println("shift", x, "start", cs.start, "to", x-1)
  178. x = x - 1
  179. } else {
  180. cm.list[x] = cs
  181. // println("cs", x, "start", cs.start)
  182. break
  183. }
  184. }
  185. }
  186. // println(key, "set to section[", x, "].start", cm.list[x].start)
  187. return cm.list[x].Set(key, offset, size)
  188. }
  189. func (cm *CompactMap) Delete(key NeedleId) uint32 {
  190. x := cm.binarySearchCompactSection(key)
  191. if x < 0 {
  192. return uint32(0)
  193. }
  194. return cm.list[x].Delete(key)
  195. }
  196. func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
  197. x := cm.binarySearchCompactSection(key)
  198. if x < 0 {
  199. return nil, false
  200. }
  201. return cm.list[x].Get(key)
  202. }
  203. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  204. l, h := 0, len(cm.list)-1
  205. if h < 0 {
  206. return -5
  207. }
  208. if cm.list[h].start <= key {
  209. if cm.list[h].counter < batch || key <= cm.list[h].end {
  210. return h
  211. }
  212. return -4
  213. }
  214. for l <= h {
  215. m := (l + h) / 2
  216. if key < cm.list[m].start {
  217. h = m - 1
  218. } else { // cm.list[m].start <= key
  219. if cm.list[m+1].start <= key {
  220. l = m + 1
  221. } else {
  222. return m
  223. }
  224. }
  225. }
  226. return -3
  227. }
  228. // Visit visits all entries or stop if any error when visiting
  229. func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
  230. for _, cs := range cm.list {
  231. cs.RLock()
  232. for i, v := range cs.overflow {
  233. if err := visit(toNeedleValue(cs.overflowExtra[i], v, cs)); err != nil {
  234. cs.RUnlock()
  235. return err
  236. }
  237. }
  238. for i, v := range cs.values {
  239. if i >= cs.counter {
  240. break
  241. }
  242. if _, _, found := cs.findOverflowEntry(v.Key); !found {
  243. if err := visit(toNeedleValue(cs.valuesExtra[i], v, cs)); err != nil {
  244. cs.RUnlock()
  245. return err
  246. }
  247. }
  248. }
  249. cs.RUnlock()
  250. }
  251. return nil
  252. }
  253. func toNeedleValue(snve SectionalNeedleValueExtra, snv SectionalNeedleValue, cs *CompactSection) NeedleValue {
  254. offset := Offset{
  255. OffsetHigher: snve.OffsetHigher,
  256. OffsetLower: snv.OffsetLower,
  257. }
  258. return NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size}
  259. }
  260. func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) (SectionalNeedleValue, SectionalNeedleValueExtra) {
  261. return SectionalNeedleValue{
  262. SectionalNeedleId(nv.Key - cs.start),
  263. nv.Offset.OffsetLower,
  264. nv.Size,
  265. }, SectionalNeedleValueExtra{
  266. nv.Offset.OffsetHigher,
  267. }
  268. }