You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

267 lines
6.6 KiB

7 years ago
6 years ago
  1. package needle
  2. import (
  3. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  4. "sort"
  5. "sync"
  6. )
  7. const (
  8. batch = 100000
  9. )
  10. type SectionalNeedleId uint32
  11. const SectionalNeedleIdLimit = 1<<32 - 1
  12. type SectionalNeedleValue struct {
  13. Key SectionalNeedleId
  14. Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
  15. Size uint32 `comment:"Size of the data portion"`
  16. }
  17. type CompactSection struct {
  18. sync.RWMutex
  19. values []SectionalNeedleValue
  20. overflow Overflow
  21. start NeedleId
  22. end NeedleId
  23. counter int
  24. }
  25. type Overflow []SectionalNeedleValue
  26. func NewCompactSection(start NeedleId) *CompactSection {
  27. return &CompactSection{
  28. values: make([]SectionalNeedleValue, batch),
  29. overflow: Overflow(make([]SectionalNeedleValue, 0)),
  30. start: start,
  31. }
  32. }
  33. //return old entry size
  34. func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  35. cs.Lock()
  36. if key > cs.end {
  37. cs.end = key
  38. }
  39. skey := SectionalNeedleId(key - cs.start)
  40. if i := cs.binarySearchValues(skey); i >= 0 {
  41. oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
  42. //println("key", key, "old size", ret)
  43. cs.values[i].Offset, cs.values[i].Size = offset, size
  44. } else {
  45. needOverflow := cs.counter >= batch
  46. needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
  47. if needOverflow {
  48. //println("start", cs.start, "counter", cs.counter, "key", key)
  49. if oldValue, found := cs.overflow.findOverflowEntry(skey); found {
  50. oldOffset, oldSize = oldValue.Offset, oldValue.Size
  51. }
  52. cs.overflow = cs.overflow.setOverflowEntry(SectionalNeedleValue{Key: skey, Offset: offset, Size: size})
  53. } else {
  54. p := &cs.values[cs.counter]
  55. p.Key, p.Offset, p.Size = skey, offset, size
  56. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  57. cs.counter++
  58. }
  59. }
  60. cs.Unlock()
  61. return
  62. }
  63. //return old entry size
  64. func (cs *CompactSection) Delete(key NeedleId) uint32 {
  65. skey := SectionalNeedleId(key - cs.start)
  66. cs.Lock()
  67. ret := uint32(0)
  68. if i := cs.binarySearchValues(skey); i >= 0 {
  69. if cs.values[i].Size > 0 {
  70. ret = cs.values[i].Size
  71. cs.values[i].Size = 0
  72. }
  73. }
  74. if v, found := cs.overflow.findOverflowEntry(skey); found {
  75. cs.overflow = cs.overflow.deleteOverflowEntry(skey)
  76. ret = v.Size
  77. }
  78. cs.Unlock()
  79. return ret
  80. }
  81. func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
  82. cs.RLock()
  83. skey := SectionalNeedleId(key - cs.start)
  84. if v, ok := cs.overflow.findOverflowEntry(skey); ok {
  85. cs.RUnlock()
  86. nv := v.toNeedleValue(cs)
  87. return &nv, true
  88. }
  89. if i := cs.binarySearchValues(skey); i >= 0 {
  90. cs.RUnlock()
  91. nv := cs.values[i].toNeedleValue(cs)
  92. return &nv, true
  93. }
  94. cs.RUnlock()
  95. return nil, false
  96. }
  97. func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int {
  98. l, h := 0, cs.counter-1
  99. if h >= 0 && cs.values[h].Key < key {
  100. return -2
  101. }
  102. //println("looking for key", key)
  103. for l <= h {
  104. m := (l + h) / 2
  105. //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
  106. if cs.values[m].Key < key {
  107. l = m + 1
  108. } else if key < cs.values[m].Key {
  109. h = m - 1
  110. } else {
  111. //println("found", m)
  112. return m
  113. }
  114. }
  115. return -1
  116. }
  117. //This map assumes mostly inserting increasing keys
  118. //This map assumes mostly inserting increasing keys
  119. type CompactMap struct {
  120. list []*CompactSection
  121. }
  122. func NewCompactMap() *CompactMap {
  123. return &CompactMap{}
  124. }
  125. func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  126. x := cm.binarySearchCompactSection(key)
  127. if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
  128. //println(x, "creating", len(cm.list), "section, starting", key)
  129. cs := NewCompactSection(key)
  130. cm.list = append(cm.list, cs)
  131. x = len(cm.list) - 1
  132. //keep compact section sorted by start
  133. for x > 0 {
  134. if cm.list[x-1].start > key {
  135. cm.list[x] = cm.list[x-1]
  136. x = x - 1
  137. } else {
  138. cm.list[x] = cs
  139. break
  140. }
  141. }
  142. }
  143. return cm.list[x].Set(key, offset, size)
  144. }
  145. func (cm *CompactMap) Delete(key NeedleId) uint32 {
  146. x := cm.binarySearchCompactSection(key)
  147. if x < 0 {
  148. return uint32(0)
  149. }
  150. return cm.list[x].Delete(key)
  151. }
  152. func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
  153. x := cm.binarySearchCompactSection(key)
  154. if x < 0 {
  155. return nil, false
  156. }
  157. return cm.list[x].Get(key)
  158. }
  159. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  160. l, h := 0, len(cm.list)-1
  161. if h < 0 {
  162. return -5
  163. }
  164. if cm.list[h].start <= key {
  165. if cm.list[h].counter < batch || key <= cm.list[h].end {
  166. return h
  167. }
  168. return -4
  169. }
  170. for l <= h {
  171. m := (l + h) / 2
  172. if key < cm.list[m].start {
  173. h = m - 1
  174. } else { // cm.list[m].start <= key
  175. if cm.list[m+1].start <= key {
  176. l = m + 1
  177. } else {
  178. return m
  179. }
  180. }
  181. }
  182. return -3
  183. }
  184. // Visit visits all entries or stop if any error when visiting
  185. func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
  186. for _, cs := range cm.list {
  187. cs.RLock()
  188. for _, v := range cs.overflow {
  189. if err := visit(v.toNeedleValue(cs)); err != nil {
  190. cs.RUnlock()
  191. return err
  192. }
  193. }
  194. for _, v := range cs.values {
  195. if _, found := cs.overflow.findOverflowEntry(v.Key); !found {
  196. if err := visit(v.toNeedleValue(cs)); err != nil {
  197. cs.RUnlock()
  198. return err
  199. }
  200. }
  201. }
  202. cs.RUnlock()
  203. }
  204. return nil
  205. }
  206. func (o Overflow) deleteOverflowEntry(key SectionalNeedleId) Overflow {
  207. length := len(o)
  208. deleteCandidate := sort.Search(length, func(i int) bool {
  209. return o[i].Key >= key
  210. })
  211. if deleteCandidate != length && o[deleteCandidate].Key == key {
  212. for i := deleteCandidate; i < length-1; i++ {
  213. o[i] = o[i+1]
  214. }
  215. o = o[0 : length-1]
  216. }
  217. return o
  218. }
  219. func (o Overflow) setOverflowEntry(needleValue SectionalNeedleValue) Overflow {
  220. insertCandidate := sort.Search(len(o), func(i int) bool {
  221. return o[i].Key >= needleValue.Key
  222. })
  223. if insertCandidate != len(o) && o[insertCandidate].Key == needleValue.Key {
  224. o[insertCandidate] = needleValue
  225. } else {
  226. o = append(o, needleValue)
  227. for i := len(o) - 1; i > insertCandidate; i-- {
  228. o[i] = o[i-1]
  229. }
  230. o[insertCandidate] = needleValue
  231. }
  232. return o
  233. }
  234. func (o Overflow) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) {
  235. foundCandidate := sort.Search(len(o), func(i int) bool {
  236. return o[i].Key >= key
  237. })
  238. if foundCandidate != len(o) && o[foundCandidate].Key == key {
  239. return o[foundCandidate], true
  240. }
  241. return nv, false
  242. }
  243. func (snv SectionalNeedleValue) toNeedleValue(cs *CompactSection) NeedleValue {
  244. return NeedleValue{NeedleId(snv.Key) + cs.start, snv.Offset, snv.Size}
  245. }
  246. func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) SectionalNeedleValue {
  247. return SectionalNeedleValue{SectionalNeedleId(nv.Key - cs.start), nv.Offset, nv.Size}
  248. }