From 37809822f3b20d8ff5df71bd15f927c6ae96bf8a Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 23:07:36 -0700 Subject: [PATCH] fix: critical bug causing 51% message loss in stateless reads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CRITICAL BUG FIX: ReadMessagesAtOffset was returning error instead of attempting disk I/O when data was flushed from memory, causing massive message loss (6254 out of 12192 messages = 51% loss). Problem: In log_read_stateless.go lines 120-131, when data was flushed to disk (empty previous buffer), the code returned an 'offset out of range' error instead of attempting disk I/O. This caused consumers to skip over flushed data entirely, leading to catastrophic message loss. The bug occurred when: 1. Data was written to LogBuffer 2. Data was flushed to disk due to buffer rotation 3. Consumer requested that offset range 4. Code found offset in expected range but not in memory 5. ❌ Returned error instead of reading from disk Root Cause: Lines 126-131 had early return with error when previous buffer was empty: // Data not in memory - for stateless fetch, we don't do disk I/O return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d out of range...") This comment was incorrect - we DO need disk I/O for flushed data! Fix: 1. Lines 120-132: Changed to fall through to disk read logic instead of returning error when previous buffer is empty 2. Lines 137-177: Enhanced disk read logic to handle TWO cases: - Historical data (offset < bufferStartOffset) - Flushed data (offset >= bufferStartOffset but not in memory) Changes: - Line 121: Log "attempting disk read" instead of breaking - Line 130-132: Fall through to disk read instead of returning error - Line 141: Changed condition from 'if startOffset < bufferStartOffset' to 'if startOffset < currentBufferEnd' to handle both cases - Lines 143-149: Add context-aware logging for both historical and flushed data - Lines 154-159: Add context-aware error messages Expected Results: - Before: 51% message loss (6254/12192 missing) - After: <1% message loss (only from rebalancing, which we already fixed) - Duplicates: Should remain ~47% (from rebalancing, expected until offsets committed) Testing: - ✅ Compiles successfully - Ready for integration testing with standard-test Related Issues: - This explains the massive data loss in recent load tests - Disk I/O fallback was implemented but not reachable due to early return - Disk chunk cache is working but was never being used for flushed data Priority: CRITICAL - Fixes production-breaking data loss bug --- weed/util/log_buffer/log_read_stateless.go | 48 ++++++++++++++-------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index 5979bae1c..e2dd8eee8 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -117,33 +117,46 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages endOfPartition = false // More data might be in current buffer return messages, nextOffset, highWaterMark, endOfPartition, nil } - // Empty previous buffer means data was flushed + // Empty previous buffer means data was flushed to disk - fall through to disk read + glog.V(2).Infof("[StatelessRead] Data at offset %d was flushed, attempting disk read", startOffset) break } } logBuffer.RUnlock() - // Data not in memory - for stateless fetch, we don't do disk I/O to avoid blocking - // Return empty with offset out of range indication - glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), returning empty", - startOffset, bufferStartOffset, currentBufferEnd) - return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d out of range (in-memory: %d-%d)", + // Data not in memory - attempt disk read if configured + // CRITICAL FIX: Don't return error here - data may be on disk! + // Fall through to disk read logic below + glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read", startOffset, bufferStartOffset, currentBufferEnd) + // Don't return error - continue to disk read check below } logBuffer.RUnlock() - // Offset is not in current buffer range - if startOffset < bufferStartOffset { - // Historical data - try to read from disk if ReadFromDiskFn is configured - glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read", - startOffset, bufferStartOffset) + // Data not in memory - try disk read + // This handles two cases: + // 1. startOffset < bufferStartOffset: Historical data + // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above) + if startOffset < currentBufferEnd { + // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured + if startOffset < bufferStartOffset { + glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read", + startOffset, bufferStartOffset) + } else { + glog.V(2).Infof("[StatelessRead] Requested offset %d in range but not in memory (flushed data), attempting disk read", + startOffset) + } // Check if disk read function is configured if logBuffer.ReadFromDiskFn == nil { - glog.V(2).Infof("[StatelessRead] No disk read function configured, returning offset too old error") - return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", - startOffset, bufferStartOffset) + glog.V(2).Infof("[StatelessRead] No disk read function configured, returning error") + if startOffset < bufferStartOffset { + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", + startOffset, bufferStartOffset) + } + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), no disk read available", + startOffset, bufferStartOffset, currentBufferEnd) } // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) @@ -153,8 +166,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages if diskErr != nil { glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr) - return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d): %v", - startOffset, bufferStartOffset, diskErr) + return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr) } glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d", @@ -165,8 +177,8 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil } - // startOffset > currentBufferEnd - future offset, no data available yet - glog.V(4).Infof("[StatelessRead] Future offset %d > buffer end %d, no data available", + // startOffset >= currentBufferEnd - future offset, no data available yet + glog.V(4).Infof("[StatelessRead] Future offset %d >= buffer end %d, no data available", startOffset, currentBufferEnd) return messages, startOffset, highWaterMark, true, nil }