You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

499 lines
12 KiB

3 years ago
3 years ago
3 years ago
  1. package redis3
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/util/skiplist"
  7. "github.com/go-redis/redis/v8"
  8. )
  9. type ItemList struct {
  10. skipList *skiplist.SkipList
  11. batchSize int
  12. client redis.UniversalClient
  13. prefix string
  14. }
  15. func newItemList(client redis.UniversalClient, prefix string, store skiplist.ListStore, batchSize int) *ItemList {
  16. return &ItemList{
  17. skipList: skiplist.New(store),
  18. batchSize: batchSize,
  19. client: client,
  20. prefix: prefix,
  21. }
  22. }
  23. /*
  24. Be reluctant to create new nodes. Try to fit into either previous node or next node.
  25. Prefer to add to previous node.
  26. There are multiple cases after finding the name for greater or equal node
  27. 1. found and node.Key == name
  28. The node contains a batch with leading key the same as the name
  29. nothing to do
  30. 2. no such node found or node.Key > name
  31. if no such node found
  32. prevNode = list.LargestNode
  33. // case 2.1
  34. if previousNode contains name
  35. nothing to do
  36. // prefer to add to previous node
  37. if prevNode != nil {
  38. // case 2.2
  39. if prevNode has capacity
  40. prevNode.add name, and save
  41. return
  42. // case 2.3
  43. split prevNode by name
  44. }
  45. // case 2.4
  46. // merge into next node. Avoid too many nodes if adding data in reverse order.
  47. if nextNode is not nil and nextNode has capacity
  48. delete nextNode.Key
  49. nextNode.Key = name
  50. nextNode.batch.add name
  51. insert nodeNode.Key
  52. return
  53. // case 2.5
  54. if prevNode is nil
  55. insert new node with key = name, value = batch{name}
  56. return
  57. */
  58. func (nl *ItemList) canAddMember(node *skiplist.SkipListElementReference, name string) (alreadyContains bool, nodeSize int, err error) {
  59. ctx := context.Background()
  60. pipe := nl.client.TxPipeline()
  61. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  62. countOperation := pipe.ZLexCount(ctx, key, "-", "+")
  63. scoreOperationt := pipe.ZScore(ctx, key, name)
  64. if _, err = pipe.Exec(ctx); err != nil && err != redis.Nil {
  65. return false, 0, err
  66. }
  67. if err == redis.Nil {
  68. err = nil
  69. }
  70. alreadyContains = scoreOperationt.Err() == nil
  71. nodeSize = int(countOperation.Val())
  72. return
  73. }
  74. func (nl *ItemList) WriteName(name string) error {
  75. lookupKey := []byte(name)
  76. prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
  77. if err != nil {
  78. return err
  79. }
  80. // case 1: the name already exists as one leading key in the batch
  81. if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
  82. return nil
  83. }
  84. var prevNodeReference *skiplist.SkipListElementReference
  85. if !found {
  86. prevNodeReference = nl.skipList.GetLargestNodeReference()
  87. }
  88. if nextNode != nil && prevNode == nil {
  89. prevNodeReference = nextNode.Prev
  90. }
  91. if prevNodeReference != nil {
  92. alreadyContains, nodeSize, err := nl.canAddMember(prevNodeReference, name)
  93. if err != nil {
  94. return err
  95. }
  96. if alreadyContains {
  97. // case 2.1
  98. return nil
  99. }
  100. // case 2.2
  101. if nodeSize < nl.batchSize {
  102. return nl.NodeAddMember(prevNodeReference, name)
  103. }
  104. // case 2.3
  105. x := nl.NodeInnerPosition(prevNodeReference, name)
  106. y := nodeSize - x
  107. addToX := x <= y
  108. // add to a new node
  109. if x == 0 || y == 0 {
  110. if err := nl.ItemAdd(lookupKey, 0, name); err != nil {
  111. return err
  112. }
  113. return nil
  114. }
  115. if addToX {
  116. // collect names before name, add them to X
  117. namesToX, err := nl.NodeRangeBeforeExclusive(prevNodeReference, name)
  118. if err != nil {
  119. return nil
  120. }
  121. // delete skiplist reference to old node
  122. if _, err := nl.skipList.DeleteByKey(prevNodeReference.Key); err != nil {
  123. return err
  124. }
  125. // add namesToY and name to a new X
  126. namesToX = append(namesToX, name)
  127. if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil {
  128. return nil
  129. }
  130. // remove names less than name from current Y
  131. if err := nl.NodeDeleteBeforeExclusive(prevNodeReference, name); err != nil {
  132. return nil
  133. }
  134. // point skip list to current Y
  135. if err := nl.ItemAdd(lookupKey, prevNodeReference.ElementPointer); err != nil {
  136. return nil
  137. }
  138. return nil
  139. } else {
  140. // collect names after name, add them to Y
  141. namesToY, err := nl.NodeRangeAfterExclusive(prevNodeReference, name)
  142. if err != nil {
  143. return nil
  144. }
  145. // add namesToY and name to a new Y
  146. namesToY = append(namesToY, name)
  147. if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil {
  148. return nil
  149. }
  150. // remove names after name from current X
  151. if err := nl.NodeDeleteAfterExclusive(prevNodeReference, name); err != nil {
  152. return nil
  153. }
  154. return nil
  155. }
  156. }
  157. // case 2.4
  158. if nextNode != nil {
  159. nodeSize := nl.NodeSize(nextNode.Reference())
  160. if nodeSize < nl.batchSize {
  161. if id, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
  162. return err
  163. } else {
  164. if err := nl.ItemAdd(lookupKey, id, name); err != nil {
  165. return err
  166. }
  167. }
  168. return nil
  169. }
  170. }
  171. // case 2.5
  172. // now prevNode is nil
  173. return nl.ItemAdd(lookupKey, 0, name)
  174. }
  175. /*
  176. // case 1: exists in nextNode
  177. if nextNode != nil && nextNode.Key == name {
  178. remove from nextNode, update nextNode
  179. // TODO: merge with prevNode if possible?
  180. return
  181. }
  182. if nextNode is nil
  183. prevNode = list.Largestnode
  184. if prevNode == nil and nextNode.Prev != nil
  185. prevNode = load(nextNode.Prev)
  186. // case 2: does not exist
  187. // case 2.1
  188. if prevNode == nil {
  189. return
  190. }
  191. // case 2.2
  192. if prevNameBatch does not contain name {
  193. return
  194. }
  195. // case 3
  196. delete from prevNameBatch
  197. if prevNameBatch + nextNode < capacityList
  198. // case 3.1
  199. merge
  200. else
  201. // case 3.2
  202. update prevNode
  203. */
  204. func (nl *ItemList) DeleteName(name string) error {
  205. lookupKey := []byte(name)
  206. prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
  207. if err != nil {
  208. return err
  209. }
  210. // case 1
  211. if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
  212. if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
  213. return err
  214. }
  215. if err := nl.NodeDeleteMember(nextNode.Reference(), name); err != nil {
  216. return err
  217. }
  218. minName := nl.NodeMin(nextNode.Reference())
  219. if minName == "" {
  220. return nl.NodeDelete(nextNode.Reference())
  221. }
  222. return nl.ItemAdd([]byte(minName), nextNode.Id)
  223. }
  224. if !found {
  225. prevNode, err = nl.skipList.GetLargestNode()
  226. if err != nil {
  227. return err
  228. }
  229. }
  230. if nextNode != nil && prevNode == nil {
  231. prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
  232. if err != nil {
  233. return err
  234. }
  235. }
  236. // case 2
  237. if prevNode == nil {
  238. // case 2.1
  239. return nil
  240. }
  241. if !nl.NodeContainsItem(prevNode.Reference(), name) {
  242. return nil
  243. }
  244. // case 3
  245. if err := nl.NodeDeleteMember(prevNode.Reference(), name); err != nil {
  246. return err
  247. }
  248. prevSize := nl.NodeSize(prevNode.Reference())
  249. if prevSize == 0 {
  250. if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil {
  251. return err
  252. }
  253. return nil
  254. }
  255. nextSize := nl.NodeSize(nextNode.Reference())
  256. if nextSize > 0 && prevSize+nextSize < nl.batchSize {
  257. // case 3.1 merge nextNode and prevNode
  258. if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
  259. return err
  260. }
  261. nextNames, err := nl.NodeRangeBeforeExclusive(nextNode.Reference(), "")
  262. if err != nil {
  263. return err
  264. }
  265. if err := nl.NodeAddMember(prevNode.Reference(), nextNames...); err != nil {
  266. return err
  267. }
  268. return nl.NodeDelete(nextNode.Reference())
  269. } else {
  270. // case 3.2 update prevNode
  271. // no action to take
  272. return nil
  273. }
  274. return nil
  275. }
  276. func (nl *ItemList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
  277. lookupKey := []byte(startFrom)
  278. prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
  279. if err != nil {
  280. return err
  281. }
  282. if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
  283. prevNode = nil
  284. }
  285. if !found {
  286. prevNode, err = nl.skipList.GetLargestNode()
  287. if err != nil {
  288. return err
  289. }
  290. }
  291. if prevNode != nil {
  292. if !nl.NodeScanIncluseiveAfter(prevNode.Reference(), startFrom, visitNamesFn) {
  293. return nil
  294. }
  295. }
  296. for nextNode != nil {
  297. if !nl.NodeScanIncluseiveAfter(nextNode.Reference(), startFrom, visitNamesFn) {
  298. return nil
  299. }
  300. nextNode, err = nl.skipList.LoadElement(nextNode.Next[0])
  301. if err != nil {
  302. return err
  303. }
  304. }
  305. return nil
  306. }
  307. func (nl *ItemList) RemoteAllListElement() error {
  308. t := nl.skipList
  309. nodeRef := t.StartLevels[0]
  310. for nodeRef != nil {
  311. node, err := t.LoadElement(nodeRef)
  312. if err != nil {
  313. return err
  314. }
  315. if node == nil {
  316. return nil
  317. }
  318. if err := t.DeleteElement(node); err != nil {
  319. return err
  320. }
  321. if err := nl.NodeDelete(node.Reference()); err != nil {
  322. return err
  323. }
  324. nodeRef = node.Next[0]
  325. }
  326. return nil
  327. }
  328. func (nl *ItemList) NodeContainsItem(node *skiplist.SkipListElementReference, item string) bool {
  329. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  330. _, err := nl.client.ZScore(context.Background(), key, item).Result()
  331. if err == redis.Nil {
  332. return false
  333. }
  334. if err == nil {
  335. return true
  336. }
  337. return false
  338. }
  339. func (nl *ItemList) NodeSize(node *skiplist.SkipListElementReference) int {
  340. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  341. return int(nl.client.ZLexCount(context.Background(), key, "-", "+").Val())
  342. }
  343. func (nl *ItemList) NodeAddMember(node *skiplist.SkipListElementReference, names ...string) error {
  344. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  345. var members []*redis.Z
  346. for _, name := range names {
  347. members = append(members, &redis.Z{
  348. Score: 0,
  349. Member: name,
  350. })
  351. }
  352. return nl.client.ZAddNX(context.Background(), key, members...).Err()
  353. }
  354. func (nl *ItemList) NodeDeleteMember(node *skiplist.SkipListElementReference, name string) error {
  355. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  356. return nl.client.ZRem(context.Background(), key, name).Err()
  357. }
  358. func (nl *ItemList) NodeDelete(node *skiplist.SkipListElementReference) error {
  359. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  360. return nl.client.Del(context.Background(), key).Err()
  361. }
  362. func (nl *ItemList) NodeInnerPosition(node *skiplist.SkipListElementReference, name string) int {
  363. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  364. return int(nl.client.ZLexCount(context.Background(), key, "-", "("+name).Val())
  365. }
  366. func (nl *ItemList) NodeMin(node *skiplist.SkipListElementReference) string {
  367. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  368. slice := nl.client.ZPopMin(context.Background(), key).Val()
  369. if len(slice) > 0 {
  370. s := slice[0].Member.(string)
  371. return s
  372. }
  373. return ""
  374. }
  375. func (nl *ItemList) NodeScanIncluseiveAfter(node *skiplist.SkipListElementReference, startFrom string, visitNamesFn func(name string) bool) bool {
  376. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  377. if startFrom == "" {
  378. startFrom = "-"
  379. } else {
  380. startFrom = "[" + startFrom
  381. }
  382. names := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
  383. Min: startFrom,
  384. Max: "+",
  385. }).Val()
  386. for _, n := range names {
  387. if !visitNamesFn(n) {
  388. return false
  389. }
  390. }
  391. return true
  392. }
  393. func (nl *ItemList) NodeRangeBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) ([]string, error) {
  394. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  395. if stopAt == "" {
  396. stopAt = "+"
  397. } else {
  398. stopAt = "(" + stopAt
  399. }
  400. return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
  401. Min: "-",
  402. Max: stopAt,
  403. }).Result()
  404. }
  405. func (nl *ItemList) NodeRangeAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) ([]string, error) {
  406. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  407. if startFrom == "" {
  408. startFrom = "-"
  409. } else {
  410. startFrom = "(" + startFrom
  411. }
  412. return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
  413. Min: startFrom,
  414. Max: "+",
  415. }).Result()
  416. }
  417. func (nl *ItemList) NodeDeleteBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) error {
  418. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  419. if stopAt == "" {
  420. stopAt = "+"
  421. } else {
  422. stopAt = "(" + stopAt
  423. }
  424. return nl.client.ZRemRangeByLex(context.Background(), key, "-", stopAt).Err()
  425. }
  426. func (nl *ItemList) NodeDeleteAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) error {
  427. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  428. if startFrom == "" {
  429. startFrom = "-"
  430. } else {
  431. startFrom = "(" + startFrom
  432. }
  433. return nl.client.ZRemRangeByLex(context.Background(), key, startFrom, "+").Err()
  434. }
  435. func (nl *ItemList) ItemAdd(lookupKey []byte, idIfKnown int64, names ...string) error {
  436. if id, err := nl.skipList.InsertByKey(lookupKey, idIfKnown, nil); err != nil {
  437. return err
  438. } else {
  439. if len(names) > 0 {
  440. return nl.NodeAddMember(&skiplist.SkipListElementReference{
  441. ElementPointer: id,
  442. Key: lookupKey,
  443. }, names...)
  444. }
  445. }
  446. return nil
  447. }