diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 397e70fac..710e95b38 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -55,9 +55,9 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) { // also fix assignee broker if invalid - changedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments) - if len(changedAssignments) > 0 { - glog.V(0).Infof("topic %v partition assignments changed: %v", t, changedAssignments) + addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments) + if len(addedAssignments) > 0 || len(updatedAssignments) > 0 { + glog.V(0).Infof("topic %v partition assignments added: %v updated: %v", t, addedAssignments, updatedAssignments) if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { return err } diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 7822f5ed9..249280cb7 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -56,22 +56,27 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) return pickedBrokers } -func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (changedAssignments []*mq_pb.BrokerPartitionAssignment) { +func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (addedAssignments, updatedAssignments []*mq_pb.BrokerPartitionAssignment) { for _, assignment := range assignments { if assignment.LeaderBroker == "" { - changedAssignments = append(changedAssignments, assignment) + addedAssignments = append(addedAssignments, assignment) continue } if _, found := activeBrokers.Get(assignment.LeaderBroker); !found { - changedAssignments = append(changedAssignments, assignment) + updatedAssignments = append(updatedAssignments, assignment) continue } } // pick the brokers with the least number of partitions - pickedBrokers := pickBrokers(activeBrokers, int32(len(changedAssignments))) - for i, assignment := range changedAssignments { + pickedBrokers := pickBrokers(activeBrokers, int32(len(addedAssignments))) + for i, assignment := range addedAssignments { assignment.LeaderBroker = pickedBrokers[i] } - return changedAssignments + pickedBrokers = pickBrokers(activeBrokers, int32(len(updatedAssignments))) + for i, assignment := range updatedAssignments { + assignment.LeaderBroker = pickedBrokers[i] + } + + return }