|
|
@ -44,11 +44,11 @@ func TestConcurrentProducerConsumer(t *testing.T) { |
|
|
|
|
|
|
|
for consumerID := 0; consumerID < numConsumers; consumerID++ { |
|
|
|
consumerWg.Add(1) |
|
|
|
go func(id int, startOffset int64) { |
|
|
|
go func(id int, startOffset int64, endOffset int64) { |
|
|
|
defer consumerWg.Done() |
|
|
|
|
|
|
|
currentOffset := startOffset |
|
|
|
for currentOffset < startOffset+int64(messagesPerConsumer) { |
|
|
|
for currentOffset < endOffset { |
|
|
|
// Read 10 messages at a time (like integration test)
|
|
|
|
messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) |
|
|
|
if err != nil { |
|
|
@ -62,18 +62,22 @@ func TestConcurrentProducerConsumer(t *testing.T) { |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
// Verify sequential offsets
|
|
|
|
// Count only messages in this consumer's assigned range
|
|
|
|
messagesInRange := 0 |
|
|
|
for i, msg := range messages { |
|
|
|
expectedOffset := currentOffset + int64(i) |
|
|
|
if msg.Offset != expectedOffset { |
|
|
|
t.Errorf("Consumer %d: Expected offset %d, got %d", id, expectedOffset, msg.Offset) |
|
|
|
if msg.Offset >= startOffset && msg.Offset < endOffset { |
|
|
|
messagesInRange++ |
|
|
|
expectedOffset := currentOffset + int64(i) |
|
|
|
if msg.Offset != expectedOffset { |
|
|
|
t.Errorf("Consumer %d: Expected offset %d, got %d", id, expectedOffset, msg.Offset) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
atomic.AddInt64(&consumedCounts[id], int64(len(messages))) |
|
|
|
atomic.AddInt64(&consumedCounts[id], int64(messagesInRange)) |
|
|
|
currentOffset = nextOffset |
|
|
|
} |
|
|
|
}(consumerID, int64(consumerID*messagesPerConsumer)) |
|
|
|
}(consumerID, int64(consumerID*messagesPerConsumer), int64((consumerID+1)*messagesPerConsumer)) |
|
|
|
} |
|
|
|
|
|
|
|
// Wait for producer to finish
|
|
|
@ -142,15 +146,23 @@ func TestBackwardSeeksWhileProducing(t *testing.T) { |
|
|
|
|
|
|
|
for currentOffset < numMessages { |
|
|
|
// Read some messages
|
|
|
|
messages, nextOffset, _, _, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) |
|
|
|
messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(currentOffset, 10, 10240) |
|
|
|
if err != nil { |
|
|
|
t.Errorf("Read error: %v", err) |
|
|
|
consumerDone <- true |
|
|
|
return |
|
|
|
// For stateless reads, "offset out of range" means data not in memory yet
|
|
|
|
// This is expected when reading historical data or before production starts
|
|
|
|
time.Sleep(5 * time.Millisecond) |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
if len(messages) == 0 { |
|
|
|
time.Sleep(5 * time.Millisecond) |
|
|
|
// No data available yet or caught up to producer
|
|
|
|
if !endOfPartition { |
|
|
|
// Data might be coming, wait
|
|
|
|
time.Sleep(5 * time.Millisecond) |
|
|
|
} else { |
|
|
|
// At end of partition, wait for more production
|
|
|
|
time.Sleep(5 * time.Millisecond) |
|
|
|
} |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|