From 972e9faaa2f6fac677b79bcec1ba17321c267b37 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 May 2024 09:09:36 -0700 Subject: [PATCH] move initial assignment to rebalance logic --- weed/mq/broker/broker_grpc_sub_coordinator.go | 13 ------------- weed/mq/broker/broker_server.go | 9 +++++++-- weed/mq/sub_coordinator/consumer_group.go | 11 +++++++---- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 6925baa9e..a1b29f45c 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -4,7 +4,6 @@ import ( "context" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -37,18 +36,6 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess ctx := stream.Context() go func() { - // try to load the partition assignment from filer - if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil { - // send partition assignment to subscriber - cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{ - Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ - Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{ - PartitionAssignments: conf.BrokerPartitionAssignments, - }, - }, - } - } - // process ack messages for { _, err := stream.Recv() diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 510c03558..b3d86a401 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -65,8 +65,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial Coordinator: coordinator, } fca := &sub_coordinator.FilerClientAccessor{ - GetFilerFn: mqBroker.GetFiler, - GrpcDialOption: grpcDialOption, + GetFiler: mqBroker.GetFiler, + GetGrpcDialOption: mqBroker.GetGrpcDialOption, } mqBroker.fca = fca coordinator.FilerClientAccessor = fca @@ -126,6 +126,11 @@ func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, } +func (b *MessageQueueBroker) GetGrpcDialOption() grpc.DialOption { + return b.grpcDialOption +} + + func (b *MessageQueueBroker) GetFiler() pb.ServerAddress { return b.currentFiler } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index be87b4105..d298c0c41 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -77,10 +77,13 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke // collect current topic partitions partitionSlotToBrokerList := knownPartitionSlotToBrokerList if partitionSlotToBrokerList == nil { - var found bool - partitionSlotToBrokerList, found = cg.pubBalancer.TopicToBrokers.Get(cg.topic.String()) - if !found { - glog.V(0).Infof("topic %s not found in balancer", cg.topic.String()) + if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil { + partitionSlotToBrokerList = pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount) + for _, assignment := range conf.BrokerPartitionAssignments { + partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker) + } + } else { + glog.V(0).Infof("fail to read topic conf from filer: %v", err) return } }