You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

324 lines
9.4 KiB

6 years ago
7 years ago
6 years ago
6 years ago
4 years ago
2 years ago
4 years ago
2 years ago
4 years ago
2 years ago
  1. package needle_map
  2. import (
  3. "sort"
  4. "sync"
  5. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. )
  7. const (
  8. batch = 10000
  9. )
  10. type SectionalNeedleId uint32
  11. const SectionalNeedleIdLimit = 1<<32 - 1
  12. type SectionalNeedleValue struct {
  13. Key SectionalNeedleId
  14. OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
  15. Size Size `comment:"Size of the data portion"`
  16. }
  17. type SectionalNeedleValueExtra struct {
  18. OffsetHigher OffsetHigher
  19. }
  20. type CompactSection struct {
  21. sync.RWMutex
  22. values []SectionalNeedleValue
  23. valuesExtra []SectionalNeedleValueExtra
  24. overflow Overflow
  25. overflowExtra OverflowExtra
  26. start NeedleId
  27. end NeedleId
  28. counter int
  29. }
  30. type Overflow []SectionalNeedleValue
  31. type OverflowExtra []SectionalNeedleValueExtra
  32. func NewCompactSection(start NeedleId) *CompactSection {
  33. return &CompactSection{
  34. values: make([]SectionalNeedleValue, batch),
  35. valuesExtra: make([]SectionalNeedleValueExtra, batch),
  36. overflow: Overflow(make([]SectionalNeedleValue, 0)),
  37. overflowExtra: OverflowExtra(make([]SectionalNeedleValueExtra, 0)),
  38. start: start,
  39. }
  40. }
  41. // return old entry size
  42. func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
  43. cs.Lock()
  44. if key > cs.end {
  45. cs.end = key
  46. }
  47. skey := SectionalNeedleId(key - cs.start)
  48. if i := cs.binarySearchValues(skey); i >= 0 {
  49. oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size
  50. //println("key", key, "old size", ret)
  51. cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size
  52. } else {
  53. needOverflow := cs.counter >= batch
  54. needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
  55. if needOverflow {
  56. lookBackIndex := cs.counter - 128
  57. if lookBackIndex < 0 {
  58. lookBackIndex = 0
  59. }
  60. if cs.counter < batch && cs.values[lookBackIndex].Key < skey {
  61. // still has capacity and only partially out of order
  62. p := &cs.values[cs.counter]
  63. p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size
  64. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  65. for x := cs.counter - 1; x >= lookBackIndex; x-- {
  66. if cs.values[x].Key > cs.values[x+1].Key {
  67. cs.values[x], cs.values[x+1] = cs.values[x+1], cs.values[x]
  68. cs.valuesExtra[x], cs.valuesExtra[x+1] = cs.valuesExtra[x+1], cs.valuesExtra[x]
  69. } else {
  70. break
  71. }
  72. }
  73. cs.counter++
  74. } else {
  75. //println("start", cs.start, "counter", cs.counter, "key", key)
  76. if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found {
  77. oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size
  78. }
  79. cs.setOverflowEntry(skey, offset, size)
  80. }
  81. } else {
  82. p := &cs.values[cs.counter]
  83. p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size
  84. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  85. cs.counter++
  86. }
  87. }
  88. cs.Unlock()
  89. return
  90. }
  91. func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) {
  92. needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size}
  93. needleValueExtra := SectionalNeedleValueExtra{OffsetHigher: offset.OffsetHigher}
  94. insertCandidate := sort.Search(len(cs.overflow), func(i int) bool {
  95. return cs.overflow[i].Key >= needleValue.Key
  96. })
  97. if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key {
  98. cs.overflow[insertCandidate] = needleValue
  99. } else {
  100. cs.overflow = append(cs.overflow, needleValue)
  101. cs.overflowExtra = append(cs.overflowExtra, needleValueExtra)
  102. for i := len(cs.overflow) - 1; i > insertCandidate; i-- {
  103. cs.overflow[i] = cs.overflow[i-1]
  104. cs.overflowExtra[i] = cs.overflowExtra[i-1]
  105. }
  106. cs.overflow[insertCandidate] = needleValue
  107. cs.overflowExtra[insertCandidate] = needleValueExtra
  108. }
  109. }
  110. func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nve SectionalNeedleValueExtra, nv SectionalNeedleValue, found bool) {
  111. foundCandidate := sort.Search(len(cs.overflow), func(i int) bool {
  112. return cs.overflow[i].Key >= key
  113. })
  114. if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key {
  115. return cs.overflowExtra[foundCandidate], cs.overflow[foundCandidate], true
  116. }
  117. return nve, nv, false
  118. }
  119. func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
  120. length := len(cs.overflow)
  121. deleteCandidate := sort.Search(length, func(i int) bool {
  122. return cs.overflow[i].Key >= key
  123. })
  124. if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
  125. if cs.overflow[deleteCandidate].Size.IsValid() {
  126. cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size
  127. }
  128. }
  129. }
  130. // return old entry size
  131. func (cs *CompactSection) Delete(key NeedleId) Size {
  132. cs.Lock()
  133. defer cs.Unlock()
  134. ret := Size(0)
  135. if key > cs.end {
  136. return ret
  137. }
  138. skey := SectionalNeedleId(key - cs.start)
  139. if i := cs.binarySearchValues(skey); i >= 0 {
  140. if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() {
  141. ret = cs.values[i].Size
  142. cs.values[i].Size = -cs.values[i].Size
  143. }
  144. }
  145. if _, v, found := cs.findOverflowEntry(skey); found {
  146. cs.deleteOverflowEntry(skey)
  147. ret = v.Size
  148. }
  149. return ret
  150. }
  151. func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
  152. cs.RLock()
  153. defer cs.RUnlock()
  154. if key > cs.end {
  155. return nil, false
  156. }
  157. skey := SectionalNeedleId(key - cs.start)
  158. if ve, v, ok := cs.findOverflowEntry(skey); ok {
  159. nv := toNeedleValue(ve, v, cs)
  160. return &nv, true
  161. }
  162. if i := cs.binarySearchValues(skey); i >= 0 {
  163. nv := toNeedleValue(cs.valuesExtra[i], cs.values[i], cs)
  164. return &nv, true
  165. }
  166. return nil, false
  167. }
  168. func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int {
  169. x := sort.Search(cs.counter, func(i int) bool {
  170. return cs.values[i].Key >= key
  171. })
  172. if x == cs.counter {
  173. return -1
  174. }
  175. if cs.values[x].Key > key {
  176. return -2
  177. }
  178. return x
  179. }
  180. // This map assumes mostly inserting increasing keys
  181. // This map assumes mostly inserting increasing keys
  182. type CompactMap struct {
  183. list []*CompactSection
  184. }
  185. func NewCompactMap() *CompactMap {
  186. return &CompactMap{}
  187. }
  188. func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
  189. x := cm.binarySearchCompactSection(key)
  190. if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
  191. // println(x, "adding to existing", len(cm.list), "sections, starting", key)
  192. cs := NewCompactSection(key)
  193. cm.list = append(cm.list, cs)
  194. x = len(cm.list) - 1
  195. //keep compact section sorted by start
  196. for x >= 0 {
  197. if x > 0 && cm.list[x-1].start > key {
  198. cm.list[x] = cm.list[x-1]
  199. // println("shift", x, "start", cs.start, "to", x-1)
  200. x = x - 1
  201. } else {
  202. cm.list[x] = cs
  203. // println("cs", x, "start", cs.start)
  204. break
  205. }
  206. }
  207. }
  208. // println(key, "set to section[", x, "].start", cm.list[x].start)
  209. return cm.list[x].Set(key, offset, size)
  210. }
  211. func (cm *CompactMap) Delete(key NeedleId) Size {
  212. x := cm.binarySearchCompactSection(key)
  213. if x < 0 {
  214. return Size(0)
  215. }
  216. return cm.list[x].Delete(key)
  217. }
  218. func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
  219. x := cm.binarySearchCompactSection(key)
  220. if x < 0 {
  221. return nil, false
  222. }
  223. return cm.list[x].Get(key)
  224. }
  225. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  226. l, h := 0, len(cm.list)-1
  227. if h < 0 {
  228. return -5
  229. }
  230. if cm.list[h].start <= key {
  231. if cm.list[h].counter < batch || key <= cm.list[h].end {
  232. return h
  233. }
  234. return -4
  235. }
  236. for l <= h {
  237. m := (l + h) / 2
  238. if key < cm.list[m].start {
  239. h = m - 1
  240. } else { // cm.list[m].start <= key
  241. if cm.list[m+1].start <= key {
  242. l = m + 1
  243. } else {
  244. return m
  245. }
  246. }
  247. }
  248. return -3
  249. }
  250. // Visit visits all entries or stop if any error when visiting
  251. func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error {
  252. for _, cs := range cm.list {
  253. cs.RLock()
  254. var i, j int
  255. for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values) && j < cs.counter; {
  256. if cs.overflow[i].Key < cs.values[j].Key {
  257. if err := visit(toNeedleValue(cs.overflowExtra[i], cs.overflow[i], cs)); err != nil {
  258. cs.RUnlock()
  259. return err
  260. }
  261. i++
  262. } else if cs.overflow[i].Key == cs.values[j].Key {
  263. j++
  264. } else {
  265. if err := visit(toNeedleValue(cs.valuesExtra[j], cs.values[j], cs)); err != nil {
  266. cs.RUnlock()
  267. return err
  268. }
  269. j++
  270. }
  271. }
  272. for ; i < len(cs.overflow); i++ {
  273. if err := visit(toNeedleValue(cs.overflowExtra[i], cs.overflow[i], cs)); err != nil {
  274. cs.RUnlock()
  275. return err
  276. }
  277. }
  278. for ; j < len(cs.values) && j < cs.counter; j++ {
  279. if err := visit(toNeedleValue(cs.valuesExtra[j], cs.values[j], cs)); err != nil {
  280. cs.RUnlock()
  281. return err
  282. }
  283. }
  284. cs.RUnlock()
  285. }
  286. return nil
  287. }
  288. func toNeedleValue(snve SectionalNeedleValueExtra, snv SectionalNeedleValue, cs *CompactSection) NeedleValue {
  289. offset := Offset{
  290. OffsetHigher: snve.OffsetHigher,
  291. OffsetLower: snv.OffsetLower,
  292. }
  293. return NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size}
  294. }
  295. func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) (SectionalNeedleValue, SectionalNeedleValueExtra) {
  296. return SectionalNeedleValue{
  297. SectionalNeedleId(nv.Key - cs.start),
  298. nv.Offset.OffsetLower,
  299. nv.Size,
  300. }, SectionalNeedleValueExtra{
  301. nv.Offset.OffsetHigher,
  302. }
  303. }