You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
6.4 KiB

3 years ago
2 years ago
2 years ago
2 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  7. "github.com/seaweedfs/seaweedfs/weed/security"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "regexp"
  11. "strings"
  12. "time"
  13. )
  14. type FilerBackupOptions struct {
  15. isActivePassive *bool
  16. filer *string
  17. path *string
  18. excludePaths *string
  19. excludeFileName *string
  20. debug *bool
  21. proxyByFiler *bool
  22. doDeleteFiles *bool
  23. timeAgo *time.Duration
  24. retentionDays *int
  25. }
  26. var (
  27. filerBackupOptions FilerBackupOptions
  28. )
  29. func init() {
  30. cmdFilerBackup.Run = runFilerBackup // break init cycle
  31. filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
  32. filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
  33. filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
  34. filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "exclude file names that match the regexp to sync on filer")
  35. filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
  36. filerBackupOptions.doDeleteFiles = cmdFilerBackup.Flag.Bool("doDeleteFiles", false, "delete files on the destination")
  37. filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
  38. filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  39. filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
  40. }
  41. var cmdFilerBackup = &Command{
  42. UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
  43. Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
  44. Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
  45. filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
  46. and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
  47. If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
  48. A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
  49. `,
  50. }
  51. func runFilerBackup(cmd *Command, args []string) bool {
  52. util.LoadConfiguration("security", false)
  53. util.LoadConfiguration("replication", true)
  54. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  55. clientId := util.RandomInt32()
  56. var clientEpoch int32
  57. for {
  58. clientEpoch++
  59. err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId, clientEpoch)
  60. if err != nil {
  61. glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
  62. time.Sleep(1747 * time.Millisecond)
  63. }
  64. }
  65. return true
  66. }
  67. const (
  68. BackupKeyPrefix = "backup."
  69. )
  70. func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error {
  71. // find data sink
  72. dataSink := findSink(util.GetViper())
  73. if dataSink == nil {
  74. return fmt.Errorf("no data sink configured in replication.toml")
  75. }
  76. sourceFiler := pb.ServerAddress(*backupOption.filer)
  77. sourcePath := *backupOption.path
  78. excludePaths := util.StringSplit(*backupOption.excludePaths, ",")
  79. var reExcludeFileName *regexp.Regexp
  80. if *backupOption.excludeFileName != "" {
  81. var err error
  82. if reExcludeFileName, err = regexp.Compile(*backupOption.excludeFileName); err != nil {
  83. return fmt.Errorf("error compile regexp %v for exclude file name: %+v", *backupOption.excludeFileName, err)
  84. }
  85. }
  86. timeAgo := *backupOption.timeAgo
  87. targetPath := dataSink.GetSinkToDirectory()
  88. debug := *backupOption.debug
  89. // get start time for the data sink
  90. startFrom := time.Unix(0, 0)
  91. sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
  92. if timeAgo.Milliseconds() == 0 {
  93. lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
  94. if err != nil {
  95. glog.V(0).Infof("starting from %v", startFrom)
  96. } else {
  97. startFrom = time.Unix(0, lastOffsetTsNs)
  98. glog.V(0).Infof("resuming from %v", startFrom)
  99. }
  100. } else {
  101. startFrom = time.Now().Add(-timeAgo)
  102. glog.V(0).Infof("start time is set to %v", startFrom)
  103. }
  104. // create filer sink
  105. filerSource := &source.FilerSource{}
  106. filerSource.DoInitialize(
  107. sourceFiler.ToHttpAddress(),
  108. sourceFiler.ToGrpcAddress(),
  109. sourcePath,
  110. *backupOption.proxyByFiler)
  111. dataSink.SetSourceFiler(filerSource)
  112. processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
  113. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  114. glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
  115. return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
  116. })
  117. if dataSink.IsIncremental() && *filerBackupOptions.retentionDays > 0 {
  118. go func() {
  119. for {
  120. now := time.Now()
  121. time.Sleep(time.Hour * 24)
  122. key := util.Join(targetPath, now.Add(-1*time.Hour*24*time.Duration(*filerBackupOptions.retentionDays)).Format("2006-01-02"))
  123. _ = dataSink.DeleteEntry(util.Join(targetPath, key), true, true, nil)
  124. glog.V(0).Infof("incremental backup delete directory:%s", key)
  125. }
  126. }()
  127. }
  128. prefix := sourcePath
  129. if !strings.HasSuffix(prefix, "/") {
  130. prefix = prefix + "/"
  131. }
  132. metadataFollowOption := &pb.MetadataFollowOption{
  133. ClientName: "backup_" + dataSink.GetName(),
  134. ClientId: clientId,
  135. ClientEpoch: clientEpoch,
  136. SelfSignature: 0,
  137. PathPrefix: prefix,
  138. AdditionalPathPrefixes: nil,
  139. DirectoriesToWatch: nil,
  140. StartTsNs: startFrom.UnixNano(),
  141. StopTsNs: 0,
  142. EventErrorType: pb.RetryForeverOnError,
  143. }
  144. return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  145. }