diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index dea522783..2769bc2e8 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -138,28 +138,28 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // Historical data - try to read from disk if ReadFromDiskFn is configured glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read", startOffset, bufferStartOffset) - + // Check if disk read function is configured if logBuffer.ReadFromDiskFn == nil { glog.V(2).Infof("[StatelessRead] No disk read function configured, returning offset too old error") return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", startOffset, bufferStartOffset) } - + // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) // The ReadFromDiskFn should handle its own timeouts and not block indefinitely diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( logBuffer, startOffset, maxMessages, maxBytes, highWaterMark) - + if diskErr != nil { glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr) return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d): %v", startOffset, bufferStartOffset, diskErr) } - + glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d", len(diskMessages), diskNextOffset) - + // Return disk data endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil @@ -184,51 +184,51 @@ func readHistoricalDataFromDisk( nextOffset = startOffset totalBytes := 0 messageCount := 0 - + // Create a position for the start offset startPosition := MessagePosition{ IsOffsetBased: true, Offset: startOffset, } - + // Define the callback function to collect messages eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { // Check if we've reached the maxMessages or maxBytes limit if messageCount >= maxMessages { return true, nil // Done, reached message limit } - + entrySize := proto.Size(logEntry) if totalBytes > 0 && totalBytes+entrySize > maxBytes { return true, nil // Done, would exceed byte limit } - + // Add this message to the results messages = append(messages, logEntry) messageCount++ totalBytes += entrySize nextOffset++ - + // Continue reading return false, nil } - + // Call the ReadFromDiskFn to read historical data // This function should be non-blocking and have its own timeout handling lastPosition, isDone, readErr := logBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) - + if readErr != nil { return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) } - + // Update nextOffset based on what was actually read if lastPosition.IsOffsetBased { nextOffset = lastPosition.Offset + 1 // Next offset to read } - + glog.V(3).Infof("[DiskRead] Read %d messages from disk (offset %d to %d), isDone=%v", len(messages), startOffset, nextOffset-1, isDone) - + return messages, nextOffset, nil }