Browse Source

refactor

pull/5637/head
chrislu 12 months ago
parent
commit
9ce7c482b3
  1. 7
      weed/mq/pub_balancer/balancer.go
  2. 8
      weed/mq/pub_balancer/partition_list_broker.go

7
weed/mq/pub_balancer/balancer.go

@ -67,7 +67,12 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
if !found {
continue
}
partitionSlotToBrokerList.RemoveBroker(broker)
pickedBroker := pickBrokers(balancer.Brokers, 1)
if len(pickedBroker) == 0 {
partitionSlotToBrokerList.RemoveBroker(broker)
} else {
partitionSlotToBrokerList.ReplaceBroker(broker, pickedBroker[0])
}
}
balancer.onPubRemoveBroker(broker, stats)
balancer.OnRemoveBroker(broker, stats)

8
weed/mq/pub_balancer/partition_list_broker.go

@ -44,9 +44,13 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke
})
}
func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
ps.ReplaceBroker(broker, "")
}
func (ps *PartitionSlotToBrokerList) ReplaceBroker(oldBroker string, newBroker string) {
for _, partitionSlot := range ps.PartitionSlots {
if partitionSlot.AssignedBroker == broker {
partitionSlot.AssignedBroker = ""
if partitionSlot.AssignedBroker == oldBroker {
partitionSlot.AssignedBroker = newBroker
}
}
}
Loading…
Cancel
Save