diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 249280cb7..520f6bcf2 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -56,7 +56,7 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) return pickedBrokers } -func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (addedAssignments, updatedAssignments []*mq_pb.BrokerPartitionAssignment) { +func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (addedAssignments, updatedAssignments []*mq_pb.BrokerPartitionAssignment) { for _, assignment := range assignments { if assignment.LeaderBroker == "" { addedAssignments = append(addedAssignments, assignment) @@ -69,13 +69,17 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*B } // pick the brokers with the least number of partitions - pickedBrokers := pickBrokers(activeBrokers, int32(len(addedAssignments))) - for i, assignment := range addedAssignments { - assignment.LeaderBroker = pickedBrokers[i] + if len(addedAssignments) > 0 { + pickedBrokers := pickBrokers(activeBrokers, int32(len(addedAssignments))) + for i, assignment := range addedAssignments { + assignment.LeaderBroker = pickedBrokers[i] + } } - pickedBrokers = pickBrokers(activeBrokers, int32(len(updatedAssignments))) - for i, assignment := range updatedAssignments { - assignment.LeaderBroker = pickedBrokers[i] + if len(updatedAssignments) == 0 { + pickedBrokers := pickBrokers(activeBrokers, int32(len(updatedAssignments))) + for i, assignment := range updatedAssignments { + assignment.LeaderBroker = pickedBrokers[i] + } } return