You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
5.4 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. )
  10. // For a new or re-configured topic, or one of the broker went offline,
  11. // the pub clients ask one broker what are the brokers for all the topic partitions.
  12. // The broker will lock the topic on write.
  13. // 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
  14. // 2. if the topic is found, return the brokers for the topic partitions
  15. // For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
  16. // The broker will lock the topic on read.
  17. // 1. if the topic is not found, return error
  18. // 2. if the topic is found, return the brokers for the topic partitions
  19. //
  20. // If the topic needs to be re-balanced, the admin client will lock the topic,
  21. // 1. collect throughput information for all the brokers
  22. // 2. adjust the topic partitions to the brokers
  23. // 3. notify the brokers to add/remove partitions to host
  24. // 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
  25. // 4. the brokers will stop process incoming messages if not the right partition
  26. // 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
  27. // 4.2 the sub clients will need to change the brokers to read from
  28. //
  29. // The following is from each individual component's perspective:
  30. // For a pub client
  31. // For current topic/partition, ask one broker for the brokers for the topic partitions
  32. // 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
  33. // For a sub client
  34. // For current topic/partition, ask one broker for the brokers for the topic partitions
  35. // 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
  36. // For a broker
  37. // Upon a pub client lookup:
  38. // 1. lock the topic
  39. // 2. if already has topic partition assignment, check all brokers are healthy
  40. // 3. if not, create topic partition assignment
  41. // 2. return the brokers for the topic partitions
  42. // 3. unlock the topic
  43. // Upon a sub client lookup:
  44. // 1. lock the topic
  45. // 2. if already has topic partition assignment, check all brokers are healthy
  46. // 3. if not, return error
  47. // 2. return the brokers for the topic partitions
  48. // 3. unlock the topic
  49. // For an admin tool
  50. // 0. collect stats from all the brokers, and find the topic worth moving
  51. // 1. lock the topic
  52. // 2. collect throughput information for all the brokers
  53. // 3. adjust the topic partitions to the brokers
  54. // 4. notify the brokers to add/remove partitions to host
  55. // 5. the brokers will stop process incoming messages if not the right partition
  56. // 6. unlock the topic
  57. /*
  58. The messages are buffered in memory, and saved to filer under
  59. /topics/<topic>/<date>/<hour>/<segment>/*.msg
  60. /topics/<topic>/<date>/<hour>/segment
  61. /topics/<topic>/info/segment_<id>.meta
  62. */
  63. func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
  64. // 1. write to the volume server
  65. // 2. find the topic metadata owning filer
  66. // 3. write to the filer
  67. var localTopicPartition *topic.LocalPartition
  68. for {
  69. req, err := stream.Recv()
  70. if err != nil {
  71. return err
  72. }
  73. // Process the received message
  74. sequence := req.GetSequence()
  75. response := &mq_pb.PublishResponse{
  76. AckSequence: sequence,
  77. }
  78. if dataMessage := req.GetData(); dataMessage != nil {
  79. if localTopicPartition == nil {
  80. response.Error = "topic partition not initialized"
  81. glog.Errorf("topic partition not found")
  82. } else {
  83. localTopicPartition.Publish(dataMessage)
  84. }
  85. } else if initMessage := req.GetInit(); initMessage != nil {
  86. localTopicPartition = broker.localTopicManager.GetTopicPartition(
  87. topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic),
  88. topic.FromPbPartition(initMessage.Segment.Partition),
  89. )
  90. if localTopicPartition == nil {
  91. response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment)
  92. glog.Errorf("topic partition %v not found", initMessage.Segment)
  93. }
  94. }
  95. if err := stream.Send(response); err != nil {
  96. glog.Errorf("Error sending setup response: %v", err)
  97. }
  98. }
  99. return nil
  100. }
  101. // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
  102. func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
  103. ret := &mq_pb.AssignTopicPartitionsResponse{}
  104. self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
  105. for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions {
  106. localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition)
  107. broker.localTopicManager.AddTopicPartition(
  108. topic.FromPbTopic(request.Topic),
  109. localPartiton)
  110. if request.IsLeader {
  111. for _, follower := range localPartiton.FollowerBrokers {
  112. err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  113. _, err := client.AssignTopicPartitions(context.Background(), request)
  114. return err
  115. })
  116. if err != nil {
  117. return ret, err
  118. }
  119. }
  120. }
  121. }
  122. return ret, nil
  123. }