You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
3.3 KiB

  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "google.golang.org/grpc/codes"
  8. "google.golang.org/grpc/status"
  9. )
  10. func (p *TopicPublisher) doLookup(brokerAddress string) error {
  11. err := pb.WithBrokerGrpcClient(true,
  12. brokerAddress,
  13. p.grpcDialOption,
  14. func(client mq_pb.SeaweedMessagingClient) error {
  15. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  16. &mq_pb.LookupTopicBrokersRequest{
  17. Topic: &mq_pb.Topic{
  18. Namespace: p.namespace,
  19. Name: p.topic,
  20. },
  21. IsForPublish: true,
  22. })
  23. if err != nil {
  24. return err
  25. }
  26. for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
  27. // partition => publishClient
  28. publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
  29. if err != nil {
  30. return err
  31. }
  32. for redirectTo != "" {
  33. publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo)
  34. if err != nil {
  35. return err
  36. }
  37. }
  38. p.partition2Broker.Insert(
  39. brokerPartitionAssignment.Partition.RangeStart,
  40. brokerPartitionAssignment.Partition.RangeStop,
  41. publishClient)
  42. }
  43. return nil
  44. })
  45. if err != nil {
  46. return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
  47. }
  48. return nil
  49. }
  50. // broker => publish client
  51. // send init message
  52. // save the publishing client
  53. func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) {
  54. grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
  55. if err != nil {
  56. return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
  57. }
  58. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  59. stream, err := brokerClient.Publish(context.Background())
  60. if err != nil {
  61. return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err)
  62. }
  63. publishClient = &PublishClient{
  64. SeaweedMessaging_PublishClient: stream,
  65. Broker: brokerAddress,
  66. }
  67. if err = publishClient.Send(&mq_pb.PublishRequest{
  68. Message: &mq_pb.PublishRequest_Init{
  69. Init: &mq_pb.PublishRequest_InitMessage{
  70. Topic: &mq_pb.Topic{
  71. Namespace: p.namespace,
  72. Name: p.topic,
  73. },
  74. Partition: &mq_pb.Partition{
  75. RingSize: partition.RingSize,
  76. RangeStart: partition.RangeStart,
  77. RangeStop: partition.RangeStop,
  78. },
  79. AckInterval: 128,
  80. },
  81. },
  82. }); err != nil {
  83. return publishClient, redirectTo, fmt.Errorf("send init message: %v", err)
  84. }
  85. resp, err := stream.Recv()
  86. if err != nil {
  87. return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err)
  88. }
  89. if resp.Error != "" {
  90. return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error)
  91. }
  92. if resp.RedirectToBroker != "" {
  93. redirectTo = resp.RedirectToBroker
  94. return publishClient, redirectTo, nil
  95. }
  96. go func() {
  97. for {
  98. _, err := publishClient.Recv()
  99. if err != nil {
  100. e, ok := status.FromError(err)
  101. if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
  102. return
  103. }
  104. publishClient.Err = err
  105. fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
  106. return
  107. }
  108. }
  109. }()
  110. return publishClient, redirectTo, nil
  111. }