You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
4.8 KiB

4 months ago
3 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "path/filepath"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandRemoteUncache{})
  16. }
  17. type commandRemoteUncache struct {
  18. }
  19. func (c *commandRemoteUncache) Name() string {
  20. return "remote.uncache"
  21. }
  22. func (c *commandRemoteUncache) Help() string {
  23. return `keep the metadata but remote cache the file content for mounted directories or files
  24. This is designed to run regularly. So you can add it to some cronjob.
  25. If a file is not synchronized with the remote copy, the file will be skipped to avoid loss of data.
  26. remote.uncache -dir=/xxx
  27. remote.uncache -dir=/xxx/some/sub/dir
  28. remote.uncache -dir=/xxx/some/sub/dir -include=*.pdf
  29. remote.uncache -dir=/xxx/some/sub/dir -exclude=*.txt
  30. remote.uncache -minSize=1024000 # uncache files larger than 100K
  31. remote.uncache -minAge=3600 # uncache files older than 1 hour
  32. `
  33. }
  34. func (c *commandRemoteUncache) HasTag(CommandTag) bool {
  35. return false
  36. }
  37. func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  38. remoteUncacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  39. dir := remoteUncacheCommand.String("dir", "", "a directory in filer")
  40. fileFiler := newFileFilter(remoteUncacheCommand)
  41. if err = remoteUncacheCommand.Parse(args); err != nil {
  42. return nil
  43. }
  44. mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
  45. if listErr != nil {
  46. return listErr
  47. }
  48. if *dir != "" {
  49. var localMountedDir string
  50. for k := range mappings.Mappings {
  51. if strings.HasPrefix(*dir, k) {
  52. localMountedDir = k
  53. }
  54. }
  55. if localMountedDir == "" {
  56. jsonPrintln(writer, mappings)
  57. fmt.Fprintf(writer, "%s is not mounted\n", *dir)
  58. return nil
  59. }
  60. // pull content from remote
  61. if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir), fileFiler); err != nil {
  62. return fmt.Errorf("uncache content data: %v", err)
  63. }
  64. return nil
  65. }
  66. for key, _ := range mappings.Mappings {
  67. if err := c.uncacheContentData(commandEnv, writer, util.FullPath(key), fileFiler); err != nil {
  68. return err
  69. }
  70. }
  71. return nil
  72. }
  73. func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer io.Writer, dirToCache util.FullPath, fileFilter *FileFilter) error {
  74. return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
  75. if !mayHaveCachedToLocal(entry) {
  76. return true // true means recursive traversal should continue
  77. }
  78. if !fileFilter.matches(entry) {
  79. return true
  80. }
  81. if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
  82. return true // should not uncache an entry that is not synchronized with remote
  83. }
  84. entry.RemoteEntry.LastLocalSyncTsNs = 0
  85. entry.Chunks = nil
  86. fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name))
  87. err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  88. _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  89. Directory: string(dir),
  90. Entry: entry,
  91. })
  92. return updateErr
  93. })
  94. if err != nil {
  95. fmt.Fprintf(writer, "uncache %+v: %v\n", dir.Child(entry.Name), err)
  96. return false
  97. }
  98. fmt.Fprintf(writer, "Done\n")
  99. return true
  100. })
  101. }
  102. type FileFilter struct {
  103. include *string
  104. exclude *string
  105. minSize *int64
  106. maxSize *int64
  107. minAge *int64
  108. maxAge *int64
  109. }
  110. func newFileFilter(remoteMountCommand *flag.FlagSet) (ff *FileFilter) {
  111. ff = &FileFilter{}
  112. ff.include = remoteMountCommand.String("include", "", "pattens of file names, e.g., *.pdf, *.html, ab?d.txt")
  113. ff.exclude = remoteMountCommand.String("exclude", "", "pattens of file names, e.g., *.pdf, *.html, ab?d.txt")
  114. ff.minSize = remoteMountCommand.Int64("minSize", -1, "minimum file size in bytes")
  115. ff.maxSize = remoteMountCommand.Int64("maxSize", -1, "maximum file size in bytes")
  116. ff.minAge = remoteMountCommand.Int64("minAge", -1, "minimum file age in seconds")
  117. ff.maxAge = remoteMountCommand.Int64("maxAge", -1, "maximum file age in seconds")
  118. return
  119. }
  120. func (ff *FileFilter) matches(entry *filer_pb.Entry) bool {
  121. if *ff.include != "" {
  122. if ok, _ := filepath.Match(*ff.include, entry.Name); !ok {
  123. return false
  124. }
  125. }
  126. if *ff.exclude != "" {
  127. if ok, _ := filepath.Match(*ff.exclude, entry.Name); ok {
  128. return false
  129. }
  130. }
  131. if *ff.minSize != -1 {
  132. if int64(entry.Attributes.FileSize) < *ff.minSize {
  133. return false
  134. }
  135. }
  136. if *ff.maxSize != -1 {
  137. if int64(entry.Attributes.FileSize) > *ff.maxSize {
  138. return false
  139. }
  140. }
  141. if *ff.minAge != -1 {
  142. if entry.Attributes.Crtime+*ff.minAge > time.Now().Unix() {
  143. return false
  144. }
  145. }
  146. if *ff.maxAge != -1 {
  147. if entry.Attributes.Crtime+*ff.maxAge < time.Now().Unix() {
  148. return false
  149. }
  150. }
  151. return true
  152. }