You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
3.7 KiB

5 years ago
5 years ago
5 years ago
3 years ago
3 years ago
5 years ago
5 years ago
5 years ago
3 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
3 years ago
5 years ago
3 years ago
5 years ago
3 years ago
5 years ago
5 years ago
  1. package broker
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/cluster"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  9. "google.golang.org/grpc"
  10. "github.com/seaweedfs/seaweedfs/weed/pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  13. )
  14. type MessageQueueBrokerOption struct {
  15. Masters map[string]pb.ServerAddress
  16. FilerGroup string
  17. DataCenter string
  18. Rack string
  19. DefaultReplication string
  20. MaxMB int
  21. Ip string
  22. Port int
  23. Cipher bool
  24. }
  25. type MessageQueueBroker struct {
  26. mq_pb.UnimplementedSeaweedMessagingServer
  27. option *MessageQueueBrokerOption
  28. grpcDialOption grpc.DialOption
  29. MasterClient *wdclient.MasterClient
  30. filers map[pb.ServerAddress]struct{}
  31. currentFiler pb.ServerAddress
  32. localTopicManager *topic.LocalTopicManager
  33. Balancer *balancer.Balancer
  34. }
  35. func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
  36. mqBroker = &MessageQueueBroker{
  37. option: option,
  38. grpcDialOption: grpcDialOption,
  39. MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
  40. filers: make(map[pb.ServerAddress]struct{}),
  41. localTopicManager: topic.NewLocalTopicManager(),
  42. Balancer: balancer.NewBalancer(),
  43. }
  44. mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
  45. go mqBroker.MasterClient.KeepConnectedToMaster()
  46. existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
  47. for _, newNode := range existingNodes {
  48. mqBroker.OnBrokerUpdate(newNode, time.Now())
  49. }
  50. return mqBroker, nil
  51. }
  52. func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  53. if update.NodeType != cluster.FilerType {
  54. return
  55. }
  56. address := pb.ServerAddress(update.Address)
  57. if update.IsAdd {
  58. broker.filers[address] = struct{}{}
  59. if broker.currentFiler == "" {
  60. broker.currentFiler = address
  61. }
  62. } else {
  63. delete(broker.filers, address)
  64. if broker.currentFiler == address {
  65. for filer := range broker.filers {
  66. broker.currentFiler = filer
  67. break
  68. }
  69. }
  70. }
  71. }
  72. func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
  73. return broker.currentFiler
  74. }
  75. func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  76. return pb.WithFilerClient(streamingMode, 0, broker.GetFiler(), broker.grpcDialOption, fn)
  77. }
  78. func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
  79. return location.Url
  80. }
  81. func (broker *MessageQueueBroker) GetDataCenter() string {
  82. return ""
  83. }
  84. func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
  85. return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  86. return fn(client)
  87. })
  88. }
  89. func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
  90. return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  91. return fn(client)
  92. })
  93. }