Browse Source

debug: add comprehensive consumer group tests and identify FindCoordinator issue

- Created consumer group tests for basic functionality, offset management, and rebalancing
- Added debug test to isolate consumer group coordination issues
- Root cause identified: Sarama repeatedly calls FindCoordinator but never progresses to JoinGroup
- Issue: Connections closed after FindCoordinator, preventing coordinator protocol
- Consumer group implementation exists but not being reached by Sarama clients

Next: Fix coordinator connection handling to enable JoinGroup protocol
pull/7231/head
chrislu 2 months ago
parent
commit
687eaddedd
  1. 154
      test/kafka/consumer_group_debug_test.go
  2. 698
      test/kafka/consumer_group_test.go
  3. 10
      weed/mq/kafka/protocol/handler.go

154
test/kafka/consumer_group_debug_test.go

@ -0,0 +1,154 @@
package kafka
import (
"context"
"fmt"
"testing"
"time"
"github.com/IBM/sarama"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
)
func TestConsumerGroup_Debug(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Logf("Gateway server error: %v", err)
}
}()
defer gatewayServer.Close()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
host, port := gatewayServer.GetListenerAddr()
brokerAddr := fmt.Sprintf("%s:%d", host, port)
// Test configuration
topicName := "debug-test"
groupID := "debug-group"
// Add topic for testing
gatewayServer.GetHandler().AddTopicForTesting(topicName, 1)
// Create Sarama config
config := sarama.NewConfig()
config.Version = sarama.V2_6_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
// Produce one test message
t.Logf("=== Producing 1 test message ===")
producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config)
if err != nil {
t.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
message := &sarama.ProducerMessage{
Topic: topicName,
Key: sarama.StringEncoder("debug-key"),
Value: sarama.StringEncoder("Debug Message"),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
t.Fatalf("Failed to produce message: %v", err)
}
t.Logf("✅ Produced message: partition=%d, offset=%d", partition, offset)
// Create a simple consumer group handler
handler := &DebugHandler{
messages: make(chan *sarama.ConsumerMessage, 1),
ready: make(chan bool),
t: t,
}
// Start one consumer
t.Logf("=== Starting 1 consumer in group '%s' ===", groupID)
consumerGroup, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
t.Fatalf("Failed to create consumer group: %v", err)
}
defer consumerGroup.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Start consuming in a goroutine
go func() {
t.Logf("Starting consumption...")
err := consumerGroup.Consume(ctx, []string{topicName}, handler)
if err != nil && err != context.DeadlineExceeded {
t.Logf("Consumer error: %v", err)
}
t.Logf("Consumption finished")
}()
// Wait for consumer to be ready or timeout
t.Logf("Waiting for consumer to be ready...")
select {
case <-handler.ready:
t.Logf("✅ Consumer is ready!")
// Try to consume the message
select {
case msg := <-handler.messages:
t.Logf("✅ Consumed message: key=%s, value=%s, offset=%d",
string(msg.Key), string(msg.Value), msg.Offset)
case <-time.After(5 * time.Second):
t.Logf("⚠️ No message received within timeout")
}
case <-time.After(8 * time.Second):
t.Logf("❌ Timeout waiting for consumer to be ready")
}
t.Logf("🎉 Debug test completed")
}
// DebugHandler implements sarama.ConsumerGroupHandler for debugging
type DebugHandler struct {
messages chan *sarama.ConsumerMessage
ready chan bool
t *testing.T
}
func (h *DebugHandler) Setup(session sarama.ConsumerGroupSession) error {
h.t.Logf("🔧 Consumer group session setup - Generation: %d, Claims: %v",
session.GenerationID(), session.Claims())
close(h.ready)
return nil
}
func (h *DebugHandler) Cleanup(session sarama.ConsumerGroupSession) error {
h.t.Logf("🧹 Consumer group session cleanup")
return nil
}
func (h *DebugHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
h.t.Logf("🍽️ Starting to consume partition %d from offset %d",
claim.Partition(), claim.InitialOffset())
for {
select {
case message := <-claim.Messages():
if message == nil {
h.t.Logf("📭 Received nil message, ending consumption")
return nil
}
h.t.Logf("📨 Received message: key=%s, value=%s, offset=%d",
string(message.Key), string(message.Value), message.Offset)
h.messages <- message
session.MarkMessage(message, "")
case <-session.Context().Done():
h.t.Logf("🛑 Session context done")
return nil
}
}
}

698
test/kafka/consumer_group_test.go

@ -0,0 +1,698 @@
package kafka
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/IBM/sarama"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
)
func TestConsumerGroup_BasicFunctionality(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Logf("Gateway server error: %v", err)
}
}()
defer gatewayServer.Close()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
host, port := gatewayServer.GetListenerAddr()
brokerAddr := fmt.Sprintf("%s:%d", host, port)
// Test configuration
topicName := "consumer-group-test"
// Add topic for testing
gatewayServer.GetHandler().AddTopicForTesting(topicName, 1)
groupID := "test-consumer-group"
numConsumers := 3
numMessages := 9 // 3 messages per consumer
// Create Sarama config for consumer group
config := sarama.NewConfig()
config.Version = sarama.V2_6_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
// Produce test messages first
t.Logf("=== Producing %d test messages ===", numMessages)
producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config)
if err != nil {
t.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
for i := 0; i < numMessages; i++ {
message := &sarama.ProducerMessage{
Topic: topicName,
Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
Value: sarama.StringEncoder(fmt.Sprintf("Consumer Group Message %d", i+1)),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
t.Fatalf("Failed to produce message %d: %v", i, err)
}
t.Logf("✅ Produced message %d: partition=%d, offset=%d", i, partition, offset)
}
// Create consumer group handler
handler := &ConsumerGroupHandler{
messages: make(chan *sarama.ConsumerMessage, numMessages),
ready: make(chan bool),
t: t,
}
// Start multiple consumers in the same group
t.Logf("=== Starting %d consumers in group '%s' ===", numConsumers, groupID)
var wg sync.WaitGroup
consumerErrors := make(chan error, numConsumers)
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go func(consumerID int) {
defer wg.Done()
consumerGroup, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
consumerErrors <- fmt.Errorf("consumer %d: failed to create consumer group: %v", consumerID, err)
return
}
defer consumerGroup.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
t.Logf("Consumer %d: Starting consumption", consumerID)
// Start consuming
err = consumerGroup.Consume(ctx, []string{topicName}, handler)
if err != nil && err != context.DeadlineExceeded {
consumerErrors <- fmt.Errorf("consumer %d: consumption error: %v", consumerID, err)
return
}
t.Logf("Consumer %d: Finished consumption", consumerID)
}(i)
}
// Wait for consumers to be ready
t.Logf("Waiting for consumers to be ready...")
readyCount := 0
for readyCount < numConsumers {
select {
case <-handler.ready:
readyCount++
t.Logf("Consumer ready: %d/%d", readyCount, numConsumers)
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for consumers to be ready")
}
}
// Collect consumed messages
t.Logf("=== Collecting consumed messages ===")
consumedMessages := make([]*sarama.ConsumerMessage, 0, numMessages)
messageTimeout := time.After(10 * time.Second)
for len(consumedMessages) < numMessages {
select {
case msg := <-handler.messages:
consumedMessages = append(consumedMessages, msg)
t.Logf("✅ Consumed message %d: key=%s, value=%s, partition=%d, offset=%d",
len(consumedMessages), string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
case err := <-consumerErrors:
t.Fatalf("Consumer error: %v", err)
case <-messageTimeout:
t.Fatalf("Timeout waiting for messages. Got %d/%d messages", len(consumedMessages), numMessages)
}
}
// Wait for all consumers to finish
wg.Wait()
// Verify all messages were consumed exactly once
if len(consumedMessages) != numMessages {
t.Errorf("Expected %d messages, got %d", numMessages, len(consumedMessages))
}
// Verify message uniqueness (no duplicates)
messageKeys := make(map[string]bool)
for _, msg := range consumedMessages {
key := string(msg.Key)
if messageKeys[key] {
t.Errorf("Duplicate message key: %s", key)
}
messageKeys[key] = true
}
// Verify all expected keys were received
for i := 0; i < numMessages; i++ {
expectedKey := fmt.Sprintf("key-%d", i)
if !messageKeys[expectedKey] {
t.Errorf("Missing message key: %s", expectedKey)
}
}
t.Logf("🎉 SUCCESS: Consumer group test completed with %d messages consumed by %d consumers",
len(consumedMessages), numConsumers)
}
// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
type ConsumerGroupHandler struct {
messages chan *sarama.ConsumerMessage
ready chan bool
t *testing.T
}
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
h.t.Logf("Consumer group session setup")
close(h.ready)
return nil
}
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.t.Logf("Consumer group session cleanup")
return nil
}
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
if message == nil {
return nil
}
h.messages <- message
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Logf("Gateway server error: %v", err)
}
}()
defer gatewayServer.Close()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
host, port := gatewayServer.GetListenerAddr()
brokerAddr := fmt.Sprintf("%s:%d", host, port)
// Test configuration
topicName := "offset-commit-test"
groupID := "offset-test-group"
numMessages := 5
// Add topic for testing
gatewayServer.GetHandler().AddTopicForTesting(topicName, 1)
// Create Sarama config
config := sarama.NewConfig()
config.Version = sarama.V2_6_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
// Produce test messages
t.Logf("=== Producing %d test messages ===", numMessages)
producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config)
if err != nil {
t.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
for i := 0; i < numMessages; i++ {
message := &sarama.ProducerMessage{
Topic: topicName,
Key: sarama.StringEncoder(fmt.Sprintf("offset-key-%d", i)),
Value: sarama.StringEncoder(fmt.Sprintf("Offset Test Message %d", i+1)),
}
_, _, err := producer.SendMessage(message)
if err != nil {
t.Fatalf("Failed to produce message %d: %v", i, err)
}
}
// First consumer: consume first 3 messages and commit offsets
t.Logf("=== First consumer: consuming first 3 messages ===")
handler1 := &OffsetTestHandler{
messages: make(chan *sarama.ConsumerMessage, numMessages),
ready: make(chan bool),
stopAfter: 3,
t: t,
}
consumerGroup1, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
t.Fatalf("Failed to create first consumer group: %v", err)
}
ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel1()
// Start first consumer
go func() {
err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
if err != nil && err != context.DeadlineExceeded {
t.Logf("First consumer error: %v", err)
}
}()
// Wait for first consumer to be ready
<-handler1.ready
// Collect messages from first consumer
consumedCount := 0
for consumedCount < 3 {
select {
case msg := <-handler1.messages:
consumedCount++
t.Logf("✅ First consumer message %d: key=%s, offset=%d",
consumedCount, string(msg.Key), msg.Offset)
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for first consumer messages")
}
}
// Close first consumer (this should commit offsets)
consumerGroup1.Close()
cancel1()
// Wait a bit for cleanup
time.Sleep(500 * time.Millisecond)
// Second consumer: should start from offset 3 (after committed offset)
t.Logf("=== Second consumer: should resume from offset 3 ===")
handler2 := &OffsetTestHandler{
messages: make(chan *sarama.ConsumerMessage, numMessages),
ready: make(chan bool),
stopAfter: 2, // Should get remaining 2 messages
t: t,
}
consumerGroup2, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
t.Fatalf("Failed to create second consumer group: %v", err)
}
defer consumerGroup2.Close()
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel2()
// Start second consumer
go func() {
err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
if err != nil && err != context.DeadlineExceeded {
t.Logf("Second consumer error: %v", err)
}
}()
// Wait for second consumer to be ready
<-handler2.ready
// Collect messages from second consumer
secondConsumerMessages := make([]*sarama.ConsumerMessage, 0)
consumedCount = 0
for consumedCount < 2 {
select {
case msg := <-handler2.messages:
consumedCount++
secondConsumerMessages = append(secondConsumerMessages, msg)
t.Logf("✅ Second consumer message %d: key=%s, offset=%d",
consumedCount, string(msg.Key), msg.Offset)
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for second consumer messages. Got %d/2", consumedCount)
}
}
// Verify second consumer started from correct offset
if len(secondConsumerMessages) > 0 {
firstMessageOffset := secondConsumerMessages[0].Offset
if firstMessageOffset < 3 {
t.Errorf("Expected second consumer to start from offset >= 3, got %d", firstMessageOffset)
} else {
t.Logf("✅ Second consumer correctly resumed from offset %d", firstMessageOffset)
}
}
t.Logf("🎉 SUCCESS: Offset commit/fetch test completed successfully")
}
// OffsetTestHandler implements sarama.ConsumerGroupHandler for offset testing
type OffsetTestHandler struct {
messages chan *sarama.ConsumerMessage
ready chan bool
stopAfter int
consumed int
t *testing.T
}
func (h *OffsetTestHandler) Setup(sarama.ConsumerGroupSession) error {
h.t.Logf("Offset test consumer setup")
close(h.ready)
return nil
}
func (h *OffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.t.Logf("Offset test consumer cleanup")
return nil
}
func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
if message == nil {
return nil
}
h.consumed++
h.messages <- message
session.MarkMessage(message, "")
// Stop after consuming the specified number of messages
if h.consumed >= h.stopAfter {
h.t.Logf("Stopping consumer after %d messages", h.consumed)
return nil
}
case <-session.Context().Done():
return nil
}
}
}
func TestConsumerGroup_Rebalancing(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Logf("Gateway server error: %v", err)
}
}()
defer gatewayServer.Close()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
host, port := gatewayServer.GetListenerAddr()
brokerAddr := fmt.Sprintf("%s:%d", host, port)
// Test configuration
topicName := "rebalance-test"
groupID := "rebalance-test-group"
numMessages := 12
// Add topic for testing
gatewayServer.GetHandler().AddTopicForTesting(topicName, 1)
// Create Sarama config
config := sarama.NewConfig()
config.Version = sarama.V2_6_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
config.Consumer.Group.Session.Timeout = 10 * time.Second
config.Consumer.Group.Heartbeat.Interval = 3 * time.Second
config.Producer.Return.Successes = true
// Produce test messages
t.Logf("=== Producing %d test messages ===", numMessages)
producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config)
if err != nil {
t.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
for i := 0; i < numMessages; i++ {
message := &sarama.ProducerMessage{
Topic: topicName,
Key: sarama.StringEncoder(fmt.Sprintf("rebalance-key-%d", i)),
Value: sarama.StringEncoder(fmt.Sprintf("Rebalance Test Message %d", i+1)),
}
_, _, err := producer.SendMessage(message)
if err != nil {
t.Fatalf("Failed to produce message %d: %v", i, err)
}
}
// Start with 2 consumers
t.Logf("=== Starting 2 initial consumers ===")
handler1 := &RebalanceTestHandler{
messages: make(chan *sarama.ConsumerMessage, numMessages),
ready: make(chan bool),
rebalanced: make(chan bool, 5),
consumerID: "consumer-1",
t: t,
}
handler2 := &RebalanceTestHandler{
messages: make(chan *sarama.ConsumerMessage, numMessages),
ready: make(chan bool),
rebalanced: make(chan bool, 5),
consumerID: "consumer-2",
t: t,
}
// Start first consumer
consumerGroup1, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
t.Fatalf("Failed to create consumer group 1: %v", err)
}
defer consumerGroup1.Close()
ctx1, cancel1 := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel1()
go func() {
err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
if err != nil && err != context.DeadlineExceeded {
t.Logf("Consumer 1 error: %v", err)
}
}()
// Wait for first consumer to be ready
<-handler1.ready
t.Logf("Consumer 1 ready")
// Start second consumer
consumerGroup2, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
t.Fatalf("Failed to create consumer group 2: %v", err)
}
defer consumerGroup2.Close()
ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel2()
go func() {
err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
if err != nil && err != context.DeadlineExceeded {
t.Logf("Consumer 2 error: %v", err)
}
}()
// Wait for second consumer to be ready
<-handler2.ready
t.Logf("Consumer 2 ready")
// Wait for initial rebalancing
t.Logf("Waiting for initial rebalancing...")
rebalanceCount := 0
for rebalanceCount < 2 {
select {
case <-handler1.rebalanced:
rebalanceCount++
t.Logf("Consumer 1 rebalanced (%d/2)", rebalanceCount)
case <-handler2.rebalanced:
rebalanceCount++
t.Logf("Consumer 2 rebalanced (%d/2)", rebalanceCount)
case <-time.After(10 * time.Second):
t.Logf("Warning: Timeout waiting for initial rebalancing")
break
}
}
// Collect some messages
t.Logf("=== Collecting messages from 2 consumers ===")
allMessages := make([]*sarama.ConsumerMessage, 0)
messageTimeout := time.After(10 * time.Second)
// Collect at least half the messages
for len(allMessages) < numMessages/2 {
select {
case msg := <-handler1.messages:
allMessages = append(allMessages, msg)
t.Logf("Consumer 1 message: key=%s, offset=%d", string(msg.Key), msg.Offset)
case msg := <-handler2.messages:
allMessages = append(allMessages, msg)
t.Logf("Consumer 2 message: key=%s, offset=%d", string(msg.Key), msg.Offset)
case <-messageTimeout:
break
}
}
t.Logf("Collected %d messages from 2 consumers", len(allMessages))
// Add a third consumer to trigger rebalancing
t.Logf("=== Adding third consumer to trigger rebalancing ===")
handler3 := &RebalanceTestHandler{
messages: make(chan *sarama.ConsumerMessage, numMessages),
ready: make(chan bool),
rebalanced: make(chan bool, 5),
consumerID: "consumer-3",
t: t,
}
consumerGroup3, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config)
if err != nil {
t.Fatalf("Failed to create consumer group 3: %v", err)
}
defer consumerGroup3.Close()
ctx3, cancel3 := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel3()
go func() {
err := consumerGroup3.Consume(ctx3, []string{topicName}, handler3)
if err != nil && err != context.DeadlineExceeded {
t.Logf("Consumer 3 error: %v", err)
}
}()
// Wait for third consumer to be ready
<-handler3.ready
t.Logf("Consumer 3 ready")
// Wait for rebalancing after adding third consumer
t.Logf("Waiting for rebalancing after adding third consumer...")
rebalanceCount = 0
rebalanceTimeout := time.After(15 * time.Second)
for rebalanceCount < 3 {
select {
case <-handler1.rebalanced:
rebalanceCount++
t.Logf("Consumer 1 rebalanced after adding consumer 3 (%d/3)", rebalanceCount)
case <-handler2.rebalanced:
rebalanceCount++
t.Logf("Consumer 2 rebalanced after adding consumer 3 (%d/3)", rebalanceCount)
case <-handler3.rebalanced:
rebalanceCount++
t.Logf("Consumer 3 rebalanced (%d/3)", rebalanceCount)
case <-rebalanceTimeout:
t.Logf("Warning: Timeout waiting for rebalancing after adding consumer 3")
break
}
}
// Collect remaining messages from all 3 consumers
t.Logf("=== Collecting remaining messages from 3 consumers ===")
finalTimeout := time.After(10 * time.Second)
for len(allMessages) < numMessages {
select {
case msg := <-handler1.messages:
allMessages = append(allMessages, msg)
t.Logf("Consumer 1 message: key=%s, offset=%d", string(msg.Key), msg.Offset)
case msg := <-handler2.messages:
allMessages = append(allMessages, msg)
t.Logf("Consumer 2 message: key=%s, offset=%d", string(msg.Key), msg.Offset)
case msg := <-handler3.messages:
allMessages = append(allMessages, msg)
t.Logf("Consumer 3 message: key=%s, offset=%d", string(msg.Key), msg.Offset)
case <-finalTimeout:
break
}
}
t.Logf("Final message count: %d/%d", len(allMessages), numMessages)
// Verify no duplicate messages
messageKeys := make(map[string]bool)
duplicates := 0
for _, msg := range allMessages {
key := string(msg.Key)
if messageKeys[key] {
duplicates++
t.Logf("Duplicate message key: %s", key)
}
messageKeys[key] = true
}
if duplicates > 0 {
t.Errorf("Found %d duplicate messages during rebalancing", duplicates)
}
t.Logf("🎉 SUCCESS: Rebalancing test completed. Consumed %d unique messages with %d consumers",
len(messageKeys), 3)
}
// RebalanceTestHandler implements sarama.ConsumerGroupHandler for rebalancing tests
type RebalanceTestHandler struct {
messages chan *sarama.ConsumerMessage
ready chan bool
rebalanced chan bool
consumerID string
t *testing.T
}
func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error {
h.t.Logf("%s: Setup - Generation: %d, Claims: %v",
h.consumerID, session.GenerationID(), session.Claims())
select {
case h.rebalanced <- true:
default:
}
select {
case h.ready <- true:
default:
}
return nil
}
func (h *RebalanceTestHandler) Cleanup(session sarama.ConsumerGroupSession) error {
h.t.Logf("%s: Cleanup", h.consumerID)
return nil
}
func (h *RebalanceTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
h.t.Logf("%s: Starting to consume partition %d", h.consumerID, claim.Partition())
for {
select {
case message := <-claim.Messages():
if message == nil {
return nil
}
h.messages <- message
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}

10
weed/mq/kafka/protocol/handler.go

@ -152,22 +152,22 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger {
// StoreRecordBatch stores a record batch for later retrieval during Fetch operations
func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) {
key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset)
// Fix the base offset in the record batch binary data to match the assigned offset
// The base offset is stored in the first 8 bytes of the record batch
if len(recordBatch) >= 8 {
// Create a copy to avoid modifying the original
fixedBatch := make([]byte, len(recordBatch))
copy(fixedBatch, recordBatch)
// Update the base offset (first 8 bytes, big endian)
binary.BigEndian.PutUint64(fixedBatch[0:8], uint64(baseOffset))
h.recordBatchMu.Lock()
defer h.recordBatchMu.Unlock()
h.recordBatches[key] = fixedBatch
fmt.Printf("DEBUG: Stored record batch with corrected base offset %d (was %d)\n",
fmt.Printf("DEBUG: Stored record batch with corrected base offset %d (was %d)\n",
baseOffset, binary.BigEndian.Uint64(recordBatch[0:8]))
} else {
h.recordBatchMu.Lock()

Loading…
Cancel
Save