diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 323c0055c..264565b7b 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync" ) // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment @@ -50,3 +52,49 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) return ret, nil } + +// called by broker leader to drain existing partitions. +// new/updated partitions will be detected by broker from the filer +func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error { + // notify the brokers to create the topic partitions in parallel + var wg sync.WaitGroup + for _, bpa := range assignments { + wg.Add(1) + go func(bpa *mq_pb.BrokerPartitionAssignment) { + defer wg.Done() + if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { + _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ + Topic: t, + BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + Partition: bpa.Partition, + }, + }, + IsLeader: true, + IsDraining: !isAdd, + }) + if doCreateErr != nil { + if !isAdd { + return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) + } else { + return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) + } + } + brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker) + if !found { + brokerStats = pub_balancer.NewBrokerStats() + if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { + brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker) + } + } + brokerStats.RegisterAssignment(t, bpa.Partition, isAdd) + return nil + }); doCreateErr != nil { + glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr) + } + }(bpa) + } + wg.Wait() + + return nil +} diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index c1984c05e..f5bcceb44 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -6,11 +6,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "sync" ) // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer @@ -62,47 +60,3 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return resp, err } - -func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error { - // notify the brokers to create the topic partitions in parallel - var wg sync.WaitGroup - for _, bpa := range assignments { - wg.Add(1) - go func(bpa *mq_pb.BrokerPartitionAssignment) { - defer wg.Done() - if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { - _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ - Topic: t, - BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ - { - Partition: bpa.Partition, - }, - }, - IsLeader: true, - IsDraining: !isAdd, - }) - if doCreateErr != nil { - if !isAdd { - return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) - } else { - return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) - } - } - brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker) - if !found { - brokerStats = pub_balancer.NewBrokerStats() - if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { - brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker) - } - } - brokerStats.RegisterAssignment(t, bpa.Partition, isAdd) - return nil - }); doCreateErr != nil { - glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr) - } - }(bpa) - } - wg.Wait() - - return nil -}