diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 8a6acadb9..1a34d49e7 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -21,6 +21,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + waitIntervalCount := 0 var localTopicPartition *topic.LocalPartition for localTopicPartition == nil { localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) @@ -32,7 +33,11 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest if localTopicPartition != nil { break } - time.Sleep(337 * time.Millisecond) + waitIntervalCount++ + if waitIntervalCount > 10 { + waitIntervalCount = 10 + } + time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond) // Check if the client has disconnected by monitoring the context select { case <-ctx.Done():