You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
2.0 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "google.golang.org/grpc/codes"
  8. "google.golang.org/grpc/status"
  9. "log"
  10. )
  11. // broker => publish client
  12. // send init message
  13. // save the publishing client
  14. func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) {
  15. log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition)
  16. grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
  17. if err != nil {
  18. return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
  19. }
  20. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  21. stream, err := brokerClient.Publish(context.Background())
  22. if err != nil {
  23. return publishClient, fmt.Errorf("create publish client: %v", err)
  24. }
  25. publishClient = &PublishClient{
  26. SeaweedMessaging_PublishClient: stream,
  27. Broker: brokerAddress,
  28. }
  29. if err = publishClient.Send(&mq_pb.PublishRequest{
  30. Message: &mq_pb.PublishRequest_Init{
  31. Init: &mq_pb.PublishRequest_InitMessage{
  32. Topic: &mq_pb.Topic{
  33. Namespace: p.namespace,
  34. Name: p.topic,
  35. },
  36. Partition: &mq_pb.Partition{
  37. RingSize: partition.RingSize,
  38. RangeStart: partition.RangeStart,
  39. RangeStop: partition.RangeStop,
  40. },
  41. AckInterval: 128,
  42. },
  43. },
  44. }); err != nil {
  45. return publishClient, fmt.Errorf("send init message: %v", err)
  46. }
  47. resp, err := stream.Recv()
  48. if err != nil {
  49. return publishClient, fmt.Errorf("recv init response: %v", err)
  50. }
  51. if resp.Error != "" {
  52. return publishClient, fmt.Errorf("init response error: %v", resp.Error)
  53. }
  54. go func() {
  55. for {
  56. _, err := publishClient.Recv()
  57. if err != nil {
  58. e, ok := status.FromError(err)
  59. if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
  60. return
  61. }
  62. publishClient.Err = err
  63. fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
  64. return
  65. }
  66. }
  67. }()
  68. return publishClient, nil
  69. }