|
|
@ -99,9 +99,9 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo |
|
|
|
func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error { |
|
|
|
// connect to the partition broker
|
|
|
|
return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { |
|
|
|
subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ |
|
|
|
Message: &mq_pb.SubscribeRequest_Init{ |
|
|
|
Init: &mq_pb.SubscribeRequest_InitMessage{ |
|
|
|
subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{ |
|
|
|
Message: &mq_pb.SubscribeMessageRequest_Init{ |
|
|
|
Init: &mq_pb.SubscribeMessageRequest_InitMessage{ |
|
|
|
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, |
|
|
|
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, |
|
|
|
Topic: &mq_pb.Topic{ |
|
|
@ -114,7 +114,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s |
|
|
|
RangeStop: partition.RangeStop, |
|
|
|
}, |
|
|
|
Filter: sub.ContentConfig.Filter, |
|
|
|
Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{ |
|
|
|
Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{ |
|
|
|
StartTimestampNs: sub.alreadyProcessedTsNs, |
|
|
|
}, |
|
|
|
}, |
|
|
|