@ -23,7 +23,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
stream , err := client . SubscriberToSubCoordinator ( ctx )
stream , err := client . SubscriberToSubCoordinator ( ctx )
if err != nil {
if err != nil {
glog . V ( 1 ) . Infof ( "subscriber %s/%s: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
glog . V ( 0 ) . Infof ( "subscriber %s/%s: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
return err
return err
}
}
waitTime = 1 * time . Second
waitTime = 1 * time . Second
@ -42,7 +42,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
} ,
} ,
} ,
} ,
} ) ; err != nil {
} ) ; err != nil {
glog . V ( 1 ) . Infof ( "subscriber %s/%s send init: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
glog . V ( 0 ) . Infof ( "subscriber %s/%s send init: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
return err
return err
}
}
@ -50,7 +50,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
for {
for {
resp , err := stream . Recv ( )
resp , err := stream . Recv ( )
if err != nil {
if err != nil {
glog . V ( 1 ) . Infof ( "subscriber %s/%s receive: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
glog . V ( 0 ) . Infof ( "subscriber %s/%s receive: %v" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , err )
return err
return err
}
}
assignment := resp . GetAssignment ( )
assignment := resp . GetAssignment ( )
@ -63,7 +63,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
return nil
} )
} )
}
}
glog . V ( 4 ) . Infof ( "subscriber %s/%s/%s waiting for more assignments" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
glog . V ( 0 ) . Infof ( "subscriber %s/%s/%s waiting for more assignments" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
if waitTime < 10 * time . Second {
if waitTime < 10 * time . Second {
waitTime += 1 * time . Second
waitTime += 1 * time . Second
}
}
@ -109,11 +109,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
Name : sub . ContentConfig . Topic ,
Name : sub . ContentConfig . Topic ,
} ,
} ,
PartitionOffset : & mq_pb . PartitionOffset {
PartitionOffset : & mq_pb . PartitionOffset {
Partition : & mq_pb . Partition {
RingSize : partition . RingSize ,
RangeStart : partition . RangeStart ,
RangeStop : partition . RangeStop ,
} ,
Partition : partition ,
StartTsNs : sub . alreadyProcessedTsNs ,
StartTsNs : sub . alreadyProcessedTsNs ,
StartType : mq_pb . PartitionOffsetStartType_EARLIEST_IN_MEMORY ,
StartType : mq_pb . PartitionOffsetStartType_EARLIEST_IN_MEMORY ,
} ,
} ,
@ -143,7 +139,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
} ( )
} ( )
for {
for {
glog . V ( 3 ) . Infof ( "subscriber %s/%s/%s waiting for message" , sub . ContentConfig . Namespace , sub . ContentConfig . Topic , sub . SubscriberConfig . ConsumerGroup )
// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp , err := subscribeClient . Recv ( )
resp , err := subscribeClient . Recv ( )
if err != nil {
if err != nil {
return fmt . Errorf ( "subscribe recv: %v" , err )
return fmt . Errorf ( "subscribe recv: %v" , err )
@ -163,6 +159,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return nil
return nil
}
}
case * mq_pb . SubscribeMessageResponse_Ctrl :
case * mq_pb . SubscribeMessageResponse_Ctrl :
// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m . Ctrl . IsEndOfStream || m . Ctrl . IsEndOfTopic {
if m . Ctrl . IsEndOfStream || m . Ctrl . IsEndOfTopic {
return io . EOF
return io . EOF
}
}