|
|
@ -334,184 +334,6 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TestFlushOffsetGap_AddToBufferDoesNotIncrementOffset reproduces THE ACTUAL BUG
|
|
|
|
// The broker uses AddToBuffer() which calls AddDataToBuffer()
|
|
|
|
// AddDataToBuffer() does NOT increment logBuffer.offset!
|
|
|
|
// When flush happens, bufferStartOffset = stale offset → GAP!
|
|
|
|
func TestFlushOffsetGap_AddToBufferDoesNotIncrementOffset(t *testing.T) { |
|
|
|
flushedRanges := []struct{ min, max int64 }{} |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
flushMu.Lock() |
|
|
|
flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) |
|
|
|
flushMu.Unlock() |
|
|
|
t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) |
|
|
|
} |
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) |
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
// Manually initialize offset to simulate broker state after restart
|
|
|
|
// (e.g., broker finds 1000 messages on disk, sets offset=1000)
|
|
|
|
logBuffer.Lock() |
|
|
|
logBuffer.offset = 1000 |
|
|
|
logBuffer.bufferStartOffset = 1000 |
|
|
|
logBuffer.Unlock() |
|
|
|
|
|
|
|
t.Logf("Initial state: offset=%d, bufferStartOffset=%d", 1000, 1000) |
|
|
|
|
|
|
|
// Now add messages using AddToBuffer (like the broker does)
|
|
|
|
// This is the REAL PRODUCTION PATH
|
|
|
|
for i := 0; i < 100; i++ { |
|
|
|
message := &mq_pb.DataMessage{ |
|
|
|
Key: []byte(fmt.Sprintf("key-%d", i)), |
|
|
|
Value: []byte(fmt.Sprintf("message-%d", i)), |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
} |
|
|
|
logBuffer.AddToBuffer(message) // ← BUG: Does NOT increment logBuffer.offset!
|
|
|
|
} |
|
|
|
|
|
|
|
// Check state before flush
|
|
|
|
logBuffer.RLock() |
|
|
|
beforeFlushOffset := logBuffer.offset |
|
|
|
beforeFlushStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
t.Logf("Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) |
|
|
|
|
|
|
|
// Force flush
|
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
|
|
|
|
|
// Check state after flush
|
|
|
|
logBuffer.RLock() |
|
|
|
afterFlushOffset := logBuffer.offset |
|
|
|
afterFlushStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
flushMu.Lock() |
|
|
|
if len(flushedRanges) > 0 { |
|
|
|
flushedMin := flushedRanges[0].min |
|
|
|
flushedMax := flushedRanges[0].max |
|
|
|
flushMu.Unlock() |
|
|
|
|
|
|
|
t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) |
|
|
|
t.Logf("Flushed range: %d-%d (but these are minOffset/maxOffset, which are 0!)", flushedMin, flushedMax) |
|
|
|
|
|
|
|
// Expected behavior: offset should increment from 1000 to 1100
|
|
|
|
expectedOffsetAfterAdd := int64(1100) |
|
|
|
expectedBufferStartAfterFlush := int64(1100) |
|
|
|
|
|
|
|
// Check if offset was incremented
|
|
|
|
if beforeFlushOffset != expectedOffsetAfterAdd { |
|
|
|
t.Errorf("") |
|
|
|
t.Errorf("❌❌❌ BUG: offset not incremented correctly ❌❌❌") |
|
|
|
t.Errorf(" Expected offset after adding 100 messages: %d", expectedOffsetAfterAdd) |
|
|
|
t.Errorf(" Actual offset: %d", beforeFlushOffset) |
|
|
|
t.Errorf(" This means AddDataToBuffer() is not incrementing offset!") |
|
|
|
} else { |
|
|
|
t.Logf("✅ offset correctly incremented to %d", beforeFlushOffset) |
|
|
|
} |
|
|
|
|
|
|
|
// Check if bufferStartOffset was set correctly after flush
|
|
|
|
if afterFlushStart != expectedBufferStartAfterFlush { |
|
|
|
t.Errorf("") |
|
|
|
t.Errorf("❌❌❌ BUG: bufferStartOffset not set correctly after flush ❌❌❌") |
|
|
|
t.Errorf(" Expected bufferStartOffset after flush: %d", expectedBufferStartAfterFlush) |
|
|
|
t.Errorf(" Actual bufferStartOffset: %d", afterFlushStart) |
|
|
|
t.Errorf(" Gap: %d offsets will be LOST!", expectedBufferStartAfterFlush-afterFlushStart) |
|
|
|
} else { |
|
|
|
t.Logf("✅ bufferStartOffset correctly set to %d after flush", afterFlushStart) |
|
|
|
} |
|
|
|
|
|
|
|
// Overall verdict
|
|
|
|
if beforeFlushOffset == expectedOffsetAfterAdd && afterFlushStart == expectedBufferStartAfterFlush { |
|
|
|
t.Logf("") |
|
|
|
t.Logf("🎉🎉🎉 FIX VERIFIED: Buffer flush offset gap bug is FIXED! 🎉🎉🎉") |
|
|
|
} |
|
|
|
} else { |
|
|
|
flushMu.Unlock() |
|
|
|
t.Error("No flush occurred!") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TestFlushOffsetGap_WithExplicitOffsets reproduces the bug using explicit offset assignment
|
|
|
|
// This matches how the MQ broker uses LogBuffer in production.
|
|
|
|
func TestFlushOffsetGap_WithExplicitOffsets(t *testing.T) { |
|
|
|
flushedRanges := []struct{ min, max int64 }{} |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
flushMu.Lock() |
|
|
|
flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) |
|
|
|
flushMu.Unlock() |
|
|
|
t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) |
|
|
|
} |
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) |
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
// Simulate MQ broker behavior: add messages with explicit offsets
|
|
|
|
for i := int64(0); i < 30; i++ { |
|
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
|
Key: []byte(fmt.Sprintf("key-%d", i)), |
|
|
|
Data: []byte(fmt.Sprintf("message-%d", i)), |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
Offset: i, // EXPLICIT OFFSET
|
|
|
|
} |
|
|
|
logBuffer.AddLogEntryToBuffer(logEntry) |
|
|
|
} |
|
|
|
|
|
|
|
// Check state before flush
|
|
|
|
logBuffer.RLock() |
|
|
|
beforeFlushOffset := logBuffer.offset |
|
|
|
beforeFlushStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
t.Logf("Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) |
|
|
|
|
|
|
|
// Force flush
|
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
|
|
|
|
|
// Check state after flush
|
|
|
|
logBuffer.RLock() |
|
|
|
afterFlushOffset := logBuffer.offset |
|
|
|
afterFlushStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
flushMu.Lock() |
|
|
|
if len(flushedRanges) > 0 { |
|
|
|
flushedMin := flushedRanges[0].min |
|
|
|
flushedMax := flushedRanges[0].max |
|
|
|
flushMu.Unlock() |
|
|
|
|
|
|
|
t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) |
|
|
|
t.Logf("Flushed range: %d-%d", flushedMin, flushedMax) |
|
|
|
|
|
|
|
// CRITICAL BUG CHECK: bufferStartOffset should be flushedMax + 1
|
|
|
|
expectedBufferStart := flushedMax + 1 |
|
|
|
if afterFlushStart != expectedBufferStart { |
|
|
|
t.Errorf("❌ CRITICAL BUG REPRODUCED!") |
|
|
|
t.Errorf(" Flushed messages: offsets %d-%d", flushedMin, flushedMax) |
|
|
|
t.Errorf(" Expected bufferStartOffset: %d (= maxOffset + 1)", expectedBufferStart) |
|
|
|
t.Errorf(" Actual bufferStartOffset: %d", afterFlushStart) |
|
|
|
t.Errorf(" This creates a GAP: offsets %d-%d will be LOST!", expectedBufferStart, afterFlushStart-1) |
|
|
|
t.Errorf("") |
|
|
|
t.Errorf("ROOT CAUSE: logBuffer.offset is not updated when AddLogEntryToBuffer is called") |
|
|
|
t.Errorf(" AddLogEntryToBuffer only updates minOffset/maxOffset") |
|
|
|
t.Errorf(" But copyToFlush sets bufferStartOffset = logBuffer.offset") |
|
|
|
t.Errorf(" Since logBuffer.offset is stale, a gap is created!") |
|
|
|
} else { |
|
|
|
t.Logf("✅ bufferStartOffset correctly set to %d", afterFlushStart) |
|
|
|
} |
|
|
|
} else { |
|
|
|
flushMu.Unlock() |
|
|
|
t.Error("No flush occurred!") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TestFlushOffsetGap_ForceFlushAdvancesBuffer tests if ForceFlush
|
|
|
|
// properly advances bufferStartOffset after flushing.
|
|
|
|
func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { |
|
|
|