You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
4.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication/source"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "google.golang.org/grpc"
  13. "time"
  14. )
  15. type RemoteSyncOptions struct {
  16. filerAddress *string
  17. grpcDialOption grpc.DialOption
  18. readChunkFromFiler *bool
  19. debug *bool
  20. timeAgo *time.Duration
  21. dir *string
  22. createBucketAt *string
  23. createBucketRandomSuffix *bool
  24. mappings *remote_pb.RemoteStorageMapping
  25. remoteConfs map[string]*remote_pb.RemoteConf
  26. bucketsDir string
  27. }
  28. const (
  29. RemoteSyncKeyPrefix = "remote.sync."
  30. )
  31. var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
  32. func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  33. return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  34. return fn(client)
  35. })
  36. }
  37. func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
  38. return location.Url
  39. }
  40. var (
  41. remoteSyncOptions RemoteSyncOptions
  42. )
  43. func init() {
  44. cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
  45. remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  46. remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
  47. remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
  48. remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", false, "add randomized suffix to bucket name to avoid conflicts")
  49. remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  50. remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
  51. remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  52. }
  53. var cmdFilerRemoteSynchronize = &Command{
  54. UsageLine: "filer.remote.sync -dir=/mount/s3_on_cloud or -createBucketAt=clound1",
  55. Short: "resumable continuously write back updates to remote storage",
  56. Long: `resumable continuously write back updates to remote storage
  57. filer.remote.sync listens on filer update events.
  58. If any mounted remote file is updated, it will fetch the updated content,
  59. and write to the remote storage.
  60. There are two modes:
  61. 1)Write back one mounted folder to remote storage
  62. weed filer.remote.sync -dir=/mount/s3_on_cloud
  63. 2)Watch /buckets folder and write back all changes.
  64. Any new buckets will be created in this remote storage.
  65. weed filer.remote.sync -createBucketAt=cloud1
  66. `,
  67. }
  68. func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
  69. util.LoadConfiguration("security", false)
  70. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  71. remoteSyncOptions.grpcDialOption = grpcDialOption
  72. dir := *remoteSyncOptions.dir
  73. filerAddress := *remoteSyncOptions.filerAddress
  74. filerSource := &source.FilerSource{}
  75. filerSource.DoInitialize(
  76. filerAddress,
  77. pb.ServerToGrpcAddress(filerAddress),
  78. "/", // does not matter
  79. *remoteSyncOptions.readChunkFromFiler,
  80. )
  81. storageName := *remoteSyncOptions.createBucketAt
  82. if storageName != "" {
  83. remoteSyncOptions.bucketsDir = "/buckets"
  84. // check buckets again
  85. remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
  86. resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  87. if err != nil {
  88. return err
  89. }
  90. remoteSyncOptions.bucketsDir = resp.DirBuckets
  91. return nil
  92. })
  93. fmt.Printf("synchronize %s, default new bucket creation in %s ...\n", remoteSyncOptions.bucketsDir, storageName)
  94. util.RetryForever("filer.remote.sync buckets "+storageName, func() error {
  95. return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource)
  96. }, func(err error) bool {
  97. if err != nil {
  98. glog.Errorf("synchronize %s to %s: %v", remoteSyncOptions.bucketsDir, storageName, err)
  99. }
  100. return true
  101. })
  102. }
  103. if dir != "" {
  104. fmt.Printf("synchronize %s to remote storage...\n", dir)
  105. util.RetryForever("filer.remote.sync "+dir, func() error {
  106. return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
  107. }, func(err error) bool {
  108. if err != nil {
  109. glog.Errorf("synchronize %s: %v", dir, err)
  110. }
  111. return true
  112. })
  113. }
  114. return true
  115. }