|
|
|
@ -15,10 +15,11 @@ import ( |
|
|
|
// are lost in the gap between flushed disk data and in-memory buffer.
|
|
|
|
//
|
|
|
|
// OBSERVED BEHAVIOR FROM LOGS:
|
|
|
|
// Request offset: 1764
|
|
|
|
// Disk contains: 1000-1763 (764 messages)
|
|
|
|
// Memory buffer starts at: 1800
|
|
|
|
// Gap: 1764-1799 (36 messages) ← MISSING!
|
|
|
|
//
|
|
|
|
// Request offset: 1764
|
|
|
|
// Disk contains: 1000-1763 (764 messages)
|
|
|
|
// Memory buffer starts at: 1800
|
|
|
|
// Gap: 1764-1799 (36 messages) ← MISSING!
|
|
|
|
//
|
|
|
|
// This test verifies:
|
|
|
|
// 1. All messages sent to buffer are accounted for
|
|
|
|
@ -27,46 +28,46 @@ import ( |
|
|
|
func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { |
|
|
|
var flushedMessages []*filer_pb.LogEntry |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf)) |
|
|
|
|
|
|
|
|
|
|
|
// Parse and store flushed messages
|
|
|
|
flushMu.Lock() |
|
|
|
defer flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Parse buffer to extract messages
|
|
|
|
parsedCount := 0 |
|
|
|
for pos := 0; pos+4 < len(buf); { |
|
|
|
if pos+4 > len(buf) { |
|
|
|
break |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) |
|
|
|
if pos+4+int(size) > len(buf) { |
|
|
|
break |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
entryData := buf[pos+4 : pos+4+int(size)] |
|
|
|
logEntry := &filer_pb.LogEntry{} |
|
|
|
if err := proto.Unmarshal(entryData, logEntry); err == nil { |
|
|
|
flushedMessages = append(flushedMessages, logEntry) |
|
|
|
parsedCount++ |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pos += 4 + int(size) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
t.Logf(" Parsed %d messages from flush buffer", parsedCount) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) |
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
|
|
|
|
// Send 100 messages
|
|
|
|
messageCount := 100 |
|
|
|
t.Logf("Sending %d messages...", messageCount) |
|
|
|
|
|
|
|
|
|
|
|
for i := 0; i < messageCount; i++ { |
|
|
|
logBuffer.AddToBuffer(&mq_pb.DataMessage{ |
|
|
|
Key: []byte(fmt.Sprintf("key-%d", i)), |
|
|
|
@ -74,11 +75,11 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Force flush multiple times to simulate real workload
|
|
|
|
t.Logf("Forcing flush...") |
|
|
|
logBuffer.ForceFlush() |
|
|
|
|
|
|
|
|
|
|
|
// Add more messages after flush
|
|
|
|
for i := messageCount; i < messageCount+50; i++ { |
|
|
|
logBuffer.AddToBuffer(&mq_pb.DataMessage{ |
|
|
|
@ -87,18 +88,18 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Force another flush
|
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(200 * time.Millisecond) // Wait for flush to complete
|
|
|
|
|
|
|
|
|
|
|
|
// Now check the buffer state
|
|
|
|
logBuffer.RLock() |
|
|
|
bufferStartOffset := logBuffer.bufferStartOffset |
|
|
|
currentOffset := logBuffer.offset |
|
|
|
pos := logBuffer.pos |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
flushMu.Lock() |
|
|
|
flushedCount := len(flushedMessages) |
|
|
|
var maxFlushedOffset int64 = -1 |
|
|
|
@ -108,23 +109,23 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { |
|
|
|
maxFlushedOffset = flushedMessages[flushedCount-1].Offset |
|
|
|
} |
|
|
|
flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("\nBUFFER STATE AFTER FLUSH:") |
|
|
|
t.Logf(" bufferStartOffset: %d", bufferStartOffset) |
|
|
|
t.Logf(" currentOffset (HWM): %d", currentOffset) |
|
|
|
t.Logf(" pos (bytes in buffer): %d", pos) |
|
|
|
t.Logf(" Messages sent: %d (offsets 0-%d)", messageCount+50, messageCount+49) |
|
|
|
t.Logf(" Messages flushed to disk: %d (offsets %d-%d)", flushedCount, minFlushedOffset, maxFlushedOffset) |
|
|
|
|
|
|
|
|
|
|
|
// CRITICAL CHECK: Is there a gap between flushed data and memory buffer?
|
|
|
|
if flushedCount > 0 && maxFlushedOffset >= 0 { |
|
|
|
gap := bufferStartOffset - (maxFlushedOffset + 1) |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("\nOFFSET CONTINUITY CHECK:") |
|
|
|
t.Logf(" Last flushed offset: %d", maxFlushedOffset) |
|
|
|
t.Logf(" Buffer starts at: %d", bufferStartOffset) |
|
|
|
t.Logf(" Gap: %d offsets", gap) |
|
|
|
|
|
|
|
|
|
|
|
if gap > 0 { |
|
|
|
t.Errorf("❌ CRITICAL BUG REPRODUCED: OFFSET GAP DETECTED!") |
|
|
|
t.Errorf(" Disk has offsets %d-%d", minFlushedOffset, maxFlushedOffset) |
|
|
|
@ -137,22 +138,22 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { |
|
|
|
} else { |
|
|
|
t.Logf("✅ PASS: No gap detected - offsets are continuous") |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check if we can read all expected offsets
|
|
|
|
t.Logf("\nREADABILITY CHECK:") |
|
|
|
for testOffset := int64(0); testOffset < currentOffset; testOffset += 10 { |
|
|
|
// Try to read from buffer
|
|
|
|
requestPosition := NewMessagePositionFromOffset(testOffset) |
|
|
|
buf, _, err := logBuffer.ReadFromBuffer(requestPosition) |
|
|
|
|
|
|
|
|
|
|
|
isReadable := (buf != nil && len(buf.Bytes()) > 0) || err == ResumeFromDiskError |
|
|
|
status := "✅" |
|
|
|
if !isReadable && err == nil { |
|
|
|
status = "❌ NOT READABLE" |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
t.Logf(" Offset %d: %s (buf=%v, err=%v)", testOffset, status, buf != nil, err) |
|
|
|
|
|
|
|
|
|
|
|
// If offset is in the gap, it should fail to read
|
|
|
|
if flushedCount > 0 && testOffset > maxFlushedOffset && testOffset < bufferStartOffset { |
|
|
|
if isReadable { |
|
|
|
@ -163,19 +164,19 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check that all sent messages are accounted for
|
|
|
|
expectedMessageCount := messageCount + 50 |
|
|
|
messagesInMemory := int(currentOffset - bufferStartOffset) |
|
|
|
totalAccountedFor := flushedCount + messagesInMemory |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("\nMESSAGE ACCOUNTING:") |
|
|
|
t.Logf(" Expected: %d messages", expectedMessageCount) |
|
|
|
t.Logf(" Flushed to disk: %d", flushedCount) |
|
|
|
t.Logf(" In memory buffer: %d (offset range %d-%d)", messagesInMemory, bufferStartOffset, currentOffset-1) |
|
|
|
t.Logf(" Total accounted for: %d", totalAccountedFor) |
|
|
|
t.Logf(" Missing: %d messages", expectedMessageCount-totalAccountedFor) |
|
|
|
|
|
|
|
|
|
|
|
if totalAccountedFor < expectedMessageCount { |
|
|
|
t.Errorf("❌ DATA LOSS CONFIRMED: %d messages are missing!", expectedMessageCount-totalAccountedFor) |
|
|
|
} else { |
|
|
|
@ -188,23 +189,23 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { |
|
|
|
func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { |
|
|
|
var flushCount int |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
flushMu.Lock() |
|
|
|
flushCount++ |
|
|
|
count := flushCount |
|
|
|
flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("FLUSH #%d: minOffset=%d maxOffset=%d size=%d bytes", count, minOffset, maxOffset, len(buf)) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) |
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
|
|
|
|
// Send messages in batches with flushes in between
|
|
|
|
for batch := 0; batch < 5; batch++ { |
|
|
|
t.Logf("\nBatch %d:", batch) |
|
|
|
|
|
|
|
|
|
|
|
// Send 20 messages
|
|
|
|
for i := 0; i < 20; i++ { |
|
|
|
offset := int64(batch*20 + i) |
|
|
|
@ -214,28 +215,28 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check state before flush
|
|
|
|
logBuffer.RLock() |
|
|
|
beforeFlushOffset := logBuffer.offset |
|
|
|
beforeFlushStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
// Force flush
|
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(50 * time.Millisecond) |
|
|
|
|
|
|
|
|
|
|
|
// Check state after flush
|
|
|
|
logBuffer.RLock() |
|
|
|
afterFlushOffset := logBuffer.offset |
|
|
|
afterFlushStart := logBuffer.bufferStartOffset |
|
|
|
prevBufferCount := len(logBuffer.prevBuffers.buffers) |
|
|
|
|
|
|
|
|
|
|
|
// Check prevBuffers state
|
|
|
|
t.Logf(" Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) |
|
|
|
t.Logf(" After flush: offset=%d, bufferStartOffset=%d, prevBuffers=%d", |
|
|
|
afterFlushOffset, afterFlushStart, prevBufferCount) |
|
|
|
|
|
|
|
|
|
|
|
// Check each prevBuffer
|
|
|
|
for i, prevBuf := range logBuffer.prevBuffers.buffers { |
|
|
|
if prevBuf.size > 0 { |
|
|
|
@ -244,7 +245,7 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
// CRITICAL: Check if bufferStartOffset advanced correctly
|
|
|
|
expectedNewStart := beforeFlushOffset |
|
|
|
if afterFlushStart != expectedNewStart { |
|
|
|
@ -261,10 +262,10 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { |
|
|
|
func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { |
|
|
|
var allFlushedOffsets []int64 |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) |
|
|
|
|
|
|
|
|
|
|
|
flushMu.Lock() |
|
|
|
// Record the offset range that was flushed
|
|
|
|
for offset := minOffset; offset <= maxOffset; offset++ { |
|
|
|
@ -272,13 +273,13 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { |
|
|
|
} |
|
|
|
flushMu.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", 50*time.Millisecond, flushFn, nil, nil) |
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
|
|
|
|
// Concurrently write messages and force flushes
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
|
|
|
|
|
|
|
// Writer goroutine
|
|
|
|
wg.Add(1) |
|
|
|
go func() { |
|
|
|
@ -294,7 +295,7 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
// Flusher goroutine
|
|
|
|
wg.Add(1) |
|
|
|
go func() { |
|
|
|
@ -304,31 +305,31 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { |
|
|
|
logBuffer.ForceFlush() |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
wg.Wait() |
|
|
|
time.Sleep(200 * time.Millisecond) // Wait for final flush
|
|
|
|
|
|
|
|
|
|
|
|
// Check final state
|
|
|
|
logBuffer.RLock() |
|
|
|
finalOffset := logBuffer.offset |
|
|
|
finalBufferStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
flushMu.Lock() |
|
|
|
flushedCount := len(allFlushedOffsets) |
|
|
|
flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
expectedCount := int(finalOffset) |
|
|
|
inMemory := int(finalOffset - finalBufferStart) |
|
|
|
totalAccountedFor := flushedCount + inMemory |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("\nFINAL STATE:") |
|
|
|
t.Logf(" Total messages sent: %d (offsets 0-%d)", expectedCount, expectedCount-1) |
|
|
|
t.Logf(" Flushed to disk: %d", flushedCount) |
|
|
|
t.Logf(" In memory: %d (offsets %d-%d)", inMemory, finalBufferStart, finalOffset-1) |
|
|
|
t.Logf(" Total accounted: %d", totalAccountedFor) |
|
|
|
t.Logf(" Missing: %d", expectedCount-totalAccountedFor) |
|
|
|
|
|
|
|
|
|
|
|
if totalAccountedFor < expectedCount { |
|
|
|
t.Errorf("❌ DATA LOSS in concurrent scenario: %d messages missing!", expectedCount-totalAccountedFor) |
|
|
|
} |
|
|
|
@ -344,7 +345,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
messages []*filer_pb.LogEntry |
|
|
|
} |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
// Parse messages from buffer
|
|
|
|
messages := []*filer_pb.LogEntry{} |
|
|
|
@ -360,7 +361,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
} |
|
|
|
pos += 4 + int(size) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
flushMu.Lock() |
|
|
|
flushedData = append(flushedData, struct { |
|
|
|
minOffset int64 |
|
|
|
@ -368,17 +369,17 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
messages []*filer_pb.LogEntry |
|
|
|
}{minOffset, maxOffset, messages}) |
|
|
|
flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("FLUSH: minOffset=%d maxOffset=%d, parsed %d messages", minOffset, maxOffset, len(messages)) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) |
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
|
|
|
|
// Simulate broker behavior: assign Kafka offsets and add to buffer
|
|
|
|
// This is what PublishWithOffset() does
|
|
|
|
nextKafkaOffset := int64(0) |
|
|
|
|
|
|
|
|
|
|
|
// Round 1: Add 50 messages with Kafka offsets 0-49
|
|
|
|
t.Logf("\n=== ROUND 1: Adding messages 0-49 ===") |
|
|
|
for i := 0; i < 50; i++ { |
|
|
|
@ -391,7 +392,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
logBuffer.AddLogEntryToBuffer(logEntry) |
|
|
|
nextKafkaOffset++ |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check buffer state before flush
|
|
|
|
logBuffer.RLock() |
|
|
|
beforeFlushOffset := logBuffer.offset |
|
|
|
@ -399,11 +400,11 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
logBuffer.RUnlock() |
|
|
|
t.Logf("Before flush: logBuffer.offset=%d, bufferStartOffset=%d, nextKafkaOffset=%d", |
|
|
|
beforeFlushOffset, beforeFlushStart, nextKafkaOffset) |
|
|
|
|
|
|
|
|
|
|
|
// Flush
|
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
|
|
|
|
|
|
|
|
|
// Check buffer state after flush
|
|
|
|
logBuffer.RLock() |
|
|
|
afterFlushOffset := logBuffer.offset |
|
|
|
@ -411,7 +412,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
logBuffer.RUnlock() |
|
|
|
t.Logf("After flush: logBuffer.offset=%d, bufferStartOffset=%d", |
|
|
|
afterFlushOffset, afterFlushStart) |
|
|
|
|
|
|
|
|
|
|
|
// Round 2: Add another 50 messages with Kafka offsets 50-99
|
|
|
|
t.Logf("\n=== ROUND 2: Adding messages 50-99 ===") |
|
|
|
for i := 0; i < 50; i++ { |
|
|
|
@ -424,20 +425,20 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
logBuffer.AddLogEntryToBuffer(logEntry) |
|
|
|
nextKafkaOffset++ |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
|
|
|
|
|
|
|
|
|
// Verification: Check if all Kafka offsets are accounted for
|
|
|
|
flushMu.Lock() |
|
|
|
t.Logf("\n=== VERIFICATION ===") |
|
|
|
t.Logf("Expected Kafka offsets: 0-%d", nextKafkaOffset-1) |
|
|
|
|
|
|
|
|
|
|
|
allOffsets := make(map[int64]bool) |
|
|
|
for flushIdx, flush := range flushedData { |
|
|
|
t.Logf("Flush #%d: minOffset=%d, maxOffset=%d, messages=%d", |
|
|
|
flushIdx, flush.minOffset, flush.maxOffset, len(flush.messages)) |
|
|
|
|
|
|
|
|
|
|
|
for _, msg := range flush.messages { |
|
|
|
if allOffsets[msg.Offset] { |
|
|
|
t.Errorf(" ❌ DUPLICATE: Offset %d appears multiple times!", msg.Offset) |
|
|
|
@ -446,7 +447,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Check for missing offsets
|
|
|
|
missingOffsets := []int64{} |
|
|
|
for expectedOffset := int64(0); expectedOffset < nextKafkaOffset; expectedOffset++ { |
|
|
|
@ -454,7 +455,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
missingOffsets = append(missingOffsets, expectedOffset) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if len(missingOffsets) > 0 { |
|
|
|
t.Errorf("\n❌ MISSING OFFSETS DETECTED: %d offsets missing", len(missingOffsets)) |
|
|
|
if len(missingOffsets) <= 20 { |
|
|
|
@ -466,18 +467,18 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
} else { |
|
|
|
t.Logf("\n✅ SUCCESS: All %d Kafka offsets accounted for (0-%d)", nextKafkaOffset, nextKafkaOffset-1) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check buffer offset consistency
|
|
|
|
logBuffer.RLock() |
|
|
|
finalOffset := logBuffer.offset |
|
|
|
finalBufferStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("\nFinal buffer state:") |
|
|
|
t.Logf(" logBuffer.offset: %d", finalOffset) |
|
|
|
t.Logf(" bufferStartOffset: %d", finalBufferStart) |
|
|
|
t.Logf(" Expected (nextKafkaOffset): %d", nextKafkaOffset) |
|
|
|
|
|
|
|
|
|
|
|
if finalOffset != nextKafkaOffset { |
|
|
|
t.Errorf("❌ logBuffer.offset mismatch: expected %d, got %d", nextKafkaOffset, finalOffset) |
|
|
|
} |
|
|
|
@ -488,12 +489,12 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { |
|
|
|
func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { |
|
|
|
var flushedOffsets []int64 |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
|
|
|
|
readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { |
|
|
|
// Simulate reading from disk - return flushed offsets
|
|
|
|
flushMu.Lock() |
|
|
|
defer flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
for _, offset := range flushedOffsets { |
|
|
|
if offset >= startPosition.Offset { |
|
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
|
@ -510,12 +511,12 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { |
|
|
|
} |
|
|
|
return startPosition, false, nil |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
// Parse and store flushed offsets
|
|
|
|
flushMu.Lock() |
|
|
|
defer flushMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
for pos := 0; pos+4 < len(buf); { |
|
|
|
size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) |
|
|
|
if pos+4+int(size) > len(buf) { |
|
|
|
@ -528,14 +529,14 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { |
|
|
|
} |
|
|
|
pos += 4 + int(size) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("FLUSH: Stored %d offsets to disk (minOffset=%d, maxOffset=%d)", |
|
|
|
len(flushedOffsets), minOffset, maxOffset) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", time.Hour, flushFn, readFromDiskFn, nil) |
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
|
|
|
|
// Add 100 messages
|
|
|
|
t.Logf("Adding 100 messages...") |
|
|
|
for i := int64(0); i < 100; i++ { |
|
|
|
@ -547,32 +548,32 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { |
|
|
|
} |
|
|
|
logBuffer.AddLogEntryToBuffer(logEntry) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Flush (moves data to disk)
|
|
|
|
t.Logf("Flushing...") |
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
|
|
|
|
|
|
|
|
|
// Now try to read all messages using ReadMessagesAtOffset
|
|
|
|
t.Logf("\nReading messages from offset 0...") |
|
|
|
messages, nextOffset, hwm, endOfPartition, err := logBuffer.ReadMessagesAtOffset(0, 1000, 1024*1024) |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("Read result: messages=%d, nextOffset=%d, hwm=%d, endOfPartition=%v, err=%v", |
|
|
|
len(messages), nextOffset, hwm, endOfPartition, err) |
|
|
|
|
|
|
|
|
|
|
|
// Verify all offsets can be read
|
|
|
|
readOffsets := make(map[int64]bool) |
|
|
|
for _, msg := range messages { |
|
|
|
readOffsets[msg.Offset] = true |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
missingOffsets := []int64{} |
|
|
|
for expectedOffset := int64(0); expectedOffset < 100; expectedOffset++ { |
|
|
|
if !readOffsets[expectedOffset] { |
|
|
|
missingOffsets = append(missingOffsets, expectedOffset) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if len(missingOffsets) > 0 { |
|
|
|
t.Errorf("❌ MISSING OFFSETS after flush: %d offsets cannot be read", len(missingOffsets)) |
|
|
|
if len(missingOffsets) <= 20 { |
|
|
|
@ -590,29 +591,29 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { |
|
|
|
func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { |
|
|
|
flushedRanges := []struct{ min, max int64 }{} |
|
|
|
var flushMu sync.Mutex |
|
|
|
|
|
|
|
|
|
|
|
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { |
|
|
|
flushMu.Lock() |
|
|
|
flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) |
|
|
|
flushMu.Unlock() |
|
|
|
t.Logf("FLUSH: offsets %d-%d", minOffset, maxOffset) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) // Long interval, manual flush only
|
|
|
|
defer logBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
|
|
|
|
// Send messages, flush, check state - repeat
|
|
|
|
for round := 0; round < 3; round++ { |
|
|
|
t.Logf("\n=== ROUND %d ===", round) |
|
|
|
|
|
|
|
|
|
|
|
// Check state before adding messages
|
|
|
|
logBuffer.RLock() |
|
|
|
beforeOffset := logBuffer.offset |
|
|
|
beforeStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("Before adding: offset=%d, bufferStartOffset=%d", beforeOffset, beforeStart) |
|
|
|
|
|
|
|
|
|
|
|
// Add 10 messages
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
logBuffer.AddToBuffer(&mq_pb.DataMessage{ |
|
|
|
@ -621,28 +622,28 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check state after adding
|
|
|
|
logBuffer.RLock() |
|
|
|
afterAddOffset := logBuffer.offset |
|
|
|
afterAddStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("After adding: offset=%d, bufferStartOffset=%d", afterAddOffset, afterAddStart) |
|
|
|
|
|
|
|
|
|
|
|
// Force flush
|
|
|
|
t.Logf("Forcing flush...") |
|
|
|
logBuffer.ForceFlush() |
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
|
|
|
|
|
|
|
|
|
// Check state after flush
|
|
|
|
logBuffer.RLock() |
|
|
|
afterFlushOffset := logBuffer.offset |
|
|
|
afterFlushStart := logBuffer.bufferStartOffset |
|
|
|
logBuffer.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) |
|
|
|
|
|
|
|
|
|
|
|
// CRITICAL CHECK: bufferStartOffset should advance to where offset was before flush
|
|
|
|
if afterFlushStart != afterAddOffset { |
|
|
|
t.Errorf("❌ FLUSH BUG: bufferStartOffset did NOT advance correctly!") |
|
|
|
@ -653,19 +654,19 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { |
|
|
|
t.Logf("✅ bufferStartOffset correctly advanced to %d", afterFlushStart) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Final verification: check all offset ranges are continuous
|
|
|
|
flushMu.Lock() |
|
|
|
t.Logf("\n=== FLUSHED RANGES ===") |
|
|
|
for i, r := range flushedRanges { |
|
|
|
t.Logf("Flush #%d: offsets %d-%d", i, r.min, r.max) |
|
|
|
|
|
|
|
|
|
|
|
// Check continuity with previous flush
|
|
|
|
if i > 0 { |
|
|
|
prevMax := flushedRanges[i-1].max |
|
|
|
currentMin := r.min |
|
|
|
gap := currentMin - (prevMax + 1) |
|
|
|
|
|
|
|
|
|
|
|
if gap > 0 { |
|
|
|
t.Errorf("❌ GAP between flush #%d and #%d: %d offsets missing!", i-1, i, gap) |
|
|
|
} else if gap < 0 { |
|
|
|
@ -677,4 +678,3 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { |
|
|
|
} |
|
|
|
flushMu.Unlock() |
|
|
|
} |
|
|
|
|