You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
3.1 KiB

2 years ago
2 years ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. func (p *TopicPublisher) doLookup(brokerAddress string) error {
  9. err := pb.WithBrokerGrpcClient(true,
  10. brokerAddress,
  11. p.grpcDialOption,
  12. func(client mq_pb.SeaweedMessagingClient) error {
  13. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  14. &mq_pb.LookupTopicBrokersRequest{
  15. Topic: &mq_pb.Topic{
  16. Namespace: p.namespace,
  17. Name: p.topic,
  18. },
  19. IsForPublish: true,
  20. })
  21. if err != nil {
  22. return err
  23. }
  24. for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
  25. // partition => publishClient
  26. publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
  27. if err != nil {
  28. return err
  29. }
  30. for redirectTo != "" {
  31. publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo)
  32. if err != nil {
  33. return err
  34. }
  35. }
  36. p.partition2Broker.Insert(
  37. brokerPartitionAssignment.Partition.RangeStart,
  38. brokerPartitionAssignment.Partition.RangeStop,
  39. publishClient)
  40. }
  41. return nil
  42. })
  43. if err != nil {
  44. return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
  45. }
  46. return nil
  47. }
  48. // broker => publish client
  49. // send init message
  50. // save the publishing client
  51. func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) {
  52. grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
  53. if err != nil {
  54. return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
  55. }
  56. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  57. stream, err := brokerClient.Publish(context.Background())
  58. if err != nil {
  59. return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err)
  60. }
  61. publishClient = &PublishClient{
  62. SeaweedMessaging_PublishClient: stream,
  63. Broker: brokerAddress,
  64. }
  65. if err = publishClient.Send(&mq_pb.PublishRequest{
  66. Message: &mq_pb.PublishRequest_Init{
  67. Init: &mq_pb.PublishRequest_InitMessage{
  68. Topic: &mq_pb.Topic{
  69. Namespace: p.namespace,
  70. Name: p.topic,
  71. },
  72. Partition: &mq_pb.Partition{
  73. RingSize: partition.RingSize,
  74. RangeStart: partition.RangeStart,
  75. RangeStop: partition.RangeStop,
  76. },
  77. },
  78. },
  79. }); err != nil {
  80. return publishClient, redirectTo, fmt.Errorf("send init message: %v", err)
  81. }
  82. resp, err := stream.Recv()
  83. if err != nil {
  84. return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err)
  85. }
  86. if resp.Error != "" {
  87. return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error)
  88. }
  89. if resp.RedirectToBroker != "" {
  90. redirectTo = resp.RedirectToBroker
  91. return publishClient, redirectTo, nil
  92. }
  93. go func() {
  94. for {
  95. _, err := publishClient.Recv()
  96. if err != nil {
  97. publishClient.Err = err
  98. fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
  99. return
  100. }
  101. }
  102. }()
  103. return publishClient, redirectTo, nil
  104. }