|
|
|
@ -183,8 +183,14 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M |
|
|
|
} |
|
|
|
|
|
|
|
// Original timestamp-based subscription logic
|
|
|
|
// Wrap eachMessageFn for disk reads to also update activity
|
|
|
|
eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) { |
|
|
|
p.UpdateActivity() // Track disk read activity for idle cleanup
|
|
|
|
return eachMessageFn(logEntry) |
|
|
|
} |
|
|
|
|
|
|
|
for { |
|
|
|
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) |
|
|
|
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn) |
|
|
|
if readPersistedLogErr != nil { |
|
|
|
glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) |
|
|
|
return readPersistedLogErr |
|
|
|
|