You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
6.4 KiB

2 years ago
2 years ago
1 year ago
2 years ago
2 years ago
2 years ago
2 years ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
2 years ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // For a new or re-configured topic, or one of the broker went offline,
  13. // the pub clients ask one broker what are the brokers for all the topic partitions.
  14. // The broker will lock the topic on write.
  15. // 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
  16. // 2. if the topic is found, return the brokers for the topic partitions
  17. // For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
  18. // The broker will lock the topic on read.
  19. // 1. if the topic is not found, return error
  20. // 2. if the topic is found, return the brokers for the topic partitions
  21. //
  22. // If the topic needs to be re-balanced, the admin client will lock the topic,
  23. // 1. collect throughput information for all the brokers
  24. // 2. adjust the topic partitions to the brokers
  25. // 3. notify the brokers to add/remove partitions to host
  26. // 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
  27. // 4. the brokers will stop process incoming messages if not the right partition
  28. // 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
  29. // 4.2 the sub clients will need to change the brokers to read from
  30. //
  31. // The following is from each individual component's perspective:
  32. // For a pub client
  33. // For current topic/partition, ask one broker for the brokers for the topic partitions
  34. // 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
  35. // For a sub client
  36. // For current topic/partition, ask one broker for the brokers for the topic partitions
  37. // 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
  38. // For a broker
  39. // Upon a pub client lookup:
  40. // 1. lock the topic
  41. // 2. if already has topic partition assignment, check all brokers are healthy
  42. // 3. if not, create topic partition assignment
  43. // 2. return the brokers for the topic partitions
  44. // 3. unlock the topic
  45. // Upon a sub client lookup:
  46. // 1. lock the topic
  47. // 2. if already has topic partition assignment, check all brokers are healthy
  48. // 3. if not, return error
  49. // 2. return the brokers for the topic partitions
  50. // 3. unlock the topic
  51. // For an admin tool
  52. // 0. collect stats from all the brokers, and find the topic worth moving
  53. // 1. lock the topic
  54. // 2. collect throughput information for all the brokers
  55. // 3. adjust the topic partitions to the brokers
  56. // 4. notify the brokers to add/remove partitions to host
  57. // 5. the brokers will stop process incoming messages if not the right partition
  58. // 6. unlock the topic
  59. /*
  60. The messages are buffered in memory, and saved to filer under
  61. /topics/<topic>/<date>/<hour>/<segment>/*.msg
  62. /topics/<topic>/<date>/<hour>/segment
  63. /topics/<topic>/info/segment_<id>.meta
  64. */
  65. func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
  66. // 1. write to the volume server
  67. // 2. find the topic metadata owning filer
  68. // 3. write to the filer
  69. var localTopicPartition *topic.LocalPartition
  70. req, err := stream.Recv()
  71. if err != nil {
  72. return err
  73. }
  74. response := &mq_pb.PublishResponse{}
  75. // TODO check whether current broker should be the leader for the topic partition
  76. ackInterval := 1
  77. initMessage := req.GetInit()
  78. if initMessage != nil {
  79. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  80. localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
  81. if localTopicPartition == nil {
  82. localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
  83. broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
  84. }
  85. ackInterval = int(initMessage.AckInterval)
  86. stream.Send(response)
  87. } else {
  88. response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
  89. glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
  90. return stream.Send(response)
  91. }
  92. ackCounter := 0
  93. var ackSequence int64
  94. var isStopping int32
  95. respChan := make(chan *mq_pb.PublishResponse, 128)
  96. defer func() {
  97. atomic.StoreInt32(&isStopping, 1)
  98. close(respChan)
  99. }()
  100. go func() {
  101. ticker := time.NewTicker(1 * time.Second)
  102. for {
  103. select {
  104. case resp := <-respChan:
  105. if resp != nil {
  106. if err := stream.Send(resp); err != nil {
  107. glog.Errorf("Error sending response %v: %v", resp, err)
  108. }
  109. } else {
  110. return
  111. }
  112. case <-ticker.C:
  113. if atomic.LoadInt32(&isStopping) == 0 {
  114. response := &mq_pb.PublishResponse{
  115. AckSequence: ackSequence,
  116. }
  117. respChan <- response
  118. } else {
  119. return
  120. }
  121. }
  122. }
  123. }()
  124. // process each published messages
  125. for {
  126. // receive a message
  127. req, err := stream.Recv()
  128. if err != nil {
  129. return err
  130. }
  131. // Process the received message
  132. if dataMessage := req.GetData(); dataMessage != nil {
  133. localTopicPartition.Publish(dataMessage)
  134. }
  135. ackCounter++
  136. ackSequence++
  137. if ackCounter >= ackInterval {
  138. ackCounter = 0
  139. // send back the ack
  140. response := &mq_pb.PublishResponse{
  141. AckSequence: ackSequence,
  142. }
  143. respChan <- response
  144. }
  145. }
  146. glog.Infof("publish stream closed")
  147. return nil
  148. }
  149. // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
  150. func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
  151. ret := &mq_pb.AssignTopicPartitionsResponse{}
  152. self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
  153. for _, brokerPartition := range request.BrokerPartitionAssignments {
  154. localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
  155. broker.localTopicManager.AddTopicPartition(
  156. topic.FromPbTopic(request.Topic),
  157. localPartiton)
  158. if request.IsLeader {
  159. for _, follower := range localPartiton.FollowerBrokers {
  160. err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  161. _, err := client.AssignTopicPartitions(context.Background(), request)
  162. return err
  163. })
  164. if err != nil {
  165. return ret, err
  166. }
  167. }
  168. }
  169. }
  170. return ret, nil
  171. }