You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
4.0 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package pub_balancer
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "math/rand"
  6. "modernc.org/mathutil"
  7. "sort"
  8. )
  9. func (balancer *Balancer) RepairTopics() []BalanceAction {
  10. action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
  11. return []BalanceAction{action}
  12. }
  13. type TopicPartitionInfo struct {
  14. Leader string
  15. Followers []string
  16. }
  17. // RepairMissingTopicPartitions check the stats of all brokers,
  18. // and repair the missing topic partitions on the brokers.
  19. func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) {
  20. // find all topic partitions
  21. topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo)
  22. for brokerStatsItem := range brokers.IterBuffered() {
  23. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  24. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  25. topicPartitionStat := topicPartitionStatsItem.Val
  26. topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic]
  27. if !found {
  28. topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo)
  29. topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo
  30. }
  31. tpi, found := topicPartitionToInfo[topicPartitionStat.Partition]
  32. if !found {
  33. tpi = &TopicPartitionInfo{}
  34. topicPartitionToInfo[topicPartitionStat.Partition] = tpi
  35. }
  36. if topicPartitionStat.IsLeader {
  37. tpi.Leader = broker
  38. } else {
  39. tpi.Followers = append(tpi.Followers, broker)
  40. }
  41. }
  42. }
  43. // collect all brokers as candidates
  44. candidates := make([]string, 0, brokers.Count())
  45. for brokerStatsItem := range brokers.IterBuffered() {
  46. candidates = append(candidates, brokerStatsItem.Key)
  47. }
  48. // find the missing topic partitions
  49. for t, topicPartitionToInfo := range topicToTopicPartitions {
  50. missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo)
  51. for _, partition := range missingPartitions {
  52. actions = append(actions, BalanceActionCreate{
  53. TopicPartition: topic.TopicPartition{
  54. Topic: t,
  55. Partition: partition,
  56. },
  57. TargetBroker: candidates[rand.Intn(len(candidates))],
  58. })
  59. }
  60. }
  61. return actions
  62. }
  63. func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) {
  64. // find the missing topic partitions
  65. var partitions []topic.Partition
  66. for partition := range info {
  67. partitions = append(partitions, partition)
  68. }
  69. return findMissingPartitions(partitions, MaxPartitionCount)
  70. }
  71. // findMissingPartitions find the missing partitions
  72. func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) {
  73. // sort the partitions by range start
  74. sort.Slice(partitions, func(i, j int) bool {
  75. return partitions[i].RangeStart < partitions[j].RangeStart
  76. })
  77. // calculate the average partition size
  78. var covered int32
  79. for _, partition := range partitions {
  80. covered += partition.RangeStop - partition.RangeStart
  81. }
  82. averagePartitionSize := covered / int32(len(partitions))
  83. // find the missing partitions
  84. var coveredWatermark int32
  85. i := 0
  86. for i < len(partitions) {
  87. partition := partitions[i]
  88. if partition.RangeStart > coveredWatermark {
  89. upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart)
  90. missingPartitions = append(missingPartitions, topic.Partition{
  91. RangeStart: coveredWatermark,
  92. RangeStop: upperBound,
  93. RingSize: ringSize,
  94. })
  95. coveredWatermark = upperBound
  96. if coveredWatermark == partition.RangeStop {
  97. i++
  98. }
  99. } else {
  100. coveredWatermark = partition.RangeStop
  101. i++
  102. }
  103. }
  104. for coveredWatermark < ringSize {
  105. upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize)
  106. missingPartitions = append(missingPartitions, topic.Partition{
  107. RangeStart: coveredWatermark,
  108. RangeStop: upperBound,
  109. RingSize: ringSize,
  110. })
  111. coveredWatermark = upperBound
  112. }
  113. return missingPartitions
  114. }