Browse Source

Activity Tracking for Disk Reads

pull/7481/head
chrislu 2 weeks ago
parent
commit
f5c9427b7a
  1. 10
      weed/mq/topic/local_partition.go

10
weed/mq/topic/local_partition.go

@ -109,11 +109,17 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
return eachMessageFn(logEntry) return eachMessageFn(logEntry)
} }
// Wrap eachMessageFn for disk reads to also update activity
eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
p.UpdateActivity() // Track disk read activity for idle cleanup
return eachMessageFn(logEntry)
}
// Always attempt initial disk read for historical data // Always attempt initial disk read for historical data
// This is fast if no data on disk, and ensures we don't miss old data // This is fast if no data on disk, and ensures we don't miss old data
// The memory read loop below handles new data with instant notifications // The memory read loop below handles new data with instant notifications
glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset) glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset)
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil { if readPersistedLogErr != nil {
glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr return readPersistedLogErr
@ -147,7 +153,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
// Read from disk ONCE to catch up, then continue with in-memory buffer // Read from disk ONCE to catch up, then continue with in-memory buffer
if readInMemoryLogErr == log_buffer.ResumeFromDiskError { if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset) glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset)
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil { if readPersistedLogErr != nil {
glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr) glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr return readPersistedLogErr

Loading…
Cancel
Save