|
|
@ -89,12 +89,16 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
if processedPosition.Time.UnixNano() != 0 { |
|
|
|
startPosition = processedPosition |
|
|
|
} |
|
|
|
processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn) |
|
|
|
if isDone { |
|
|
|
return nil |
|
|
|
} |
|
|
|
if processedPosition.Time.UnixNano() != 0 { |
|
|
|
startPosition = processedPosition |
|
|
|
} |
|
|
|
|
|
|
|
if readInMemoryLogErr == log_buffer.ResumeFromDiskError { |
|
|
|
continue |
|
|
|