You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
6.0 KiB

3 years ago
3 years ago
3 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "io"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandRemoteMetaSync{})
  14. }
  15. type commandRemoteMetaSync struct {
  16. }
  17. func (c *commandRemoteMetaSync) Name() string {
  18. return "remote.meta.sync"
  19. }
  20. func (c *commandRemoteMetaSync) Help() string {
  21. return `synchronize the local file meta data with the remote file metadata
  22. # assume a remote storage is configured to name "cloud1"
  23. remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
  24. # mount and pull one bucket
  25. remote.mount -dir=/xxx -remote=cloud1/bucket
  26. After mount, if the remote file can be changed,
  27. run this command to synchronize the metadata of the mounted folder or any sub folder
  28. remote.meta.sync -dir=/xxx
  29. remote.meta.sync -dir=/xxx/some/subdir
  30. This is designed to run regularly. So you can add it to some cronjob.
  31. If there are no other operations changing remote files, this operation is not needed.
  32. `
  33. }
  34. func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  35. remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  36. dir := remoteMetaSyncCommand.String("dir", "", "a directory in filer")
  37. if err = remoteMetaSyncCommand.Parse(args); err != nil {
  38. return nil
  39. }
  40. mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
  41. if detectErr != nil{
  42. jsonPrintln(writer, mappings)
  43. return detectErr
  44. }
  45. // pull metadata from remote
  46. if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil {
  47. return fmt.Errorf("cache content data: %v", err)
  48. }
  49. return nil
  50. }
  51. func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (*filer_pb.RemoteStorageMapping, string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) {
  52. return filer.DetectMountInfo(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, dir)
  53. }
  54. /*
  55. This function update entry.RemoteEntry if the remote has any changes.
  56. To pull remote updates, or created for the first time, the criteria is:
  57. entry == nil or (entry.RemoteEntry != nil and entry.RemoteEntry.RemoteTag != remote.RemoteTag)
  58. After the meta pull, the entry.RemoteEntry will have:
  59. remoteEntry.LastLocalSyncTsNs == 0
  60. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  61. Attributes.Mtime = remoteEntry.RemoteMtime
  62. remoteEntry.RemoteTag = actual remote tag
  63. chunks = nil
  64. When reading the file content or pulling the file content in "remote.cache", the criteria is:
  65. Attributes.FileSize > 0 and len(chunks) == 0
  66. After caching the file content, the entry.RemoteEntry will be
  67. remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano()
  68. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  69. Attributes.Mtime = remoteEntry.RemoteMtime
  70. chunks = non-emtpy
  71. When "weed filer.remote.sync" to upload local changes to remote, the criteria is:
  72. Attributes.Mtime > remoteEntry.RemoteMtime
  73. Right after "weed filer.remote.sync", the entry.RemoteEntry will be
  74. remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano()
  75. remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize
  76. remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime
  77. remoteEntry.RemoteTag = actual remote tag
  78. If entry does not exists, need to pull meta
  79. If entry.RemoteEntry == nil, this is a new local change and should not be overwritten
  80. If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag {
  81. the remote version is updated, need to pull meta
  82. }
  83. */
  84. func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error {
  85. // visit remote storage
  86. remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
  87. if err != nil {
  88. return err
  89. }
  90. remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
  91. err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  92. ctx := context.Background()
  93. err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  94. localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
  95. fmt.Fprint(writer, localDir.Child(name))
  96. lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
  97. Directory: string(localDir),
  98. Name: name,
  99. })
  100. var existingEntry *filer_pb.Entry
  101. if lookupErr != nil {
  102. if lookupErr != filer_pb.ErrNotFound {
  103. return lookupErr
  104. }
  105. } else {
  106. existingEntry = lookupResponse.Entry
  107. }
  108. if existingEntry == nil {
  109. _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
  110. Directory: string(localDir),
  111. Entry: &filer_pb.Entry{
  112. Name: name,
  113. IsDirectory: isDirectory,
  114. Attributes: &filer_pb.FuseAttributes{
  115. FileSize: uint64(remoteEntry.RemoteSize),
  116. Mtime: remoteEntry.RemoteMtime,
  117. FileMode: uint32(0644),
  118. },
  119. RemoteEntry: remoteEntry,
  120. },
  121. })
  122. fmt.Fprintln(writer, " (create)")
  123. return createErr
  124. } else {
  125. if existingEntry.RemoteEntry == nil {
  126. // this is a new local change and should not be overwritten
  127. fmt.Fprintln(writer, " (skip)")
  128. return nil
  129. }
  130. if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag {
  131. // the remote version is updated, need to pull meta
  132. fmt.Fprintln(writer, " (update)")
  133. return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry)
  134. }
  135. }
  136. fmt.Fprintln(writer, " (skip)")
  137. return nil
  138. })
  139. return err
  140. })
  141. if err != nil {
  142. return err
  143. }
  144. return nil
  145. }