Browse Source

feat: add disk I/O fallback for historical offset reads

This commit implements async disk I/O fallback to handle cases where:
1. Data is flushed from memory before consumers can read it (CI issue)
2. Consumers request historical offsets not in memory
3. Small LogBuffer retention in resource-constrained environments

Changes:
- Add readHistoricalDataFromDisk() helper function
- Update ReadMessagesAtOffset() to call ReadFromDiskFn when offset < bufferStartOffset
- Properly handle maxMessages and maxBytes limits during disk reads
- Return appropriate nextOffset after disk reads
- Log disk read operations at V(2) and V(3) levels

Benefits:
- Fixes CI test failures where data is flushed before consumption
- Enables consumers to catch up even if they fall behind memory retention
- No blocking on hot path (disk read only for historical data)
- Respects existing ReadFromDiskFn timeout handling

How it works:
1. Try in-memory read first (fast path)
2. If offset too old and ReadFromDiskFn configured, read from disk
3. Return disk data with proper nextOffset
4. Consumer continues reading seamlessly

This fixes the 'offset 0 too old (earliest in-memory: 5)' error in
TestOffsetManagement where messages were flushed before consumer started.
pull/7329/head
chrislu 5 days ago
parent
commit
c5634470ed
  1. 93
      weed/util/log_buffer/log_read_stateless.go

93
weed/util/log_buffer/log_read_stateless.go

@ -135,12 +135,34 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
// Offset is not in current buffer range
if startOffset < bufferStartOffset {
// Historical data - for stateless fetch, we don't do disk I/O to avoid blocking
// Return empty with offset out of range indication
glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (too old), returning empty",
startOffset, bufferStartOffset)
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)",
// Historical data - try to read from disk if ReadFromDiskFn is configured
glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read",
startOffset, bufferStartOffset)
// Check if disk read function is configured
if logBuffer.ReadFromDiskFn == nil {
glog.V(2).Infof("[StatelessRead] No disk read function configured, returning offset too old error")
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)",
startOffset, bufferStartOffset)
}
// Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented)
// The ReadFromDiskFn should handle its own timeouts and not block indefinitely
diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk(
logBuffer, startOffset, maxMessages, maxBytes, highWaterMark)
if diskErr != nil {
glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr)
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d): %v",
startOffset, bufferStartOffset, diskErr)
}
glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d",
len(diskMessages), diskNextOffset)
// Return disk data
endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages
return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil
}
// startOffset > currentBufferEnd - future offset, no data available yet
@ -149,6 +171,67 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
return messages, startOffset, highWaterMark, true, nil
}
// readHistoricalDataFromDisk reads messages from disk for historical offsets
// This is called when the requested offset is older than what's in memory
func readHistoricalDataFromDisk(
logBuffer *LogBuffer,
startOffset int64,
maxMessages int,
maxBytes int,
highWaterMark int64,
) (messages []*filer_pb.LogEntry, nextOffset int64, err error) {
messages = make([]*filer_pb.LogEntry, 0, maxMessages)
nextOffset = startOffset
totalBytes := 0
messageCount := 0
// Create a position for the start offset
startPosition := MessagePosition{
IsOffsetBased: true,
Offset: startOffset,
}
// Define the callback function to collect messages
eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
// Check if we've reached the maxMessages or maxBytes limit
if messageCount >= maxMessages {
return true, nil // Done, reached message limit
}
entrySize := proto.Size(logEntry)
if totalBytes > 0 && totalBytes+entrySize > maxBytes {
return true, nil // Done, would exceed byte limit
}
// Add this message to the results
messages = append(messages, logEntry)
messageCount++
totalBytes += entrySize
nextOffset++
// Continue reading
return false, nil
}
// Call the ReadFromDiskFn to read historical data
// This function should be non-blocking and have its own timeout handling
lastPosition, isDone, readErr := logBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
if readErr != nil {
return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr)
}
// Update nextOffset based on what was actually read
if lastPosition.IsOffsetBased {
nextOffset = lastPosition.Offset + 1 // Next offset to read
}
glog.V(3).Infof("[DiskRead] Read %d messages from disk (offset %d to %d), isDone=%v",
len(messages), startOffset, nextOffset-1, isDone)
return messages, nextOffset, nil
}
// parseMessagesFromBuffer parses messages from a buffer byte slice
// This is thread-safe as it operates on a copy of the buffer
func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) (

Loading…
Cancel
Save