You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

255 lines
6.5 KiB

7 years ago
6 years ago
  1. package needle
  2. import (
  3. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  4. "sort"
  5. "sync"
  6. )
  7. const (
  8. batch = 100000
  9. )
  10. type SectionalNeedleId uint32
  11. const SectionalNeedleIdLimit = 1<<32 - 1
  12. type SectionalNeedleValue struct {
  13. Key SectionalNeedleId
  14. Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
  15. Size uint32 `comment:"Size of the data portion"`
  16. }
  17. type CompactSection struct {
  18. sync.RWMutex
  19. values []SectionalNeedleValue
  20. overflow Overflow
  21. start NeedleId
  22. end NeedleId
  23. counter int
  24. }
  25. type Overflow []SectionalNeedleValue
  26. func NewCompactSection(start NeedleId) *CompactSection {
  27. return &CompactSection{
  28. values: make([]SectionalNeedleValue, batch),
  29. overflow: Overflow(make([]SectionalNeedleValue, 0)),
  30. start: start,
  31. }
  32. }
  33. //return old entry size
  34. func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  35. cs.Lock()
  36. if key > cs.end {
  37. cs.end = key
  38. }
  39. skey := SectionalNeedleId(key - cs.start)
  40. if i := cs.binarySearchValues(skey); i >= 0 {
  41. oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
  42. //println("key", key, "old size", ret)
  43. cs.values[i].Offset, cs.values[i].Size = offset, size
  44. } else {
  45. needOverflow := cs.counter >= batch
  46. needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
  47. if needOverflow {
  48. //println("start", cs.start, "counter", cs.counter, "key", key)
  49. if oldValue, found := cs.overflow.findOverflowEntry(skey); found {
  50. oldOffset, oldSize = oldValue.Offset, oldValue.Size
  51. }
  52. cs.overflow = cs.overflow.setOverflowEntry(SectionalNeedleValue{Key: skey, Offset: offset, Size: size})
  53. } else {
  54. p := &cs.values[cs.counter]
  55. p.Key, p.Offset, p.Size = skey, offset, size
  56. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  57. cs.counter++
  58. }
  59. }
  60. cs.Unlock()
  61. return
  62. }
  63. //return old entry size
  64. func (cs *CompactSection) Delete(key NeedleId) uint32 {
  65. skey := SectionalNeedleId(key - cs.start)
  66. cs.Lock()
  67. ret := uint32(0)
  68. if i := cs.binarySearchValues(skey); i >= 0 {
  69. if cs.values[i].Size > 0 {
  70. ret = cs.values[i].Size
  71. cs.values[i].Size = 0
  72. }
  73. }
  74. if v, found := cs.overflow.findOverflowEntry(skey); found {
  75. cs.overflow = cs.overflow.deleteOverflowEntry(skey)
  76. ret = v.Size
  77. }
  78. cs.Unlock()
  79. return ret
  80. }
  81. func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
  82. cs.RLock()
  83. skey := SectionalNeedleId(key - cs.start)
  84. if v, ok := cs.overflow.findOverflowEntry(skey); ok {
  85. cs.RUnlock()
  86. nv := v.toNeedleValue(cs)
  87. return &nv, true
  88. }
  89. if i := cs.binarySearchValues(skey); i >= 0 {
  90. cs.RUnlock()
  91. nv := cs.values[i].toNeedleValue(cs)
  92. return &nv, true
  93. }
  94. cs.RUnlock()
  95. return nil, false
  96. }
  97. func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int {
  98. x := sort.Search(cs.counter, func(i int) bool {
  99. return cs.values[i].Key >= key
  100. })
  101. if x == cs.counter {
  102. return -1
  103. }
  104. if cs.values[x].Key > key {
  105. return -2
  106. }
  107. return x
  108. }
  109. //This map assumes mostly inserting increasing keys
  110. //This map assumes mostly inserting increasing keys
  111. type CompactMap struct {
  112. list []*CompactSection
  113. }
  114. func NewCompactMap() *CompactMap {
  115. return &CompactMap{}
  116. }
  117. func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  118. x := cm.binarySearchCompactSection(key)
  119. if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
  120. // println(x, "adding to existing", len(cm.list), "sections, starting", key)
  121. cs := NewCompactSection(key)
  122. cm.list = append(cm.list, cs)
  123. x = len(cm.list) - 1
  124. //keep compact section sorted by start
  125. for x >= 0 {
  126. if x > 0 && cm.list[x-1].start > key {
  127. cm.list[x] = cm.list[x-1]
  128. // println("shift", x, "start", cs.start, "to", x-1)
  129. x = x - 1
  130. } else {
  131. cm.list[x] = cs
  132. // println("cs", x, "start", cs.start)
  133. break
  134. }
  135. }
  136. }
  137. // println(key, "set to section[", x, "].start", cm.list[x].start)
  138. return cm.list[x].Set(key, offset, size)
  139. }
  140. func (cm *CompactMap) Delete(key NeedleId) uint32 {
  141. x := cm.binarySearchCompactSection(key)
  142. if x < 0 {
  143. return uint32(0)
  144. }
  145. return cm.list[x].Delete(key)
  146. }
  147. func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
  148. x := cm.binarySearchCompactSection(key)
  149. if x < 0 {
  150. return nil, false
  151. }
  152. return cm.list[x].Get(key)
  153. }
  154. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  155. if len(cm.list) == 0 {
  156. return -1
  157. }
  158. x := sort.Search(len(cm.list), func(i int) bool {
  159. return cm.list[i].start >= key
  160. })
  161. if len(cm.list) == x {
  162. return -1
  163. }
  164. if cm.list[x].start == key {
  165. return x
  166. }
  167. return x - 1
  168. }
  169. // Visit visits all entries or stop if any error when visiting
  170. func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
  171. for _, cs := range cm.list {
  172. cs.RLock()
  173. for _, v := range cs.overflow {
  174. if err := visit(v.toNeedleValue(cs)); err != nil {
  175. cs.RUnlock()
  176. return err
  177. }
  178. }
  179. for i, v := range cs.values {
  180. if i >= cs.counter {
  181. break
  182. }
  183. if _, found := cs.overflow.findOverflowEntry(v.Key); !found {
  184. if err := visit(v.toNeedleValue(cs)); err != nil {
  185. cs.RUnlock()
  186. return err
  187. }
  188. }
  189. }
  190. cs.RUnlock()
  191. }
  192. return nil
  193. }
  194. func (o Overflow) deleteOverflowEntry(key SectionalNeedleId) Overflow {
  195. length := len(o)
  196. deleteCandidate := sort.Search(length, func(i int) bool {
  197. return o[i].Key >= key
  198. })
  199. if deleteCandidate != length && o[deleteCandidate].Key == key {
  200. for i := deleteCandidate; i < length-1; i++ {
  201. o[i] = o[i+1]
  202. }
  203. o = o[0 : length-1]
  204. }
  205. return o
  206. }
  207. func (o Overflow) setOverflowEntry(needleValue SectionalNeedleValue) Overflow {
  208. insertCandidate := sort.Search(len(o), func(i int) bool {
  209. return o[i].Key >= needleValue.Key
  210. })
  211. if insertCandidate != len(o) && o[insertCandidate].Key == needleValue.Key {
  212. o[insertCandidate] = needleValue
  213. } else {
  214. o = append(o, needleValue)
  215. for i := len(o) - 1; i > insertCandidate; i-- {
  216. o[i] = o[i-1]
  217. }
  218. o[insertCandidate] = needleValue
  219. }
  220. return o
  221. }
  222. func (o Overflow) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) {
  223. foundCandidate := sort.Search(len(o), func(i int) bool {
  224. return o[i].Key >= key
  225. })
  226. if foundCandidate != len(o) && o[foundCandidate].Key == key {
  227. return o[foundCandidate], true
  228. }
  229. return nv, false
  230. }
  231. func (snv SectionalNeedleValue) toNeedleValue(cs *CompactSection) NeedleValue {
  232. return NeedleValue{NeedleId(snv.Key) + cs.start, snv.Offset, snv.Size}
  233. }
  234. func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) SectionalNeedleValue {
  235. return SectionalNeedleValue{SectionalNeedleId(nv.Key - cs.start), nv.Offset, nv.Size}
  236. }