You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
2.9 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "google.golang.org/grpc/reflection"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. messageBrokerStandaloneOptions QueueOptions
  18. )
  19. type QueueOptions struct {
  20. filer *string
  21. port *int
  22. }
  23. func init() {
  24. cmdMsgBroker.Run = runMsgBroker // break init cycle
  25. messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
  26. messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
  27. }
  28. var cmdMsgBroker = &Command{
  29. UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]",
  30. Short: "<WIP> start a message queue broker",
  31. Long: `start a message queue broker
  32. The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
  33. The brokers are stateless. To scale up, just add more brokers.
  34. `,
  35. }
  36. func runMsgBroker(cmd *Command, args []string) bool {
  37. util.LoadConfiguration("security", false)
  38. return messageBrokerStandaloneOptions.startQueueServer()
  39. }
  40. func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
  41. filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
  42. if err != nil {
  43. glog.Fatal(err)
  44. return false
  45. }
  46. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
  47. cipher := false
  48. for {
  49. err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  50. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  51. if err != nil {
  52. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  53. }
  54. cipher = resp.Cipher
  55. return nil
  56. })
  57. if err != nil {
  58. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  59. time.Sleep(time.Second)
  60. } else {
  61. glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  62. break
  63. }
  64. }
  65. qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
  66. Filers: []string{*msgBrokerOpt.filer},
  67. DefaultReplication: "",
  68. MaxMB: 0,
  69. Port: *msgBrokerOpt.port,
  70. Cipher: cipher,
  71. }, grpcDialOption)
  72. // start grpc listener
  73. grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
  74. if err != nil {
  75. glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
  76. }
  77. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
  78. messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
  79. reflection.Register(grpcS)
  80. grpcS.Serve(grpcL)
  81. return true
  82. }