You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

244 lines
5.7 KiB

7 years ago
6 years ago
  1. package needle
  2. import (
  3. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  4. "sort"
  5. "sync"
  6. )
  7. const (
  8. batch = 100000
  9. )
  10. type CompactSection struct {
  11. sync.RWMutex
  12. values []NeedleValue
  13. overflow Overflow
  14. start NeedleId
  15. end NeedleId
  16. counter int
  17. }
  18. type Overflow []NeedleValue
  19. func NewCompactSection(start NeedleId) *CompactSection {
  20. return &CompactSection{
  21. values: make([]NeedleValue, batch),
  22. overflow: Overflow(make([]NeedleValue, 0)),
  23. start: start,
  24. }
  25. }
  26. //return old entry size
  27. func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  28. cs.Lock()
  29. if key > cs.end {
  30. cs.end = key
  31. }
  32. if i := cs.binarySearchValues(key); i >= 0 {
  33. oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
  34. //println("key", key, "old size", ret)
  35. cs.values[i].Offset, cs.values[i].Size = offset, size
  36. } else {
  37. needOverflow := cs.counter >= batch
  38. needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
  39. if needOverflow {
  40. //println("start", cs.start, "counter", cs.counter, "key", key)
  41. if oldValue, found := cs.overflow.findOverflowEntry(key); found {
  42. oldOffset, oldSize = oldValue.Offset, oldValue.Size
  43. }
  44. cs.overflow = cs.overflow.setOverflowEntry(NeedleValue{Key: key, Offset: offset, Size: size})
  45. } else {
  46. p := &cs.values[cs.counter]
  47. p.Key, p.Offset, p.Size = key, offset, size
  48. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  49. cs.counter++
  50. }
  51. }
  52. cs.Unlock()
  53. return
  54. }
  55. //return old entry size
  56. func (cs *CompactSection) Delete(key NeedleId) uint32 {
  57. cs.Lock()
  58. ret := uint32(0)
  59. if i := cs.binarySearchValues(key); i >= 0 {
  60. if cs.values[i].Size > 0 {
  61. ret = cs.values[i].Size
  62. cs.values[i].Size = 0
  63. }
  64. }
  65. if v, found := cs.overflow.findOverflowEntry(key); found {
  66. cs.overflow = cs.overflow.deleteOverflowEntry(key)
  67. ret = v.Size
  68. }
  69. cs.Unlock()
  70. return ret
  71. }
  72. func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
  73. cs.RLock()
  74. if v, ok := cs.overflow.findOverflowEntry(key); ok {
  75. cs.RUnlock()
  76. return &v, true
  77. }
  78. if i := cs.binarySearchValues(key); i >= 0 {
  79. cs.RUnlock()
  80. return &cs.values[i], true
  81. }
  82. cs.RUnlock()
  83. return nil, false
  84. }
  85. func (cs *CompactSection) binarySearchValues(key NeedleId) int {
  86. l, h := 0, cs.counter-1
  87. if h >= 0 && cs.values[h].Key < key {
  88. return -2
  89. }
  90. //println("looking for key", key)
  91. for l <= h {
  92. m := (l + h) / 2
  93. //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
  94. if cs.values[m].Key < key {
  95. l = m + 1
  96. } else if key < cs.values[m].Key {
  97. h = m - 1
  98. } else {
  99. //println("found", m)
  100. return m
  101. }
  102. }
  103. return -1
  104. }
  105. //This map assumes mostly inserting increasing keys
  106. //This map assumes mostly inserting increasing keys
  107. type CompactMap struct {
  108. list []*CompactSection
  109. }
  110. func NewCompactMap() *CompactMap {
  111. return &CompactMap{}
  112. }
  113. func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  114. x := cm.binarySearchCompactSection(key)
  115. if x < 0 {
  116. //println(x, "creating", len(cm.list), "section, starting", key)
  117. cs := NewCompactSection(key)
  118. cm.list = append(cm.list, cs)
  119. x = len(cm.list) - 1
  120. //keep compact section sorted by start
  121. for x > 0 {
  122. if cm.list[x-1].start > key {
  123. cm.list[x] = cm.list[x-1]
  124. x = x - 1
  125. } else {
  126. cm.list[x] = cs
  127. break
  128. }
  129. }
  130. }
  131. return cm.list[x].Set(key, offset, size)
  132. }
  133. func (cm *CompactMap) Delete(key NeedleId) uint32 {
  134. x := cm.binarySearchCompactSection(key)
  135. if x < 0 {
  136. return uint32(0)
  137. }
  138. return cm.list[x].Delete(key)
  139. }
  140. func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
  141. x := cm.binarySearchCompactSection(key)
  142. if x < 0 {
  143. return nil, false
  144. }
  145. return cm.list[x].Get(key)
  146. }
  147. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  148. l, h := 0, len(cm.list)-1
  149. if h < 0 {
  150. return -5
  151. }
  152. if cm.list[h].start <= key {
  153. if cm.list[h].counter < batch || key <= cm.list[h].end {
  154. return h
  155. }
  156. return -4
  157. }
  158. for l <= h {
  159. m := (l + h) / 2
  160. if key < cm.list[m].start {
  161. h = m - 1
  162. } else { // cm.list[m].start <= key
  163. if cm.list[m+1].start <= key {
  164. l = m + 1
  165. } else {
  166. return m
  167. }
  168. }
  169. }
  170. return -3
  171. }
  172. // Visit visits all entries or stop if any error when visiting
  173. func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
  174. for _, cs := range cm.list {
  175. cs.RLock()
  176. for _, v := range cs.overflow {
  177. if err := visit(v); err != nil {
  178. cs.RUnlock()
  179. return err
  180. }
  181. }
  182. for _, v := range cs.values {
  183. if _, found := cs.overflow.findOverflowEntry(v.Key); !found {
  184. if err := visit(v); err != nil {
  185. cs.RUnlock()
  186. return err
  187. }
  188. }
  189. }
  190. cs.RUnlock()
  191. }
  192. return nil
  193. }
  194. func (o Overflow) deleteOverflowEntry(key NeedleId) Overflow {
  195. length := len(o)
  196. deleteCandidate := sort.Search(length, func(i int) bool {
  197. return o[i].Key >= key
  198. })
  199. if deleteCandidate != length && o[deleteCandidate].Key == key {
  200. for i := deleteCandidate; i < length-1; i++ {
  201. o[i] = o[i+1]
  202. }
  203. o = o[0 : length-1]
  204. }
  205. return o
  206. }
  207. func (o Overflow) setOverflowEntry(needleValue NeedleValue) Overflow {
  208. insertCandidate := sort.Search(len(o), func(i int) bool {
  209. return o[i].Key >= needleValue.Key
  210. })
  211. if insertCandidate != len(o) && o[insertCandidate].Key == needleValue.Key {
  212. o[insertCandidate] = needleValue
  213. } else {
  214. o = append(o, needleValue)
  215. for i := len(o) - 1; i > insertCandidate; i-- {
  216. o[i] = o[i-1]
  217. }
  218. o[insertCandidate] = needleValue
  219. }
  220. return o
  221. }
  222. func (o Overflow) findOverflowEntry(key NeedleId) (nv NeedleValue, found bool) {
  223. foundCandidate := sort.Search(len(o), func(i int) bool {
  224. return o[i].Key >= key
  225. })
  226. if foundCandidate != len(o) && o[foundCandidate].Key == key {
  227. return o[foundCandidate], true
  228. }
  229. return nv, false
  230. }