Browse Source

updated and added assignments

pull/5637/head
chrislu 12 months ago
parent
commit
428fea45f3
  1. 6
      weed/mq/broker/broker_topic_conf_read_write.go
  2. 17
      weed/mq/pub_balancer/allocate.go

6
weed/mq/broker/broker_topic_conf_read_write.go

@ -55,9 +55,9 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) { func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
// also fix assignee broker if invalid // also fix assignee broker if invalid
changedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)
if len(changedAssignments) > 0 {
glog.V(0).Infof("topic %v partition assignments changed: %v", t, changedAssignments)
addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)
if len(addedAssignments) > 0 || len(updatedAssignments) > 0 {
glog.V(0).Infof("topic %v partition assignments added: %v updated: %v", t, addedAssignments, updatedAssignments)
if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
return err return err
} }

17
weed/mq/pub_balancer/allocate.go

@ -56,22 +56,27 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32)
return pickedBrokers return pickedBrokers
} }
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (changedAssignments []*mq_pb.BrokerPartitionAssignment) {
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (addedAssignments, updatedAssignments []*mq_pb.BrokerPartitionAssignment) {
for _, assignment := range assignments { for _, assignment := range assignments {
if assignment.LeaderBroker == "" { if assignment.LeaderBroker == "" {
changedAssignments = append(changedAssignments, assignment)
addedAssignments = append(addedAssignments, assignment)
continue continue
} }
if _, found := activeBrokers.Get(assignment.LeaderBroker); !found { if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
changedAssignments = append(changedAssignments, assignment)
updatedAssignments = append(updatedAssignments, assignment)
continue continue
} }
} }
// pick the brokers with the least number of partitions // pick the brokers with the least number of partitions
pickedBrokers := pickBrokers(activeBrokers, int32(len(changedAssignments)))
for i, assignment := range changedAssignments {
pickedBrokers := pickBrokers(activeBrokers, int32(len(addedAssignments)))
for i, assignment := range addedAssignments {
assignment.LeaderBroker = pickedBrokers[i] assignment.LeaderBroker = pickedBrokers[i]
} }
return changedAssignments
pickedBrokers = pickBrokers(activeBrokers, int32(len(updatedAssignments)))
for i, assignment := range updatedAssignments {
assignment.LeaderBroker = pickedBrokers[i]
}
return
} }
Loading…
Cancel
Save