You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

144 lines
4.3 KiB

5 years ago
1 year ago
5 years ago
5 years ago
3 years ago
3 years ago
5 years ago
5 years ago
5 years ago
3 years ago
3 years ago
1 year ago
5 years ago
5 years ago
5 years ago
5 years ago
1 year ago
5 years ago
3 years ago
5 years ago
3 years ago
5 years ago
3 years ago
5 years ago
5 years ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/cluster"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. )
  16. type MessageQueueBrokerOption struct {
  17. Masters map[string]pb.ServerAddress
  18. FilerGroup string
  19. DataCenter string
  20. Rack string
  21. DefaultReplication string
  22. MaxMB int
  23. Ip string
  24. Port int
  25. Cipher bool
  26. }
  27. type MessageQueueBroker struct {
  28. mq_pb.UnimplementedSeaweedMessagingServer
  29. option *MessageQueueBrokerOption
  30. grpcDialOption grpc.DialOption
  31. MasterClient *wdclient.MasterClient
  32. filers map[pb.ServerAddress]struct{}
  33. currentFiler pb.ServerAddress
  34. localTopicManager *topic.LocalTopicManager
  35. Balancer *balancer.Balancer
  36. lockAsBalancer *cluster.LiveLock
  37. }
  38. func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
  39. mqBroker = &MessageQueueBroker{
  40. option: option,
  41. grpcDialOption: grpcDialOption,
  42. MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
  43. filers: make(map[pb.ServerAddress]struct{}),
  44. localTopicManager: topic.NewLocalTopicManager(),
  45. Balancer: balancer.NewBalancer(),
  46. }
  47. mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
  48. go mqBroker.MasterClient.KeepConnectedToMaster()
  49. existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
  50. for _, newNode := range existingNodes {
  51. mqBroker.OnBrokerUpdate(newNode, time.Now())
  52. }
  53. // keep connecting to balancer
  54. go func() {
  55. for mqBroker.currentFiler == "" {
  56. time.Sleep(time.Millisecond * 237)
  57. }
  58. self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
  59. glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
  60. lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
  61. mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self)
  62. for {
  63. err := mqBroker.BrokerConnectToBalancer(self)
  64. if err != nil {
  65. fmt.Printf("BrokerConnectToBalancer: %v\n", err)
  66. }
  67. time.Sleep(time.Second)
  68. }
  69. }()
  70. return mqBroker, nil
  71. }
  72. func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  73. if update.NodeType != cluster.FilerType {
  74. return
  75. }
  76. address := pb.ServerAddress(update.Address)
  77. if update.IsAdd {
  78. broker.filers[address] = struct{}{}
  79. if broker.currentFiler == "" {
  80. broker.currentFiler = address
  81. }
  82. } else {
  83. delete(broker.filers, address)
  84. if broker.currentFiler == address {
  85. for filer := range broker.filers {
  86. broker.currentFiler = filer
  87. break
  88. }
  89. }
  90. }
  91. }
  92. func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
  93. return broker.currentFiler
  94. }
  95. func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  96. return pb.WithFilerClient(streamingMode, 0, broker.GetFiler(), broker.grpcDialOption, fn)
  97. }
  98. func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
  99. return location.Url
  100. }
  101. func (broker *MessageQueueBroker) GetDataCenter() string {
  102. return ""
  103. }
  104. func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
  105. return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  106. return fn(client)
  107. })
  108. }
  109. func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
  110. return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  111. return fn(client)
  112. })
  113. }