You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "google.golang.org/grpc/reflection"
  7. "github.com/chrislusf/seaweedfs/weed/util/grace"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. messageBrokerStandaloneOptions MessageBrokerOptions
  18. )
  19. type MessageBrokerOptions struct {
  20. filer *string
  21. ip *string
  22. port *int
  23. cpuprofile *string
  24. memprofile *string
  25. }
  26. func init() {
  27. cmdMsgBroker.Run = runMsgBroker // break init cycle
  28. messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
  29. messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
  30. messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
  31. messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
  32. messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
  33. }
  34. var cmdMsgBroker = &Command{
  35. UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
  36. Short: "start a message queue broker",
  37. Long: `start a message queue broker
  38. The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
  39. The brokers are stateless. To scale up, just add more brokers.
  40. `,
  41. }
  42. func runMsgBroker(cmd *Command, args []string) bool {
  43. util.LoadConfiguration("security", false)
  44. return messageBrokerStandaloneOptions.startQueueServer()
  45. }
  46. func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
  47. grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
  48. filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer)
  49. if err != nil {
  50. glog.Fatal(err)
  51. return false
  52. }
  53. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
  54. cipher := false
  55. for {
  56. err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  57. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  58. if err != nil {
  59. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  60. }
  61. cipher = resp.Cipher
  62. return nil
  63. })
  64. if err != nil {
  65. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  66. time.Sleep(time.Second)
  67. } else {
  68. glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  69. break
  70. }
  71. }
  72. qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
  73. Filers: []string{*msgBrokerOpt.filer},
  74. DefaultReplication: "",
  75. MaxMB: 0,
  76. Ip: *msgBrokerOpt.ip,
  77. Port: *msgBrokerOpt.port,
  78. Cipher: cipher,
  79. }, grpcDialOption)
  80. // start grpc listener
  81. grpcL, err := util.NewListener(util.JoinHostPort("", *msgBrokerOpt.port), 0)
  82. if err != nil {
  83. glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
  84. }
  85. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
  86. messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
  87. reflection.Register(grpcS)
  88. grpcS.Serve(grpcL)
  89. return true
  90. }