From 67e6051585875736f24bd3d509549db775c3d44f Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 21 May 2024 09:57:45 -0700 Subject: [PATCH] rename Coordinator to SubCoordinator --- weed/mq/broker/broker_server.go | 2 +- weed/mq/sub_coordinator/coordinator.go | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index e381fa84c..2e449083a 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -45,7 +45,7 @@ type MessageQueueBroker struct { localTopicManager *topic.LocalTopicManager Balancer *pub_balancer.PubBalancer lockAsBalancer *cluster.LiveLock - Coordinator *sub_coordinator.Coordinator + Coordinator *sub_coordinator.SubCoordinator accessLock sync.Mutex fca *sub_coordinator.FilerClientAccessor } diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 6ab7166d4..f78e8c849 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -11,25 +11,25 @@ type TopicConsumerGroups struct { ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] } -// Coordinator coordinates the instances in the consumer group for one topic. +// SubCoordinator coordinates the instances in the consumer group for one topic. // It is responsible for: // 1. (Maybe) assigning partitions when a consumer instance is up/down. -type Coordinator struct { +type SubCoordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] balancer *pub_balancer.PubBalancer FilerClientAccessor *FilerClientAccessor } -func NewCoordinator(balancer *pub_balancer.PubBalancer) *Coordinator { - return &Coordinator{ +func NewCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator { + return &SubCoordinator{ TopicSubscribers: cmap.New[*TopicConsumerGroups](), balancer: balancer, } } -func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups { +func (c *SubCoordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups { topicName := toTopicName(topic) tcg, _ := c.TopicSubscribers.Get(topicName) if tcg == nil && createIfMissing { @@ -42,7 +42,7 @@ func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing } return tcg } -func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) { +func (c *SubCoordinator) RemoveTopic(topic *mq_pb.Topic) { topicName := toTopicName(topic) c.TopicSubscribers.Remove(topicName) } @@ -52,7 +52,7 @@ func toTopicName(topic *mq_pb.Topic) string { return topicName } -func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance { +func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance { tcg := c.GetTopicConsumerGroups(initMessage.Topic, true) cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) if cg == nil { @@ -73,7 +73,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato return cgi } -func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) { +func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) { tcg := c.GetTopicConsumerGroups(initMessage.Topic, false) if tcg == nil { return @@ -92,7 +92,7 @@ func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordin } } -func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) { +func (c *SubCoordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) { tcg, _ := c.TopicSubscribers.Get(toTopicName(topic)) if tcg == nil { return @@ -103,11 +103,11 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb } // OnSubAddBroker is called when a broker is added to the balancer -func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) { +func (c *SubCoordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) { } // OnSubRemoveBroker is called when a broker is removed from the balancer -func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) { +func (c *SubCoordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) { }