|
|
@ -86,16 +86,19 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo |
|
|
|
defer wg.Done() |
|
|
|
defer func() { <-semaphore }() |
|
|
|
glog.V(1).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) |
|
|
|
sub.onEachPartition(partition, broker) |
|
|
|
err := sub.onEachPartition(partition, broker) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("subscriber %s/%s/%s partition %+v at %v: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err) |
|
|
|
} |
|
|
|
}(assigned.Partition, assigned.Broker) |
|
|
|
} |
|
|
|
|
|
|
|
wg.Wait() |
|
|
|
} |
|
|
|
|
|
|
|
func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) { |
|
|
|
func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error { |
|
|
|
// connect to the partition broker
|
|
|
|
pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { |
|
|
|
return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { |
|
|
|
subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ |
|
|
|
Message: &mq_pb.SubscribeRequest_Init{ |
|
|
|
Init: &mq_pb.SubscribeRequest_InitMessage{ |
|
|
|