From c5634470edd356719781aca374fa57a68d23c669 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 21:35:39 -0700 Subject: [PATCH] feat: add disk I/O fallback for historical offset reads This commit implements async disk I/O fallback to handle cases where: 1. Data is flushed from memory before consumers can read it (CI issue) 2. Consumers request historical offsets not in memory 3. Small LogBuffer retention in resource-constrained environments Changes: - Add readHistoricalDataFromDisk() helper function - Update ReadMessagesAtOffset() to call ReadFromDiskFn when offset < bufferStartOffset - Properly handle maxMessages and maxBytes limits during disk reads - Return appropriate nextOffset after disk reads - Log disk read operations at V(2) and V(3) levels Benefits: - Fixes CI test failures where data is flushed before consumption - Enables consumers to catch up even if they fall behind memory retention - No blocking on hot path (disk read only for historical data) - Respects existing ReadFromDiskFn timeout handling How it works: 1. Try in-memory read first (fast path) 2. If offset too old and ReadFromDiskFn configured, read from disk 3. Return disk data with proper nextOffset 4. Consumer continues reading seamlessly This fixes the 'offset 0 too old (earliest in-memory: 5)' error in TestOffsetManagement where messages were flushed before consumer started. --- weed/util/log_buffer/log_read_stateless.go | 93 ++++++++++++++++++++-- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index 8d2b42033..dea522783 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -135,12 +135,34 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // Offset is not in current buffer range if startOffset < bufferStartOffset { - // Historical data - for stateless fetch, we don't do disk I/O to avoid blocking - // Return empty with offset out of range indication - glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (too old), returning empty", - startOffset, bufferStartOffset) - return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", + // Historical data - try to read from disk if ReadFromDiskFn is configured + glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read", startOffset, bufferStartOffset) + + // Check if disk read function is configured + if logBuffer.ReadFromDiskFn == nil { + glog.V(2).Infof("[StatelessRead] No disk read function configured, returning offset too old error") + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", + startOffset, bufferStartOffset) + } + + // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) + // The ReadFromDiskFn should handle its own timeouts and not block indefinitely + diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( + logBuffer, startOffset, maxMessages, maxBytes, highWaterMark) + + if diskErr != nil { + glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr) + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d): %v", + startOffset, bufferStartOffset, diskErr) + } + + glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d", + len(diskMessages), diskNextOffset) + + // Return disk data + endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages + return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil } // startOffset > currentBufferEnd - future offset, no data available yet @@ -149,6 +171,67 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages return messages, startOffset, highWaterMark, true, nil } +// readHistoricalDataFromDisk reads messages from disk for historical offsets +// This is called when the requested offset is older than what's in memory +func readHistoricalDataFromDisk( + logBuffer *LogBuffer, + startOffset int64, + maxMessages int, + maxBytes int, + highWaterMark int64, +) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + totalBytes := 0 + messageCount := 0 + + // Create a position for the start offset + startPosition := MessagePosition{ + IsOffsetBased: true, + Offset: startOffset, + } + + // Define the callback function to collect messages + eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Check if we've reached the maxMessages or maxBytes limit + if messageCount >= maxMessages { + return true, nil // Done, reached message limit + } + + entrySize := proto.Size(logEntry) + if totalBytes > 0 && totalBytes+entrySize > maxBytes { + return true, nil // Done, would exceed byte limit + } + + // Add this message to the results + messages = append(messages, logEntry) + messageCount++ + totalBytes += entrySize + nextOffset++ + + // Continue reading + return false, nil + } + + // Call the ReadFromDiskFn to read historical data + // This function should be non-blocking and have its own timeout handling + lastPosition, isDone, readErr := logBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + + if readErr != nil { + return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) + } + + // Update nextOffset based on what was actually read + if lastPosition.IsOffsetBased { + nextOffset = lastPosition.Offset + 1 // Next offset to read + } + + glog.V(3).Infof("[DiskRead] Read %d messages from disk (offset %d to %d), isDone=%v", + len(messages), startOffset, nextOffset-1, isDone) + + return messages, nextOffset, nil +} + // parseMessagesFromBuffer parses messages from a buffer byte slice // This is thread-safe as it operates on a copy of the buffer func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) (