You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
6.1 KiB

3 years ago
3 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "io"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandRemoteMetaSync{})
  15. }
  16. type commandRemoteMetaSync struct {
  17. }
  18. func (c *commandRemoteMetaSync) Name() string {
  19. return "remote.meta.sync"
  20. }
  21. func (c *commandRemoteMetaSync) Help() string {
  22. return `synchronize the local file meta data with the remote file metadata
  23. # assume a remote storage is configured to name "cloud1"
  24. remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
  25. # mount and pull one bucket
  26. remote.mount -dir=/xxx -remote=cloud1/bucket
  27. After mount, if the remote file can be changed,
  28. run this command to synchronize the metadata of the mounted folder or any sub folder
  29. remote.meta.sync -dir=/xxx
  30. remote.meta.sync -dir=/xxx/some/subdir
  31. This is designed to run regularly. So you can add it to some cronjob.
  32. If there are no other operations changing remote files, this operation is not needed.
  33. `
  34. }
  35. func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  36. remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  37. dir := remoteMetaSyncCommand.String("dir", "", "a directory in filer")
  38. if err = remoteMetaSyncCommand.Parse(args); err != nil {
  39. return nil
  40. }
  41. mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
  42. if detectErr != nil{
  43. jsonPrintln(writer, mappings)
  44. return detectErr
  45. }
  46. // pull metadata from remote
  47. if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil {
  48. return fmt.Errorf("cache content data: %v", err)
  49. }
  50. return nil
  51. }
  52. func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
  53. return filer.DetectMountInfo(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, dir)
  54. }
  55. /*
  56. This function update entry.RemoteEntry if the remote has any changes.
  57. To pull remote updates, or created for the first time, the criteria is:
  58. entry == nil or (entry.RemoteEntry != nil and entry.RemoteEntry.RemoteTag != remote.RemoteTag)
  59. After the meta pull, the entry.RemoteEntry will have:
  60. remoteEntry.LastLocalSyncTsNs == 0
  61. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  62. Attributes.Mtime = remoteEntry.RemoteMtime
  63. remoteEntry.RemoteTag = actual remote tag
  64. chunks = nil
  65. When reading the file content or pulling the file content in "remote.cache", the criteria is:
  66. Attributes.FileSize > 0 and len(chunks) == 0
  67. After caching the file content, the entry.RemoteEntry will be
  68. remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano()
  69. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  70. Attributes.Mtime = remoteEntry.RemoteMtime
  71. chunks = non-emtpy
  72. When "weed filer.remote.sync" to upload local changes to remote, the criteria is:
  73. Attributes.Mtime > remoteEntry.RemoteMtime
  74. Right after "weed filer.remote.sync", the entry.RemoteEntry will be
  75. remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano()
  76. remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize
  77. remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime
  78. remoteEntry.RemoteTag = actual remote tag
  79. If entry does not exists, need to pull meta
  80. If entry.RemoteEntry == nil, this is a new local change and should not be overwritten
  81. If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag {
  82. the remote version is updated, need to pull meta
  83. }
  84. */
  85. func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *remote_pb.RemoteConf) error {
  86. // visit remote storage
  87. remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
  88. if err != nil {
  89. return err
  90. }
  91. remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
  92. err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  93. ctx := context.Background()
  94. err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  95. localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
  96. fmt.Fprint(writer, localDir.Child(name))
  97. lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
  98. Directory: string(localDir),
  99. Name: name,
  100. })
  101. var existingEntry *filer_pb.Entry
  102. if lookupErr != nil {
  103. if lookupErr != filer_pb.ErrNotFound {
  104. return lookupErr
  105. }
  106. } else {
  107. existingEntry = lookupResponse.Entry
  108. }
  109. if existingEntry == nil {
  110. _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
  111. Directory: string(localDir),
  112. Entry: &filer_pb.Entry{
  113. Name: name,
  114. IsDirectory: isDirectory,
  115. Attributes: &filer_pb.FuseAttributes{
  116. FileSize: uint64(remoteEntry.RemoteSize),
  117. Mtime: remoteEntry.RemoteMtime,
  118. FileMode: uint32(0644),
  119. },
  120. RemoteEntry: remoteEntry,
  121. },
  122. })
  123. fmt.Fprintln(writer, " (create)")
  124. return createErr
  125. } else {
  126. if existingEntry.RemoteEntry == nil {
  127. // this is a new local change and should not be overwritten
  128. fmt.Fprintln(writer, " (skip)")
  129. return nil
  130. }
  131. if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag {
  132. // the remote version is updated, need to pull meta
  133. fmt.Fprintln(writer, " (update)")
  134. return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry)
  135. }
  136. }
  137. fmt.Fprintln(writer, " (skip)")
  138. return nil
  139. })
  140. return err
  141. })
  142. if err != nil {
  143. return err
  144. }
  145. return nil
  146. }