From 554ae09f8236206e121c2b229eaa96e0858832ed Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 21 May 2024 10:02:07 -0700 Subject: [PATCH] rename --- weed/mq/broker/broker_grpc_assign.go | 6 ++--- weed/mq/broker/broker_grpc_balance.go | 4 ++-- weed/mq/broker/broker_grpc_configure.go | 6 ++--- weed/mq/broker/broker_grpc_lookup.go | 2 +- weed/mq/broker/broker_grpc_pub_balancer.go | 6 ++--- weed/mq/broker/broker_grpc_sub_coordinator.go | 4 ++-- weed/mq/broker/broker_server.go | 24 +++++++++---------- .../mq/broker/broker_topic_conf_read_write.go | 2 +- weed/mq/pub_balancer/pub_balancer.go | 2 +- weed/mq/sub_coordinator/sub_coordinator.go | 2 +- 10 files changed, 29 insertions(+), 29 deletions(-) diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 352a6f69b..48ec0d5bd 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -79,11 +79,11 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) } } - brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker) + brokerStats, found := b.PubBalancer.Brokers.Get(bpa.LeaderBroker) if !found { brokerStats = pub_balancer.NewBrokerStats() - if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { - brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker) + if !b.PubBalancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { + brokerStats, _ = b.PubBalancer.Brokers.Get(bpa.LeaderBroker) } } brokerStats.RegisterAssignment(t, bpa.Partition, isAdd) diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go index 412407211..54634c9d1 100644 --- a/weed/mq/broker/broker_grpc_balance.go +++ b/weed/mq/broker/broker_grpc_balance.go @@ -20,8 +20,8 @@ func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.B ret := &mq_pb.BalanceTopicsResponse{} - actions := b.Balancer.BalancePublishers() - err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption) + actions := b.PubBalancer.BalancePublishers() + err = b.PubBalancer.ExecuteBalanceAction(actions, b.grpcDialOption) return ret, err } diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 7f8afab03..7222c8359 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -60,10 +60,10 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } } resp = &mq_pb.ConfigureTopicResponse{} - if b.Balancer.Brokers.IsEmpty() { + if b.PubBalancer.Brokers.IsEmpty() { return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error()) } - resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount) + resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount) resp.RecordType = request.RecordType // save the topic configuration on filer @@ -71,7 +71,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return nil, fmt.Errorf("configure topic: %v", err) } - b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) + b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index da2c64dfc..db62fd88a 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -50,7 +50,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List ret := &mq_pb.ListTopicsResponse{} knownTopics := make(map[string]struct{}) - for brokerStatsItem := range b.Balancer.Brokers.IterBuffered() { + for brokerStatsItem := range b.PubBalancer.Brokers.IterBuffered() { _, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() { topicPartitionStat := topicPartitionStatsItem.Val diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go index b8675caca..5978d2173 100644 --- a/weed/mq/broker/broker_grpc_pub_balancer.go +++ b/weed/mq/broker/broker_grpc_pub_balancer.go @@ -22,12 +22,12 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin initMessage := req.GetInit() var brokerStats *pub_balancer.BrokerStats if initMessage != nil { - brokerStats = b.Balancer.AddBroker(initMessage.Broker) + brokerStats = b.PubBalancer.AddBroker(initMessage.Broker) } else { return status.Errorf(codes.InvalidArgument, "balancer init message is empty") } defer func() { - b.Balancer.RemoveBroker(initMessage.Broker, brokerStats) + b.PubBalancer.RemoveBroker(initMessage.Broker, brokerStats) }() // process stats message @@ -40,7 +40,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin return status.Errorf(codes.Unavailable, "not current broker balancer") } if receivedStats := req.GetStats(); receivedStats != nil { - b.Balancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats) + b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats) // glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats) } } diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index a1b29f45c..266859c24 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -23,13 +23,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess // process init message initMessage := req.GetInit() if initMessage != nil { - cgi = b.Coordinator.AddSubscriber(initMessage) + cgi = b.SubCoordinator.AddSubscriber(initMessage) glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) } else { return status.Errorf(codes.InvalidArgument, "subscriber init message is empty") } defer func() { - b.Coordinator.RemoveSubscriber(initMessage) + b.SubCoordinator.RemoveSubscriber(initMessage) glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) }() diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 2e449083a..cdf652294 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -43,17 +43,17 @@ type MessageQueueBroker struct { filers map[pb.ServerAddress]struct{} currentFiler pb.ServerAddress localTopicManager *topic.LocalTopicManager - Balancer *pub_balancer.PubBalancer - lockAsBalancer *cluster.LiveLock - Coordinator *sub_coordinator.SubCoordinator - accessLock sync.Mutex + PubBalancer *pub_balancer.PubBalancer + lockAsBalancer *cluster.LiveLock + SubCoordinator *sub_coordinator.SubCoordinator + accessLock sync.Mutex fca *sub_coordinator.FilerClientAccessor } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { - pub_broker_balancer := pub_balancer.NewBalancer() - coordinator := sub_coordinator.NewCoordinator(pub_broker_balancer) + pubBalancer := pub_balancer.NewPubBalancer() + subCoordinator := sub_coordinator.NewSubCoordinator(pubBalancer) mqBroker = &MessageQueueBroker{ option: option, @@ -61,20 +61,20 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), filers: make(map[pb.ServerAddress]struct{}), localTopicManager: topic.NewLocalTopicManager(), - Balancer: pub_broker_balancer, - Coordinator: coordinator, + PubBalancer: pubBalancer, + SubCoordinator: subCoordinator, } fca := &sub_coordinator.FilerClientAccessor{ GetFiler: mqBroker.GetFiler, GetGrpcDialOption: mqBroker.GetGrpcDialOption, } mqBroker.fca = fca - coordinator.FilerClientAccessor = fca + subCoordinator.FilerClientAccessor = fca mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) - pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange - pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker - pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker + pubBalancer.OnPartitionChange = mqBroker.SubCoordinator.OnPartitionChange + pubBalancer.OnAddBroker = mqBroker.SubCoordinator.OnSubAddBroker + pubBalancer.OnRemoveBroker = mqBroker.SubCoordinator.OnSubRemoveBroker go mqBroker.MasterClient.KeepConnectedToMaster() diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 211473dad..ea5cb71b9 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -52,7 +52,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) { // also fix assignee broker if invalid - hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments) + hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments) if hasChanges { glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { diff --git a/weed/mq/pub_balancer/pub_balancer.go b/weed/mq/pub_balancer/pub_balancer.go index 1c8e31558..3499d2233 100644 --- a/weed/mq/pub_balancer/pub_balancer.go +++ b/weed/mq/pub_balancer/pub_balancer.go @@ -37,7 +37,7 @@ type PubBalancer struct { OnRemoveBroker func(broker string, brokerStats *BrokerStats) } -func NewBalancer() *PubBalancer { +func NewPubBalancer() *PubBalancer { return &PubBalancer{ Brokers: cmap.New[*BrokerStats](), TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](), diff --git a/weed/mq/sub_coordinator/sub_coordinator.go b/weed/mq/sub_coordinator/sub_coordinator.go index f78e8c849..b03397ad8 100644 --- a/weed/mq/sub_coordinator/sub_coordinator.go +++ b/weed/mq/sub_coordinator/sub_coordinator.go @@ -22,7 +22,7 @@ type SubCoordinator struct { FilerClientAccessor *FilerClientAccessor } -func NewCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator { +func NewSubCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator { return &SubCoordinator{ TopicSubscribers: cmap.New[*TopicConsumerGroups](), balancer: balancer,