You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
4.8 KiB

3 years ago
3 years ago
4 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "io"
  10. "sync"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandRemoteCache{})
  14. }
  15. type commandRemoteCache struct {
  16. }
  17. func (c *commandRemoteCache) Name() string {
  18. return "remote.cache"
  19. }
  20. func (c *commandRemoteCache) Help() string {
  21. return `cache the file content for mounted directories or files
  22. # assume a remote storage is configured to name "cloud1"
  23. remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
  24. # mount and pull one bucket
  25. remote.mount -dir=/xxx -remote=cloud1/bucket
  26. # after mount, run one of these command to cache the content of the files
  27. remote.cache -dir=/xxx
  28. remote.cache -dir=/xxx/some/sub/dir
  29. remote.cache -dir=/xxx/some/sub/dir -include=*.pdf
  30. remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt
  31. remote.cache -maxSize=1024000 # cache files smaller than 100K
  32. remote.cache -maxAge=3600 # cache files less than 1 hour old
  33. This is designed to run regularly. So you can add it to some cronjob.
  34. If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy.
  35. The actual data copying goes through volume severs in parallel.
  36. `
  37. }
  38. func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  39. remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  40. dir := remoteMountCommand.String("dir", "", "a directory in filer")
  41. concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading")
  42. fileFiler := newFileFilter(remoteMountCommand)
  43. if err = remoteMountCommand.Parse(args); err != nil {
  44. return nil
  45. }
  46. mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
  47. if detectErr != nil{
  48. jsonPrintln(writer, mappings)
  49. return detectErr
  50. }
  51. // pull content from remote
  52. if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf, *concurrency); err != nil {
  53. return fmt.Errorf("cache content data: %v", err)
  54. }
  55. return nil
  56. }
  57. func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
  58. err = filer_pb.ReadDirAllEntries(filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  59. if entry.IsDirectory {
  60. if !visitEntry(dirPath, entry) {
  61. return nil
  62. }
  63. subDir := dirPath.Child(entry.Name)
  64. if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
  65. return err
  66. }
  67. } else {
  68. if !visitEntry(dirPath, entry) {
  69. return nil
  70. }
  71. }
  72. return nil
  73. })
  74. return
  75. }
  76. func shouldCacheToLocal(entry *filer_pb.Entry) bool {
  77. if entry.IsDirectory {
  78. return false
  79. }
  80. if entry.RemoteEntry == nil {
  81. return false
  82. }
  83. if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 {
  84. return true
  85. }
  86. return false
  87. }
  88. func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
  89. if entry.IsDirectory {
  90. return false
  91. }
  92. if entry.RemoteEntry == nil {
  93. return false // should not uncache an entry that is not in remote
  94. }
  95. if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
  96. return true
  97. }
  98. return false
  99. }
  100. func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error {
  101. var wg sync.WaitGroup
  102. limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
  103. var executionErr error
  104. traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
  105. if !shouldCacheToLocal(entry) {
  106. return true // true means recursive traversal should continue
  107. }
  108. if fileFilter.matches(entry) {
  109. return true
  110. }
  111. wg.Add(1)
  112. limitedConcurrentExecutor.Execute(func() {
  113. defer wg.Done()
  114. fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name))
  115. remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
  116. if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
  117. fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err)
  118. if executionErr == nil {
  119. executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err)
  120. }
  121. return
  122. }
  123. fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name))
  124. })
  125. return true
  126. })
  127. wg.Wait()
  128. if traverseErr != nil {
  129. return traverseErr
  130. }
  131. if executionErr != nil {
  132. return executionErr
  133. }
  134. return nil
  135. }