diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 3953090f2..fa58fbdff 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -12,6 +12,7 @@ import ( ) func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { + waitTime := 1 * time.Second for { for _, broker := range sub.bootstrapBrokers { // TODO find the balancer @@ -25,6 +26,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { glog.V(1).Infof("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) return err } + waitTime = 1 * time.Second // Maybe later: subscribe to multiple topics instead of just one @@ -61,8 +63,11 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { return nil }) } - print("z") - time.Sleep(3 * time.Second) + glog.V(4).Infof("subscriber %s/%s/%s waiting for more assignments", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + if waitTime < 10*time.Second { + waitTime += 1 * time.Second + } + time.Sleep(waitTime) } } @@ -80,7 +85,7 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo go func(partition *mq_pb.Partition, broker string) { defer wg.Done() defer func() { <-semaphore }() - glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) + glog.V(1).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) sub.onEachPartition(partition, broker) }(assigned.Partition, assigned.Broker) } @@ -117,7 +122,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s return fmt.Errorf("create subscribe client: %v", err) } - fmt.Printf("subscriber %s/%s/%s connected to partition %+v at %v\n", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) + glog.V(1).Infof("subscriber %s/%s/%s connected to partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) if sub.OnCompletionFunc != nil { defer sub.OnCompletionFunc() @@ -134,7 +139,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s }() for { - fmt.Printf("subscriber %s/%s/%s waiting for message\n", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { return fmt.Errorf("subscribe error: %v", err)