You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
7.9 KiB

3 years ago
3 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
3 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
11 months ago
11 months ago
11 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
3 years ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "google.golang.org/grpc/peer"
  10. "io"
  11. "math/rand"
  12. "net"
  13. )
  14. // PUB
  15. // 1. gRPC API to configure a topic
  16. // 1.1 create a topic with existing partition count
  17. // 1.2 assign partitions to brokers
  18. // 2. gRPC API to lookup topic partitions
  19. // 3. gRPC API to publish by topic partitions
  20. // SUB
  21. // 1. gRPC API to lookup a topic partitions
  22. // Re-balance topic partitions for publishing
  23. // 1. collect stats from all the brokers
  24. // 2. Rebalance and configure new generation of partitions on brokers
  25. // 3. Tell brokers to close current gneration of publishing.
  26. // Publishers needs to lookup again and publish to the new generation of partitions.
  27. // Re-balance topic partitions for subscribing
  28. // 1. collect stats from all the brokers
  29. // Subscribers needs to listen for new partitions and connect to the brokers.
  30. // Each subscription may not get data. It can act as a backup.
  31. func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
  32. // 1. write to the volume server
  33. // 2. find the topic metadata owning filer
  34. // 3. write to the filer
  35. req, err := stream.Recv()
  36. if err != nil {
  37. return err
  38. }
  39. response := &mq_pb.PublishMessageResponse{}
  40. // TODO check whether current broker should be the leader for the topic partition
  41. ackInterval := 1
  42. initMessage := req.GetInit()
  43. if initMessage == nil {
  44. response.Error = fmt.Sprintf("missing init message")
  45. glog.Errorf("missing init message")
  46. return stream.Send(response)
  47. }
  48. // get or generate a local partition
  49. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  50. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
  51. if getOrGenErr != nil {
  52. response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
  53. glog.Errorf("topic %v not found: %v", t, getOrGenErr)
  54. return stream.Send(response)
  55. }
  56. ackInterval = int(initMessage.AckInterval)
  57. // connect to follower brokers
  58. if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 {
  59. follower := initMessage.FollowerBrokers[0]
  60. ctx := stream.Context()
  61. localTopicPartition.GrpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption)
  62. if err != nil {
  63. response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err)
  64. glog.Errorf("fail to dial %s: %v", follower, err)
  65. return stream.Send(response)
  66. }
  67. followerClient := mq_pb.NewSeaweedMessagingClient(localTopicPartition.GrpcConnection)
  68. localTopicPartition.FollowerStream, err = followerClient.PublishFollowMe(ctx)
  69. if err != nil {
  70. response.Error = fmt.Sprintf("fail to create publish client: %v", err)
  71. glog.Errorf("fail to create publish client: %v", err)
  72. return stream.Send(response)
  73. }
  74. if err = localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
  75. Message: &mq_pb.PublishFollowMeRequest_Init{
  76. Init: &mq_pb.PublishFollowMeRequest_InitMessage{
  77. Topic: initMessage.Topic,
  78. Partition: initMessage.Partition,
  79. },
  80. },
  81. }); err != nil {
  82. return err
  83. }
  84. }
  85. // process each published messages
  86. clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
  87. localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
  88. ackCounter := 0
  89. var ackSequence int64
  90. defer func() {
  91. if localTopicPartition.FollowerStream == nil {
  92. // remove the publisher
  93. localTopicPartition.Publishers.RemovePublisher(clientName)
  94. glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
  95. if localTopicPartition.MaybeShutdownLocalPartition() {
  96. b.localTopicManager.RemoveTopicPartition(t, p)
  97. }
  98. }
  99. }()
  100. if localTopicPartition.FollowerStream != nil {
  101. go func() {
  102. defer func() {
  103. println("stop receiving ack from follower")
  104. // remove the publisher
  105. localTopicPartition.Publishers.RemovePublisher(clientName)
  106. glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
  107. if localTopicPartition.MaybeShutdownLocalPartition() {
  108. b.localTopicManager.RemoveTopicPartition(t, p)
  109. }
  110. println("closing grpcConnection to follower")
  111. localTopicPartition.GrpcConnection.Close()
  112. }()
  113. for {
  114. ack, err := localTopicPartition.FollowerStream.Recv()
  115. if err != nil {
  116. glog.Errorf("Error receiving response: %v", err)
  117. return
  118. }
  119. ackSequence = ack.AckTsNs
  120. println("recv ack", ack.AckTsNs)
  121. if err := stream.Send(&mq_pb.PublishMessageResponse{
  122. AckSequence: ack.AckTsNs,
  123. }); err != nil {
  124. glog.Errorf("Error sending response %v: %v", ack, err)
  125. return
  126. }
  127. }
  128. }()
  129. }
  130. // send a hello message
  131. stream.Send(&mq_pb.PublishMessageResponse{})
  132. var receivedSequence, acknowledgedSequence int64
  133. defer func() {
  134. if localTopicPartition.FollowerStream != nil {
  135. //if err := followerStream.CloseSend(); err != nil {
  136. // glog.Errorf("Error closing follower stream: %v", err)
  137. //}
  138. } else {
  139. if acknowledgedSequence < receivedSequence {
  140. acknowledgedSequence = receivedSequence
  141. response := &mq_pb.PublishMessageResponse{
  142. AckSequence: acknowledgedSequence,
  143. }
  144. if err := stream.Send(response); err != nil {
  145. glog.Errorf("Error sending response %v: %v", response, err)
  146. }
  147. }
  148. }
  149. }()
  150. // process each published messages
  151. for {
  152. // receive a message
  153. req, err := stream.Recv()
  154. if err != nil {
  155. if err == io.EOF {
  156. break
  157. }
  158. glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
  159. return err
  160. }
  161. // Process the received message
  162. dataMessage := req.GetData()
  163. if dataMessage == nil {
  164. continue
  165. }
  166. // send to the local partition
  167. localTopicPartition.Publish(dataMessage)
  168. receivedSequence = dataMessage.TsNs
  169. // maybe send to the follower
  170. if localTopicPartition.FollowerStream != nil {
  171. println("recv", string(dataMessage.Key), dataMessage.TsNs)
  172. if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
  173. Message: &mq_pb.PublishFollowMeRequest_Data{
  174. Data: dataMessage,
  175. },
  176. }); followErr != nil {
  177. return followErr
  178. }
  179. } else {
  180. ackCounter++
  181. if ackCounter >= ackInterval {
  182. ackCounter = 0
  183. // send back the ack directly
  184. acknowledgedSequence = receivedSequence
  185. response := &mq_pb.PublishMessageResponse{
  186. AckSequence: acknowledgedSequence,
  187. }
  188. if err := stream.Send(response); err != nil {
  189. glog.Errorf("Error sending response %v: %v", response, err)
  190. }
  191. }
  192. }
  193. }
  194. if localTopicPartition.FollowerStream != nil {
  195. // send close to the follower
  196. if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
  197. Message: &mq_pb.PublishFollowMeRequest_Close{
  198. Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
  199. },
  200. }); followErr != nil {
  201. return followErr
  202. }
  203. println("closing follower stream")
  204. //if err := followerStream.CloseSend(); err != nil {
  205. // glog.Errorf("Error closing follower stream: %v", err)
  206. //}
  207. }
  208. glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
  209. return nil
  210. }
  211. // duplicated from master_grpc_server.go
  212. func findClientAddress(ctx context.Context) string {
  213. // fmt.Printf("FromContext %+v\n", ctx)
  214. pr, ok := peer.FromContext(ctx)
  215. if !ok {
  216. glog.Error("failed to get peer from ctx")
  217. return ""
  218. }
  219. if pr.Addr == net.Addr(nil) {
  220. glog.Error("failed to get peer address")
  221. return ""
  222. }
  223. return pr.Addr.String()
  224. }