You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
5.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package redis3
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-redsync/redsync/v4"
  6. "time"
  7. "github.com/go-redis/redis/v8"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. const (
  14. DIR_LIST_MARKER = "\x00"
  15. )
  16. type UniversalRedis3Store struct {
  17. Client redis.UniversalClient
  18. redsync *redsync.Redsync
  19. }
  20. func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
  21. return ctx, nil
  22. }
  23. func (store *UniversalRedis3Store) CommitTransaction(ctx context.Context) error {
  24. return nil
  25. }
  26. func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) error {
  27. return nil
  28. }
  29. func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  30. value, err := entry.EncodeAttributesAndChunks()
  31. if err != nil {
  32. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  33. }
  34. if len(entry.Chunks) > filer.CountEntryChunksForGzip {
  35. value = util.MaybeGzipData(value)
  36. }
  37. if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
  38. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  39. }
  40. dir, name := entry.FullPath.DirAndName()
  41. if name != "" {
  42. if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil {
  43. return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
  44. }
  45. }
  46. return nil
  47. }
  48. func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  49. return store.InsertEntry(ctx, entry)
  50. }
  51. func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  52. data, err := store.Client.Get(ctx, string(fullpath)).Result()
  53. if err == redis.Nil {
  54. return nil, filer_pb.ErrNotFound
  55. }
  56. if err != nil {
  57. return nil, fmt.Errorf("get %s : %v", fullpath, err)
  58. }
  59. entry = &filer.Entry{
  60. FullPath: fullpath,
  61. }
  62. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
  63. if err != nil {
  64. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  65. }
  66. return entry, nil
  67. }
  68. func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
  69. _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
  70. if err != nil {
  71. return fmt.Errorf("delete dir list %s : %v", fullpath, err)
  72. }
  73. _, err = store.Client.Del(ctx, string(fullpath)).Result()
  74. if err != nil {
  75. return fmt.Errorf("delete %s : %v", fullpath, err)
  76. }
  77. dir, name := fullpath.DirAndName()
  78. if name != "" {
  79. if err = removeChild(ctx, store, genDirectoryListKey(dir), name); err != nil {
  80. return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
  81. }
  82. }
  83. return nil
  84. }
  85. func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
  86. return removeChildren(ctx, store, genDirectoryListKey(string(fullpath)), func(name string) error {
  87. path := util.NewFullPath(string(fullpath), name)
  88. _, err = store.Client.Del(ctx, string(path)).Result()
  89. if err != nil {
  90. return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
  91. }
  92. // not efficient, but need to remove if it is a directory
  93. store.Client.Del(ctx, genDirectoryListKey(string(path)))
  94. return nil
  95. })
  96. }
  97. func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  98. return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
  99. }
  100. func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  101. dirListKey := genDirectoryListKey(string(dirPath))
  102. counter := int64(0)
  103. err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool {
  104. if startFileName != "" {
  105. if !includeStartFile && startFileName == fileName {
  106. return true
  107. }
  108. }
  109. path := util.NewFullPath(string(dirPath), fileName)
  110. entry, err := store.FindEntry(ctx, path)
  111. lastFileName = fileName
  112. if err != nil {
  113. glog.V(0).Infof("list %s : %v", path, err)
  114. if err == filer_pb.ErrNotFound {
  115. return true
  116. }
  117. } else {
  118. if entry.TtlSec > 0 {
  119. if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
  120. store.Client.Del(ctx, string(path)).Result()
  121. store.Client.ZRem(ctx, dirListKey, fileName).Result()
  122. return true
  123. }
  124. }
  125. counter++
  126. if !eachEntryFunc(entry) {
  127. return false
  128. }
  129. if counter >= limit {
  130. return false
  131. }
  132. }
  133. return true
  134. })
  135. return lastFileName, err
  136. }
  137. func genDirectoryListKey(dir string) (dirList string) {
  138. return dir + DIR_LIST_MARKER
  139. }
  140. func (store *UniversalRedis3Store) Shutdown() {
  141. store.Client.Close()
  142. }