You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
3.9 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
9 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
9 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package pub_balancer
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "math/rand"
  6. "modernc.org/mathutil"
  7. "sort"
  8. )
  9. func (balancer *PubBalancer) RepairTopics() []BalanceAction {
  10. action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
  11. return []BalanceAction{action}
  12. }
  13. type TopicPartitionInfo struct {
  14. Broker string
  15. }
  16. // RepairMissingTopicPartitions check the stats of all brokers,
  17. // and repair the missing topic partitions on the brokers.
  18. func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) {
  19. // find all topic partitions
  20. topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo)
  21. for brokerStatsItem := range brokers.IterBuffered() {
  22. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  23. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  24. topicPartitionStat := topicPartitionStatsItem.Val
  25. topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic]
  26. if !found {
  27. topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo)
  28. topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo
  29. }
  30. tpi, found := topicPartitionToInfo[topicPartitionStat.Partition]
  31. if !found {
  32. tpi = &TopicPartitionInfo{}
  33. topicPartitionToInfo[topicPartitionStat.Partition] = tpi
  34. }
  35. tpi.Broker = broker
  36. }
  37. }
  38. // collect all brokers as candidates
  39. candidates := make([]string, 0, brokers.Count())
  40. for brokerStatsItem := range brokers.IterBuffered() {
  41. candidates = append(candidates, brokerStatsItem.Key)
  42. }
  43. // find the missing topic partitions
  44. for t, topicPartitionToInfo := range topicToTopicPartitions {
  45. missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo)
  46. for _, partition := range missingPartitions {
  47. actions = append(actions, BalanceActionCreate{
  48. TopicPartition: topic.TopicPartition{
  49. Topic: t,
  50. Partition: partition,
  51. },
  52. TargetBroker: candidates[rand.Intn(len(candidates))],
  53. })
  54. }
  55. }
  56. return actions
  57. }
  58. func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) {
  59. // find the missing topic partitions
  60. var partitions []topic.Partition
  61. for partition := range info {
  62. partitions = append(partitions, partition)
  63. }
  64. return findMissingPartitions(partitions, MaxPartitionCount)
  65. }
  66. // findMissingPartitions find the missing partitions
  67. func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) {
  68. // sort the partitions by range start
  69. sort.Slice(partitions, func(i, j int) bool {
  70. return partitions[i].RangeStart < partitions[j].RangeStart
  71. })
  72. // calculate the average partition size
  73. var covered int32
  74. for _, partition := range partitions {
  75. covered += partition.RangeStop - partition.RangeStart
  76. }
  77. averagePartitionSize := covered / int32(len(partitions))
  78. // find the missing partitions
  79. var coveredWatermark int32
  80. i := 0
  81. for i < len(partitions) {
  82. partition := partitions[i]
  83. if partition.RangeStart > coveredWatermark {
  84. upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart)
  85. missingPartitions = append(missingPartitions, topic.Partition{
  86. RangeStart: coveredWatermark,
  87. RangeStop: upperBound,
  88. RingSize: ringSize,
  89. })
  90. coveredWatermark = upperBound
  91. if coveredWatermark == partition.RangeStop {
  92. i++
  93. }
  94. } else {
  95. coveredWatermark = partition.RangeStop
  96. i++
  97. }
  98. }
  99. for coveredWatermark < ringSize {
  100. upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize)
  101. missingPartitions = append(missingPartitions, topic.Partition{
  102. RangeStart: coveredWatermark,
  103. RangeStop: upperBound,
  104. RingSize: ringSize,
  105. })
  106. coveredWatermark = upperBound
  107. }
  108. return missingPartitions
  109. }