You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
9.0 KiB

1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
11 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
11 months ago
11 months ago
11 months ago
11 months ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  10. "sync/atomic"
  11. "time"
  12. )
  13. func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) {
  14. ctx := stream.Context()
  15. clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
  16. t := topic.FromPbTopic(req.GetInit().Topic)
  17. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  18. glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
  19. waitIntervalCount := 0
  20. var localTopicPartition *topic.LocalPartition
  21. for localTopicPartition == nil {
  22. localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
  23. if err != nil {
  24. glog.V(1).Infof("topic %v partition %v not setup", t, partition)
  25. }
  26. if localTopicPartition != nil {
  27. break
  28. }
  29. waitIntervalCount++
  30. if waitIntervalCount > 10 {
  31. waitIntervalCount = 10
  32. }
  33. time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond)
  34. // Check if the client has disconnected by monitoring the context
  35. select {
  36. case <-ctx.Done():
  37. err := ctx.Err()
  38. if err == context.Canceled {
  39. // Client disconnected
  40. return nil
  41. }
  42. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  43. return nil
  44. default:
  45. // Continue processing the request
  46. }
  47. }
  48. localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
  49. glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
  50. isConnected := true
  51. sleepIntervalCount := 0
  52. var counter int64
  53. defer func() {
  54. isConnected = false
  55. localTopicPartition.Subscribers.RemoveSubscriber(clientName)
  56. glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  57. if localTopicPartition.MaybeShutdownLocalPartition() {
  58. b.localTopicManager.RemoveTopicPartition(t, partition)
  59. }
  60. }()
  61. var startPosition log_buffer.MessagePosition
  62. if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
  63. startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
  64. }
  65. return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
  66. if !isConnected {
  67. return false
  68. }
  69. sleepIntervalCount++
  70. if sleepIntervalCount > 32 {
  71. sleepIntervalCount = 32
  72. }
  73. time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
  74. // Check if the client has disconnected by monitoring the context
  75. select {
  76. case <-ctx.Done():
  77. err := ctx.Err()
  78. if err == context.Canceled {
  79. // Client disconnected
  80. return false
  81. }
  82. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  83. return false
  84. default:
  85. // Continue processing the request
  86. }
  87. return true
  88. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  89. // reset the sleep interval count
  90. sleepIntervalCount = 0
  91. if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
  92. Data: &mq_pb.DataMessage{
  93. Key: logEntry.Key,
  94. Value: logEntry.Data,
  95. TsNs: logEntry.TsNs,
  96. },
  97. }}); err != nil {
  98. glog.Errorf("Error sending data: %v", err)
  99. return false, err
  100. }
  101. counter++
  102. return false, nil
  103. })
  104. }
  105. func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
  106. if offset.StartTsNs != 0 {
  107. startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
  108. }
  109. if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
  110. startPosition = log_buffer.NewMessagePosition(1, -3)
  111. } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
  112. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
  113. }
  114. return
  115. }
  116. func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
  117. ctx := stream.Context()
  118. clientName := req.GetInit().ConsumerId
  119. t := topic.FromPbTopic(req.GetInit().Topic)
  120. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  121. glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
  122. waitIntervalCount := 0
  123. var localTopicPartition *topic.LocalPartition
  124. for localTopicPartition == nil {
  125. localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
  126. if err != nil {
  127. glog.V(1).Infof("topic %v partition %v not setup", t, partition)
  128. }
  129. if localTopicPartition != nil {
  130. break
  131. }
  132. waitIntervalCount++
  133. if waitIntervalCount > 32 {
  134. waitIntervalCount = 32
  135. }
  136. time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
  137. // Check if the client has disconnected by monitoring the context
  138. select {
  139. case <-ctx.Done():
  140. err := ctx.Err()
  141. if err == context.Canceled {
  142. // Client disconnected
  143. return nil
  144. }
  145. glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
  146. return nil
  147. default:
  148. // Continue processing the request
  149. }
  150. }
  151. // set the current follower id
  152. followerId := req.GetInit().FollowerId
  153. atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
  154. glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
  155. isConnected := true
  156. sleepIntervalCount := 0
  157. var counter int64
  158. defer func() {
  159. isConnected = false
  160. glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  161. }()
  162. // send first hello message
  163. // to indicate the follower is connected
  164. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  165. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  166. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
  167. },
  168. })
  169. var startPosition log_buffer.MessagePosition
  170. if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
  171. startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
  172. }
  173. var prevFlushTsNs int64
  174. _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
  175. if !isConnected {
  176. return false
  177. }
  178. sleepIntervalCount++
  179. if sleepIntervalCount > 32 {
  180. sleepIntervalCount = 32
  181. }
  182. time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
  183. if localTopicPartition.LogBuffer.IsStopping() {
  184. newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
  185. glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
  186. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  187. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  188. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  189. FollowerChangedToId: newFollowerId,
  190. },
  191. },
  192. })
  193. return false
  194. }
  195. // Check if the client has disconnected by monitoring the context
  196. select {
  197. case <-ctx.Done():
  198. err := ctx.Err()
  199. if err == context.Canceled {
  200. // Client disconnected
  201. return false
  202. }
  203. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  204. return false
  205. default:
  206. // Continue processing the request
  207. }
  208. // send the last flushed sequence
  209. flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
  210. if flushTsNs != prevFlushTsNs {
  211. prevFlushTsNs = flushTsNs
  212. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  213. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  214. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  215. FlushedSequence: flushTsNs,
  216. },
  217. },
  218. })
  219. }
  220. return true
  221. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  222. // reset the sleep interval count
  223. sleepIntervalCount = 0
  224. // check the follower id
  225. newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
  226. if newFollowerId != followerId {
  227. glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
  228. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  229. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  230. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  231. FollowerChangedToId: newFollowerId,
  232. },
  233. },
  234. })
  235. return true, nil
  236. }
  237. // send the last flushed sequence
  238. flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
  239. if flushTsNs != prevFlushTsNs {
  240. prevFlushTsNs = flushTsNs
  241. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  242. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  243. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  244. FlushedSequence: flushTsNs,
  245. },
  246. },
  247. })
  248. }
  249. // send the log entry
  250. if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  251. Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
  252. Data: &mq_pb.DataMessage{
  253. Key: logEntry.Key,
  254. Value: logEntry.Data,
  255. TsNs: logEntry.TsNs,
  256. },
  257. }}); err != nil {
  258. glog.Errorf("Error sending setup response: %v", err)
  259. return false, err
  260. }
  261. counter++
  262. return false, nil
  263. })
  264. return err
  265. }