You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
122 lines
3.9 KiB
122 lines
3.9 KiB
package pub_balancer
|
|
|
|
import (
|
|
cmap "github.com/orcaman/concurrent-map/v2"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"math/rand"
|
|
"modernc.org/mathutil"
|
|
"sort"
|
|
)
|
|
|
|
func (balancer *PubBalancer) RepairTopics() []BalanceAction {
|
|
action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
|
|
return []BalanceAction{action}
|
|
}
|
|
|
|
type TopicPartitionInfo struct {
|
|
Broker string
|
|
}
|
|
|
|
// RepairMissingTopicPartitions check the stats of all brokers,
|
|
// and repair the missing topic partitions on the brokers.
|
|
func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) {
|
|
|
|
// find all topic partitions
|
|
topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo)
|
|
for brokerStatsItem := range brokers.IterBuffered() {
|
|
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
|
|
for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
|
|
topicPartitionStat := topicPartitionStatsItem.Val
|
|
topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic]
|
|
if !found {
|
|
topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo)
|
|
topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo
|
|
}
|
|
tpi, found := topicPartitionToInfo[topicPartitionStat.Partition]
|
|
if !found {
|
|
tpi = &TopicPartitionInfo{}
|
|
topicPartitionToInfo[topicPartitionStat.Partition] = tpi
|
|
}
|
|
tpi.Broker = broker
|
|
}
|
|
}
|
|
|
|
// collect all brokers as candidates
|
|
candidates := make([]string, 0, brokers.Count())
|
|
for brokerStatsItem := range brokers.IterBuffered() {
|
|
candidates = append(candidates, brokerStatsItem.Key)
|
|
}
|
|
|
|
// find the missing topic partitions
|
|
for t, topicPartitionToInfo := range topicToTopicPartitions {
|
|
missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo)
|
|
for _, partition := range missingPartitions {
|
|
actions = append(actions, BalanceActionCreate{
|
|
TopicPartition: topic.TopicPartition{
|
|
Topic: t,
|
|
Partition: partition,
|
|
},
|
|
TargetBroker: candidates[rand.Intn(len(candidates))],
|
|
})
|
|
}
|
|
}
|
|
|
|
return actions
|
|
}
|
|
|
|
func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) {
|
|
|
|
// find the missing topic partitions
|
|
var partitions []topic.Partition
|
|
for partition := range info {
|
|
partitions = append(partitions, partition)
|
|
}
|
|
return findMissingPartitions(partitions, MaxPartitionCount)
|
|
}
|
|
|
|
// findMissingPartitions find the missing partitions
|
|
func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) {
|
|
// sort the partitions by range start
|
|
sort.Slice(partitions, func(i, j int) bool {
|
|
return partitions[i].RangeStart < partitions[j].RangeStart
|
|
})
|
|
|
|
// calculate the average partition size
|
|
var covered int32
|
|
for _, partition := range partitions {
|
|
covered += partition.RangeStop - partition.RangeStart
|
|
}
|
|
averagePartitionSize := covered / int32(len(partitions))
|
|
|
|
// find the missing partitions
|
|
var coveredWatermark int32
|
|
i := 0
|
|
for i < len(partitions) {
|
|
partition := partitions[i]
|
|
if partition.RangeStart > coveredWatermark {
|
|
upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart)
|
|
missingPartitions = append(missingPartitions, topic.Partition{
|
|
RangeStart: coveredWatermark,
|
|
RangeStop: upperBound,
|
|
RingSize: ringSize,
|
|
})
|
|
coveredWatermark = upperBound
|
|
if coveredWatermark == partition.RangeStop {
|
|
i++
|
|
}
|
|
} else {
|
|
coveredWatermark = partition.RangeStop
|
|
i++
|
|
}
|
|
}
|
|
for coveredWatermark < ringSize {
|
|
upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize)
|
|
missingPartitions = append(missingPartitions, topic.Partition{
|
|
RangeStart: coveredWatermark,
|
|
RangeStop: upperBound,
|
|
RingSize: ringSize,
|
|
})
|
|
coveredWatermark = upperBound
|
|
}
|
|
return missingPartitions
|
|
}
|