You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
5.1 KiB

  1. package redis_lua
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/go-redis/redis/v8"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/filer/redis_lua/stored_procedure"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. )
  13. const (
  14. DIR_LIST_MARKER = "\x00"
  15. )
  16. type UniversalRedisLuaStore struct {
  17. Client redis.UniversalClient
  18. superLargeDirectoryHash map[string]bool
  19. }
  20. func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
  21. _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
  22. return
  23. }
  24. func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) {
  25. // set directory hash
  26. store.superLargeDirectoryHash = make(map[string]bool)
  27. for _, dir := range superLargeDirectories {
  28. store.superLargeDirectoryHash[dir] = true
  29. }
  30. }
  31. func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  32. return ctx, nil
  33. }
  34. func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error {
  35. return nil
  36. }
  37. func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error {
  38. return nil
  39. }
  40. func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  41. value, err := entry.EncodeAttributesAndChunks()
  42. if err != nil {
  43. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  44. }
  45. if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
  46. value = util.MaybeGzipData(value)
  47. }
  48. dir, name := entry.FullPath.DirAndName()
  49. err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
  50. []string{string(entry.FullPath), genDirectoryListKey(dir)},
  51. value, entry.TtlSec,
  52. store.isSuperLargeDirectory(dir), 0, name).Err()
  53. if err != nil {
  54. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  55. }
  56. return nil
  57. }
  58. func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  59. return store.InsertEntry(ctx, entry)
  60. }
  61. func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  62. data, err := store.Client.Get(ctx, string(fullpath)).Result()
  63. if err == redis.Nil {
  64. return nil, filer_pb.ErrNotFound
  65. }
  66. if err != nil {
  67. return nil, fmt.Errorf("get %s : %v", fullpath, err)
  68. }
  69. entry = &filer.Entry{
  70. FullPath: fullpath,
  71. }
  72. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
  73. if err != nil {
  74. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  75. }
  76. return entry, nil
  77. }
  78. func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
  79. dir, name := fullpath.DirAndName()
  80. err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
  81. []string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)},
  82. store.isSuperLargeDirectory(dir), name).Err()
  83. if err != nil {
  84. return fmt.Errorf("DeleteEntry %s : %v", fullpath, err)
  85. }
  86. return nil
  87. }
  88. func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
  89. if store.isSuperLargeDirectory(string(fullpath)) {
  90. return nil
  91. }
  92. err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
  93. []string{string(fullpath)}).Err()
  94. if err != nil {
  95. return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
  96. }
  97. return nil
  98. }
  99. func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  100. return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
  101. }
  102. func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  103. dirListKey := genDirectoryListKey(string(dirPath))
  104. min := "-"
  105. if startFileName != "" {
  106. if includeStartFile {
  107. min = "[" + startFileName
  108. } else {
  109. min = "(" + startFileName
  110. }
  111. }
  112. members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
  113. Min: min,
  114. Max: "+",
  115. Offset: 0,
  116. Count: limit,
  117. }).Result()
  118. if err != nil {
  119. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  120. }
  121. // fetch entry meta
  122. for _, fileName := range members {
  123. path := util.NewFullPath(string(dirPath), fileName)
  124. entry, err := store.FindEntry(ctx, path)
  125. lastFileName = fileName
  126. if err != nil {
  127. glog.V(0).Infof("list %s : %v", path, err)
  128. if err == filer_pb.ErrNotFound {
  129. continue
  130. }
  131. } else {
  132. if entry.TtlSec > 0 {
  133. if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
  134. store.DeleteEntry(ctx, path)
  135. continue
  136. }
  137. }
  138. if !eachEntryFunc(entry) {
  139. break
  140. }
  141. }
  142. }
  143. return lastFileName, err
  144. }
  145. func genDirectoryListKey(dir string) (dirList string) {
  146. return dir + DIR_LIST_MARKER
  147. }
  148. func (store *UniversalRedisLuaStore) Shutdown() {
  149. store.Client.Close()
  150. }