You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

135 lines
4.2 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package command
  2. import (
  3. "strings"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/replication"
  6. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  7. _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
  8. _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
  9. _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  10. _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
  11. _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
  12. "github.com/chrislusf/seaweedfs/weed/replication/sub"
  13. "github.com/chrislusf/seaweedfs/weed/server"
  14. "github.com/spf13/viper"
  15. )
  16. func init() {
  17. cmdFilerReplicate.Run = runFilerReplicate // break init cycle
  18. }
  19. var cmdFilerReplicate = &Command{
  20. UsageLine: "filer.replicate",
  21. Short: "replicate file changes to another destination",
  22. Long: `replicate file changes to another destination
  23. filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
  24. and write to the other destination.
  25. Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters.
  26. `,
  27. }
  28. func runFilerReplicate(cmd *Command, args []string) bool {
  29. weed_server.LoadConfiguration("replication", true)
  30. weed_server.LoadConfiguration("notification", true)
  31. config := viper.GetViper()
  32. var notificationInput sub.NotificationInput
  33. enabledInput := ""
  34. for _, input := range sub.NotificationInputs {
  35. if config.GetBool("notification." + input.GetName() + ".enabled") {
  36. if enabledInput == "" {
  37. enabledInput = input.GetName()
  38. } else {
  39. glog.Fatalf("Notification input is enabled for both %s and %s", enabledInput, input.GetName())
  40. }
  41. }
  42. }
  43. for _, input := range sub.NotificationInputs {
  44. if config.GetBool("notification." + input.GetName() + ".enabled") {
  45. viperSub := config.Sub("notification." + input.GetName())
  46. if err := input.Initialize(viperSub); err != nil {
  47. glog.Fatalf("Failed to initialize notification input for %s: %+v",
  48. input.GetName(), err)
  49. }
  50. glog.V(0).Infof("Configure notification input to %s", input.GetName())
  51. notificationInput = input
  52. break
  53. }
  54. }
  55. if notificationInput == nil {
  56. println("No notification is defined in notification.toml file.")
  57. println("Please follow 'weed scaffold -config=notification' to see example notification configurations.")
  58. return true
  59. }
  60. // avoid recursive replication
  61. if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
  62. sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
  63. if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
  64. fromDir := sourceConfig.GetString("directory")
  65. toDir := sinkConfig.GetString("directory")
  66. if strings.HasPrefix(toDir, fromDir) {
  67. glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
  68. }
  69. }
  70. }
  71. var dataSink sink.ReplicationSink
  72. for _, sk := range sink.Sinks {
  73. if config.GetBool("sink." + sk.GetName() + ".enabled") {
  74. viperSub := config.Sub("sink." + sk.GetName())
  75. if err := sk.Initialize(viperSub); err != nil {
  76. glog.Fatalf("Failed to initialize sink for %s: %+v",
  77. sk.GetName(), err)
  78. }
  79. glog.V(0).Infof("Configure sink to %s", sk.GetName())
  80. dataSink = sk
  81. break
  82. }
  83. }
  84. if dataSink == nil {
  85. println("no data sink configured in replication.toml:")
  86. for _, sk := range sink.Sinks {
  87. println(" " + sk.GetName())
  88. }
  89. return true
  90. }
  91. replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink)
  92. for {
  93. key, m, err := notificationInput.ReceiveMessage()
  94. if err != nil {
  95. glog.Errorf("receive %s: %+v", key, err)
  96. continue
  97. }
  98. if key == "" {
  99. // long poll received no messages
  100. continue
  101. }
  102. if m.OldEntry != nil && m.NewEntry == nil {
  103. glog.V(1).Infof("delete: %s", key)
  104. } else if m.OldEntry == nil && m.NewEntry != nil {
  105. glog.V(1).Infof(" add: %s", key)
  106. } else {
  107. glog.V(1).Infof("modify: %s", key)
  108. }
  109. if err = replicator.Replicate(key, m); err != nil {
  110. glog.Errorf("replicate %s: %+v", key, err)
  111. } else {
  112. glog.V(1).Infof("replicated %s", key)
  113. }
  114. }
  115. return true
  116. }