@ -15,15 +15,30 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
waitTime := 1 * time . Second
waitTime := 1 * time . Second
for {
for {
for _ , broker := range sub . bootstrapBrokers {
for _ , broker := range sub . bootstrapBrokers {
// TODO find the balancer
// lookup topic brokers
var brokerLeader string
err := pb . WithBrokerGrpcClient ( false , broker , sub . SubscriberConfig . GrpcDialOption , func ( client mq_pb . SeaweedMessagingClient ) error {
resp , err := client . FindBrokerLeader ( context . Background ( ) , & mq_pb . FindBrokerLeaderRequest { } )
if err != nil {
return err
}
brokerLeader = resp . Broker
return nil
} )
if err != nil {
glog . V ( 0 ) . Infof ( "broker coordinator on %s: %v" , broker , err )
continue
}
glog . V ( 0 ) . Infof ( "found broker coordinator: %v" , brokerLeader )
// connect to the balancer
// connect to the balancer
pb . WithBrokerGrpcClient ( true , broker , sub . SubscriberConfig . GrpcDialOption , func ( client mq_pb . SeaweedMessagingClient ) error {
pb . WithBrokerGrpcClient ( true , brokerLeader , sub . SubscriberConfig . GrpcDialOption , func ( client mq_pb . SeaweedMessagingClient ) error {
ctx , cancel := context . WithCancel ( context . Background ( ) )
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
defer cancel ( )
stream , err := client . SubscriberToSubCoordinator ( ctx )
stream , err := client . SubscriberToSubCoordinator ( ctx )
if err != nil {
if err != nil {
glog . V ( 0 ) . Infof ( "subscriber %s/%s: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
glog . V ( 0 ) . Infof ( "subscriber %s: %v" , sub . ContentConfig . Topic , err )
return err
return err
}
}
waitTime = 1 * time . Second
waitTime = 1 * time . Second
@ -35,14 +50,11 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
Init : & mq_pb . SubscriberToSubCoordinatorRequest_InitMessage {
Init : & mq_pb . SubscriberToSubCoordinatorRequest_InitMessage {
ConsumerGroup : sub . SubscriberConfig . ConsumerGroup ,
ConsumerGroup : sub . SubscriberConfig . ConsumerGroup ,
ConsumerGroupInstanceId : sub . SubscriberConfig . ConsumerGroupInstanceId ,
ConsumerGroupInstanceId : sub . SubscriberConfig . ConsumerGroupInstanceId ,
Topic : & mq_pb . Topic {
Namespace : sub . ContentConfig . Namespace ,
Name : sub . ContentConfig . Topic ,
} ,
Topic : sub . ContentConfig . Topic . ToPbTopic ( ) ,
} ,
} ,
} ,
} ,
} ) ; err != nil {
} ) ; err != nil {
glog . V ( 0 ) . Infof ( "subscriber %s/%s send init: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
glog . V ( 0 ) . Infof ( "subscriber %s send init: %v" , sub . ContentConfig . Topic , err )
return err
return err
}
}
@ -50,12 +62,12 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
for {
for {
resp , err := stream . Recv ( )
resp , err := stream . Recv ( )
if err != nil {
if err != nil {
glog . V ( 0 ) . Infof ( "subscriber %s/%s receive: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
glog . V ( 0 ) . Infof ( "subscriber %s receive: %v" , sub . ContentConfig . Topic , err )
return err
return err
}
}
assignment := resp . GetAssignment ( )
assignment := resp . GetAssignment ( )
if assignment != nil {
if assignment != nil {
glog . V ( 0 ) . Infof ( "subscriber %s/%s receive assignment: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , assignment )
glog . V ( 0 ) . Infof ( "subscriber %s receive assignment: %v" , sub . ContentConfig . Topic , assignment )
}
}
sub . onEachAssignment ( assignment )
sub . onEachAssignment ( assignment )
}
}
@ -63,7 +75,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
return nil
} )
} )
}
}
glog . V ( 0 ) . Infof ( "subscriber %s/%s/%s waiting for more assignments" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
glog . V ( 0 ) . Infof ( "subscriber %s/%s waiting for more assignments" , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
if waitTime < 10 * time . Second {
if waitTime < 10 * time . Second {
waitTime += 1 * time . Second
waitTime += 1 * time . Second
}
}
@ -85,10 +97,10 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
go func ( partition * mq_pb . Partition , broker string ) {
go func ( partition * mq_pb . Partition , broker string ) {
defer wg . Done ( )
defer wg . Done ( )
defer func ( ) { <- semaphore } ( )
defer func ( ) { <- semaphore } ( )
glog . V ( 0 ) . Infof ( "subscriber %s/%s/%s assigned partition %+v at %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
glog . V ( 0 ) . Infof ( "subscriber %s/%s assigned partition %+v at %v" , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
err := sub . onEachPartition ( partition , broker )
err := sub . onEachPartition ( partition , broker )
if err != nil {
if err != nil {
glog . V ( 0 ) . Infof ( "subscriber %s/%s/%s partition %+v at %v: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker , err )
glog . V ( 0 ) . Infof ( "subscriber %s/%s partition %+v at %v: %v" , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker , err )
}
}
} ( assigned . Partition , assigned . Broker )
} ( assigned . Partition , assigned . Broker )
}
}
@ -104,10 +116,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
Init : & mq_pb . SubscribeMessageRequest_InitMessage {
Init : & mq_pb . SubscribeMessageRequest_InitMessage {
ConsumerGroup : sub . SubscriberConfig . ConsumerGroup ,
ConsumerGroup : sub . SubscriberConfig . ConsumerGroup ,
ConsumerId : sub . SubscriberConfig . ConsumerGroupInstanceId ,
ConsumerId : sub . SubscriberConfig . ConsumerGroupInstanceId ,
Topic : & mq_pb . Topic {
Namespace : sub . ContentConfig . Namespace ,
Name : sub . ContentConfig . Topic ,
} ,
Topic : sub . ContentConfig . Topic . ToPbTopic ( ) ,
PartitionOffset : & mq_pb . PartitionOffset {
PartitionOffset : & mq_pb . PartitionOffset {
Partition : partition ,
Partition : partition ,
StartTsNs : sub . alreadyProcessedTsNs ,
StartTsNs : sub . alreadyProcessedTsNs ,
@ -122,7 +131,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt . Errorf ( "create subscribe client: %v" , err )
return fmt . Errorf ( "create subscribe client: %v" , err )
}
}
glog . V ( 0 ) . Infof ( "subscriber %s/%s/%s connected to partition %+v at %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
glog . V ( 0 ) . Infof ( "subscriber %s/%s connected to partition %+v at %v" , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
if sub . OnCompletionFunc != nil {
if sub . OnCompletionFunc != nil {
defer sub . OnCompletionFunc ( )
defer sub . OnCompletionFunc ( )
@ -145,7 +154,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt . Errorf ( "subscribe recv: %v" , err )
return fmt . Errorf ( "subscribe recv: %v" , err )
}
}
if resp . Message == nil {
if resp . Message == nil {
glog . V ( 0 ) . Infof ( "subscriber %s/%s/%s received nil message" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
glog . V ( 0 ) . Infof ( "subscriber %s/%s received nil message" , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
continue
continue
}
}
switch m := resp . Message . ( type ) {
switch m := resp . Message . ( type ) {