You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

295 lines
9.0 KiB

1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
12 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  10. "sync/atomic"
  11. "time"
  12. )
  13. func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) {
  14. ctx := stream.Context()
  15. clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
  16. t := topic.FromPbTopic(req.GetInit().Topic)
  17. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  18. glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
  19. waitIntervalCount := 0
  20. var localTopicPartition *topic.LocalPartition
  21. for localTopicPartition == nil {
  22. localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
  23. if err != nil {
  24. glog.V(1).Infof("topic %v partition %v not setup", t, partition)
  25. }
  26. if localTopicPartition != nil {
  27. break
  28. }
  29. waitIntervalCount++
  30. if waitIntervalCount > 10 {
  31. waitIntervalCount = 10
  32. }
  33. time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond)
  34. // Check if the client has disconnected by monitoring the context
  35. select {
  36. case <-ctx.Done():
  37. err := ctx.Err()
  38. if err == context.Canceled {
  39. // Client disconnected
  40. return nil
  41. }
  42. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  43. return nil
  44. default:
  45. // Continue processing the request
  46. }
  47. }
  48. localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
  49. glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
  50. isConnected := true
  51. sleepIntervalCount := 0
  52. var counter int64
  53. defer func() {
  54. isConnected = false
  55. localTopicPartition.Subscribers.RemoveSubscriber(clientName)
  56. glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  57. if localTopicPartition.MaybeShutdownLocalPartition() {
  58. b.localTopicManager.RemoveTopicPartition(t, partition)
  59. }
  60. }()
  61. var startPosition log_buffer.MessagePosition
  62. if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
  63. startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
  64. }
  65. return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
  66. if !isConnected {
  67. return false
  68. }
  69. sleepIntervalCount++
  70. if sleepIntervalCount > 32 {
  71. sleepIntervalCount = 32
  72. }
  73. time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
  74. // Check if the client has disconnected by monitoring the context
  75. select {
  76. case <-ctx.Done():
  77. err := ctx.Err()
  78. if err == context.Canceled {
  79. // Client disconnected
  80. return false
  81. }
  82. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  83. return false
  84. default:
  85. // Continue processing the request
  86. }
  87. return true
  88. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  89. // reset the sleep interval count
  90. sleepIntervalCount = 0
  91. if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
  92. Data: &mq_pb.DataMessage{
  93. Key: logEntry.Key,
  94. Value: logEntry.Data,
  95. TsNs: logEntry.TsNs,
  96. },
  97. }}); err != nil {
  98. glog.Errorf("Error sending data: %v", err)
  99. return false, err
  100. }
  101. counter++
  102. return false, nil
  103. })
  104. }
  105. func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
  106. if offset.StartTsNs != 0 {
  107. startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
  108. }
  109. if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
  110. startPosition = log_buffer.NewMessagePosition(1, -3)
  111. } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
  112. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
  113. }
  114. return
  115. }
  116. func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
  117. ctx := stream.Context()
  118. clientName := req.GetInit().ConsumerId
  119. t := topic.FromPbTopic(req.GetInit().Topic)
  120. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  121. glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
  122. waitIntervalCount := 0
  123. var localTopicPartition *topic.LocalPartition
  124. for localTopicPartition == nil {
  125. localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
  126. if err != nil {
  127. glog.V(1).Infof("topic %v partition %v not setup", t, partition)
  128. }
  129. if localTopicPartition != nil {
  130. break
  131. }
  132. waitIntervalCount++
  133. if waitIntervalCount > 32 {
  134. waitIntervalCount = 32
  135. }
  136. time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
  137. // Check if the client has disconnected by monitoring the context
  138. select {
  139. case <-ctx.Done():
  140. err := ctx.Err()
  141. if err == context.Canceled {
  142. // Client disconnected
  143. return nil
  144. }
  145. glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
  146. return nil
  147. default:
  148. // Continue processing the request
  149. }
  150. }
  151. // set the current follower id
  152. followerId := req.GetInit().FollowerId
  153. atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
  154. glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
  155. var counter int64
  156. defer func() {
  157. glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  158. }()
  159. // send first hello message
  160. // to indicate the follower is connected
  161. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  162. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  163. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
  164. },
  165. })
  166. var startPosition log_buffer.MessagePosition
  167. if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
  168. startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
  169. }
  170. var prevFlushTsNs int64
  171. _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
  172. // wait for the log buffer to be ready
  173. localTopicPartition.ListenersLock.Lock()
  174. atomic.AddInt64(&localTopicPartition.ListenersWaits, 1)
  175. localTopicPartition.ListenersCond.Wait()
  176. atomic.AddInt64(&localTopicPartition.ListenersWaits, -1)
  177. localTopicPartition.ListenersLock.Unlock()
  178. if localTopicPartition.LogBuffer.IsStopping() {
  179. newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
  180. glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
  181. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  182. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  183. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  184. FollowerChangedToId: newFollowerId,
  185. },
  186. },
  187. })
  188. return false
  189. }
  190. // Check if the client has disconnected by monitoring the context
  191. select {
  192. case <-ctx.Done():
  193. err := ctx.Err()
  194. if err == context.Canceled {
  195. // Client disconnected
  196. return false
  197. }
  198. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  199. return false
  200. default:
  201. // Continue processing the request
  202. }
  203. // send the last flushed sequence
  204. flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
  205. if flushTsNs != prevFlushTsNs {
  206. prevFlushTsNs = flushTsNs
  207. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  208. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  209. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  210. FlushedSequence: flushTsNs,
  211. },
  212. },
  213. })
  214. }
  215. return true
  216. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  217. // check the follower id
  218. newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
  219. if newFollowerId != followerId {
  220. glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
  221. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  222. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  223. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  224. FollowerChangedToId: newFollowerId,
  225. },
  226. },
  227. })
  228. return true, nil
  229. }
  230. // send the last flushed sequence
  231. flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
  232. if flushTsNs != prevFlushTsNs {
  233. prevFlushTsNs = flushTsNs
  234. stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  235. Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
  236. Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
  237. FlushedSequence: flushTsNs,
  238. },
  239. },
  240. })
  241. }
  242. // send the log entry
  243. if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
  244. Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
  245. Data: &mq_pb.DataMessage{
  246. Key: logEntry.Key,
  247. Value: logEntry.Data,
  248. TsNs: logEntry.TsNs,
  249. },
  250. }}); err != nil {
  251. glog.Errorf("Error sending setup response: %v", err)
  252. return false, err
  253. }
  254. counter++
  255. return false, nil
  256. })
  257. return err
  258. }