@ -12,6 +12,7 @@ import (
)
)
func ( sub * TopicSubscriber ) doKeepConnectedToSubCoordinator ( ) {
func ( sub * TopicSubscriber ) doKeepConnectedToSubCoordinator ( ) {
waitTime := 1 * time . Second
for {
for {
for _ , broker := range sub . bootstrapBrokers {
for _ , broker := range sub . bootstrapBrokers {
// TODO find the balancer
// TODO find the balancer
@ -25,6 +26,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
glog . V ( 1 ) . Infof ( "subscriber %s/%s: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
glog . V ( 1 ) . Infof ( "subscriber %s/%s: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
return err
return err
}
}
waitTime = 1 * time . Second
// Maybe later: subscribe to multiple topics instead of just one
// Maybe later: subscribe to multiple topics instead of just one
@ -61,8 +63,11 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
return nil
} )
} )
}
}
print ( "z" )
time . Sleep ( 3 * time . Second )
glog . V ( 4 ) . Infof ( "subscriber %s/%s/%s waiting for more assignments" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
if waitTime < 10 * time . Second {
waitTime += 1 * time . Second
}
time . Sleep ( waitTime )
}
}
}
}
@ -80,7 +85,7 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
go func ( partition * mq_pb . Partition , broker string ) {
go func ( partition * mq_pb . Partition , broker string ) {
defer wg . Done ( )
defer wg . Done ( )
defer func ( ) { <- semaphore } ( )
defer func ( ) { <- semaphore } ( )
glog . V ( 0 ) . Infof ( "subscriber %s/%s/%s assigned partition %+v at %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
glog . V ( 1 ) . Infof ( "subscriber %s/%s/%s assigned partition %+v at %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
sub . onEachPartition ( partition , broker )
sub . onEachPartition ( partition , broker )
} ( assigned . Partition , assigned . Broker )
} ( assigned . Partition , assigned . Broker )
}
}
@ -117,7 +122,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt . Errorf ( "create subscribe client: %v" , err )
return fmt . Errorf ( "create subscribe client: %v" , err )
}
}
fmt . Print f( "subscriber %s/%s/%s connected to partition %+v at %v\n " , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
glog . V ( 1 ) . Info f( "subscriber %s/%s/%s connected to partition %+v at %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup , partition , broker )
if sub . OnCompletionFunc != nil {
if sub . OnCompletionFunc != nil {
defer sub . OnCompletionFunc ( )
defer sub . OnCompletionFunc ( )
@ -134,7 +139,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
} ( )
} ( )
for {
for {
fmt . Print f( "subscriber %s/%s/%s waiting for message\n " , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
glog . V ( 3 ) . Info f( "subscriber %s/%s/%s waiting for message" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
resp , err := subscribeClient . Recv ( )
resp , err := subscribeClient . Recv ( )
if err != nil {
if err != nil {
return fmt . Errorf ( "subscribe error: %v" , err )
return fmt . Errorf ( "subscribe error: %v" , err )