Browse Source

feat: Add comprehensive broker-side logging for disk read debugging

Phase 6: Root Cause Debugging - Broker Disk Read Path

Added extensive logging to trace disk read failures:
  - FetchMessage: Logs every read attempt with full details
  - ReadMessagesAtOffset: Tracks which code path (memory/disk)
  - readHistoricalDataFromDisk: Logs cache hits/misses
  - extractMessagesFromCache: Traces extraction logic

Changes:
  - broker_grpc_fetch.go: Added CRITICAL detection for empty reads
  - log_read_stateless.go: Comprehensive PATH and state logging

Test Results:
  - 87.9% delivery (consistent)
  - FOUND THE BUG: Cache hit but extraction returns empty!

Root Cause Identified:
  [DiskCache] Cache HIT: cachedMessages=572
  [StatelessRead] WARNING: Disk read returned 0 messages

The Problem:
  - Request offset 1572
  - Chunk start: 1000
  - Position in chunk: 572
  - Chunk has messages 0-571 (572 total)
  - Check: positionInChunk (572) >= len(chunkMessages) (572) → TRUE
  - Returns empty!

This is an OFF-BY-ONE ERROR in extractMessagesFromCache:
  The chunk contains offsets 1000-1571, but request for 1572 is out of range.
  The real issue: chunk was only read up to 1571, but HWM says 1572+ exist.

Next: Fix the chunk reading logic or offset calculation
pull/7329/head
chrislu 4 days ago
parent
commit
2bc29f93d3
  1. 16
      weed/mq/broker/broker_grpc_fetch.go
  2. 4
      weed/mq/kafka/integration/broker_client_fetch.go
  3. 59
      weed/util/log_buffer/log_read_stateless.go

16
weed/mq/broker/broker_grpc_fetch.go

@ -105,15 +105,25 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM
requestedOffset := req.StartOffset
// Read messages from LogBuffer (stateless read)
glog.V(3).Infof("[FetchMessage] About to read from LogBuffer at offset %d (requested=%d)", requestedOffset, req.StartOffset)
glog.Infof("[FetchMessage] About to read from LogBuffer: topic=%s partition=%v offset=%d maxMessages=%d maxBytes=%d",
t.Name, partition, requestedOffset, maxMessages, maxBytes)
logEntries, nextOffset, highWaterMark, endOfPartition, err := localPartition.LogBuffer.ReadMessagesAtOffset(
requestedOffset,
maxMessages,
maxBytes,
)
glog.V(3).Infof("[FetchMessage] Read completed: %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v",
len(logEntries), nextOffset, highWaterMark, endOfPartition, err)
// CRITICAL: Log the result with full details
if len(logEntries) == 0 && highWaterMark > requestedOffset && err == nil {
glog.Errorf("[FetchMessage] CRITICAL: ReadMessagesAtOffset returned 0 entries but HWM=%d > requestedOffset=%d (should return data!)",
highWaterMark, requestedOffset)
glog.Errorf("[FetchMessage] Details: nextOffset=%d, endOfPartition=%v, bufferStartOffset=%d",
nextOffset, endOfPartition, localPartition.LogBuffer.GetLogStartOffset())
}
glog.Infof("[FetchMessage] Read completed: topic=%s partition=%v offset=%d -> %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v",
t.Name, partition, requestedOffset, len(logEntries), nextOffset, highWaterMark, endOfPartition, err)
if err != nil {
// Check if this is an "offset out of range" error

4
weed/mq/kafka/integration/broker_client_fetch.go

@ -83,7 +83,7 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string
// CRITICAL DEBUGGING: Log what broker returned
glog.Infof("[FETCH-STATELESS-CLIENT] Broker response for %s[%d] offset %d: messages=%d, nextOffset=%d, hwm=%d, logStart=%d, endOfPartition=%v",
topic, partition, startOffset, len(resp.Messages), resp.NextOffset, resp.HighWaterMark, resp.LogStartOffset, resp.EndOfPartition)
// CRITICAL: If broker returns 0 messages but hwm > startOffset, something is wrong
if len(resp.Messages) == 0 && resp.HighWaterMark > startOffset {
glog.Errorf("[FETCH-STATELESS-CLIENT] CRITICAL BUG: Broker returned 0 messages for %s[%d] offset %d, but HWM=%d (should have %d messages available)",
@ -103,7 +103,7 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string
Offset: startOffset + int64(i), // Sequential offset assignment
}
records = append(records, record)
// Log each message for debugging
glog.V(4).Infof("[FETCH-STATELESS-CLIENT] Message %d: offset=%d, keyLen=%d, valueLen=%d",
i, record.Offset, len(msg.Key), len(msg.Value))

59
weed/util/log_buffer/log_read_stateless.go

@ -34,7 +34,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
endOfPartition bool,
err error,
) {
glog.V(4).Infof("[StatelessRead] Reading from offset %d, maxMessages=%d, maxBytes=%d",
glog.Infof("[StatelessRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d",
startOffset, maxMessages, maxBytes)
// Quick validation
@ -54,10 +54,13 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
bufferStartOffset := logBuffer.bufferStartOffset
highWaterMark = currentBufferEnd
glog.Infof("[StatelessRead] Buffer state: startOffset=%d, bufferStart=%d, bufferEnd=%d, HWM=%d, pos=%d",
startOffset, bufferStartOffset, currentBufferEnd, highWaterMark, logBuffer.pos)
// Special case: empty buffer (no data written yet)
if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 {
logBuffer.RUnlock()
glog.V(4).Infof("[StatelessRead] Empty buffer, returning no data with endOfPartition=true")
glog.Infof("[StatelessRead] PATH: Empty buffer (no data written yet)")
// Return empty result - partition exists but has no data yet
// Preserve the requested offset in nextOffset
return messages, startOffset, 0, true, nil
@ -65,6 +68,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
// Check if requested offset is in current buffer
if startOffset >= bufferStartOffset && startOffset < currentBufferEnd {
glog.Infof("[StatelessRead] PATH: Attempting to read from current/previous memory buffers")
// Read from current buffer
glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d",
bufferStartOffset, currentBufferEnd)
@ -139,38 +143,48 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
// 1. startOffset < bufferStartOffset: Historical data
// 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above)
if startOffset < currentBufferEnd {
glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ")
// Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured
if startOffset < bufferStartOffset {
glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read",
glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d",
startOffset, bufferStartOffset)
} else {
glog.V(2).Infof("[StatelessRead] Requested offset %d in range but not in memory (flushed data), attempting disk read",
startOffset)
glog.Errorf("[StatelessRead] CASE 2: Flushed data - offset %d in range [%d, %d) but not in memory",
startOffset, bufferStartOffset, currentBufferEnd)
}
// Check if disk read function is configured
if logBuffer.ReadFromDiskFn == nil {
glog.V(2).Infof("[StatelessRead] No disk read function configured, returning error")
glog.Errorf("[StatelessRead] CRITICAL: ReadFromDiskFn is NIL! Cannot read from disk.")
if startOffset < bufferStartOffset {
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)",
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d), and ReadFromDiskFn is nil",
startOffset, bufferStartOffset)
}
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), no disk read available",
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), and ReadFromDiskFn is nil",
startOffset, bufferStartOffset, currentBufferEnd)
}
glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...")
// Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented)
// The ReadFromDiskFn should handle its own timeouts and not block indefinitely
diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk(
logBuffer, startOffset, maxMessages, maxBytes, highWaterMark)
if diskErr != nil {
glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr)
glog.Errorf("[StatelessRead] CRITICAL: Disk read FAILED for offset %d: %v", startOffset, diskErr)
// IMPORTANT: Return retryable error instead of silently returning empty!
return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr)
}
glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d",
len(diskMessages), diskNextOffset)
if len(diskMessages) == 0 {
glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)",
startOffset, highWaterMark, bufferStartOffset)
} else {
glog.Infof("[StatelessRead] SUCCESS: Disk read returned %d messages, nextOffset=%d",
len(diskMessages), diskNextOffset)
}
// Return disk data
endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages
@ -195,21 +209,26 @@ func readHistoricalDataFromDisk(
) (messages []*filer_pb.LogEntry, nextOffset int64, err error) {
const chunkSize = 1000 // Size of each cached chunk
glog.Infof("[DiskRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d, HWM=%d",
startOffset, maxMessages, maxBytes, highWaterMark)
// Calculate chunk start offset (aligned to chunkSize boundary)
chunkStartOffset := (startOffset / chunkSize) * chunkSize
glog.Infof("[DiskRead] Calculated chunkStartOffset=%d (aligned from %d)", chunkStartOffset, startOffset)
// Try to get from cache first
cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset)
if cacheHit {
// Found in cache - extract requested messages
glog.V(3).Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d)",
chunkStartOffset, startOffset)
glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d",
chunkStartOffset, startOffset, len(cachedMessages))
return extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes)
}
glog.V(3).Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk",
glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn",
chunkStartOffset)
// Not in cache - read entire chunk from disk for caching
@ -237,21 +256,29 @@ func readHistoricalDataFromDisk(
}
// Read chunk from disk
glog.Infof("[DiskRead] Calling ReadFromDiskFn with position offset=%d...", chunkStartOffset)
_, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn)
if readErr != nil {
glog.Errorf("[DiskRead] CRITICAL: ReadFromDiskFn returned ERROR: %v", readErr)
return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr)
}
glog.Infof("[DiskRead] ReadFromDiskFn completed successfully, read %d messages", len(chunkMessages))
// Cache the chunk for future reads
if len(chunkMessages) > 0 {
cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages)
glog.V(3).Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)",
glog.Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)",
chunkStartOffset, chunkNextOffset-1, len(chunkMessages))
} else {
glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset)
}
// Extract requested messages from the chunk
return extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes)
result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes)
glog.Infof("[DiskRead] EXIT: Returning %d messages, nextOffset=%d, err=%v", len(result), resNextOffset, resErr)
return result, resNextOffset, resErr
}
// getCachedDiskChunk retrieves a cached disk chunk if available

Loading…
Cancel
Save