From 4e98483661ff939e59a6b419d384e2df369191b6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 12:37:59 -0700 Subject: [PATCH] feat: Add cache invalidation on extraction failure (incomplete fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 6: Disk Read Fix Attempt #1 Added cache invalidation when extraction fails due to offset beyond cached chunk: - extractMessagesFromCache: Returns error when offset beyond cache - readHistoricalDataFromDisk: Invalidates bad cache and retries - invalidateCachedDiskChunk: New function to remove stale cache Problem Discovered: Cache invalidation works, but re-reading returns SAME incomplete data! Example: - Request offset 1764 - Disk read returns 764 messages (1000-1763) - Cache stores 1000-1763 - Request 1764 again → cache invalid → re-read → SAME 764 messages! Root Cause: ReadFromDiskFn (GenLogOnDiskReadFunc) is NOT returning incomplete data The disk files ACTUALLY only contain up to offset 1763 Messages 1764+ are either: 1. Still in memory (not yet flushed) 2. In a different file not being read 3. Lost during flush Test Results: 73.3% delivery (worse than before 87.9%) Cache thrashing causing performance degradation Next: Fix the actual disk read to handle gaps between flushed data and in-memory data --- weed/mq/broker/broker_grpc_fetch.go | 4 +- weed/util/log_buffer/log_read_stateless.go | 57 +++++++++++++++++++--- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/weed/mq/broker/broker_grpc_fetch.go b/weed/mq/broker/broker_grpc_fetch.go index 09503fb3b..19024d852 100644 --- a/weed/mq/broker/broker_grpc_fetch.go +++ b/weed/mq/broker/broker_grpc_fetch.go @@ -107,7 +107,7 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM // Read messages from LogBuffer (stateless read) glog.Infof("[FetchMessage] About to read from LogBuffer: topic=%s partition=%v offset=%d maxMessages=%d maxBytes=%d", t.Name, partition, requestedOffset, maxMessages, maxBytes) - + logEntries, nextOffset, highWaterMark, endOfPartition, err := localPartition.LogBuffer.ReadMessagesAtOffset( requestedOffset, maxMessages, @@ -121,7 +121,7 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM glog.Errorf("[FetchMessage] Details: nextOffset=%d, endOfPartition=%v, bufferStartOffset=%d", nextOffset, endOfPartition, localPartition.LogBuffer.GetLogStartOffset()) } - + glog.Infof("[FetchMessage] Read completed: topic=%s partition=%v offset=%d -> %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v", t.Name, partition, requestedOffset, len(logEntries), nextOffset, highWaterMark, endOfPartition, err) diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index e44664f62..d57ddfd5e 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -144,7 +144,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above) if startOffset < currentBufferEnd { glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ") - + // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured if startOffset < bufferStartOffset { glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d", @@ -166,7 +166,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages } glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...") - + // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) // The ReadFromDiskFn should handle its own timeouts and not block indefinitely diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( @@ -225,7 +225,19 @@ func readHistoricalDataFromDisk( glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d", chunkStartOffset, startOffset, len(cachedMessages)) - return extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) + result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) + + if err != nil { + // Cache extraction failed (likely offset beyond cached chunk) + // Invalidate this cache entry and fall through to read from disk + glog.Errorf("[DiskCache] Cache extraction FAILED for offset %d: %v", startOffset, err) + glog.Infof("[DiskCache] Invalidating cache entry for chunk %d and re-reading from disk", chunkStartOffset) + invalidateCachedDiskChunk(logBuffer, chunkStartOffset) + // Fall through to read from disk with fresh data + } else { + // Success - return cached data + return result, nextOff, nil + } } glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn", @@ -295,6 +307,18 @@ func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_ return nil, false } +// invalidateCachedDiskChunk removes a chunk from the cache +// This is called when cached data is found to be incomplete or incorrect +func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { + delete(logBuffer.diskChunkCache.chunks, chunkStartOffset) + glog.Infof("[DiskCache] Invalidated chunk at offset %d", chunkStartOffset) + } +} + // cacheDiskChunk stores a disk chunk in the cache with LRU eviction func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) { logBuffer.diskChunkCache.mu.Lock() @@ -340,10 +364,29 @@ func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset in positionInChunk := int(startOffset - chunkStartOffset) // Check if requested offset is within the chunk - if positionInChunk < 0 || positionInChunk >= len(chunkMessages) { - glog.V(4).Infof("[DiskCache] Requested offset %d out of chunk range (chunk start: %d, size: %d)", - startOffset, chunkStartOffset, len(chunkMessages)) - return nil, startOffset, nil // Return empty but no error + if positionInChunk < 0 { + glog.Errorf("[DiskCache] CRITICAL: Requested offset %d is BEFORE chunk start %d (positionInChunk=%d < 0)", + startOffset, chunkStartOffset, positionInChunk) + return nil, startOffset, fmt.Errorf("offset %d before chunk start %d", startOffset, chunkStartOffset) + } + + if positionInChunk >= len(chunkMessages) { + // CRITICAL FIX: Requested offset is beyond the cached chunk + // This happens when: + // 1. Chunk only has partial data (e.g., 572 messages instead of 1000) + // 2. Request is for offset beyond what was cached + // + // Solution: Return RETRYABLE ERROR to force a new disk read with correct chunk start + glog.Errorf("[DiskCache] CRITICAL: Requested offset %d is BEYOND cached chunk (chunkStart=%d, cachedSize=%d, positionInChunk=%d)", + startOffset, chunkStartOffset, len(chunkMessages), positionInChunk) + glog.Errorf("[DiskCache] Chunk contains offsets %d-%d, but requested %d", + chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset) + + // Return error to trigger cache invalidation and re-read + // The next read will use a new chunk start aligned to the requested offset + return nil, startOffset, fmt.Errorf("offset %d beyond cached chunk (start=%d, size=%d): cached offsets %d-%d", + startOffset, chunkStartOffset, len(chunkMessages), + chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1) } // Extract messages starting from the requested position