You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

255 lines
8.2 KiB

2 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
7 days ago
2 years ago
10 months ago
2 years ago
2 years ago
2 years ago
2 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
12 months ago
2 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
11 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
9 months ago
10 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
9 months ago
10 months ago
10 months ago
10 months ago
9 months ago
7 days ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
7 days ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
2 years ago
10 months ago
2 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
2 years ago
10 months ago
9 months ago
  1. package broker
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  14. "io"
  15. "time"
  16. )
  17. func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
  18. req, err := stream.Recv()
  19. if err != nil {
  20. return err
  21. }
  22. if req.GetInit() == nil {
  23. glog.Errorf("missing init message")
  24. return fmt.Errorf("missing init message")
  25. }
  26. ctx := stream.Context()
  27. clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
  28. t := topic.FromPbTopic(req.GetInit().Topic)
  29. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  30. glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
  31. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
  32. if getOrGenErr != nil {
  33. return getOrGenErr
  34. }
  35. localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
  36. glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
  37. isConnected := true
  38. sleepIntervalCount := 0
  39. var counter int64
  40. defer func() {
  41. isConnected = false
  42. localTopicPartition.Subscribers.RemoveSubscriber(clientName)
  43. glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  44. if localTopicPartition.MaybeShutdownLocalPartition() {
  45. b.localTopicManager.RemoveLocalPartition(t, partition)
  46. }
  47. }()
  48. startPosition := b.getRequestPosition(req.GetInit())
  49. imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().SlidingWindowSize))
  50. // connect to the follower
  51. var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
  52. glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
  53. if req.GetInit().FollowerBroker != "" {
  54. follower := req.GetInit().FollowerBroker
  55. if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
  56. return fmt.Errorf("fail to dial %s: %v", follower, err)
  57. } else {
  58. defer func() {
  59. println("closing SubscribeFollowMe connection", follower)
  60. if subscribeFollowMeStream != nil {
  61. subscribeFollowMeStream.CloseSend()
  62. }
  63. // followerGrpcConnection.Close()
  64. }()
  65. followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
  66. if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
  67. return fmt.Errorf("fail to subscribe to %s: %v", follower, err)
  68. } else {
  69. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  70. Message: &mq_pb.SubscribeFollowMeRequest_Init{
  71. Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
  72. Topic: req.GetInit().Topic,
  73. Partition: req.GetInit().GetPartitionOffset().Partition,
  74. ConsumerGroup: req.GetInit().ConsumerGroup,
  75. },
  76. },
  77. }); err != nil {
  78. return fmt.Errorf("fail to send init to %s: %v", follower, err)
  79. }
  80. }
  81. }
  82. glog.V(0).Infof("follower %s connected", follower)
  83. }
  84. go func() {
  85. var lastOffset int64
  86. for {
  87. ack, err := stream.Recv()
  88. if err != nil {
  89. if err == io.EOF {
  90. // the client has called CloseSend(). This is to ack the close.
  91. stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
  92. Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
  93. IsEndOfStream: true,
  94. },
  95. }})
  96. break
  97. }
  98. glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
  99. break
  100. }
  101. if ack.GetAck().Key == nil {
  102. // skip ack for control messages
  103. continue
  104. }
  105. imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
  106. currentLastOffset := imt.GetOldestAckedTimestamp()
  107. // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
  108. if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
  109. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  110. Message: &mq_pb.SubscribeFollowMeRequest_Ack{
  111. Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
  112. TsNs: currentLastOffset,
  113. },
  114. },
  115. }); err != nil {
  116. glog.Errorf("Error sending ack to follower: %v", err)
  117. break
  118. }
  119. lastOffset = currentLastOffset
  120. // fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset)
  121. }
  122. }
  123. if lastOffset > 0 {
  124. glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
  125. if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
  126. glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
  127. }
  128. }
  129. if subscribeFollowMeStream != nil {
  130. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  131. Message: &mq_pb.SubscribeFollowMeRequest_Close{
  132. Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
  133. },
  134. }); err != nil {
  135. if err != io.EOF {
  136. glog.Errorf("Error sending close to follower: %v", err)
  137. }
  138. }
  139. }
  140. }()
  141. return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
  142. if !isConnected {
  143. return false
  144. }
  145. sleepIntervalCount++
  146. if sleepIntervalCount > 32 {
  147. sleepIntervalCount = 32
  148. }
  149. time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
  150. // Check if the client has disconnected by monitoring the context
  151. select {
  152. case <-ctx.Done():
  153. err := ctx.Err()
  154. if errors.Is(err, context.Canceled) {
  155. // Client disconnected
  156. return false
  157. }
  158. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  159. return false
  160. default:
  161. // Continue processing the request
  162. }
  163. return true
  164. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  165. // reset the sleep interval count
  166. sleepIntervalCount = 0
  167. for imt.IsInflight(logEntry.Key) {
  168. time.Sleep(137 * time.Millisecond)
  169. // Check if the client has disconnected by monitoring the context
  170. select {
  171. case <-ctx.Done():
  172. err := ctx.Err()
  173. if err == context.Canceled {
  174. // Client disconnected
  175. return false, nil
  176. }
  177. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  178. return false, nil
  179. default:
  180. // Continue processing the request
  181. }
  182. }
  183. if logEntry.Key != nil {
  184. imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
  185. }
  186. if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
  187. Data: &mq_pb.DataMessage{
  188. Key: logEntry.Key,
  189. Value: logEntry.Data,
  190. TsNs: logEntry.TsNs,
  191. },
  192. }}); err != nil {
  193. glog.Errorf("Error sending data: %v", err)
  194. return false, err
  195. }
  196. counter++
  197. return false, nil
  198. })
  199. }
  200. func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) {
  201. if initMessage == nil {
  202. return
  203. }
  204. offset := initMessage.GetPartitionOffset()
  205. offsetType := initMessage.OffsetType
  206. // reset to earliest or latest
  207. if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST {
  208. startPosition = log_buffer.NewMessagePosition(1, -3)
  209. return
  210. }
  211. if offsetType == schema_pb.OffsetType_RESET_TO_LATEST {
  212. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
  213. return
  214. }
  215. // use the exact timestamp
  216. if offsetType == schema_pb.OffsetType_EXACT_TS_NS {
  217. startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
  218. return
  219. }
  220. // try to resume
  221. if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
  222. glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
  223. startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
  224. return
  225. }
  226. if offsetType == schema_pb.OffsetType_RESUME_OR_EARLIEST {
  227. startPosition = log_buffer.NewMessagePosition(1, -5)
  228. } else if offsetType == schema_pb.OffsetType_RESUME_OR_LATEST {
  229. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -6)
  230. }
  231. return
  232. }