You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
4.0 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
7 months ago
10 months ago
  1. package sub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "io"
  10. "reflect"
  11. "time"
  12. )
  13. type KeyedOffset struct {
  14. Key []byte
  15. Offset int64
  16. }
  17. func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error {
  18. // connect to the partition broker
  19. return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  20. subscribeClient, err := client.SubscribeMessage(context.Background())
  21. if err != nil {
  22. return fmt.Errorf("create subscribe client: %v", err)
  23. }
  24. slidingWindowSize := sub.SubscriberConfig.SlidingWindowSize
  25. if slidingWindowSize <= 0 {
  26. slidingWindowSize = 1
  27. }
  28. po := findPartitionOffset(sub.ContentConfig.PartitionOffsets, assigned.Partition)
  29. if po == nil {
  30. po = &schema_pb.PartitionOffset{
  31. Partition: assigned.Partition,
  32. StartTsNs: time.Now().UnixNano(),
  33. StartType: schema_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
  34. }
  35. }
  36. if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
  37. Message: &mq_pb.SubscribeMessageRequest_Init{
  38. Init: &mq_pb.SubscribeMessageRequest_InitMessage{
  39. ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
  40. ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
  41. Topic: sub.ContentConfig.Topic.ToPbTopic(),
  42. PartitionOffset: po,
  43. Filter: sub.ContentConfig.Filter,
  44. FollowerBroker: assigned.FollowerBroker,
  45. SlidingWindowSize: slidingWindowSize,
  46. },
  47. },
  48. }); err != nil {
  49. glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
  50. }
  51. glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
  52. if sub.OnCompletionFunc != nil {
  53. defer sub.OnCompletionFunc()
  54. }
  55. go func() {
  56. for {
  57. select {
  58. case <-stopCh:
  59. subscribeClient.CloseSend()
  60. return
  61. case ack, ok := <-sub.PartitionOffsetChan:
  62. if !ok {
  63. subscribeClient.CloseSend()
  64. return
  65. }
  66. subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
  67. Message: &mq_pb.SubscribeMessageRequest_Ack{
  68. Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
  69. Key: ack.Key,
  70. Sequence: ack.Offset,
  71. },
  72. },
  73. })
  74. }
  75. }
  76. }()
  77. for {
  78. // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  79. resp, err := subscribeClient.Recv()
  80. if err != nil {
  81. return fmt.Errorf("subscribe recv: %v", err)
  82. }
  83. if resp.Message == nil {
  84. glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  85. continue
  86. }
  87. switch m := resp.Message.(type) {
  88. case *mq_pb.SubscribeMessageResponse_Data:
  89. if m.Data.Ctrl != nil {
  90. glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
  91. continue
  92. }
  93. if len(m.Data.Key) == 0 {
  94. fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
  95. continue
  96. }
  97. onDataMessageFn(m)
  98. case *mq_pb.SubscribeMessageResponse_Ctrl:
  99. // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
  100. if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
  101. return io.EOF
  102. }
  103. }
  104. }
  105. })
  106. }
  107. func findPartitionOffset(partitionOffsets []*schema_pb.PartitionOffset, partition *schema_pb.Partition) *schema_pb.PartitionOffset {
  108. for _, po := range partitionOffsets {
  109. if po.Partition == partition {
  110. return po
  111. }
  112. }
  113. return nil
  114. }