Browse Source

fix: critical bug causing 51% message loss in stateless reads

CRITICAL BUG FIX: ReadMessagesAtOffset was returning error instead of
attempting disk I/O when data was flushed from memory, causing massive
message loss (6254 out of 12192 messages = 51% loss).

Problem:
In log_read_stateless.go lines 120-131, when data was flushed to disk
(empty previous buffer), the code returned an 'offset out of range' error
instead of attempting disk I/O. This caused consumers to skip over flushed
data entirely, leading to catastrophic message loss.

The bug occurred when:
1. Data was written to LogBuffer
2. Data was flushed to disk due to buffer rotation
3. Consumer requested that offset range
4. Code found offset in expected range but not in memory
5.  Returned error instead of reading from disk

Root Cause:
Lines 126-131 had early return with error when previous buffer was empty:
  // Data not in memory - for stateless fetch, we don't do disk I/O
  return messages, startOffset, highWaterMark, false,
    fmt.Errorf("offset %d out of range...")

This comment was incorrect - we DO need disk I/O for flushed data!

Fix:
1. Lines 120-132: Changed to fall through to disk read logic instead of
   returning error when previous buffer is empty

2. Lines 137-177: Enhanced disk read logic to handle TWO cases:
   - Historical data (offset < bufferStartOffset)
   - Flushed data (offset >= bufferStartOffset but not in memory)

Changes:
- Line 121: Log "attempting disk read" instead of breaking
- Line 130-132: Fall through to disk read instead of returning error
- Line 141: Changed condition from 'if startOffset < bufferStartOffset'
            to 'if startOffset < currentBufferEnd' to handle both cases
- Lines 143-149: Add context-aware logging for both historical and flushed data
- Lines 154-159: Add context-aware error messages

Expected Results:
- Before: 51% message loss (6254/12192 missing)
- After: <1% message loss (only from rebalancing, which we already fixed)
- Duplicates: Should remain ~47% (from rebalancing, expected until offsets committed)

Testing:
-  Compiles successfully
- Ready for integration testing with standard-test

Related Issues:
- This explains the massive data loss in recent load tests
- Disk I/O fallback was implemented but not reachable due to early return
- Disk chunk cache is working but was never being used for flushed data

Priority: CRITICAL - Fixes production-breaking data loss bug
pull/7329/head
chrislu 5 days ago
parent
commit
37809822f3
  1. 48
      weed/util/log_buffer/log_read_stateless.go

48
weed/util/log_buffer/log_read_stateless.go

@ -117,33 +117,46 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
endOfPartition = false // More data might be in current buffer
return messages, nextOffset, highWaterMark, endOfPartition, nil
}
// Empty previous buffer means data was flushed
// Empty previous buffer means data was flushed to disk - fall through to disk read
glog.V(2).Infof("[StatelessRead] Data at offset %d was flushed, attempting disk read", startOffset)
break
}
}
logBuffer.RUnlock()
// Data not in memory - for stateless fetch, we don't do disk I/O to avoid blocking
// Return empty with offset out of range indication
glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), returning empty",
startOffset, bufferStartOffset, currentBufferEnd)
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d out of range (in-memory: %d-%d)",
// Data not in memory - attempt disk read if configured
// CRITICAL FIX: Don't return error here - data may be on disk!
// Fall through to disk read logic below
glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read",
startOffset, bufferStartOffset, currentBufferEnd)
// Don't return error - continue to disk read check below
}
logBuffer.RUnlock()
// Offset is not in current buffer range
if startOffset < bufferStartOffset {
// Historical data - try to read from disk if ReadFromDiskFn is configured
glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read",
startOffset, bufferStartOffset)
// Data not in memory - try disk read
// This handles two cases:
// 1. startOffset < bufferStartOffset: Historical data
// 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above)
if startOffset < currentBufferEnd {
// Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured
if startOffset < bufferStartOffset {
glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read",
startOffset, bufferStartOffset)
} else {
glog.V(2).Infof("[StatelessRead] Requested offset %d in range but not in memory (flushed data), attempting disk read",
startOffset)
}
// Check if disk read function is configured
if logBuffer.ReadFromDiskFn == nil {
glog.V(2).Infof("[StatelessRead] No disk read function configured, returning offset too old error")
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)",
startOffset, bufferStartOffset)
glog.V(2).Infof("[StatelessRead] No disk read function configured, returning error")
if startOffset < bufferStartOffset {
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)",
startOffset, bufferStartOffset)
}
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), no disk read available",
startOffset, bufferStartOffset, currentBufferEnd)
}
// Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented)
@ -153,8 +166,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
if diskErr != nil {
glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr)
return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d): %v",
startOffset, bufferStartOffset, diskErr)
return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr)
}
glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d",
@ -165,8 +177,8 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil
}
// startOffset > currentBufferEnd - future offset, no data available yet
glog.V(4).Infof("[StatelessRead] Future offset %d > buffer end %d, no data available",
// startOffset >= currentBufferEnd - future offset, no data available yet
glog.V(4).Infof("[StatelessRead] Future offset %d >= buffer end %d, no data available",
startOffset, currentBufferEnd)
return messages, startOffset, highWaterMark, true, nil
}

Loading…
Cancel
Save