diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 0a2b8e89a..e47034d87 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -75,8 +75,36 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition } bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition) if err == ResumeFromDiskError { - time.Sleep(1127 * time.Millisecond) - return lastReadPosition, isDone, ResumeFromDiskError + // Try to read from disk if readFromDiskFn is available + if logBuffer.ReadFromDiskFn != nil { + lastReadPosition, isDone, err = logBuffer.ReadFromDiskFn(lastReadPosition, stopTsNs, eachLogDataFn) + if err != nil { + return lastReadPosition, isDone, err + } + if isDone { + return lastReadPosition, isDone, nil + } + } + + // CRITICAL: Check if client is still connected + if !waitForDataFn() { + // Client disconnected - exit cleanly + glog.V(4).Infof("%s: Client disconnected after disk read attempt", readerName) + return lastReadPosition, true, nil + } + + // Wait for notification or timeout (instant wake-up when data arrives) + select { + case <-notifyChan: + // New data available, retry immediately + glog.V(3).Infof("%s: Woke up from notification after ResumeFromDiskError", readerName) + case <-time.After(10 * time.Millisecond): + // Timeout, retry anyway (fallback for edge cases) + glog.V(4).Infof("%s: Notification timeout after ResumeFromDiskError, polling", readerName) + } + + // Continue to next iteration (don't return ResumeFromDiskError) + continue } if err != nil { // Check for buffer corruption error