You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

81 lines
2.5 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package command
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "github.com/chrislusf/seaweedfs/weed/replication"
  5. "github.com/chrislusf/seaweedfs/weed/server"
  6. "github.com/spf13/viper"
  7. "strings"
  8. )
  9. func init() {
  10. cmdFilerReplicate.Run = runFilerReplicate // break init cycle
  11. }
  12. var cmdFilerReplicate = &Command{
  13. UsageLine: "filer.replicate",
  14. Short: "replicate file changes to another destination",
  15. Long: `replicate file changes to another destination
  16. filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
  17. and write to the other destination.
  18. Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters.
  19. `,
  20. }
  21. func runFilerReplicate(cmd *Command, args []string) bool {
  22. weed_server.LoadConfiguration("replication", true)
  23. config := viper.GetViper()
  24. var notificationInput replication.NotificationInput
  25. for _, input := range replication.NotificationInputs {
  26. if config.GetBool("notification." + input.GetName() + ".enabled") {
  27. viperSub := config.Sub("notification." + input.GetName())
  28. if err := input.Initialize(viperSub); err != nil {
  29. glog.Fatalf("Failed to initialize notification input for %s: %+v",
  30. input.GetName(), err)
  31. }
  32. glog.V(0).Infof("Configure notification input to %s", input.GetName())
  33. notificationInput = input
  34. break
  35. }
  36. }
  37. // avoid recursive replication
  38. if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
  39. sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
  40. if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
  41. fromDir := sourceConfig.GetString("directory")
  42. toDir := sinkConfig.GetString("directory")
  43. if strings.HasPrefix(toDir, fromDir) {
  44. glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
  45. }
  46. }
  47. }
  48. replicator := replication.NewReplicator(config.Sub("source.filer"), config.Sub("sink.filer"))
  49. for {
  50. key, m, err := notificationInput.ReceiveMessage()
  51. if err != nil {
  52. glog.Errorf("receive %s: %+v", key, err)
  53. continue
  54. }
  55. if m.OldEntry != nil && m.NewEntry == nil {
  56. glog.V(1).Infof("delete: %s", key)
  57. } else if m.OldEntry == nil && m.NewEntry != nil {
  58. glog.V(1).Infof(" add: %s", key)
  59. } else {
  60. glog.V(1).Infof("modify: %s", key)
  61. }
  62. if err = replicator.Replicate(key, m); err != nil {
  63. glog.Errorf("replicate %s: %+v", key, err)
  64. }
  65. }
  66. return true
  67. }