|
|
@ -11,8 +11,8 @@ import ( |
|
|
|
|
|
|
|
func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error { |
|
|
|
|
|
|
|
localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic), |
|
|
|
topic.FromPbPartition(req.Cursor.Partition)) |
|
|
|
localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.GetConsumer().Topic), |
|
|
|
topic.FromPbPartition(req.GetConsumer().Partition)) |
|
|
|
if localTopicPartition == nil { |
|
|
|
stream.Send(&mq_pb.SubscribeResponse{ |
|
|
|
Message: &mq_pb.SubscribeResponse_Ctrl{ |
|
|
@ -24,7 +24,7 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
clientName := fmt.Sprintf("%s/%s-%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId, req.Consumer.ClientId) |
|
|
|
clientName := fmt.Sprintf("%s/%s-%s", req.GetConsumer().ConsumerGroup, req.GetConsumer().ConsumerId, req.GetConsumer().ClientId) |
|
|
|
|
|
|
|
localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error { |
|
|
|
value := logEntry.GetData() |
|
|
|