|
@ -11,25 +11,25 @@ type TopicConsumerGroups struct { |
|
|
ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] |
|
|
ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Coordinator coordinates the instances in the consumer group for one topic.
|
|
|
|
|
|
|
|
|
// SubCoordinator coordinates the instances in the consumer group for one topic.
|
|
|
// It is responsible for:
|
|
|
// It is responsible for:
|
|
|
// 1. (Maybe) assigning partitions when a consumer instance is up/down.
|
|
|
// 1. (Maybe) assigning partitions when a consumer instance is up/down.
|
|
|
|
|
|
|
|
|
type Coordinator struct { |
|
|
|
|
|
|
|
|
type SubCoordinator struct { |
|
|
// map topic name to consumer groups
|
|
|
// map topic name to consumer groups
|
|
|
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] |
|
|
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] |
|
|
balancer *pub_balancer.PubBalancer |
|
|
balancer *pub_balancer.PubBalancer |
|
|
FilerClientAccessor *FilerClientAccessor |
|
|
FilerClientAccessor *FilerClientAccessor |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewCoordinator(balancer *pub_balancer.PubBalancer) *Coordinator { |
|
|
|
|
|
return &Coordinator{ |
|
|
|
|
|
|
|
|
func NewCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator { |
|
|
|
|
|
return &SubCoordinator{ |
|
|
TopicSubscribers: cmap.New[*TopicConsumerGroups](), |
|
|
TopicSubscribers: cmap.New[*TopicConsumerGroups](), |
|
|
balancer: balancer, |
|
|
balancer: balancer, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups { |
|
|
|
|
|
|
|
|
func (c *SubCoordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups { |
|
|
topicName := toTopicName(topic) |
|
|
topicName := toTopicName(topic) |
|
|
tcg, _ := c.TopicSubscribers.Get(topicName) |
|
|
tcg, _ := c.TopicSubscribers.Get(topicName) |
|
|
if tcg == nil && createIfMissing { |
|
|
if tcg == nil && createIfMissing { |
|
@ -42,7 +42,7 @@ func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing |
|
|
} |
|
|
} |
|
|
return tcg |
|
|
return tcg |
|
|
} |
|
|
} |
|
|
func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) { |
|
|
|
|
|
|
|
|
func (c *SubCoordinator) RemoveTopic(topic *mq_pb.Topic) { |
|
|
topicName := toTopicName(topic) |
|
|
topicName := toTopicName(topic) |
|
|
c.TopicSubscribers.Remove(topicName) |
|
|
c.TopicSubscribers.Remove(topicName) |
|
|
} |
|
|
} |
|
@ -52,7 +52,7 @@ func toTopicName(topic *mq_pb.Topic) string { |
|
|
return topicName |
|
|
return topicName |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance { |
|
|
|
|
|
|
|
|
func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance { |
|
|
tcg := c.GetTopicConsumerGroups(initMessage.Topic, true) |
|
|
tcg := c.GetTopicConsumerGroups(initMessage.Topic, true) |
|
|
cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) |
|
|
cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) |
|
|
if cg == nil { |
|
|
if cg == nil { |
|
@ -73,7 +73,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato |
|
|
return cgi |
|
|
return cgi |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) { |
|
|
|
|
|
|
|
|
func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) { |
|
|
tcg := c.GetTopicConsumerGroups(initMessage.Topic, false) |
|
|
tcg := c.GetTopicConsumerGroups(initMessage.Topic, false) |
|
|
if tcg == nil { |
|
|
if tcg == nil { |
|
|
return |
|
|
return |
|
@ -92,7 +92,7 @@ func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordin |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) { |
|
|
|
|
|
|
|
|
func (c *SubCoordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) { |
|
|
tcg, _ := c.TopicSubscribers.Get(toTopicName(topic)) |
|
|
tcg, _ := c.TopicSubscribers.Get(toTopicName(topic)) |
|
|
if tcg == nil { |
|
|
if tcg == nil { |
|
|
return |
|
|
return |
|
@ -103,11 +103,11 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// OnSubAddBroker is called when a broker is added to the balancer
|
|
|
// OnSubAddBroker is called when a broker is added to the balancer
|
|
|
func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) { |
|
|
|
|
|
|
|
|
func (c *SubCoordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) { |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// OnSubRemoveBroker is called when a broker is removed from the balancer
|
|
|
// OnSubRemoveBroker is called when a broker is removed from the balancer
|
|
|
func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) { |
|
|
|
|
|
|
|
|
func (c *SubCoordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) { |
|
|
|
|
|
|
|
|
} |
|
|
} |