|
@ -175,12 +175,10 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes |
|
|
atomic.StoreInt32(&localTopicPartition.FollowerId, followerId) |
|
|
atomic.StoreInt32(&localTopicPartition.FollowerId, followerId) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition) |
|
|
glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition) |
|
|
isConnected := true |
|
|
|
|
|
sleepIntervalCount := 0 |
|
|
sleepIntervalCount := 0 |
|
|
|
|
|
|
|
|
var counter int64 |
|
|
var counter int64 |
|
|
defer func() { |
|
|
defer func() { |
|
|
isConnected = false |
|
|
|
|
|
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter) |
|
|
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter) |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
@ -200,9 +198,6 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes |
|
|
var prevFlushTsNs int64 |
|
|
var prevFlushTsNs int64 |
|
|
|
|
|
|
|
|
_, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { |
|
|
_, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { |
|
|
if !isConnected { |
|
|
|
|
|
return false |
|
|
|
|
|
} |
|
|
|
|
|
sleepIntervalCount++ |
|
|
sleepIntervalCount++ |
|
|
if sleepIntervalCount > 32 { |
|
|
if sleepIntervalCount > 32 { |
|
|
sleepIntervalCount = 32 |
|
|
sleepIntervalCount = 32 |
|
|