You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

142 lines
5.5 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. )
  10. // For a new or re-configured topic, or one of the broker went offline,
  11. // the pub clients ask one broker what are the brokers for all the topic partitions.
  12. // The broker will lock the topic on write.
  13. // 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
  14. // 2. if the topic is found, return the brokers for the topic partitions
  15. // For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
  16. // The broker will lock the topic on read.
  17. // 1. if the topic is not found, return error
  18. // 2. if the topic is found, return the brokers for the topic partitions
  19. //
  20. // If the topic needs to be re-balanced, the admin client will lock the topic,
  21. // 1. collect throughput information for all the brokers
  22. // 2. adjust the topic partitions to the brokers
  23. // 3. notify the brokers to add/remove partitions to host
  24. // 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
  25. // 4. the brokers will stop process incoming messages if not the right partition
  26. // 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
  27. // 4.2 the sub clients will need to change the brokers to read from
  28. //
  29. // The following is from each individual component's perspective:
  30. // For a pub client
  31. // For current topic/partition, ask one broker for the brokers for the topic partitions
  32. // 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
  33. // For a sub client
  34. // For current topic/partition, ask one broker for the brokers for the topic partitions
  35. // 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
  36. // For a broker
  37. // Upon a pub client lookup:
  38. // 1. lock the topic
  39. // 2. if already has topic partition assignment, check all brokers are healthy
  40. // 3. if not, create topic partition assignment
  41. // 2. return the brokers for the topic partitions
  42. // 3. unlock the topic
  43. // Upon a sub client lookup:
  44. // 1. lock the topic
  45. // 2. if already has topic partition assignment, check all brokers are healthy
  46. // 3. if not, return error
  47. // 2. return the brokers for the topic partitions
  48. // 3. unlock the topic
  49. // For an admin tool
  50. // 0. collect stats from all the brokers, and find the topic worth moving
  51. // 1. lock the topic
  52. // 2. collect throughput information for all the brokers
  53. // 3. adjust the topic partitions to the brokers
  54. // 4. notify the brokers to add/remove partitions to host
  55. // 5. the brokers will stop process incoming messages if not the right partition
  56. // 6. unlock the topic
  57. /*
  58. The messages are buffered in memory, and saved to filer under
  59. /topics/<topic>/<date>/<hour>/<segment>/*.msg
  60. /topics/<topic>/<date>/<hour>/segment
  61. /topics/<topic>/info/segment_<id>.meta
  62. */
  63. func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
  64. // 1. write to the volume server
  65. // 2. find the topic metadata owning filer
  66. // 3. write to the filer
  67. var localTopicPartition *topic.LocalPartition
  68. req, err := stream.Recv()
  69. if err != nil {
  70. return err
  71. }
  72. response := &mq_pb.PublishResponse{}
  73. // TODO check whether current broker should be the leader for the topic partition
  74. if initMessage := req.GetInit(); initMessage != nil {
  75. localTopicPartition = broker.localTopicManager.GetTopicPartition(
  76. topic.FromPbTopic(initMessage.Topic),
  77. topic.FromPbPartition(initMessage.Partition),
  78. )
  79. if localTopicPartition == nil {
  80. response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
  81. glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
  82. return stream.Send(response)
  83. }
  84. }
  85. // process each published messages
  86. for {
  87. req, err := stream.Recv()
  88. if err != nil {
  89. return err
  90. }
  91. // Process the received message
  92. sequence := req.GetSequence()
  93. response := &mq_pb.PublishResponse{
  94. AckSequence: sequence,
  95. }
  96. if dataMessage := req.GetData(); dataMessage != nil {
  97. print('+')
  98. localTopicPartition.Publish(dataMessage)
  99. }
  100. if err := stream.Send(response); err != nil {
  101. glog.Errorf("Error sending setup response: %v", err)
  102. }
  103. }
  104. return nil
  105. }
  106. // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
  107. func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
  108. ret := &mq_pb.AssignTopicPartitionsResponse{}
  109. self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
  110. for _, brokerPartition := range request.BrokerPartitionAssignments {
  111. localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
  112. broker.localTopicManager.AddTopicPartition(
  113. topic.FromPbTopic(request.Topic),
  114. localPartiton)
  115. if request.IsLeader {
  116. for _, follower := range localPartiton.FollowerBrokers {
  117. err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  118. _, err := client.AssignTopicPartitions(context.Background(), request)
  119. return err
  120. })
  121. if err != nil {
  122. return ret, err
  123. }
  124. }
  125. }
  126. }
  127. return ret, nil
  128. }