|
@ -21,6 +21,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest |
|
|
|
|
|
|
|
|
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) |
|
|
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) |
|
|
|
|
|
|
|
|
|
|
|
waitIntervalCount := 0 |
|
|
var localTopicPartition *topic.LocalPartition |
|
|
var localTopicPartition *topic.LocalPartition |
|
|
for localTopicPartition == nil { |
|
|
for localTopicPartition == nil { |
|
|
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) |
|
|
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) |
|
@ -32,7 +33,11 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest |
|
|
if localTopicPartition != nil { |
|
|
if localTopicPartition != nil { |
|
|
break |
|
|
break |
|
|
} |
|
|
} |
|
|
time.Sleep(337 * time.Millisecond) |
|
|
|
|
|
|
|
|
waitIntervalCount++ |
|
|
|
|
|
if waitIntervalCount > 10 { |
|
|
|
|
|
waitIntervalCount = 10 |
|
|
|
|
|
} |
|
|
|
|
|
time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond) |
|
|
// Check if the client has disconnected by monitoring the context
|
|
|
// Check if the client has disconnected by monitoring the context
|
|
|
select { |
|
|
select { |
|
|
case <-ctx.Done(): |
|
|
case <-ctx.Done(): |
|
|