You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

504 lines
12 KiB

  1. package redis3
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/util/skiplist"
  7. "github.com/go-redis/redis/v8"
  8. )
  9. type ItemList struct {
  10. skipList *skiplist.SkipList
  11. batchSize int
  12. client redis.UniversalClient
  13. prefix string
  14. }
  15. func newItemList(client redis.UniversalClient, prefix string, store skiplist.ListStore, batchSize int) *ItemList {
  16. return &ItemList{
  17. skipList: skiplist.New(store),
  18. batchSize: batchSize,
  19. client: client,
  20. prefix: prefix,
  21. }
  22. }
  23. /*
  24. Be reluctant to create new nodes. Try to fit into either previous node or next node.
  25. Prefer to add to previous node.
  26. There are multiple cases after finding the name for greater or equal node
  27. 1. found and node.Key == name
  28. The node contains a batch with leading key the same as the name
  29. nothing to do
  30. 2. no such node found or node.Key > name
  31. if no such node found
  32. prevNode = list.LargestNode
  33. // case 2.1
  34. if previousNode contains name
  35. nothing to do
  36. // prefer to add to previous node
  37. if prevNode != nil {
  38. // case 2.2
  39. if prevNode has capacity
  40. prevNode.add name, and save
  41. return
  42. // case 2.3
  43. split prevNode by name
  44. }
  45. // case 2.4
  46. // merge into next node. Avoid too many nodes if adding data in reverse order.
  47. if nextNode is not nil and nextNode has capacity
  48. delete nextNode.Key
  49. nextNode.Key = name
  50. nextNode.batch.add name
  51. insert nodeNode.Key
  52. return
  53. // case 2.5
  54. if prevNode is nil
  55. insert new node with key = name, value = batch{name}
  56. return
  57. */
  58. func (nl *ItemList) canAddMember(node *skiplist.SkipListElementReference, name string) (alreadyContains bool, nodeSize int, err error) {
  59. ctx := context.Background()
  60. pipe := nl.client.TxPipeline()
  61. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  62. countOperation := pipe.ZLexCount(ctx, key, "-", "+")
  63. scoreOperationt := pipe.ZScore(ctx, key, name)
  64. if _, err = pipe.Exec(ctx); err != nil && err != redis.Nil{
  65. return false, 0, err
  66. }
  67. if err == redis.Nil {
  68. err = nil
  69. }
  70. alreadyContains = scoreOperationt.Err() == nil
  71. nodeSize = int(countOperation.Val())
  72. return
  73. }
  74. func (nl *ItemList) WriteName(name string) error {
  75. lookupKey := []byte(name)
  76. prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
  77. if err != nil {
  78. return err
  79. }
  80. // case 1: the name already exists as one leading key in the batch
  81. if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
  82. return nil
  83. }
  84. if !found {
  85. prevNode, err = nl.skipList.GetLargestNode()
  86. if err != nil {
  87. return err
  88. }
  89. }
  90. if nextNode != nil && prevNode == nil {
  91. prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
  92. if err != nil {
  93. return err
  94. }
  95. }
  96. if prevNode != nil {
  97. alreadyContains, nodeSize, err := nl.canAddMember(prevNode.Reference(), name)
  98. if err != nil {
  99. return err
  100. }
  101. if alreadyContains {
  102. // case 2.1
  103. return nil
  104. }
  105. // case 2.2
  106. if nodeSize < nl.batchSize {
  107. return nl.NodeAddMember(prevNode.Reference(), name)
  108. }
  109. // case 2.3
  110. x := nl.NodeInnerPosition(prevNode.Reference(), name)
  111. y := nodeSize - x
  112. addToX := x <= y
  113. // add to a new node
  114. if x == 0 || y == 0 {
  115. if err := nl.ItemAdd(lookupKey, 0, name); err != nil {
  116. return err
  117. }
  118. return nil
  119. }
  120. if addToX {
  121. // collect names before name, add them to X
  122. namesToX, err := nl.NodeRangeBeforeExclusive(prevNode.Reference(), name)
  123. if err != nil {
  124. return nil
  125. }
  126. // delete skiplist reference to old node
  127. if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil {
  128. return err
  129. }
  130. // add namesToY and name to a new X
  131. namesToX = append(namesToX, name)
  132. if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil {
  133. return nil
  134. }
  135. // remove names less than name from current Y
  136. if err := nl.NodeDeleteBeforeExclusive(prevNode.Reference(), name); err != nil {
  137. return nil
  138. }
  139. // point skip list to current Y
  140. if err := nl.ItemAdd(lookupKey, prevNode.Id); err != nil {
  141. return nil
  142. }
  143. return nil
  144. } else {
  145. // collect names after name, add them to Y
  146. namesToY, err := nl.NodeRangeAfterExclusive(prevNode.Reference(), name)
  147. if err != nil {
  148. return nil
  149. }
  150. // add namesToY and name to a new Y
  151. namesToY = append(namesToY, name)
  152. if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil {
  153. return nil
  154. }
  155. // remove names after name from current X
  156. if err := nl.NodeDeleteAfterExclusive(prevNode.Reference(), name); err != nil {
  157. return nil
  158. }
  159. return nil
  160. }
  161. }
  162. // case 2.4
  163. if nextNode != nil {
  164. nodeSize := nl.NodeSize(nextNode.Reference())
  165. if nodeSize < nl.batchSize {
  166. if id, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
  167. return err
  168. } else {
  169. if err := nl.ItemAdd(lookupKey, id, name); err != nil {
  170. return err
  171. }
  172. }
  173. return nil
  174. }
  175. }
  176. // case 2.5
  177. // now prevNode is nil
  178. return nl.ItemAdd(lookupKey, 0, name)
  179. }
  180. /*
  181. // case 1: exists in nextNode
  182. if nextNode != nil && nextNode.Key == name {
  183. remove from nextNode, update nextNode
  184. // TODO: merge with prevNode if possible?
  185. return
  186. }
  187. if nextNode is nil
  188. prevNode = list.Largestnode
  189. if prevNode == nil and nextNode.Prev != nil
  190. prevNode = load(nextNode.Prev)
  191. // case 2: does not exist
  192. // case 2.1
  193. if prevNode == nil {
  194. return
  195. }
  196. // case 2.2
  197. if prevNameBatch does not contain name {
  198. return
  199. }
  200. // case 3
  201. delete from prevNameBatch
  202. if prevNameBatch + nextNode < capacityList
  203. // case 3.1
  204. merge
  205. else
  206. // case 3.2
  207. update prevNode
  208. */
  209. func (nl *ItemList) DeleteName(name string) error {
  210. lookupKey := []byte(name)
  211. prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
  212. if err != nil {
  213. return err
  214. }
  215. // case 1
  216. if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
  217. if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
  218. return err
  219. }
  220. if err := nl.NodeDeleteMember(nextNode.Reference(), name); err != nil {
  221. return err
  222. }
  223. minName := nl.NodeMin(nextNode.Reference())
  224. if minName == "" {
  225. return nl.NodeDelete(nextNode.Reference())
  226. }
  227. return nl.ItemAdd([]byte(minName), nextNode.Id)
  228. }
  229. if !found {
  230. prevNode, err = nl.skipList.GetLargestNode()
  231. if err != nil {
  232. return err
  233. }
  234. }
  235. if nextNode != nil && prevNode == nil {
  236. prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
  237. if err != nil {
  238. return err
  239. }
  240. }
  241. // case 2
  242. if prevNode == nil {
  243. // case 2.1
  244. return nil
  245. }
  246. if !nl.NodeContainsItem(prevNode.Reference(), name) {
  247. return nil
  248. }
  249. // case 3
  250. if err := nl.NodeDeleteMember(prevNode.Reference(), name); err != nil {
  251. return err
  252. }
  253. prevSize := nl.NodeSize(prevNode.Reference())
  254. if prevSize == 0 {
  255. if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil {
  256. return err
  257. }
  258. return nil
  259. }
  260. nextSize := nl.NodeSize(nextNode.Reference())
  261. if nextSize > 0 && prevSize + nextSize < nl.batchSize {
  262. // case 3.1 merge nextNode and prevNode
  263. if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
  264. return err
  265. }
  266. nextNames, err := nl.NodeRangeBeforeExclusive(nextNode.Reference(), "")
  267. if err != nil {
  268. return err
  269. }
  270. if err := nl.NodeAddMember(prevNode.Reference(), nextNames...); err != nil {
  271. return err
  272. }
  273. return nl.NodeDelete(nextNode.Reference())
  274. } else {
  275. // case 3.2 update prevNode
  276. // no action to take
  277. return nil
  278. }
  279. return nil
  280. }
  281. func (nl *ItemList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
  282. lookupKey := []byte(startFrom)
  283. prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
  284. if err != nil {
  285. return err
  286. }
  287. if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
  288. prevNode = nil
  289. }
  290. if !found {
  291. prevNode, err = nl.skipList.GetLargestNode()
  292. if err != nil {
  293. return err
  294. }
  295. }
  296. if prevNode != nil {
  297. if !nl.NodeScanIncluseiveAfter(prevNode.Reference(), startFrom, visitNamesFn) {
  298. return nil
  299. }
  300. }
  301. for nextNode != nil {
  302. if !nl.NodeScanIncluseiveAfter(nextNode.Reference(), startFrom, visitNamesFn) {
  303. return nil
  304. }
  305. nextNode, err = nl.skipList.LoadElement(nextNode.Next[0])
  306. if err != nil {
  307. return err
  308. }
  309. }
  310. return nil
  311. }
  312. func (nl *ItemList) RemoteAllListElement() error {
  313. t := nl.skipList
  314. nodeRef := t.StartLevels[0]
  315. for nodeRef != nil {
  316. node, err := t.LoadElement(nodeRef)
  317. if err != nil {
  318. return err
  319. }
  320. if node == nil {
  321. return nil
  322. }
  323. if err := t.DeleteElement(node); err != nil {
  324. return err
  325. }
  326. if err := nl.NodeDelete(node.Reference()); err != nil {
  327. return err
  328. }
  329. nodeRef = node.Next[0]
  330. }
  331. return nil
  332. }
  333. func (nl *ItemList) NodeContainsItem(node *skiplist.SkipListElementReference, item string) bool {
  334. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  335. _, err := nl.client.ZScore(context.Background(), key, item).Result()
  336. if err == redis.Nil {
  337. return false
  338. }
  339. if err == nil {
  340. return true
  341. }
  342. return false
  343. }
  344. func (nl *ItemList) NodeSize(node *skiplist.SkipListElementReference) int {
  345. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  346. return int(nl.client.ZLexCount(context.Background(), key, "-", "+").Val())
  347. }
  348. func (nl *ItemList) NodeAddMember(node *skiplist.SkipListElementReference, names ...string) error {
  349. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  350. var members []*redis.Z
  351. for _, name := range names {
  352. members = append(members, &redis.Z{
  353. Score: 0,
  354. Member: name,
  355. })
  356. }
  357. return nl.client.ZAddNX(context.Background(), key, members...).Err()
  358. }
  359. func (nl *ItemList) NodeDeleteMember(node *skiplist.SkipListElementReference, name string) error {
  360. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  361. return nl.client.ZRem(context.Background(), key, name).Err()
  362. }
  363. func (nl *ItemList) NodeDelete(node *skiplist.SkipListElementReference) error {
  364. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  365. return nl.client.Del(context.Background(), key).Err()
  366. }
  367. func (nl *ItemList) NodeInnerPosition(node *skiplist.SkipListElementReference, name string) int {
  368. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  369. return int(nl.client.ZLexCount(context.Background(), key, "-", "("+name).Val())
  370. }
  371. func (nl *ItemList) NodeMin(node *skiplist.SkipListElementReference) string {
  372. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  373. slice := nl.client.ZPopMin(context.Background(), key).Val()
  374. if len(slice)>0{
  375. s := slice[0].Member.(string)
  376. return s
  377. }
  378. return ""
  379. }
  380. func (nl *ItemList) NodeScanIncluseiveAfter(node *skiplist.SkipListElementReference, startFrom string, visitNamesFn func(name string) bool) bool {
  381. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  382. if startFrom == "" {
  383. startFrom = "-"
  384. } else {
  385. startFrom = "[" + startFrom
  386. }
  387. names := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
  388. Min: startFrom,
  389. Max: "+",
  390. }).Val()
  391. for _, n := range names {
  392. if !visitNamesFn(n) {
  393. return false
  394. }
  395. }
  396. return true
  397. }
  398. func (nl *ItemList) NodeRangeBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) ([]string, error) {
  399. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  400. if stopAt == "" {
  401. stopAt = "+"
  402. } else {
  403. stopAt = "(" + stopAt
  404. }
  405. return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
  406. Min: "-",
  407. Max: stopAt,
  408. }).Result()
  409. }
  410. func (nl *ItemList) NodeRangeAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) ([]string, error) {
  411. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  412. if startFrom == "" {
  413. startFrom = "-"
  414. } else {
  415. startFrom = "(" + startFrom
  416. }
  417. return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
  418. Min: startFrom,
  419. Max: "+",
  420. }).Result()
  421. }
  422. func (nl *ItemList) NodeDeleteBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) error {
  423. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  424. if stopAt == "" {
  425. stopAt = "+"
  426. } else {
  427. stopAt = "(" + stopAt
  428. }
  429. return nl.client.ZRemRangeByLex(context.Background(), key, "-", stopAt).Err()
  430. }
  431. func (nl *ItemList) NodeDeleteAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) error {
  432. key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
  433. if startFrom == "" {
  434. startFrom = "-"
  435. } else {
  436. startFrom = "(" + startFrom
  437. }
  438. return nl.client.ZRemRangeByLex(context.Background(), key, startFrom, "+").Err()
  439. }
  440. func (nl *ItemList) ItemAdd(lookupKey []byte, idIfKnown int64, names ...string) error {
  441. if id, err := nl.skipList.InsertByKey(lookupKey, idIfKnown, nil); err != nil {
  442. return err
  443. } else {
  444. if len(names) > 0 {
  445. return nl.NodeAddMember(&skiplist.SkipListElementReference{
  446. ElementPointer: id,
  447. Key: lookupKey,
  448. }, names...)
  449. }
  450. }
  451. return nil
  452. }