From 221b3525934e7eb4846d5e91f529ba4424e78ffd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 14 Dec 2025 21:52:50 -0800 Subject: [PATCH] fix: handle ResumeFromDiskError gracefully in LoopProcessLogData (#7753) When ReadFromBuffer returns ResumeFromDiskError, the function now: - Attempts to read from disk if ReadFromDiskFn is available - Checks if the client is still connected via waitForDataFn - Waits for notification or short timeout before retrying - Continues the loop instead of immediately returning the error This fixes TestNewLogBufferFirstBuffer which was failing because the function returned too early before data was available in the buffer. --- weed/util/log_buffer/log_read.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 0a2b8e89a..e47034d87 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -75,8 +75,36 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition } bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition) if err == ResumeFromDiskError { - time.Sleep(1127 * time.Millisecond) - return lastReadPosition, isDone, ResumeFromDiskError + // Try to read from disk if readFromDiskFn is available + if logBuffer.ReadFromDiskFn != nil { + lastReadPosition, isDone, err = logBuffer.ReadFromDiskFn(lastReadPosition, stopTsNs, eachLogDataFn) + if err != nil { + return lastReadPosition, isDone, err + } + if isDone { + return lastReadPosition, isDone, nil + } + } + + // CRITICAL: Check if client is still connected + if !waitForDataFn() { + // Client disconnected - exit cleanly + glog.V(4).Infof("%s: Client disconnected after disk read attempt", readerName) + return lastReadPosition, true, nil + } + + // Wait for notification or timeout (instant wake-up when data arrives) + select { + case <-notifyChan: + // New data available, retry immediately + glog.V(3).Infof("%s: Woke up from notification after ResumeFromDiskError", readerName) + case <-time.After(10 * time.Millisecond): + // Timeout, retry anyway (fallback for edge cases) + glog.V(4).Infof("%s: Notification timeout after ResumeFromDiskError, polling", readerName) + } + + // Continue to next iteration (don't return ResumeFromDiskError) + continue } if err != nil { // Check for buffer corruption error