You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
3.9 KiB

5 years ago
3 years ago
5 years ago
5 years ago
3 years ago
3 years ago
5 years ago
5 years ago
5 years ago
3 years ago
3 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
  1. package broker
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/cluster"
  5. "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
  6. "github.com/chrislusf/seaweedfs/weed/wdclient"
  7. "time"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. )
  14. type MessageQueueBrokerOption struct {
  15. Masters map[string]pb.ServerAddress
  16. FilerGroup string
  17. DataCenter string
  18. Rack string
  19. Filers []pb.ServerAddress
  20. DefaultReplication string
  21. MaxMB int
  22. Ip string
  23. Port int
  24. Cipher bool
  25. }
  26. type MessageQueueBroker struct {
  27. mq_pb.UnimplementedSeaweedMessagingServer
  28. option *MessageQueueBrokerOption
  29. grpcDialOption grpc.DialOption
  30. MasterClient *wdclient.MasterClient
  31. topicManager *TopicManager
  32. }
  33. func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
  34. mqBroker = &MessageQueueBroker{
  35. option: option,
  36. grpcDialOption: grpcDialOption,
  37. MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
  38. }
  39. mqBroker.topicManager = NewTopicManager(mqBroker)
  40. mqBroker.checkFilers()
  41. go mqBroker.keepConnectedToOneFiler()
  42. go mqBroker.MasterClient.KeepConnectedToMaster()
  43. return mqBroker, nil
  44. }
  45. func (broker *MessageQueueBroker) keepConnectedToOneFiler() {
  46. for {
  47. for _, filer := range broker.option.Filers {
  48. broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
  49. ctx, cancel := context.WithCancel(context.Background())
  50. defer cancel()
  51. stream, err := client.KeepConnected(ctx)
  52. if err != nil {
  53. glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  54. return err
  55. }
  56. initRequest := &filer_pb.KeepConnectedRequest{
  57. Name: broker.option.Ip,
  58. GrpcPort: uint32(broker.option.Port),
  59. }
  60. for _, tp := range broker.topicManager.ListTopicPartitions() {
  61. initRequest.Resources = append(initRequest.Resources, tp.String())
  62. }
  63. if err := stream.Send(&filer_pb.KeepConnectedRequest{
  64. Name: broker.option.Ip,
  65. GrpcPort: uint32(broker.option.Port),
  66. }); err != nil {
  67. glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  68. return err
  69. }
  70. // TODO send events of adding/removing topics
  71. glog.V(0).Infof("conntected with filer: %v", filer)
  72. for {
  73. if err := stream.Send(&filer_pb.KeepConnectedRequest{
  74. Name: broker.option.Ip,
  75. GrpcPort: uint32(broker.option.Port),
  76. }); err != nil {
  77. glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  78. return err
  79. }
  80. // println("send heartbeat")
  81. if _, err := stream.Recv(); err != nil {
  82. glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  83. return err
  84. }
  85. // println("received reply")
  86. time.Sleep(11 * time.Second)
  87. // println("woke up")
  88. }
  89. return nil
  90. })
  91. time.Sleep(3 * time.Second)
  92. }
  93. }
  94. }
  95. func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
  96. return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
  97. }
  98. func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
  99. return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
  100. return fn(client)
  101. })
  102. }