You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
6.6 KiB

7 years ago
6 years ago
  1. package needle
  2. import (
  3. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  4. "sort"
  5. "sync"
  6. )
  7. const (
  8. batch = 100000
  9. )
  10. type SectionalNeedleId uint32
  11. const SectionalNeedleIdLimit = 1<<32 - 1
  12. type SectionalNeedleValue struct {
  13. Key SectionalNeedleId
  14. Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
  15. Size uint32 `comment:"Size of the data portion"`
  16. }
  17. type CompactSection struct {
  18. sync.RWMutex
  19. values []SectionalNeedleValue
  20. overflow Overflow
  21. start NeedleId
  22. end NeedleId
  23. counter int
  24. }
  25. type Overflow []SectionalNeedleValue
  26. func NewCompactSection(start NeedleId) *CompactSection {
  27. return &CompactSection{
  28. values: make([]SectionalNeedleValue, batch),
  29. overflow: Overflow(make([]SectionalNeedleValue, 0)),
  30. start: start,
  31. }
  32. }
  33. //return old entry size
  34. func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  35. cs.Lock()
  36. if key > cs.end {
  37. cs.end = key
  38. }
  39. skey := SectionalNeedleId(key - cs.start)
  40. if i := cs.binarySearchValues(skey); i >= 0 {
  41. oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
  42. //println("key", key, "old size", ret)
  43. cs.values[i].Offset, cs.values[i].Size = offset, size
  44. } else {
  45. needOverflow := cs.counter >= batch
  46. needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
  47. if needOverflow {
  48. //println("start", cs.start, "counter", cs.counter, "key", key)
  49. if oldValue, found := cs.overflow.findOverflowEntry(skey); found {
  50. oldOffset, oldSize = oldValue.Offset, oldValue.Size
  51. }
  52. cs.overflow = cs.overflow.setOverflowEntry(SectionalNeedleValue{Key: skey, Offset: offset, Size: size})
  53. } else {
  54. p := &cs.values[cs.counter]
  55. p.Key, p.Offset, p.Size = skey, offset, size
  56. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  57. cs.counter++
  58. }
  59. }
  60. cs.Unlock()
  61. return
  62. }
  63. //return old entry size
  64. func (cs *CompactSection) Delete(key NeedleId) uint32 {
  65. skey := SectionalNeedleId(key - cs.start)
  66. cs.Lock()
  67. ret := uint32(0)
  68. if i := cs.binarySearchValues(skey); i >= 0 {
  69. if cs.values[i].Size > 0 {
  70. ret = cs.values[i].Size
  71. cs.values[i].Size = 0
  72. }
  73. }
  74. if v, found := cs.overflow.findOverflowEntry(skey); found {
  75. cs.overflow = cs.overflow.deleteOverflowEntry(skey)
  76. ret = v.Size
  77. }
  78. cs.Unlock()
  79. return ret
  80. }
  81. func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
  82. cs.RLock()
  83. skey := SectionalNeedleId(key - cs.start)
  84. if v, ok := cs.overflow.findOverflowEntry(skey); ok {
  85. cs.RUnlock()
  86. nv := v.toNeedleValue(cs)
  87. return &nv, true
  88. }
  89. if i := cs.binarySearchValues(skey); i >= 0 {
  90. cs.RUnlock()
  91. nv := cs.values[i].toNeedleValue(cs)
  92. return &nv, true
  93. }
  94. cs.RUnlock()
  95. return nil, false
  96. }
  97. func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int {
  98. x := sort.Search(cs.counter, func(i int) bool {
  99. return cs.values[i].Key >= key
  100. })
  101. if x == cs.counter {
  102. return -1
  103. }
  104. if cs.values[x].Key > key {
  105. return -2
  106. }
  107. return x
  108. }
  109. //This map assumes mostly inserting increasing keys
  110. //This map assumes mostly inserting increasing keys
  111. type CompactMap struct {
  112. list []*CompactSection
  113. }
  114. func NewCompactMap() *CompactMap {
  115. return &CompactMap{}
  116. }
  117. func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  118. x := cm.binarySearchCompactSection(key)
  119. if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
  120. // println(x, "adding to existing", len(cm.list), "sections, starting", key)
  121. cs := NewCompactSection(key)
  122. cm.list = append(cm.list, cs)
  123. x = len(cm.list) - 1
  124. //keep compact section sorted by start
  125. for x >= 0 {
  126. if x > 0 && cm.list[x-1].start > key {
  127. cm.list[x] = cm.list[x-1]
  128. // println("shift", x, "start", cs.start, "to", x-1)
  129. x = x - 1
  130. } else {
  131. cm.list[x] = cs
  132. // println("cs", x, "start", cs.start)
  133. break
  134. }
  135. }
  136. }
  137. // println(key, "set to section[", x, "].start", cm.list[x].start)
  138. return cm.list[x].Set(key, offset, size)
  139. }
  140. func (cm *CompactMap) Delete(key NeedleId) uint32 {
  141. x := cm.binarySearchCompactSection(key)
  142. if x < 0 {
  143. return uint32(0)
  144. }
  145. return cm.list[x].Delete(key)
  146. }
  147. func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
  148. x := cm.binarySearchCompactSection(key)
  149. if x < 0 {
  150. return nil, false
  151. }
  152. return cm.list[x].Get(key)
  153. }
  154. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  155. l, h := 0, len(cm.list)-1
  156. if h < 0 {
  157. return -5
  158. }
  159. if cm.list[h].start <= key {
  160. if cm.list[h].counter < batch || key <= cm.list[h].end {
  161. return h
  162. }
  163. return -4
  164. }
  165. for l <= h {
  166. m := (l + h) / 2
  167. if key < cm.list[m].start {
  168. h = m - 1
  169. } else { // cm.list[m].start <= key
  170. if cm.list[m+1].start <= key {
  171. l = m + 1
  172. } else {
  173. return m
  174. }
  175. }
  176. }
  177. return -3
  178. }
  179. // Visit visits all entries or stop if any error when visiting
  180. func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
  181. for _, cs := range cm.list {
  182. cs.RLock()
  183. for _, v := range cs.overflow {
  184. if err := visit(v.toNeedleValue(cs)); err != nil {
  185. cs.RUnlock()
  186. return err
  187. }
  188. }
  189. for i, v := range cs.values {
  190. if i >= cs.counter {
  191. break
  192. }
  193. if _, found := cs.overflow.findOverflowEntry(v.Key); !found {
  194. if err := visit(v.toNeedleValue(cs)); err != nil {
  195. cs.RUnlock()
  196. return err
  197. }
  198. }
  199. }
  200. cs.RUnlock()
  201. }
  202. return nil
  203. }
  204. func (o Overflow) deleteOverflowEntry(key SectionalNeedleId) Overflow {
  205. length := len(o)
  206. deleteCandidate := sort.Search(length, func(i int) bool {
  207. return o[i].Key >= key
  208. })
  209. if deleteCandidate != length && o[deleteCandidate].Key == key {
  210. for i := deleteCandidate; i < length-1; i++ {
  211. o[i] = o[i+1]
  212. }
  213. o = o[0 : length-1]
  214. }
  215. return o
  216. }
  217. func (o Overflow) setOverflowEntry(needleValue SectionalNeedleValue) Overflow {
  218. insertCandidate := sort.Search(len(o), func(i int) bool {
  219. return o[i].Key >= needleValue.Key
  220. })
  221. if insertCandidate != len(o) && o[insertCandidate].Key == needleValue.Key {
  222. o[insertCandidate] = needleValue
  223. } else {
  224. o = append(o, needleValue)
  225. for i := len(o) - 1; i > insertCandidate; i-- {
  226. o[i] = o[i-1]
  227. }
  228. o[insertCandidate] = needleValue
  229. }
  230. return o
  231. }
  232. func (o Overflow) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) {
  233. foundCandidate := sort.Search(len(o), func(i int) bool {
  234. return o[i].Key >= key
  235. })
  236. if foundCandidate != len(o) && o[foundCandidate].Key == key {
  237. return o[foundCandidate], true
  238. }
  239. return nv, false
  240. }
  241. func (snv SectionalNeedleValue) toNeedleValue(cs *CompactSection) NeedleValue {
  242. return NeedleValue{NeedleId(snv.Key) + cs.start, snv.Offset, snv.Size}
  243. }
  244. func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) SectionalNeedleValue {
  245. return SectionalNeedleValue{SectionalNeedleId(nv.Key - cs.start), nv.Offset, nv.Size}
  246. }