You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
4.3 KiB

5 years ago
5 years ago
3 years ago
5 years ago
3 years ago
5 years ago
5 years ago
3 years ago
5 years ago
3 years ago
5 years ago
3 years ago
5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
5 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "google.golang.org/grpc/reflection"
  7. "github.com/chrislusf/seaweedfs/weed/util/grace"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/mq/broker"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. mqBrokerStandaloneOptions MessageQueueBrokerOptions
  18. )
  19. type MessageQueueBrokerOptions struct {
  20. masters map[string]pb.ServerAddress
  21. mastersString *string
  22. filerGroup *string
  23. filer *string
  24. ip *string
  25. port *int
  26. dataCenter *string
  27. rack *string
  28. cpuprofile *string
  29. memprofile *string
  30. }
  31. func init() {
  32. cmdMqBroker.Run = runMqBroker // break init cycle
  33. mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers")
  34. mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address")
  35. mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
  36. mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
  37. mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
  38. mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
  39. mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack")
  40. mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file")
  41. mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file")
  42. }
  43. var cmdMqBroker = &Command{
  44. UsageLine: "mq.broker [-port=17777] [-master=<ip:port>]",
  45. Short: "start a message queue broker",
  46. Long: `start a message queue broker
  47. The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
  48. The brokers are stateless. To scale up, just add more brokers.
  49. `,
  50. }
  51. func runMqBroker(cmd *Command, args []string) bool {
  52. util.LoadConfiguration("security", false)
  53. mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap()
  54. return mqBrokerStandaloneOptions.startQueueServer()
  55. }
  56. func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
  57. grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
  58. filerAddress := pb.ServerAddress(*mqBrokerOpt.filer)
  59. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
  60. cipher := false
  61. for {
  62. err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  63. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  64. if err != nil {
  65. return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
  66. }
  67. cipher = resp.Cipher
  68. return nil
  69. })
  70. if err != nil {
  71. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
  72. time.Sleep(time.Second)
  73. } else {
  74. glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
  75. break
  76. }
  77. }
  78. qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{
  79. Masters: mqBrokerOpt.masters,
  80. FilerGroup: *mqBrokerOpt.filerGroup,
  81. DataCenter: *mqBrokerOpt.dataCenter,
  82. Rack: *mqBrokerOpt.rack,
  83. Filers: []pb.ServerAddress{filerAddress},
  84. DefaultReplication: "",
  85. MaxMB: 0,
  86. Ip: *mqBrokerOpt.ip,
  87. Port: *mqBrokerOpt.port,
  88. Cipher: cipher,
  89. }, grpcDialOption)
  90. // start grpc listener
  91. grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0)
  92. if err != nil {
  93. glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
  94. }
  95. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
  96. mq_pb.RegisterSeaweedMessagingServer(grpcS, qs)
  97. reflection.Register(grpcS)
  98. grpcS.Serve(grpcL)
  99. return true
  100. }