diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index c8d1f119d..7644b3604 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -109,11 +109,17 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M return eachMessageFn(logEntry) } + // Wrap eachMessageFn for disk reads to also update activity + eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + p.UpdateActivity() // Track disk read activity for idle cleanup + return eachMessageFn(logEntry) + } + // Always attempt initial disk read for historical data // This is fast if no data on disk, and ensures we don't miss old data // The memory read loop below handles new data with instant notifications glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset) - processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn) if readPersistedLogErr != nil { glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) return readPersistedLogErr @@ -147,7 +153,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M // Read from disk ONCE to catch up, then continue with in-memory buffer if readInMemoryLogErr == log_buffer.ResumeFromDiskError { glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset) - processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn) if readPersistedLogErr != nil { glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr) return readPersistedLogErr