Browse Source

refactor

pull/5890/head
chrislu 8 months ago
parent
commit
2142842f82
  1. 4
      weed/mq/broker/broker_grpc_sub_coordinator.go
  2. 36
      weed/mq/sub_coordinator/coordinator.go

4
weed/mq/broker/broker_grpc_sub_coordinator.go

@ -24,13 +24,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// process init message
initMessage := req.GetInit()
if initMessage != nil {
cgi = b.Coordinator.AddSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
cgi = b.Coordinator.AddSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
} else {
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
}
defer func() {
b.Coordinator.RemoveSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
b.Coordinator.RemoveSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}()

36
weed/mq/sub_coordinator/coordinator.go

@ -51,42 +51,42 @@ func toTopicName(topic *mq_pb.Topic) string {
return topicName
}
func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
tcg := c.GetTopicConsumerGroups(topic, true)
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance {
tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
if cg == nil {
cg = NewConsumerGroup(topic, c.balancer)
if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg) {
cg, _ = tcg.ConsumerGroups.Get(consumerGroup)
cg = NewConsumerGroup(initMessage.Topic, c.balancer)
if !tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg) {
cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
}
}
cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
if cgi == nil {
cgi = NewConsumerGroupInstance(consumerGroupInstance)
if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi) {
cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance)
cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId)
if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) {
cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
}
}
cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
return cgi
}
func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
tcg := c.GetTopicConsumerGroups(topic, false)
func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
tcg := c.GetTopicConsumerGroups(initMessage.Topic, false)
if tcg == nil {
return
}
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
if cg == nil {
return
}
cg.ConsumerGroupInstances.Remove(consumerGroupInstance)
cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic)
cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId)
cg.OnRemoveConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
if cg.ConsumerGroupInstances.Count() == 0 {
tcg.ConsumerGroups.Remove(consumerGroup)
tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup)
}
if tcg.ConsumerGroups.Count() == 0 {
c.RemoveTopic(topic)
c.RemoveTopic(initMessage.Topic)
}
}

Loading…
Cancel
Save