diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index 9c3672215..b57f7742f 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -134,9 +134,43 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read", startOffset, bufferStartOffset, currentBufferEnd) // Don't return error - continue to disk read check below + } else { + // Offset is not in current buffer - check previous buffers FIRST before going to disk + // This handles the case where data was just flushed but is still in prevBuffers + glog.Infof("[StatelessRead] PATH: Offset %d not in current buffer [%d-%d), checking previous buffers first", + startOffset, bufferStartOffset, currentBufferEnd) + + for _, prevBuf := range logBuffer.prevBuffers.buffers { + if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset { + if prevBuf.size > 0 { + // Found in previous buffer! + bufCopy := make([]byte, prevBuf.size) + copy(bufCopy, prevBuf.buf[:prevBuf.size]) + logBuffer.RUnlock() + + messages, nextOffset, _, err = parseMessagesFromBuffer( + bufCopy, startOffset, maxMessages, maxBytes) + + if err != nil { + return nil, startOffset, highWaterMark, false, err + } + + glog.Infof("[StatelessRead] SUCCESS: Found %d messages in previous buffer, nextOffset=%d", + len(messages), nextOffset) + + endOfPartition = false // More data might exist + return messages, nextOffset, highWaterMark, endOfPartition, nil + } + // Empty previous buffer - data was flushed to disk + glog.V(2).Infof("[StatelessRead] Found empty previous buffer for offset %d, will try disk", startOffset) + break + } + } + logBuffer.RUnlock() } - logBuffer.RUnlock() + // If we get here, unlock if not already unlocked + // (Note: logBuffer.RUnlock() was called above in all paths) // Data not in memory - try disk read // This handles two cases: @@ -226,7 +260,7 @@ func readHistoricalDataFromDisk( chunkStartOffset, startOffset, len(cachedMessages)) result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) - + if err != nil { // CRITICAL: Cache extraction failed because requested offset is BEYOND cached chunk // This means disk files only contain partial data (e.g., 1000-1763) and the @@ -235,14 +269,14 @@ func readHistoricalDataFromDisk( // SOLUTION: Return empty result with NO ERROR to let ReadMessagesAtOffset // continue to check memory buffers. The data might be in memory even though // it's not on disk. - glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)", + glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)", startOffset, chunkStartOffset, len(cachedMessages)) glog.Infof("[DiskCache] Returning empty to let memory buffers handle offset %d", startOffset) - + // Return empty but NO ERROR - this signals "not on disk, try memory" return nil, startOffset, nil } - + // Success - return cached data return result, nextOff, nil } @@ -385,7 +419,7 @@ func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset in startOffset, chunkStartOffset, len(chunkMessages), positionInChunk) glog.Infof("[DiskCache] Chunk contains offsets %d-%d, requested %d - data not on disk", chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset) - + // Return empty (data not on disk) - caller will check memory buffers return nil, startOffset, nil }