diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index 8d2b42033..dea522783 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -135,12 +135,34 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // Offset is not in current buffer range if startOffset < bufferStartOffset { - // Historical data - for stateless fetch, we don't do disk I/O to avoid blocking - // Return empty with offset out of range indication - glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (too old), returning empty", - startOffset, bufferStartOffset) - return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", + // Historical data - try to read from disk if ReadFromDiskFn is configured + glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read", startOffset, bufferStartOffset) + + // Check if disk read function is configured + if logBuffer.ReadFromDiskFn == nil { + glog.V(2).Infof("[StatelessRead] No disk read function configured, returning offset too old error") + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", + startOffset, bufferStartOffset) + } + + // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) + // The ReadFromDiskFn should handle its own timeouts and not block indefinitely + diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( + logBuffer, startOffset, maxMessages, maxBytes, highWaterMark) + + if diskErr != nil { + glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr) + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d): %v", + startOffset, bufferStartOffset, diskErr) + } + + glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d", + len(diskMessages), diskNextOffset) + + // Return disk data + endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages + return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil } // startOffset > currentBufferEnd - future offset, no data available yet @@ -149,6 +171,67 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages return messages, startOffset, highWaterMark, true, nil } +// readHistoricalDataFromDisk reads messages from disk for historical offsets +// This is called when the requested offset is older than what's in memory +func readHistoricalDataFromDisk( + logBuffer *LogBuffer, + startOffset int64, + maxMessages int, + maxBytes int, + highWaterMark int64, +) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + totalBytes := 0 + messageCount := 0 + + // Create a position for the start offset + startPosition := MessagePosition{ + IsOffsetBased: true, + Offset: startOffset, + } + + // Define the callback function to collect messages + eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Check if we've reached the maxMessages or maxBytes limit + if messageCount >= maxMessages { + return true, nil // Done, reached message limit + } + + entrySize := proto.Size(logEntry) + if totalBytes > 0 && totalBytes+entrySize > maxBytes { + return true, nil // Done, would exceed byte limit + } + + // Add this message to the results + messages = append(messages, logEntry) + messageCount++ + totalBytes += entrySize + nextOffset++ + + // Continue reading + return false, nil + } + + // Call the ReadFromDiskFn to read historical data + // This function should be non-blocking and have its own timeout handling + lastPosition, isDone, readErr := logBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + + if readErr != nil { + return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) + } + + // Update nextOffset based on what was actually read + if lastPosition.IsOffsetBased { + nextOffset = lastPosition.Offset + 1 // Next offset to read + } + + glog.V(3).Infof("[DiskRead] Read %d messages from disk (offset %d to %d), isDone=%v", + len(messages), startOffset, nextOffset-1, isDone) + + return messages, nextOffset, nil +} + // parseMessagesFromBuffer parses messages from a buffer byte slice // This is thread-safe as it operates on a copy of the buffer func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) (