diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 35da6a9d0..e0f9319a4 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -54,11 +54,21 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) } - for _, bpa := range resp.BrokerPartitionAssignments { - fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker) + if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments); assignErr != nil { + return nil, assignErr + } + + glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) + + return resp, err +} + +func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) error { + for _, bpa := range assignments { + fmt.Printf("create topic %s partition %+v on %s\n", t, bpa.Partition, bpa.LeaderBroker) if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ - Topic: request.Topic, + Topic: t, BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ { Partition: bpa.Partition, @@ -68,7 +78,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. IsDraining: false, }) if doCreateErr != nil { - return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr) + return fmt.Errorf("do create topic %s on %s: %v", t, bpa.LeaderBroker, doCreateErr) } brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker) if !found { @@ -77,16 +87,13 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker) } } - brokerStats.RegisterAssignment(request.Topic, bpa.Partition) + brokerStats.RegisterAssignment(t, bpa.Partition) return nil }); doCreateErr != nil { - return nil, doCreateErr + return doCreateErr } } - - glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) - - return resp, err + return nil } // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment