diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 3fd97f1c2..6925baa9e 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -24,13 +24,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess // process init message initMessage := req.GetInit() if initMessage != nil { - cgi = b.Coordinator.AddSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) + cgi = b.Coordinator.AddSubscriber(initMessage) glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) } else { return status.Errorf(codes.InvalidArgument, "subscriber init message is empty") } defer func() { - b.Coordinator.RemoveSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) + b.Coordinator.RemoveSubscriber(initMessage) glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) }() diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index bb50991ab..d128310d1 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -51,42 +51,42 @@ func toTopicName(topic *mq_pb.Topic) string { return topicName } -func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance { - tcg := c.GetTopicConsumerGroups(topic, true) - cg, _ := tcg.ConsumerGroups.Get(consumerGroup) +func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance { + tcg := c.GetTopicConsumerGroups(initMessage.Topic, true) + cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) if cg == nil { - cg = NewConsumerGroup(topic, c.balancer) - if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg) { - cg, _ = tcg.ConsumerGroups.Get(consumerGroup) + cg = NewConsumerGroup(initMessage.Topic, c.balancer) + if !tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg) { + cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) } } - cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance) + cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId) if cgi == nil { - cgi = NewConsumerGroupInstance(consumerGroupInstance) - if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi) { - cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance) + cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId) + if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) { + cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId) } } - cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic) + cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic) return cgi } -func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) { - tcg := c.GetTopicConsumerGroups(topic, false) +func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) { + tcg := c.GetTopicConsumerGroups(initMessage.Topic, false) if tcg == nil { return } - cg, _ := tcg.ConsumerGroups.Get(consumerGroup) + cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) if cg == nil { return } - cg.ConsumerGroupInstances.Remove(consumerGroupInstance) - cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic) + cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId) + cg.OnRemoveConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic) if cg.ConsumerGroupInstances.Count() == 0 { - tcg.ConsumerGroups.Remove(consumerGroup) + tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup) } if tcg.ConsumerGroups.Count() == 0 { - c.RemoveTopic(topic) + c.RemoveTopic(initMessage.Topic) } }