You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
6.2 KiB

4 months ago
3 years ago
3 years ago
2 years ago
2 years ago
5 months ago
2 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "io"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandRemoteMetaSync{})
  15. }
  16. type commandRemoteMetaSync struct {
  17. }
  18. func (c *commandRemoteMetaSync) Name() string {
  19. return "remote.meta.sync"
  20. }
  21. func (c *commandRemoteMetaSync) Help() string {
  22. return `synchronize the local file meta data with the remote file metadata
  23. # assume a remote storage is configured to name "cloud1"
  24. remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
  25. # mount and pull one bucket
  26. remote.mount -dir=/xxx -remote=cloud1/bucket
  27. After mount, if the remote file can be changed,
  28. run this command to synchronize the metadata of the mounted folder or any sub folder
  29. remote.meta.sync -dir=/xxx
  30. remote.meta.sync -dir=/xxx/some/subdir
  31. This is designed to run regularly. So you can add it to some cronjob.
  32. If there are no other operations changing remote files, this operation is not needed.
  33. `
  34. }
  35. func (c *commandRemoteMetaSync) HasTag(CommandTag) bool {
  36. return false
  37. }
  38. func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  39. remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  40. dir := remoteMetaSyncCommand.String("dir", "", "a directory in filer")
  41. if err = remoteMetaSyncCommand.Parse(args); err != nil {
  42. return nil
  43. }
  44. mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
  45. if detectErr != nil {
  46. jsonPrintln(writer, mappings)
  47. return detectErr
  48. }
  49. // pull metadata from remote
  50. if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil {
  51. return fmt.Errorf("cache meta data: %v", err)
  52. }
  53. return nil
  54. }
  55. func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
  56. return filer.DetectMountInfo(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, dir)
  57. }
  58. /*
  59. This function update entry.RemoteEntry if the remote has any changes.
  60. To pull remote updates, or created for the first time, the criteria is:
  61. entry == nil or (entry.RemoteEntry != nil and (entry.RemoteEntry.RemoteTag != remote.RemoteTag or entry.RemoteEntry.RemoteMTime < remote.RemoteMTime ))
  62. After the meta pull, the entry.RemoteEntry will have:
  63. remoteEntry.LastLocalSyncTsNs == 0
  64. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  65. Attributes.Mtime = remoteEntry.RemoteMtime
  66. remoteEntry.RemoteTag = actual remote tag
  67. chunks = nil
  68. When reading the file content or pulling the file content in "remote.cache", the criteria is:
  69. Attributes.FileSize > 0 and len(chunks) == 0
  70. After caching the file content, the entry.RemoteEntry will be
  71. remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano()
  72. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  73. Attributes.Mtime = remoteEntry.RemoteMtime
  74. chunks = non-empty
  75. When "weed filer.remote.sync" to upload local changes to remote, the criteria is:
  76. Attributes.Mtime > remoteEntry.RemoteMtime
  77. Right after "weed filer.remote.sync", the entry.RemoteEntry will be
  78. remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano()
  79. remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize
  80. remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime
  81. remoteEntry.RemoteTag = actual remote tag
  82. If entry does not exist, need to pull meta
  83. If entry.RemoteEntry == nil, this is a new local change and should not be overwritten
  84. If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag {
  85. the remote version is updated, need to pull meta
  86. }
  87. */
  88. func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *remote_pb.RemoteConf) error {
  89. // visit remote storage
  90. remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
  91. if err != nil {
  92. return err
  93. }
  94. remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
  95. err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  96. ctx := context.Background()
  97. err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  98. localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
  99. fmt.Fprint(writer, localDir.Child(name))
  100. lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
  101. Directory: string(localDir),
  102. Name: name,
  103. })
  104. var existingEntry *filer_pb.Entry
  105. if lookupErr != nil {
  106. if lookupErr != filer_pb.ErrNotFound {
  107. return lookupErr
  108. }
  109. } else {
  110. existingEntry = lookupResponse.Entry
  111. }
  112. if existingEntry == nil {
  113. _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
  114. Directory: string(localDir),
  115. Entry: &filer_pb.Entry{
  116. Name: name,
  117. IsDirectory: isDirectory,
  118. Attributes: &filer_pb.FuseAttributes{
  119. FileSize: uint64(remoteEntry.RemoteSize),
  120. Mtime: remoteEntry.RemoteMtime,
  121. FileMode: uint32(0644),
  122. },
  123. RemoteEntry: remoteEntry,
  124. },
  125. })
  126. fmt.Fprintln(writer, " (create)")
  127. return createErr
  128. } else {
  129. if existingEntry.RemoteEntry == nil {
  130. // this is a new local change and should not be overwritten
  131. fmt.Fprintln(writer, " (skip)")
  132. return nil
  133. }
  134. if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag || existingEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime {
  135. // the remote version is updated, need to pull meta
  136. fmt.Fprintln(writer, " (update)")
  137. return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry)
  138. }
  139. }
  140. fmt.Fprintln(writer, " (skip)")
  141. return nil
  142. })
  143. return err
  144. })
  145. if err != nil {
  146. return err
  147. }
  148. return nil
  149. }