You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
3.3 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package redis3
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/go-redis/redis/v8"
  7. )
  8. const maxNameBatchSizeLimit = 1000000
  9. func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error {
  10. // lock and unlock
  11. mutex := redisStore.redsync.NewMutex(key + "lock")
  12. if err := mutex.Lock(); err != nil {
  13. return fmt.Errorf("lock %s: %v", key, err)
  14. }
  15. defer func() {
  16. mutex.Unlock()
  17. }()
  18. client := redisStore.Client
  19. data, err := client.Get(ctx, key).Result()
  20. if err != nil {
  21. if err != redis.Nil {
  22. return fmt.Errorf("read %s: %v", key, err)
  23. }
  24. }
  25. store := newSkipListElementStore(key, client)
  26. nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
  27. if err := nameList.WriteName(name); err != nil {
  28. glog.Errorf("add %s %s: %v", key, name, err)
  29. return err
  30. }
  31. if !nameList.HasChanges() {
  32. return nil
  33. }
  34. if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
  35. return err
  36. }
  37. return nil
  38. }
  39. func removeChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error {
  40. // lock and unlock
  41. mutex := redisStore.redsync.NewMutex(key + "lock")
  42. if err := mutex.Lock(); err != nil {
  43. return fmt.Errorf("lock %s: %v", key, err)
  44. }
  45. defer mutex.Unlock()
  46. client := redisStore.Client
  47. data, err := client.Get(ctx, key).Result()
  48. if err != nil {
  49. if err != redis.Nil {
  50. return fmt.Errorf("read %s: %v", key, err)
  51. }
  52. }
  53. store := newSkipListElementStore(key, client)
  54. nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
  55. if err := nameList.DeleteName(name); err != nil {
  56. return err
  57. }
  58. if !nameList.HasChanges() {
  59. return nil
  60. }
  61. if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
  62. return err
  63. }
  64. return nil
  65. }
  66. func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, onDeleteFn func(name string) error) error {
  67. // lock and unlock
  68. mutex := redisStore.redsync.NewMutex(key + "lock")
  69. if err := mutex.Lock(); err != nil {
  70. return fmt.Errorf("lock %s: %v", key, err)
  71. }
  72. defer mutex.Unlock()
  73. client := redisStore.Client
  74. data, err := client.Get(ctx, key).Result()
  75. if err != nil {
  76. if err != redis.Nil {
  77. return fmt.Errorf("read %s: %v", key, err)
  78. }
  79. }
  80. store := newSkipListElementStore(key, client)
  81. nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
  82. if err = nameList.ListNames("", func(name string) bool {
  83. if err := onDeleteFn(name); err != nil {
  84. glog.Errorf("delete %s child %s: %v", key, name, err)
  85. return false
  86. }
  87. return true
  88. }); err != nil {
  89. return err
  90. }
  91. if err = nameList.RemoteAllListElement(); err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. func listChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, startFileName string, eachFn func(name string) bool) error {
  97. client := redisStore.Client
  98. data, err := client.Get(ctx, key).Result()
  99. if err != nil {
  100. if err != redis.Nil {
  101. return fmt.Errorf("read %s: %v", key, err)
  102. }
  103. }
  104. store := newSkipListElementStore(key, client)
  105. nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
  106. if err = nameList.ListNames(startFileName, func(name string) bool {
  107. return eachFn(name)
  108. }); err != nil {
  109. return err
  110. }
  111. return nil
  112. }