You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
7.8 KiB

3 years ago
3 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
3 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
11 months ago
11 months ago
11 months ago
11 months ago
1 year ago
1 year ago
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
3 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "google.golang.org/grpc/peer"
  10. "io"
  11. "math/rand"
  12. "net"
  13. )
  14. // PUB
  15. // 1. gRPC API to configure a topic
  16. // 1.1 create a topic with existing partition count
  17. // 1.2 assign partitions to brokers
  18. // 2. gRPC API to lookup topic partitions
  19. // 3. gRPC API to publish by topic partitions
  20. // SUB
  21. // 1. gRPC API to lookup a topic partitions
  22. // Re-balance topic partitions for publishing
  23. // 1. collect stats from all the brokers
  24. // 2. Rebalance and configure new generation of partitions on brokers
  25. // 3. Tell brokers to close current gneration of publishing.
  26. // Publishers needs to lookup again and publish to the new generation of partitions.
  27. // Re-balance topic partitions for subscribing
  28. // 1. collect stats from all the brokers
  29. // Subscribers needs to listen for new partitions and connect to the brokers.
  30. // Each subscription may not get data. It can act as a backup.
  31. func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
  32. // 1. write to the volume server
  33. // 2. find the topic metadata owning filer
  34. // 3. write to the filer
  35. req, err := stream.Recv()
  36. if err != nil {
  37. return err
  38. }
  39. response := &mq_pb.PublishMessageResponse{}
  40. // TODO check whether current broker should be the leader for the topic partition
  41. ackInterval := 1
  42. initMessage := req.GetInit()
  43. if initMessage == nil {
  44. response.Error = fmt.Sprintf("missing init message")
  45. glog.Errorf("missing init message")
  46. return stream.Send(response)
  47. }
  48. // get or generate a local partition
  49. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  50. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
  51. if getOrGenErr != nil {
  52. response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
  53. glog.Errorf("topic %v not found: %v", t, getOrGenErr)
  54. return stream.Send(response)
  55. }
  56. ackInterval = int(initMessage.AckInterval)
  57. // connect to follower brokers
  58. if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 {
  59. follower := initMessage.FollowerBrokers[0]
  60. ctx := stream.Context()
  61. localTopicPartition.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption)
  62. if err != nil {
  63. response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err)
  64. glog.Errorf("fail to dial %s: %v", follower, err)
  65. return stream.Send(response)
  66. }
  67. followerClient := mq_pb.NewSeaweedMessagingClient(localTopicPartition.FollowerGrpcConnection)
  68. localTopicPartition.FollowerStream, err = followerClient.PublishFollowMe(ctx)
  69. if err != nil {
  70. response.Error = fmt.Sprintf("fail to create publish client: %v", err)
  71. glog.Errorf("fail to create publish client: %v", err)
  72. return stream.Send(response)
  73. }
  74. if err = localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
  75. Message: &mq_pb.PublishFollowMeRequest_Init{
  76. Init: &mq_pb.PublishFollowMeRequest_InitMessage{
  77. Topic: initMessage.Topic,
  78. Partition: initMessage.Partition,
  79. },
  80. },
  81. }); err != nil {
  82. return err
  83. }
  84. // start receiving ack from follower
  85. go func() {
  86. defer func() {
  87. println("stop receiving ack from follower")
  88. }()
  89. for {
  90. ack, err := localTopicPartition.FollowerStream.Recv()
  91. if err != nil {
  92. glog.Errorf("Error receiving follower ack: %v", err)
  93. return
  94. }
  95. println("recv ack", ack.AckTsNs)
  96. if err := stream.Send(&mq_pb.PublishMessageResponse{
  97. AckSequence: ack.AckTsNs,
  98. }); err != nil {
  99. glog.Errorf("Error sending publisher ack %v: %v", ack, err)
  100. return
  101. }
  102. }
  103. }()
  104. }
  105. // process each published messages
  106. clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
  107. localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
  108. ackCounter := 0
  109. var ackSequence int64
  110. defer func() {
  111. // remove the publisher
  112. localTopicPartition.Publishers.RemovePublisher(clientName)
  113. glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
  114. if localTopicPartition.MaybeShutdownLocalPartition() {
  115. if localTopicPartition.FollowerStream != nil {
  116. // send close to the follower
  117. if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
  118. Message: &mq_pb.PublishFollowMeRequest_Close{
  119. Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
  120. },
  121. }); followErr != nil {
  122. glog.Errorf("Error closing follower stream: %v", followErr)
  123. }
  124. println("closing grpcConnection to follower")
  125. localTopicPartition.FollowerGrpcConnection.Close()
  126. }
  127. b.localTopicManager.RemoveTopicPartition(t, p)
  128. glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
  129. }
  130. }()
  131. // send a hello message
  132. stream.Send(&mq_pb.PublishMessageResponse{})
  133. var receivedSequence, acknowledgedSequence int64
  134. defer func() {
  135. if localTopicPartition.FollowerStream != nil {
  136. //if err := followerStream.CloseSend(); err != nil {
  137. // glog.Errorf("Error closing follower stream: %v", err)
  138. //}
  139. } else {
  140. if acknowledgedSequence < receivedSequence {
  141. acknowledgedSequence = receivedSequence
  142. response := &mq_pb.PublishMessageResponse{
  143. AckSequence: acknowledgedSequence,
  144. }
  145. if err := stream.Send(response); err != nil {
  146. glog.Errorf("Error sending response %v: %v", response, err)
  147. }
  148. }
  149. }
  150. }()
  151. // process each published messages
  152. for {
  153. // receive a message
  154. req, err := stream.Recv()
  155. if err != nil {
  156. if err == io.EOF {
  157. break
  158. }
  159. glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
  160. return err
  161. }
  162. // Process the received message
  163. dataMessage := req.GetData()
  164. if dataMessage == nil {
  165. continue
  166. }
  167. // send to the local partition
  168. localTopicPartition.Publish(dataMessage)
  169. receivedSequence = dataMessage.TsNs
  170. // maybe send to the follower
  171. if localTopicPartition.FollowerStream != nil {
  172. println("recv", string(dataMessage.Key), dataMessage.TsNs)
  173. if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
  174. Message: &mq_pb.PublishFollowMeRequest_Data{
  175. Data: dataMessage,
  176. },
  177. }); followErr != nil {
  178. return followErr
  179. }
  180. } else {
  181. ackCounter++
  182. if ackCounter >= ackInterval {
  183. ackCounter = 0
  184. // send back the ack directly
  185. acknowledgedSequence = receivedSequence
  186. response := &mq_pb.PublishMessageResponse{
  187. AckSequence: acknowledgedSequence,
  188. }
  189. if err := stream.Send(response); err != nil {
  190. glog.Errorf("Error sending response %v: %v", response, err)
  191. }
  192. }
  193. }
  194. }
  195. if localTopicPartition.FollowerStream != nil {
  196. // send close to the follower
  197. if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
  198. Message: &mq_pb.PublishFollowMeRequest_Close{
  199. Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
  200. },
  201. }); followErr != nil {
  202. return followErr
  203. }
  204. println("closing follower stream")
  205. //if err := followerStream.CloseSend(); err != nil {
  206. // glog.Errorf("Error closing follower stream: %v", err)
  207. //}
  208. }
  209. glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
  210. return nil
  211. }
  212. // duplicated from master_grpc_server.go
  213. func findClientAddress(ctx context.Context) string {
  214. // fmt.Printf("FromContext %+v\n", ctx)
  215. pr, ok := peer.FromContext(ctx)
  216. if !ok {
  217. glog.Error("failed to get peer from ctx")
  218. return ""
  219. }
  220. if pr.Addr == net.Addr(nil) {
  221. glog.Error("failed to get peer address")
  222. return ""
  223. }
  224. return pr.Addr.String()
  225. }