You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
6.8 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "io"
  11. "strings"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandRemoteMetaSync{})
  15. }
  16. type commandRemoteMetaSync struct {
  17. }
  18. func (c *commandRemoteMetaSync) Name() string {
  19. return "remote.meta.sync"
  20. }
  21. func (c *commandRemoteMetaSync) Help() string {
  22. return `synchronize the local file meta data with the remote file metadata
  23. # assume a remote storage is configured to name "cloud1"
  24. remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
  25. # mount and pull one bucket
  26. remote.mount -dir=/xxx -remote=cloud1/bucket
  27. After mount, if the remote file can be changed,
  28. run this command to synchronize the metadata of the mounted folder or any sub folder
  29. remote.meta.sync -dir=/xxx
  30. remote.meta.sync -dir=/xxx/some/subdir
  31. This is designed to run regularly. So you can add it to some cronjob.
  32. If there are no other operations changing remote files, this operation is not needed.
  33. `
  34. }
  35. func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  36. remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  37. dir := remoteMetaSyncCommand.String("dir", "", "a directory in filer")
  38. if err = remoteMetaSyncCommand.Parse(args); err != nil {
  39. return nil
  40. }
  41. localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
  42. if detectErr != nil{
  43. return detectErr
  44. }
  45. // pull metadata from remote
  46. if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil {
  47. return fmt.Errorf("cache content data: %v", err)
  48. }
  49. return nil
  50. }
  51. func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) {
  52. mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
  53. if listErr != nil {
  54. return "", nil, nil, listErr
  55. }
  56. if dir == "" {
  57. jsonPrintln(writer, mappings)
  58. return "", nil, nil, fmt.Errorf("need to specify '-dir' option")
  59. }
  60. var localMountedDir string
  61. var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
  62. for k, loc := range mappings.Mappings {
  63. if strings.HasPrefix(dir, k) {
  64. localMountedDir, remoteStorageMountedLocation = k, loc
  65. }
  66. }
  67. if localMountedDir == "" {
  68. jsonPrintln(writer, mappings)
  69. return "", nil, nil, fmt.Errorf("%s is not mounted", dir)
  70. }
  71. // find remote storage configuration
  72. remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name)
  73. if err != nil {
  74. return "", nil, nil, err
  75. }
  76. return localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil
  77. }
  78. /*
  79. This function update entry.RemoteEntry if the remote has any changes.
  80. To pull remote updates, or created for the first time, the criteria is:
  81. entry == nil or (entry.RemoteEntry != nil and entry.RemoteEntry.RemoteTag != remote.RemoteTag)
  82. After the meta pull, the entry.RemoteEntry will have:
  83. remoteEntry.LastLocalSyncTsNs == 0
  84. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  85. Attributes.Mtime = remoteEntry.RemoteMtime
  86. remoteEntry.RemoteTag = actual remote tag
  87. chunks = nil
  88. When reading the file content or pulling the file content in "remote.cache", the criteria is:
  89. Attributes.FileSize > 0 and len(chunks) == 0
  90. After caching the file content, the entry.RemoteEntry will be
  91. remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano()
  92. Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  93. Attributes.Mtime = remoteEntry.RemoteMtime
  94. chunks = non-emtpy
  95. When "weed filer.remote.sync" to upload local changes to remote, the criteria is:
  96. Attributes.Mtime > remoteEntry.RemoteMtime
  97. Right after "weed filer.remote.sync", the entry.RemoteEntry will be
  98. remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano()
  99. remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize
  100. remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime
  101. remoteEntry.RemoteTag = actual remote tag
  102. If entry does not exists, need to pull meta
  103. If entry.RemoteEntry == nil, this is a new local change and should not be overwritten
  104. If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag {
  105. the remote version is updated, need to pull meta
  106. }
  107. */
  108. func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error {
  109. // visit remote storage
  110. remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
  111. if err != nil {
  112. return err
  113. }
  114. remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
  115. err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  116. ctx := context.Background()
  117. err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  118. localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
  119. fmt.Fprint(writer, localDir.Child(name))
  120. lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
  121. Directory: string(localDir),
  122. Name: name,
  123. })
  124. var existingEntry *filer_pb.Entry
  125. if lookupErr != nil {
  126. if lookupErr != filer_pb.ErrNotFound {
  127. return lookupErr
  128. }
  129. } else {
  130. existingEntry = lookupResponse.Entry
  131. }
  132. if existingEntry == nil {
  133. _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
  134. Directory: string(localDir),
  135. Entry: &filer_pb.Entry{
  136. Name: name,
  137. IsDirectory: isDirectory,
  138. Attributes: &filer_pb.FuseAttributes{
  139. FileSize: uint64(remoteEntry.RemoteSize),
  140. Mtime: remoteEntry.RemoteMtime,
  141. FileMode: uint32(0644),
  142. },
  143. RemoteEntry: remoteEntry,
  144. },
  145. })
  146. fmt.Fprintln(writer, " (create)")
  147. return createErr
  148. } else {
  149. if existingEntry.RemoteEntry == nil {
  150. // this is a new local change and should not be overwritten
  151. fmt.Fprintln(writer, " (skip)")
  152. return nil
  153. }
  154. if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag {
  155. // the remote version is updated, need to pull meta
  156. fmt.Fprintln(writer, " (update)")
  157. return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry)
  158. }
  159. }
  160. fmt.Fprintln(writer, " (skip)")
  161. return nil
  162. })
  163. return err
  164. })
  165. if err != nil {
  166. return err
  167. }
  168. return nil
  169. }