@ -334,6 +334,257 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) {
}
}
// TestFlushOffsetGap_ProductionScenario reproduces the actual production scenario
// where the broker uses AddLogEntryToBuffer with explicit Kafka offsets.
// This simulates leader publishing with offset assignment.
func TestFlushOffsetGap_ProductionScenario ( t * testing . T ) {
var flushedData [ ] struct {
minOffset int64
maxOffset int64
messages [ ] * filer_pb . LogEntry
}
var flushMu sync . Mutex
flushFn := func ( logBuffer * LogBuffer , startTime , stopTime time . Time , buf [ ] byte , minOffset , maxOffset int64 ) {
// Parse messages from buffer
messages := [ ] * filer_pb . LogEntry { }
for pos := 0 ; pos + 4 < len ( buf ) ; {
size := uint32 ( buf [ pos ] ) << 24 | uint32 ( buf [ pos + 1 ] ) << 16 | uint32 ( buf [ pos + 2 ] ) << 8 | uint32 ( buf [ pos + 3 ] )
if pos + 4 + int ( size ) > len ( buf ) {
break
}
entryData := buf [ pos + 4 : pos + 4 + int ( size ) ]
logEntry := & filer_pb . LogEntry { }
if err := proto . Unmarshal ( entryData , logEntry ) ; err == nil {
messages = append ( messages , logEntry )
}
pos += 4 + int ( size )
}
flushMu . Lock ( )
flushedData = append ( flushedData , struct {
minOffset int64
maxOffset int64
messages [ ] * filer_pb . LogEntry
} { minOffset , maxOffset , messages } )
flushMu . Unlock ( )
t . Logf ( "FLUSH: minOffset=%d maxOffset=%d, parsed %d messages" , minOffset , maxOffset , len ( messages ) )
}
logBuffer := NewLogBuffer ( "test" , time . Hour , flushFn , nil , nil )
defer logBuffer . ShutdownLogBuffer ( )
// Simulate broker behavior: assign Kafka offsets and add to buffer
// This is what PublishWithOffset() does
nextKafkaOffset := int64 ( 0 )
// Round 1: Add 50 messages with Kafka offsets 0-49
t . Logf ( "\n=== ROUND 1: Adding messages 0-49 ===" )
for i := 0 ; i < 50 ; i ++ {
logEntry := & filer_pb . LogEntry {
Key : [ ] byte ( fmt . Sprintf ( "key-%d" , i ) ) ,
Data : [ ] byte ( fmt . Sprintf ( "message-%d" , i ) ) ,
TsNs : time . Now ( ) . UnixNano ( ) ,
Offset : nextKafkaOffset , // Explicit Kafka offset
}
logBuffer . AddLogEntryToBuffer ( logEntry )
nextKafkaOffset ++
}
// Check buffer state before flush
logBuffer . RLock ( )
beforeFlushOffset := logBuffer . offset
beforeFlushStart := logBuffer . bufferStartOffset
logBuffer . RUnlock ( )
t . Logf ( "Before flush: logBuffer.offset=%d, bufferStartOffset=%d, nextKafkaOffset=%d" ,
beforeFlushOffset , beforeFlushStart , nextKafkaOffset )
// Flush
logBuffer . ForceFlush ( )
time . Sleep ( 100 * time . Millisecond )
// Check buffer state after flush
logBuffer . RLock ( )
afterFlushOffset := logBuffer . offset
afterFlushStart := logBuffer . bufferStartOffset
logBuffer . RUnlock ( )
t . Logf ( "After flush: logBuffer.offset=%d, bufferStartOffset=%d" ,
afterFlushOffset , afterFlushStart )
// Round 2: Add another 50 messages with Kafka offsets 50-99
t . Logf ( "\n=== ROUND 2: Adding messages 50-99 ===" )
for i := 0 ; i < 50 ; i ++ {
logEntry := & filer_pb . LogEntry {
Key : [ ] byte ( fmt . Sprintf ( "key-%d" , 50 + i ) ) ,
Data : [ ] byte ( fmt . Sprintf ( "message-%d" , 50 + i ) ) ,
TsNs : time . Now ( ) . UnixNano ( ) ,
Offset : nextKafkaOffset ,
}
logBuffer . AddLogEntryToBuffer ( logEntry )
nextKafkaOffset ++
}
logBuffer . ForceFlush ( )
time . Sleep ( 100 * time . Millisecond )
// Verification: Check if all Kafka offsets are accounted for
flushMu . Lock ( )
t . Logf ( "\n=== VERIFICATION ===" )
t . Logf ( "Expected Kafka offsets: 0-%d" , nextKafkaOffset - 1 )
allOffsets := make ( map [ int64 ] bool )
for flushIdx , flush := range flushedData {
t . Logf ( "Flush #%d: minOffset=%d, maxOffset=%d, messages=%d" ,
flushIdx , flush . minOffset , flush . maxOffset , len ( flush . messages ) )
for _ , msg := range flush . messages {
if allOffsets [ msg . Offset ] {
t . Errorf ( " ❌ DUPLICATE: Offset %d appears multiple times!" , msg . Offset )
}
allOffsets [ msg . Offset ] = true
}
}
flushMu . Unlock ( )
// Check for missing offsets
missingOffsets := [ ] int64 { }
for expectedOffset := int64 ( 0 ) ; expectedOffset < nextKafkaOffset ; expectedOffset ++ {
if ! allOffsets [ expectedOffset ] {
missingOffsets = append ( missingOffsets , expectedOffset )
}
}
if len ( missingOffsets ) > 0 {
t . Errorf ( "\n❌ MISSING OFFSETS DETECTED: %d offsets missing" , len ( missingOffsets ) )
if len ( missingOffsets ) <= 20 {
t . Errorf ( "Missing: %v" , missingOffsets )
} else {
t . Errorf ( "Missing: %v ... and %d more" , missingOffsets [ : 20 ] , len ( missingOffsets ) - 20 )
}
t . Errorf ( "\nThis reproduces the production bug!" )
} else {
t . Logf ( "\n✅ SUCCESS: All %d Kafka offsets accounted for (0-%d)" , nextKafkaOffset , nextKafkaOffset - 1 )
}
// Check buffer offset consistency
logBuffer . RLock ( )
finalOffset := logBuffer . offset
finalBufferStart := logBuffer . bufferStartOffset
logBuffer . RUnlock ( )
t . Logf ( "\nFinal buffer state:" )
t . Logf ( " logBuffer.offset: %d" , finalOffset )
t . Logf ( " bufferStartOffset: %d" , finalBufferStart )
t . Logf ( " Expected (nextKafkaOffset): %d" , nextKafkaOffset )
if finalOffset != nextKafkaOffset {
t . Errorf ( "❌ logBuffer.offset mismatch: expected %d, got %d" , nextKafkaOffset , finalOffset )
}
}
// TestFlushOffsetGap_ConcurrentReadDuringFlush tests if concurrent reads
// during flush can cause messages to be missed.
func TestFlushOffsetGap_ConcurrentReadDuringFlush ( t * testing . T ) {
var flushedOffsets [ ] int64
var flushMu sync . Mutex
readFromDiskFn := func ( startPosition MessagePosition , stopTsNs int64 , eachLogEntryFn EachLogEntryFuncType ) ( MessagePosition , bool , error ) {
// Simulate reading from disk - return flushed offsets
flushMu . Lock ( )
defer flushMu . Unlock ( )
for _ , offset := range flushedOffsets {
if offset >= startPosition . Offset {
logEntry := & filer_pb . LogEntry {
Key : [ ] byte ( fmt . Sprintf ( "key-%d" , offset ) ) ,
Data : [ ] byte ( fmt . Sprintf ( "message-%d" , offset ) ) ,
TsNs : time . Now ( ) . UnixNano ( ) ,
Offset : offset ,
}
isDone , err := eachLogEntryFn ( logEntry )
if err != nil || isDone {
return NewMessagePositionFromOffset ( offset + 1 ) , isDone , err
}
}
}
return startPosition , false , nil
}
flushFn := func ( logBuffer * LogBuffer , startTime , stopTime time . Time , buf [ ] byte , minOffset , maxOffset int64 ) {
// Parse and store flushed offsets
flushMu . Lock ( )
defer flushMu . Unlock ( )
for pos := 0 ; pos + 4 < len ( buf ) ; {
size := uint32 ( buf [ pos ] ) << 24 | uint32 ( buf [ pos + 1 ] ) << 16 | uint32 ( buf [ pos + 2 ] ) << 8 | uint32 ( buf [ pos + 3 ] )
if pos + 4 + int ( size ) > len ( buf ) {
break
}
entryData := buf [ pos + 4 : pos + 4 + int ( size ) ]
logEntry := & filer_pb . LogEntry { }
if err := proto . Unmarshal ( entryData , logEntry ) ; err == nil {
flushedOffsets = append ( flushedOffsets , logEntry . Offset )
}
pos += 4 + int ( size )
}
t . Logf ( "FLUSH: Stored %d offsets to disk (minOffset=%d, maxOffset=%d)" ,
len ( flushedOffsets ) , minOffset , maxOffset )
}
logBuffer := NewLogBuffer ( "test" , time . Hour , flushFn , readFromDiskFn , nil )
defer logBuffer . ShutdownLogBuffer ( )
// Add 100 messages
t . Logf ( "Adding 100 messages..." )
for i := int64 ( 0 ) ; i < 100 ; i ++ {
logEntry := & filer_pb . LogEntry {
Key : [ ] byte ( fmt . Sprintf ( "key-%d" , i ) ) ,
Data : [ ] byte ( fmt . Sprintf ( "message-%d" , i ) ) ,
TsNs : time . Now ( ) . UnixNano ( ) ,
Offset : i ,
}
logBuffer . AddLogEntryToBuffer ( logEntry )
}
// Flush (moves data to disk)
t . Logf ( "Flushing..." )
logBuffer . ForceFlush ( )
time . Sleep ( 100 * time . Millisecond )
// Now try to read all messages using ReadMessagesAtOffset
t . Logf ( "\nReading messages from offset 0..." )
messages , nextOffset , hwm , endOfPartition , err := logBuffer . ReadMessagesAtOffset ( 0 , 1000 , 1024 * 1024 )
t . Logf ( "Read result: messages=%d, nextOffset=%d, hwm=%d, endOfPartition=%v, err=%v" ,
len ( messages ) , nextOffset , hwm , endOfPartition , err )
// Verify all offsets can be read
readOffsets := make ( map [ int64 ] bool )
for _ , msg := range messages {
readOffsets [ msg . Offset ] = true
}
missingOffsets := [ ] int64 { }
for expectedOffset := int64 ( 0 ) ; expectedOffset < 100 ; expectedOffset ++ {
if ! readOffsets [ expectedOffset ] {
missingOffsets = append ( missingOffsets , expectedOffset )
}
}
if len ( missingOffsets ) > 0 {
t . Errorf ( "❌ MISSING OFFSETS after flush: %d offsets cannot be read" , len ( missingOffsets ) )
if len ( missingOffsets ) <= 20 {
t . Errorf ( "Missing: %v" , missingOffsets )
} else {
t . Errorf ( "Missing: %v ... and %d more" , missingOffsets [ : 20 ] , len ( missingOffsets ) - 20 )
}
} else {
t . Logf ( "✅ All 100 offsets can be read after flush" )
}
}
// TestFlushOffsetGap_ForceFlushAdvancesBuffer tests if ForceFlush
// properly advances bufferStartOffset after flushing.
func TestFlushOffsetGap_ForceFlushAdvancesBuffer ( t * testing . T ) {