You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
4.7 KiB

4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "path/filepath"
  8. "strings"
  9. "github.com/seaweedfs/seaweedfs/weed/filer"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandRemoteUncache{})
  15. }
  16. type commandRemoteUncache struct {
  17. }
  18. func (c *commandRemoteUncache) Name() string {
  19. return "remote.uncache"
  20. }
  21. func (c *commandRemoteUncache) Help() string {
  22. return `keep the metadata but remote cache the file content for mounted directories or files
  23. This is designed to run regularly. So you can add it to some cronjob.
  24. If a file is not synchronized with the remote copy, the file will be skipped to avoid loss of data.
  25. remote.uncache -dir=/xxx
  26. remote.uncache -dir=/xxx/some/sub/dir
  27. remote.uncache -dir=/xxx/some/sub/dir -include=*.pdf
  28. remote.uncache -dir=/xxx/some/sub/dir -exclude=*.txt
  29. remote.uncache -minSize=1024000 # uncache files larger than 100K
  30. remote.uncache -minAge=3600 # uncache files older than 1 hour
  31. `
  32. }
  33. func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  34. remoteUncacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  35. dir := remoteUncacheCommand.String("dir", "", "a directory in filer")
  36. fileFiler := newFileFilter(remoteUncacheCommand)
  37. if err = remoteUncacheCommand.Parse(args); err != nil {
  38. return nil
  39. }
  40. mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
  41. if listErr != nil {
  42. return listErr
  43. }
  44. if *dir != "" {
  45. var localMountedDir string
  46. for k := range mappings.Mappings {
  47. if strings.HasPrefix(*dir, k) {
  48. localMountedDir = k
  49. }
  50. }
  51. if localMountedDir == "" {
  52. jsonPrintln(writer, mappings)
  53. fmt.Fprintf(writer, "%s is not mounted\n", *dir)
  54. return nil
  55. }
  56. // pull content from remote
  57. if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir), fileFiler); err != nil {
  58. return fmt.Errorf("uncache content data: %v", err)
  59. }
  60. return nil
  61. }
  62. for key, _ := range mappings.Mappings {
  63. if err := c.uncacheContentData(commandEnv, writer, util.FullPath(key), fileFiler); err != nil {
  64. return err
  65. }
  66. }
  67. return nil
  68. }
  69. func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer io.Writer, dirToCache util.FullPath, fileFilter *FileFilter) error {
  70. return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
  71. if !mayHaveCachedToLocal(entry) {
  72. return true // true means recursive traversal should continue
  73. }
  74. if !fileFilter.matches(entry) {
  75. return true
  76. }
  77. if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
  78. return true // should not uncache an entry that is not synchronized with remote
  79. }
  80. entry.RemoteEntry.LastLocalSyncTsNs = 0
  81. entry.Chunks = nil
  82. fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name))
  83. err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  84. _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  85. Directory: string(dir),
  86. Entry: entry,
  87. })
  88. return updateErr
  89. })
  90. if err != nil {
  91. fmt.Fprintf(writer, "uncache %+v: %v\n", dir.Child(entry.Name), err)
  92. return false
  93. }
  94. fmt.Fprintf(writer, "Done\n")
  95. return true
  96. })
  97. }
  98. type FileFilter struct {
  99. include *string
  100. exclude *string
  101. minSize *int64
  102. maxSize *int64
  103. minAge *int64
  104. maxAge *int64
  105. }
  106. func newFileFilter(remoteMountCommand *flag.FlagSet) (ff *FileFilter) {
  107. ff = &FileFilter{}
  108. ff.include = remoteMountCommand.String("include", "", "pattens of file names, e.g., *.pdf, *.html, ab?d.txt")
  109. ff.exclude = remoteMountCommand.String("exclude", "", "pattens of file names, e.g., *.pdf, *.html, ab?d.txt")
  110. ff.minSize = remoteMountCommand.Int64("minSize", -1, "minimum file size in bytes")
  111. ff.maxSize = remoteMountCommand.Int64("maxSize", -1, "maximum file size in bytes")
  112. ff.minAge = remoteMountCommand.Int64("minAge", -1, "minimum file age in seconds")
  113. ff.maxAge = remoteMountCommand.Int64("maxAge", -1, "maximum file age in seconds")
  114. return
  115. }
  116. func (ff *FileFilter) matches(entry *filer_pb.Entry) bool {
  117. if *ff.include != "" {
  118. if ok, _ := filepath.Match(*ff.include, entry.Name); !ok {
  119. return false
  120. }
  121. }
  122. if *ff.exclude != "" {
  123. if ok, _ := filepath.Match(*ff.exclude, entry.Name); ok {
  124. return false
  125. }
  126. }
  127. if *ff.minSize != -1 {
  128. if int64(entry.Attributes.FileSize) < *ff.minSize {
  129. return false
  130. }
  131. }
  132. if *ff.maxSize != -1 {
  133. if int64(entry.Attributes.FileSize) > *ff.maxSize {
  134. return false
  135. }
  136. }
  137. if *ff.minAge != -1 {
  138. if entry.Attributes.Crtime < *ff.minAge {
  139. return false
  140. }
  141. }
  142. if *ff.maxAge != -1 {
  143. if entry.Attributes.Crtime > *ff.maxAge {
  144. return false
  145. }
  146. }
  147. return true
  148. }