From b1856dfc9ff303a2fb0a4a98abc8d9430326ea5f Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 15:01:58 -0700 Subject: [PATCH] debug: Added detailed parseMessages logging to identify root cause Phase 9: Root Cause Identified - Disk Cache Not Updated on Flush Analysis: - Consumer stops at offset 600/601 (pattern repeats at multiples of ~600) - Buffer state shows: startOffset=601, bufferStart=602 (data flushed!) - Disk read attempts to read offset 601 - Disk cache contains ONLY offsets 0-100 (first flush) - Subsequent flushes (101-150, 151-200, ..., 551-601) NOT in cache Flush logs confirm regular flushes: - offset 51: First flush (0-50) - offset 101: Second flush (51-100) - offset 151, 201, 251, ..., 602: Subsequent flushes - ALL flushes succeed, but cache not updated! ROOT CAUSE: The disk cache (diskChunkCache) is only populated on the FIRST flush. Subsequent flushes write to disk successfully, but the cache is never updated with the new chunk boundaries. When a consumer requests offset 601: 1. Buffer has flushed, so bufferStart=602 2. Code correctly tries disk read 3. Cache has chunk 0-100, returns 'data not on disk' 4. Code returns empty, consumer stalls FIX NEEDED: Update diskChunkCache after EVERY flush, not just first one. OR invalidate cache more aggressively to force fresh reads. Next: Fix diskChunkCache update in flush logic --- weed/util/log_buffer/log_read_stateless.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index 071df3ad3..9c3672215 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -428,6 +428,7 @@ func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, max totalBytes = 0 foundStart := false + messagesInBuffer := 0 for pos := 0; pos+4 < len(buf) && len(messages) < maxMessages && totalBytes < maxBytes; { // Read message size size := util.BytesToUint32(buf[pos : pos+4]) @@ -447,14 +448,18 @@ func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, max continue } + messagesInBuffer++ + // Initialize foundStart from first message if !foundStart { // Find the first message at or after startOffset if logEntry.Offset >= startOffset { + glog.Infof("[parseMessages] Found first message at/after startOffset %d: logEntry.Offset=%d", startOffset, logEntry.Offset) foundStart = true nextOffset = logEntry.Offset } else { // Skip messages before startOffset + glog.V(3).Infof("[parseMessages] Skipping message at offset %d (before startOffset %d)", logEntry.Offset, startOffset) pos += 4 + int(size) continue } @@ -462,6 +467,7 @@ func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, max // Check if this message matches expected offset if foundStart && logEntry.Offset >= startOffset { + glog.V(3).Infof("[parseMessages] Adding message at offset %d (count=%d)", logEntry.Offset, len(messages)+1) messages = append(messages, logEntry) totalBytes += 4 + int(size) nextOffset = logEntry.Offset + 1 @@ -470,6 +476,9 @@ func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, max pos += 4 + int(size) } + glog.Infof("[parseMessages] Parsed buffer: requested startOffset=%d, messagesInBuffer=%d, messagesReturned=%d, nextOffset=%d", + startOffset, messagesInBuffer, len(messages), nextOffset) + glog.V(4).Infof("[parseMessages] Parsed %d messages, nextOffset=%d, totalBytes=%d", len(messages), nextOffset, totalBytes)