You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
6.9 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "io"
  11. "strings"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandRemoteMetaSync{})
  15. }
  16. type commandRemoteMetaSync struct {
  17. }
  18. func (c *commandRemoteMetaSync) Name() string {
  19. return "remote.meta.sync"
  20. }
  21. func (c *commandRemoteMetaSync) Help() string {
  22. return `synchronize the local file meta data with the remote file metadata
  23. # assume a remote storage is configured to name "cloud1"
  24. remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
  25. # mount and pull one bucket
  26. remote.mount -dir=/xxx -remote=cloud1/bucket
  27. After mount, if the remote file can be changed,
  28. run this command to synchronize the metadata of the mounted folder or any sub folder
  29. remote.meta.sync -dir=/xxx
  30. remote.meta.sync -dir=/xxx/some/subdir
  31. This is designed to run regularly. So you can add it to some cronjob.
  32. If there are no other operations changing remote files, this operation is not needed.
  33. `
  34. }
  35. func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  36. remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  37. dir := remoteMetaSyncCommand.String("dir", "", "a directory in filer")
  38. if err = remoteMetaSyncCommand.Parse(args); err != nil {
  39. return nil
  40. }
  41. mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
  42. if detectErr != nil{
  43. jsonPrintln(writer, mappings)
  44. return detectErr
  45. }
  46. // pull metadata from remote
  47. if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil {
  48. return fmt.Errorf("cache content data: %v", err)
  49. }
  50. return nil
  51. }
  52. func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (*filer_pb.RemoteStorageMapping, string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) {
  53. mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
  54. if listErr != nil {
  55. return nil, "", nil, nil, listErr
  56. }
  57. if dir == "" {
  58. return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option")
  59. }
  60. var localMountedDir string
  61. var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
  62. for k, loc := range mappings.Mappings {
  63. if strings.HasPrefix(dir, k) {
  64. localMountedDir, remoteStorageMountedLocation = k, loc
  65. }
  66. }
  67. if localMountedDir == "" {
  68. return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir)
  69. }
  70. // find remote storage configuration
  71. remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name)
  72. if err != nil {
  73. return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err
  74. }
  75. return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil
  76. }
  77. /*
  78. This function update entry.RemoteEntry if the remote has any changes.
  79. To pull remote updates, or created for the first time, the criteria is:
  80. entry == nil or (entry.RemoteEntry != nil and entry.RemoteEntry.RemoteTag != remote.RemoteTag)
  81. After the meta pull, the entry.RemoteEntry will have:
  82. remoteEntry.LastLocalSyncTsNs == 0
  83. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  84. Attributes.Mtime = remoteEntry.RemoteMtime
  85. remoteEntry.RemoteTag = actual remote tag
  86. chunks = nil
  87. When reading the file content or pulling the file content in "remote.cache", the criteria is:
  88. Attributes.FileSize > 0 and len(chunks) == 0
  89. After caching the file content, the entry.RemoteEntry will be
  90. remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano()
  91. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  92. Attributes.Mtime = remoteEntry.RemoteMtime
  93. chunks = non-emtpy
  94. When "weed filer.remote.sync" to upload local changes to remote, the criteria is:
  95. Attributes.Mtime > remoteEntry.RemoteMtime
  96. Right after "weed filer.remote.sync", the entry.RemoteEntry will be
  97. remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano()
  98. remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize
  99. remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime
  100. remoteEntry.RemoteTag = actual remote tag
  101. If entry does not exists, need to pull meta
  102. If entry.RemoteEntry == nil, this is a new local change and should not be overwritten
  103. If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag {
  104. the remote version is updated, need to pull meta
  105. }
  106. */
  107. func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error {
  108. // visit remote storage
  109. remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
  110. if err != nil {
  111. return err
  112. }
  113. remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
  114. err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  115. ctx := context.Background()
  116. err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  117. localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
  118. fmt.Fprint(writer, localDir.Child(name))
  119. lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
  120. Directory: string(localDir),
  121. Name: name,
  122. })
  123. var existingEntry *filer_pb.Entry
  124. if lookupErr != nil {
  125. if lookupErr != filer_pb.ErrNotFound {
  126. return lookupErr
  127. }
  128. } else {
  129. existingEntry = lookupResponse.Entry
  130. }
  131. if existingEntry == nil {
  132. _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
  133. Directory: string(localDir),
  134. Entry: &filer_pb.Entry{
  135. Name: name,
  136. IsDirectory: isDirectory,
  137. Attributes: &filer_pb.FuseAttributes{
  138. FileSize: uint64(remoteEntry.RemoteSize),
  139. Mtime: remoteEntry.RemoteMtime,
  140. FileMode: uint32(0644),
  141. },
  142. RemoteEntry: remoteEntry,
  143. },
  144. })
  145. fmt.Fprintln(writer, " (create)")
  146. return createErr
  147. } else {
  148. if existingEntry.RemoteEntry == nil {
  149. // this is a new local change and should not be overwritten
  150. fmt.Fprintln(writer, " (skip)")
  151. return nil
  152. }
  153. if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag {
  154. // the remote version is updated, need to pull meta
  155. fmt.Fprintln(writer, " (update)")
  156. return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry)
  157. }
  158. }
  159. fmt.Fprintln(writer, " (skip)")
  160. return nil
  161. })
  162. return err
  163. })
  164. if err != nil {
  165. return err
  166. }
  167. return nil
  168. }