You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
4.5 KiB

7 years ago
6 years ago
  1. package needle
  2. import (
  3. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  4. "sync"
  5. )
  6. const (
  7. batch = 100000
  8. )
  9. type CompactSection struct {
  10. sync.RWMutex
  11. values []NeedleValue
  12. overflow map[NeedleId]NeedleValue
  13. start NeedleId
  14. end NeedleId
  15. counter int
  16. }
  17. func NewCompactSection(start NeedleId) *CompactSection {
  18. return &CompactSection{
  19. values: make([]NeedleValue, batch),
  20. overflow: make(map[NeedleId]NeedleValue),
  21. start: start,
  22. }
  23. }
  24. //return old entry size
  25. func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  26. cs.Lock()
  27. if key > cs.end {
  28. cs.end = key
  29. }
  30. if i := cs.binarySearchValues(key); i >= 0 {
  31. oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
  32. //println("key", key, "old size", ret)
  33. cs.values[i].Offset, cs.values[i].Size = offset, size
  34. } else {
  35. needOverflow := cs.counter >= batch
  36. needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
  37. if needOverflow {
  38. //println("start", cs.start, "counter", cs.counter, "key", key)
  39. if oldValue, found := cs.overflow[key]; found {
  40. oldOffset, oldSize = oldValue.Offset, oldValue.Size
  41. }
  42. cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size}
  43. } else {
  44. p := &cs.values[cs.counter]
  45. p.Key, p.Offset, p.Size = key, offset, size
  46. //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
  47. cs.counter++
  48. }
  49. }
  50. cs.Unlock()
  51. return
  52. }
  53. //return old entry size
  54. func (cs *CompactSection) Delete(key NeedleId) uint32 {
  55. cs.Lock()
  56. ret := uint32(0)
  57. if i := cs.binarySearchValues(key); i >= 0 {
  58. if cs.values[i].Size > 0 {
  59. ret = cs.values[i].Size
  60. cs.values[i].Size = 0
  61. }
  62. }
  63. if v, found := cs.overflow[key]; found {
  64. delete(cs.overflow, key)
  65. ret = v.Size
  66. }
  67. cs.Unlock()
  68. return ret
  69. }
  70. func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
  71. cs.RLock()
  72. if v, ok := cs.overflow[key]; ok {
  73. cs.RUnlock()
  74. return &v, true
  75. }
  76. if i := cs.binarySearchValues(key); i >= 0 {
  77. cs.RUnlock()
  78. return &cs.values[i], true
  79. }
  80. cs.RUnlock()
  81. return nil, false
  82. }
  83. func (cs *CompactSection) binarySearchValues(key NeedleId) int {
  84. l, h := 0, cs.counter-1
  85. if h >= 0 && cs.values[h].Key < key {
  86. return -2
  87. }
  88. //println("looking for key", key)
  89. for l <= h {
  90. m := (l + h) / 2
  91. //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
  92. if cs.values[m].Key < key {
  93. l = m + 1
  94. } else if key < cs.values[m].Key {
  95. h = m - 1
  96. } else {
  97. //println("found", m)
  98. return m
  99. }
  100. }
  101. return -1
  102. }
  103. //This map assumes mostly inserting increasing keys
  104. //This map assumes mostly inserting increasing keys
  105. type CompactMap struct {
  106. list []*CompactSection
  107. }
  108. func NewCompactMap() *CompactMap {
  109. return &CompactMap{}
  110. }
  111. func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
  112. x := cm.binarySearchCompactSection(key)
  113. if x < 0 {
  114. //println(x, "creating", len(cm.list), "section, starting", key)
  115. cs := NewCompactSection(key)
  116. cm.list = append(cm.list, cs)
  117. x = len(cm.list) - 1
  118. //keep compact section sorted by start
  119. for x > 0 {
  120. if cm.list[x-1].start > key {
  121. cm.list[x] = cm.list[x-1]
  122. x = x - 1
  123. } else {
  124. cm.list[x] = cs
  125. break
  126. }
  127. }
  128. }
  129. return cm.list[x].Set(key, offset, size)
  130. }
  131. func (cm *CompactMap) Delete(key NeedleId) uint32 {
  132. x := cm.binarySearchCompactSection(key)
  133. if x < 0 {
  134. return uint32(0)
  135. }
  136. return cm.list[x].Delete(key)
  137. }
  138. func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
  139. x := cm.binarySearchCompactSection(key)
  140. if x < 0 {
  141. return nil, false
  142. }
  143. return cm.list[x].Get(key)
  144. }
  145. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  146. l, h := 0, len(cm.list)-1
  147. if h < 0 {
  148. return -5
  149. }
  150. if cm.list[h].start <= key {
  151. if cm.list[h].counter < batch || key <= cm.list[h].end {
  152. return h
  153. }
  154. return -4
  155. }
  156. for l <= h {
  157. m := (l + h) / 2
  158. if key < cm.list[m].start {
  159. h = m - 1
  160. } else { // cm.list[m].start <= key
  161. if cm.list[m+1].start <= key {
  162. l = m + 1
  163. } else {
  164. return m
  165. }
  166. }
  167. }
  168. return -3
  169. }
  170. // Visit visits all entries or stop if any error when visiting
  171. func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
  172. for _, cs := range cm.list {
  173. cs.RLock()
  174. for _, v := range cs.overflow {
  175. if err := visit(v); err != nil {
  176. cs.RUnlock()
  177. return err
  178. }
  179. }
  180. for _, v := range cs.values {
  181. if _, found := cs.overflow[v.Key]; !found {
  182. if err := visit(v); err != nil {
  183. cs.RUnlock()
  184. return err
  185. }
  186. }
  187. }
  188. cs.RUnlock()
  189. }
  190. return nil
  191. }