Browse Source

fix: Check previous buffers even when offset < bufferStart

Phase 10: CRITICAL FIX - Read from Previous Buffers During Flush

Problem:
  Consumer stopped at offset 1550, missing last 48 messages (1551-1598)
  that were flushed but still in previous buffers.

Root Cause:
  ReadMessagesAtOffset only checked prevBuffers if:
    startOffset >= bufferStartOffset && startOffset < currentBufferEnd

  But after flush:
    - bufferStartOffset advanced to 1599
    - startOffset = 1551 < 1599 (condition FAILS!)
    - Code skipped prevBuffer check, went straight to disk
    - Disk had stale cache (1000-1550)
    - Returned empty, consumer stalled

The Timeline:
  1. Producer flushes offsets 1551-1598 to disk
  2. Buffer advances: bufferStart = 1599, pos = 0
  3. Data STILL in prevBuffers (not yet released)
  4. Consumer requests offset 1551
  5. Code sees 1551 < 1599, skips prevBuffer check
  6. Goes to disk, finds stale cache (1000-1550)
  7. Returns empty!

Fix:
  Added else branch to ALWAYS check prevBuffers when offset
  is not in current buffer, BEFORE attempting disk read.

  This ensures we read from memory when data is still available
  in prevBuffers, even after bufferStart has advanced.

Expected Result:
  - 100% message delivery (no loss!)
  - Consumer reads 1551-1598 from prevBuffers
  - No more premature stops
pull/7329/head
chrislu 2 months ago
parent
commit
ae94e4d8f1
  1. 46
      weed/util/log_buffer/log_read_stateless.go

46
weed/util/log_buffer/log_read_stateless.go

@ -134,9 +134,43 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages
glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read",
startOffset, bufferStartOffset, currentBufferEnd)
// Don't return error - continue to disk read check below
} else {
// Offset is not in current buffer - check previous buffers FIRST before going to disk
// This handles the case where data was just flushed but is still in prevBuffers
glog.Infof("[StatelessRead] PATH: Offset %d not in current buffer [%d-%d), checking previous buffers first",
startOffset, bufferStartOffset, currentBufferEnd)
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset {
if prevBuf.size > 0 {
// Found in previous buffer!
bufCopy := make([]byte, prevBuf.size)
copy(bufCopy, prevBuf.buf[:prevBuf.size])
logBuffer.RUnlock()
messages, nextOffset, _, err = parseMessagesFromBuffer(
bufCopy, startOffset, maxMessages, maxBytes)
if err != nil {
return nil, startOffset, highWaterMark, false, err
}
glog.Infof("[StatelessRead] SUCCESS: Found %d messages in previous buffer, nextOffset=%d",
len(messages), nextOffset)
endOfPartition = false // More data might exist
return messages, nextOffset, highWaterMark, endOfPartition, nil
}
// Empty previous buffer - data was flushed to disk
glog.V(2).Infof("[StatelessRead] Found empty previous buffer for offset %d, will try disk", startOffset)
break
}
}
logBuffer.RUnlock()
}
logBuffer.RUnlock()
// If we get here, unlock if not already unlocked
// (Note: logBuffer.RUnlock() was called above in all paths)
// Data not in memory - try disk read
// This handles two cases:
@ -226,7 +260,7 @@ func readHistoricalDataFromDisk(
chunkStartOffset, startOffset, len(cachedMessages))
result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes)
if err != nil {
// CRITICAL: Cache extraction failed because requested offset is BEYOND cached chunk
// This means disk files only contain partial data (e.g., 1000-1763) and the
@ -235,14 +269,14 @@ func readHistoricalDataFromDisk(
// SOLUTION: Return empty result with NO ERROR to let ReadMessagesAtOffset
// continue to check memory buffers. The data might be in memory even though
// it's not on disk.
glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)",
glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)",
startOffset, chunkStartOffset, len(cachedMessages))
glog.Infof("[DiskCache] Returning empty to let memory buffers handle offset %d", startOffset)
// Return empty but NO ERROR - this signals "not on disk, try memory"
return nil, startOffset, nil
}
// Success - return cached data
return result, nextOff, nil
}
@ -385,7 +419,7 @@ func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset in
startOffset, chunkStartOffset, len(chunkMessages), positionInChunk)
glog.Infof("[DiskCache] Chunk contains offsets %d-%d, requested %d - data not on disk",
chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset)
// Return empty (data not on disk) - caller will check memory buffers
return nil, startOffset, nil
}

Loading…
Cancel
Save