@ -88,7 +88,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
func ( sub * TopicSubscriber ) onEachPartition ( assigned * mq_pb . BrokerPartitionAssignment ) error {
func ( sub * TopicSubscriber ) onEachPartition ( assigned * mq_pb . BrokerPartitionAssignment ) error {
// connect to the partition broker
// connect to the partition broker
return pb . WithBrokerGrpcClient ( true , assigned . LeaderBroker , sub . SubscriberConfig . GrpcDialOption , func ( client mq_pb . SeaweedMessagingClient ) error {
return pb . WithBrokerGrpcClient ( true , assigned . LeaderBroker , sub . SubscriberConfig . GrpcDialOption , func ( client mq_pb . SeaweedMessagingClient ) error {
subscribeClient , err := client . SubscribeMessage ( context . Background ( ) , & mq_pb . SubscribeMessageRequest {
subscribeClient , err := client . SubscribeMessage ( context . Background ( ) )
if err != nil {
return fmt . Errorf ( "create subscribe client: %v" , err )
}
if err = subscribeClient . Send ( & mq_pb . SubscribeMessageRequest {
Message : & mq_pb . SubscribeMessageRequest_Init {
Message : & mq_pb . SubscribeMessageRequest_Init {
Init : & mq_pb . SubscribeMessageRequest_InitMessage {
Init : & mq_pb . SubscribeMessageRequest_InitMessage {
ConsumerGroup : sub . SubscriberConfig . ConsumerGroup ,
ConsumerGroup : sub . SubscriberConfig . ConsumerGroup ,
@ -103,25 +109,32 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
FollowerBrokers : assigned . FollowerBrokers ,
FollowerBrokers : assigned . FollowerBrokers ,
} ,
} ,
} ,
} ,
} )
if err != nil {
return fmt . Errorf ( "create subscribe client: %v" , err )
} ) ; err != nil {
glog . V ( 0 ) . Infof ( "subscriber %s connected to partition %+v at %v: %v" , sub . ContentConfig . Topic , assigned . Partition , assigned . LeaderBroker , err )
}
}
glog . V ( 0 ) . Infof ( "subscriber %s/%s connected to partition %+v at %v" , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , assigned . Partition , assigned . LeaderBroker )
glog . V ( 0 ) . Infof ( "subscriber %s connected to partition %+v at %v" , sub . ContentConfig . Topic , assigned . Partition , assigned . LeaderBroker )
if sub . OnCompletionFunc != nil {
if sub . OnCompletionFunc != nil {
defer sub . OnCompletionFunc ( )
defer sub . OnCompletionFunc ( )
}
}
partitionOffsetChan := make ( chan int64 , 1024 )
defer func ( ) {
defer func ( ) {
subscribeClient . SendMsg ( & mq_pb . SubscribeMessageRequest {
Message : & mq_pb . SubscribeMessageRequest_Ack {
Ack : & mq_pb . SubscribeMessageRequest_AckMessage {
Sequence : 0 ,
close ( partitionOffsetChan )
} ( )
go func ( ) {
for ack := range partitionOffsetChan {
subscribeClient . SendMsg ( & mq_pb . SubscribeMessageRequest {
Message : & mq_pb . SubscribeMessageRequest_Ack {
Ack : & mq_pb . SubscribeMessageRequest_AckMessage {
Sequence : ack ,
} ,
} ,
} ,
} ,
} )
} )
}
subscribeClient . CloseSend ( )
subscribeClient . CloseSend ( )
} ( )
} ( )
@ -142,6 +155,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
return fmt . Errorf ( "process error: %v" , processErr )
return fmt . Errorf ( "process error: %v" , processErr )
}
}
sub . alreadyProcessedTsNs = m . Data . TsNs
sub . alreadyProcessedTsNs = m . Data . TsNs
partitionOffsetChan <- m . Data . TsNs
if ! shouldContinue {
if ! shouldContinue {
return nil
return nil
}
}