120 lines
4.4 KiB

3 years ago
3 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/replication/source"
  7. "github.com/chrislusf/seaweedfs/weed/security"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "time"
  11. )
  12. type FilerBackupOptions struct {
  13. isActivePassive *bool
  14. filer *string
  15. path *string
  16. debug *bool
  17. proxyByFiler *bool
  18. timeAgo *time.Duration
  19. }
  20. var (
  21. filerBackupOptions FilerBackupOptions
  22. )
  23. func init() {
  24. cmdFilerBackup.Run = runFilerBackup // break init cycle
  25. filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
  26. filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
  27. filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
  28. filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
  29. filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  30. }
  31. var cmdFilerBackup = &Command{
  32. UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
  33. Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
  34. Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
  35. filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
  36. and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
  37. If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
  38. A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
  39. `,
  40. }
  41. func runFilerBackup(cmd *Command, args []string) bool {
  42. util.LoadConfiguration("security", false)
  43. util.LoadConfiguration("replication", true)
  44. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  45. clientId := util.RandomInt32()
  46. for {
  47. err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId)
  48. if err != nil {
  49. glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
  50. time.Sleep(1747 * time.Millisecond)
  51. }
  52. }
  53. return true
  54. }
  55. const (
  56. BackupKeyPrefix = "backup."
  57. )
  58. func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error {
  59. // find data sink
  60. config := util.GetViper()
  61. dataSink := findSink(config)
  62. if dataSink == nil {
  63. return fmt.Errorf("no data sink configured in replication.toml")
  64. }
  65. sourceFiler := pb.ServerAddress(*backupOption.filer)
  66. sourcePath := *backupOption.path
  67. timeAgo := *backupOption.timeAgo
  68. targetPath := dataSink.GetSinkToDirectory()
  69. debug := *backupOption.debug
  70. // get start time for the data sink
  71. startFrom := time.Unix(0, 0)
  72. sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
  73. if timeAgo.Milliseconds() == 0 {
  74. lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
  75. if err != nil {
  76. glog.V(0).Infof("starting from %v", startFrom)
  77. } else {
  78. startFrom = time.Unix(0, lastOffsetTsNs)
  79. glog.V(0).Infof("resuming from %v", startFrom)
  80. }
  81. } else {
  82. startFrom = time.Now().Add(-timeAgo)
  83. glog.V(0).Infof("start time is set to %v", startFrom)
  84. }
  85. // create filer sink
  86. filerSource := &source.FilerSource{}
  87. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler)
  88. dataSink.SetSourceFiler(filerSource)
  89. processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
  90. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  91. glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
  92. return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
  93. })
  94. return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId,
  95. sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
  96. }