From 6af1dfcb619de2960d8a4feec83e9d21a10f1c79 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 15:04:20 -0700 Subject: [PATCH] fix: Invalidate disk cache after buffer flush to prevent stale data Phase 9: ROOT CAUSE FIXED - Stale Disk Cache After Flush Problem: Consumer stops at offset 600/601 because disk cache contains stale data from the first disk read (only offsets 0-100). Timeline of the Bug: 1. Producer starts, flushes messages 0-50, then 51-100 to disk 2. Consumer requests offset 601 (not yet produced) 3. Code aligns to chunk 0, reads from disk 4. Disk has 0-100 (only 2 files flushed so far) 5. Cache stores chunk 0 = [0-100] (101 messages) 6. Producer continues, flushes 101-150, 151-200, ..., up to 600+ 7. Consumer retries offset 601 8. Cache HIT on chunk 0, returns [0-100] 9. extractMessagesFromCache says 'offset 601 beyond chunk' 10. Returns empty, consumer stalls forever! Root Cause: DiskChunkCache is populated on first read and NEVER invalidated. Even after new data is flushed to disk, the cache still contains old data from the initial read. The cache has no TTL, no invalidation on flush, nothing! Fix: Added invalidateAllDiskCacheChunks() in copyToFlushInternal() to clear ALL cached chunks after every buffer flush. This ensures consumers always read fresh data from disk after a flush, preventing the stale cache bug. Expected Result: - 100% message delivery (no loss!) - 0 duplicates - Consumers can read all messages from 0 to HWM --- weed/util/log_buffer/log_buffer.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 4795227fa..50449db31 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -560,11 +560,29 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush logBuffer.hasOffsets = false logBuffer.minOffset = 0 logBuffer.maxOffset = 0 + + // CRITICAL FIX: Invalidate disk cache chunks after flush + // The cache may contain stale data from before this flush + // Invalidating ensures consumers will re-read fresh data from disk after flush + logBuffer.invalidateAllDiskCacheChunks() + return d } return nil } +// invalidateAllDiskCacheChunks clears all cached disk chunks +// This should be called after a buffer flush to ensure consumers read fresh data from disk +func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + if len(logBuffer.diskChunkCache.chunks) > 0 { + glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks)) + logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk) + } +} + func (logBuffer *LogBuffer) GetEarliestTime() time.Time { return logBuffer.startTime }