You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
4.2 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
7 months ago
10 months ago
  1. package sub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "io"
  10. "reflect"
  11. )
  12. func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
  13. // connect to the partition broker
  14. return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  15. subscribeClient, err := client.SubscribeMessage(context.Background())
  16. if err != nil {
  17. return fmt.Errorf("create subscribe client: %v", err)
  18. }
  19. perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
  20. if perPartitionConcurrency <= 0 {
  21. perPartitionConcurrency = 1
  22. }
  23. var stopTsNs int64
  24. if !sub.ContentConfig.StopTime.IsZero() {
  25. stopTsNs = sub.ContentConfig.StopTime.UnixNano()
  26. }
  27. if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
  28. Message: &mq_pb.SubscribeMessageRequest_Init{
  29. Init: &mq_pb.SubscribeMessageRequest_InitMessage{
  30. ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
  31. ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
  32. Topic: sub.ContentConfig.Topic.ToPbTopic(),
  33. PartitionOffset: &mq_pb.PartitionOffset{
  34. Partition: assigned.Partition,
  35. StartTsNs: sub.ContentConfig.StartTime.UnixNano(),
  36. StopTsNs: stopTsNs,
  37. StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
  38. },
  39. Filter: sub.ContentConfig.Filter,
  40. FollowerBroker: assigned.FollowerBroker,
  41. Concurrency: perPartitionConcurrency,
  42. },
  43. },
  44. }); err != nil {
  45. glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
  46. }
  47. glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
  48. if sub.OnCompletionFunc != nil {
  49. defer sub.OnCompletionFunc()
  50. }
  51. type KeyedOffset struct {
  52. Key []byte
  53. Offset int64
  54. }
  55. partitionOffsetChan := make(chan KeyedOffset, 1024)
  56. defer func() {
  57. close(partitionOffsetChan)
  58. }()
  59. executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
  60. go func() {
  61. for {
  62. select {
  63. case <-stopCh:
  64. subscribeClient.CloseSend()
  65. return
  66. case ack, ok := <-partitionOffsetChan:
  67. if !ok {
  68. subscribeClient.CloseSend()
  69. return
  70. }
  71. subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
  72. Message: &mq_pb.SubscribeMessageRequest_Ack{
  73. Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
  74. Key: ack.Key,
  75. Sequence: ack.Offset,
  76. },
  77. },
  78. })
  79. }
  80. }
  81. }()
  82. var lastErr error
  83. for lastErr == nil {
  84. // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  85. resp, err := subscribeClient.Recv()
  86. if err != nil {
  87. return fmt.Errorf("subscribe recv: %v", err)
  88. }
  89. if resp.Message == nil {
  90. glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  91. continue
  92. }
  93. switch m := resp.Message.(type) {
  94. case *mq_pb.SubscribeMessageResponse_Data:
  95. if m.Data.Ctrl != nil {
  96. glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
  97. continue
  98. }
  99. if len(m.Data.Key) == 0 {
  100. fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
  101. continue
  102. }
  103. executors.Execute(func() {
  104. processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
  105. if processErr == nil {
  106. partitionOffsetChan <- KeyedOffset{
  107. Key: m.Data.Key,
  108. Offset: m.Data.TsNs,
  109. }
  110. } else {
  111. lastErr = processErr
  112. }
  113. })
  114. case *mq_pb.SubscribeMessageResponse_Ctrl:
  115. // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
  116. if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
  117. return io.EOF
  118. }
  119. }
  120. }
  121. return lastErr
  122. })
  123. }