From 687eaddeddea54a8c160105776dad4ee49bd90c1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 15:08:55 -0700 Subject: [PATCH] debug: add comprehensive consumer group tests and identify FindCoordinator issue - Created consumer group tests for basic functionality, offset management, and rebalancing - Added debug test to isolate consumer group coordination issues - Root cause identified: Sarama repeatedly calls FindCoordinator but never progresses to JoinGroup - Issue: Connections closed after FindCoordinator, preventing coordinator protocol - Consumer group implementation exists but not being reached by Sarama clients Next: Fix coordinator connection handling to enable JoinGroup protocol --- test/kafka/consumer_group_debug_test.go | 154 ++++++ test/kafka/consumer_group_test.go | 698 ++++++++++++++++++++++++ weed/mq/kafka/protocol/handler.go | 10 +- 3 files changed, 857 insertions(+), 5 deletions(-) create mode 100644 test/kafka/consumer_group_debug_test.go create mode 100644 test/kafka/consumer_group_test.go diff --git a/test/kafka/consumer_group_debug_test.go b/test/kafka/consumer_group_debug_test.go new file mode 100644 index 000000000..664628803 --- /dev/null +++ b/test/kafka/consumer_group_debug_test.go @@ -0,0 +1,154 @@ +package kafka + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +func TestConsumerGroup_Debug(t *testing.T) { + // Start Kafka gateway + gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"}) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Logf("Gateway server error: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + + // Test configuration + topicName := "debug-test" + groupID := "debug-group" + + // Add topic for testing + gatewayServer.GetHandler().AddTopicForTesting(topicName, 1) + + // Create Sarama config + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Return.Errors = true + config.Producer.Return.Successes = true + + // Produce one test message + t.Logf("=== Producing 1 test message ===") + producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + message := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder("debug-key"), + Value: sarama.StringEncoder("Debug Message"), + } + + partition, offset, err := producer.SendMessage(message) + if err != nil { + t.Fatalf("Failed to produce message: %v", err) + } + t.Logf("✅ Produced message: partition=%d, offset=%d", partition, offset) + + // Create a simple consumer group handler + handler := &DebugHandler{ + messages: make(chan *sarama.ConsumerMessage, 1), + ready: make(chan bool), + t: t, + } + + // Start one consumer + t.Logf("=== Starting 1 consumer in group '%s' ===", groupID) + + consumerGroup, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) + if err != nil { + t.Fatalf("Failed to create consumer group: %v", err) + } + defer consumerGroup.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Start consuming in a goroutine + go func() { + t.Logf("Starting consumption...") + err := consumerGroup.Consume(ctx, []string{topicName}, handler) + if err != nil && err != context.DeadlineExceeded { + t.Logf("Consumer error: %v", err) + } + t.Logf("Consumption finished") + }() + + // Wait for consumer to be ready or timeout + t.Logf("Waiting for consumer to be ready...") + select { + case <-handler.ready: + t.Logf("✅ Consumer is ready!") + + // Try to consume the message + select { + case msg := <-handler.messages: + t.Logf("✅ Consumed message: key=%s, value=%s, offset=%d", + string(msg.Key), string(msg.Value), msg.Offset) + case <-time.After(5 * time.Second): + t.Logf("⚠️ No message received within timeout") + } + + case <-time.After(8 * time.Second): + t.Logf("❌ Timeout waiting for consumer to be ready") + } + + t.Logf("🎉 Debug test completed") +} + +// DebugHandler implements sarama.ConsumerGroupHandler for debugging +type DebugHandler struct { + messages chan *sarama.ConsumerMessage + ready chan bool + t *testing.T +} + +func (h *DebugHandler) Setup(session sarama.ConsumerGroupSession) error { + h.t.Logf("🔧 Consumer group session setup - Generation: %d, Claims: %v", + session.GenerationID(), session.Claims()) + close(h.ready) + return nil +} + +func (h *DebugHandler) Cleanup(session sarama.ConsumerGroupSession) error { + h.t.Logf("🧹 Consumer group session cleanup") + return nil +} + +func (h *DebugHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + h.t.Logf("🍽️ Starting to consume partition %d from offset %d", + claim.Partition(), claim.InitialOffset()) + + for { + select { + case message := <-claim.Messages(): + if message == nil { + h.t.Logf("📭 Received nil message, ending consumption") + return nil + } + h.t.Logf("📨 Received message: key=%s, value=%s, offset=%d", + string(message.Key), string(message.Value), message.Offset) + h.messages <- message + session.MarkMessage(message, "") + case <-session.Context().Done(): + h.t.Logf("🛑 Session context done") + return nil + } + } +} diff --git a/test/kafka/consumer_group_test.go b/test/kafka/consumer_group_test.go new file mode 100644 index 000000000..2a9a2b3b6 --- /dev/null +++ b/test/kafka/consumer_group_test.go @@ -0,0 +1,698 @@ +package kafka + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +func TestConsumerGroup_BasicFunctionality(t *testing.T) { + // Start Kafka gateway + gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"}) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Logf("Gateway server error: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + + // Test configuration + topicName := "consumer-group-test" + + // Add topic for testing + gatewayServer.GetHandler().AddTopicForTesting(topicName, 1) + groupID := "test-consumer-group" + numConsumers := 3 + numMessages := 9 // 3 messages per consumer + + // Create Sarama config for consumer group + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Return.Errors = true + config.Producer.Return.Successes = true + + // Produce test messages first + t.Logf("=== Producing %d test messages ===", numMessages) + producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + for i := 0; i < numMessages; i++ { + message := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)), + Value: sarama.StringEncoder(fmt.Sprintf("Consumer Group Message %d", i+1)), + } + + partition, offset, err := producer.SendMessage(message) + if err != nil { + t.Fatalf("Failed to produce message %d: %v", i, err) + } + t.Logf("✅ Produced message %d: partition=%d, offset=%d", i, partition, offset) + } + + // Create consumer group handler + handler := &ConsumerGroupHandler{ + messages: make(chan *sarama.ConsumerMessage, numMessages), + ready: make(chan bool), + t: t, + } + + // Start multiple consumers in the same group + t.Logf("=== Starting %d consumers in group '%s' ===", numConsumers, groupID) + + var wg sync.WaitGroup + consumerErrors := make(chan error, numConsumers) + + for i := 0; i < numConsumers; i++ { + wg.Add(1) + go func(consumerID int) { + defer wg.Done() + + consumerGroup, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) + if err != nil { + consumerErrors <- fmt.Errorf("consumer %d: failed to create consumer group: %v", consumerID, err) + return + } + defer consumerGroup.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + t.Logf("Consumer %d: Starting consumption", consumerID) + + // Start consuming + err = consumerGroup.Consume(ctx, []string{topicName}, handler) + if err != nil && err != context.DeadlineExceeded { + consumerErrors <- fmt.Errorf("consumer %d: consumption error: %v", consumerID, err) + return + } + + t.Logf("Consumer %d: Finished consumption", consumerID) + }(i) + } + + // Wait for consumers to be ready + t.Logf("Waiting for consumers to be ready...") + readyCount := 0 + for readyCount < numConsumers { + select { + case <-handler.ready: + readyCount++ + t.Logf("Consumer ready: %d/%d", readyCount, numConsumers) + case <-time.After(5 * time.Second): + t.Fatalf("Timeout waiting for consumers to be ready") + } + } + + // Collect consumed messages + t.Logf("=== Collecting consumed messages ===") + consumedMessages := make([]*sarama.ConsumerMessage, 0, numMessages) + messageTimeout := time.After(10 * time.Second) + + for len(consumedMessages) < numMessages { + select { + case msg := <-handler.messages: + consumedMessages = append(consumedMessages, msg) + t.Logf("✅ Consumed message %d: key=%s, value=%s, partition=%d, offset=%d", + len(consumedMessages), string(msg.Key), string(msg.Value), msg.Partition, msg.Offset) + case err := <-consumerErrors: + t.Fatalf("Consumer error: %v", err) + case <-messageTimeout: + t.Fatalf("Timeout waiting for messages. Got %d/%d messages", len(consumedMessages), numMessages) + } + } + + // Wait for all consumers to finish + wg.Wait() + + // Verify all messages were consumed exactly once + if len(consumedMessages) != numMessages { + t.Errorf("Expected %d messages, got %d", numMessages, len(consumedMessages)) + } + + // Verify message uniqueness (no duplicates) + messageKeys := make(map[string]bool) + for _, msg := range consumedMessages { + key := string(msg.Key) + if messageKeys[key] { + t.Errorf("Duplicate message key: %s", key) + } + messageKeys[key] = true + } + + // Verify all expected keys were received + for i := 0; i < numMessages; i++ { + expectedKey := fmt.Sprintf("key-%d", i) + if !messageKeys[expectedKey] { + t.Errorf("Missing message key: %s", expectedKey) + } + } + + t.Logf("🎉 SUCCESS: Consumer group test completed with %d messages consumed by %d consumers", + len(consumedMessages), numConsumers) +} + +// ConsumerGroupHandler implements sarama.ConsumerGroupHandler +type ConsumerGroupHandler struct { + messages chan *sarama.ConsumerMessage + ready chan bool + t *testing.T +} + +func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + h.t.Logf("Consumer group session setup") + close(h.ready) + return nil +} + +func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + h.t.Logf("Consumer group session cleanup") + return nil +} + +func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for { + select { + case message := <-claim.Messages(): + if message == nil { + return nil + } + h.messages <- message + session.MarkMessage(message, "") + case <-session.Context().Done(): + return nil + } + } +} + +func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) { + // Start Kafka gateway + gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"}) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Logf("Gateway server error: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + + // Test configuration + topicName := "offset-commit-test" + groupID := "offset-test-group" + numMessages := 5 + + // Add topic for testing + gatewayServer.GetHandler().AddTopicForTesting(topicName, 1) + + // Create Sarama config + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Return.Errors = true + config.Producer.Return.Successes = true + + // Produce test messages + t.Logf("=== Producing %d test messages ===", numMessages) + producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + for i := 0; i < numMessages; i++ { + message := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder(fmt.Sprintf("offset-key-%d", i)), + Value: sarama.StringEncoder(fmt.Sprintf("Offset Test Message %d", i+1)), + } + + _, _, err := producer.SendMessage(message) + if err != nil { + t.Fatalf("Failed to produce message %d: %v", i, err) + } + } + + // First consumer: consume first 3 messages and commit offsets + t.Logf("=== First consumer: consuming first 3 messages ===") + handler1 := &OffsetTestHandler{ + messages: make(chan *sarama.ConsumerMessage, numMessages), + ready: make(chan bool), + stopAfter: 3, + t: t, + } + + consumerGroup1, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) + if err != nil { + t.Fatalf("Failed to create first consumer group: %v", err) + } + + ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel1() + + // Start first consumer + go func() { + err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1) + if err != nil && err != context.DeadlineExceeded { + t.Logf("First consumer error: %v", err) + } + }() + + // Wait for first consumer to be ready + <-handler1.ready + + // Collect messages from first consumer + consumedCount := 0 + for consumedCount < 3 { + select { + case msg := <-handler1.messages: + consumedCount++ + t.Logf("✅ First consumer message %d: key=%s, offset=%d", + consumedCount, string(msg.Key), msg.Offset) + case <-time.After(5 * time.Second): + t.Fatalf("Timeout waiting for first consumer messages") + } + } + + // Close first consumer (this should commit offsets) + consumerGroup1.Close() + cancel1() + + // Wait a bit for cleanup + time.Sleep(500 * time.Millisecond) + + // Second consumer: should start from offset 3 (after committed offset) + t.Logf("=== Second consumer: should resume from offset 3 ===") + handler2 := &OffsetTestHandler{ + messages: make(chan *sarama.ConsumerMessage, numMessages), + ready: make(chan bool), + stopAfter: 2, // Should get remaining 2 messages + t: t, + } + + consumerGroup2, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) + if err != nil { + t.Fatalf("Failed to create second consumer group: %v", err) + } + defer consumerGroup2.Close() + + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel2() + + // Start second consumer + go func() { + err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2) + if err != nil && err != context.DeadlineExceeded { + t.Logf("Second consumer error: %v", err) + } + }() + + // Wait for second consumer to be ready + <-handler2.ready + + // Collect messages from second consumer + secondConsumerMessages := make([]*sarama.ConsumerMessage, 0) + consumedCount = 0 + for consumedCount < 2 { + select { + case msg := <-handler2.messages: + consumedCount++ + secondConsumerMessages = append(secondConsumerMessages, msg) + t.Logf("✅ Second consumer message %d: key=%s, offset=%d", + consumedCount, string(msg.Key), msg.Offset) + case <-time.After(5 * time.Second): + t.Fatalf("Timeout waiting for second consumer messages. Got %d/2", consumedCount) + } + } + + // Verify second consumer started from correct offset + if len(secondConsumerMessages) > 0 { + firstMessageOffset := secondConsumerMessages[0].Offset + if firstMessageOffset < 3 { + t.Errorf("Expected second consumer to start from offset >= 3, got %d", firstMessageOffset) + } else { + t.Logf("✅ Second consumer correctly resumed from offset %d", firstMessageOffset) + } + } + + t.Logf("🎉 SUCCESS: Offset commit/fetch test completed successfully") +} + +// OffsetTestHandler implements sarama.ConsumerGroupHandler for offset testing +type OffsetTestHandler struct { + messages chan *sarama.ConsumerMessage + ready chan bool + stopAfter int + consumed int + t *testing.T +} + +func (h *OffsetTestHandler) Setup(sarama.ConsumerGroupSession) error { + h.t.Logf("Offset test consumer setup") + close(h.ready) + return nil +} + +func (h *OffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error { + h.t.Logf("Offset test consumer cleanup") + return nil +} + +func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for { + select { + case message := <-claim.Messages(): + if message == nil { + return nil + } + h.consumed++ + h.messages <- message + session.MarkMessage(message, "") + + // Stop after consuming the specified number of messages + if h.consumed >= h.stopAfter { + h.t.Logf("Stopping consumer after %d messages", h.consumed) + return nil + } + case <-session.Context().Done(): + return nil + } + } +} + +func TestConsumerGroup_Rebalancing(t *testing.T) { + // Start Kafka gateway + gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"}) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Logf("Gateway server error: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + + // Test configuration + topicName := "rebalance-test" + groupID := "rebalance-test-group" + numMessages := 12 + + // Add topic for testing + gatewayServer.GetHandler().AddTopicForTesting(topicName, 1) + + // Create Sarama config + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Return.Errors = true + config.Consumer.Group.Session.Timeout = 10 * time.Second + config.Consumer.Group.Heartbeat.Interval = 3 * time.Second + config.Producer.Return.Successes = true + + // Produce test messages + t.Logf("=== Producing %d test messages ===", numMessages) + producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + for i := 0; i < numMessages; i++ { + message := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder(fmt.Sprintf("rebalance-key-%d", i)), + Value: sarama.StringEncoder(fmt.Sprintf("Rebalance Test Message %d", i+1)), + } + + _, _, err := producer.SendMessage(message) + if err != nil { + t.Fatalf("Failed to produce message %d: %v", i, err) + } + } + + // Start with 2 consumers + t.Logf("=== Starting 2 initial consumers ===") + + handler1 := &RebalanceTestHandler{ + messages: make(chan *sarama.ConsumerMessage, numMessages), + ready: make(chan bool), + rebalanced: make(chan bool, 5), + consumerID: "consumer-1", + t: t, + } + + handler2 := &RebalanceTestHandler{ + messages: make(chan *sarama.ConsumerMessage, numMessages), + ready: make(chan bool), + rebalanced: make(chan bool, 5), + consumerID: "consumer-2", + t: t, + } + + // Start first consumer + consumerGroup1, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) + if err != nil { + t.Fatalf("Failed to create consumer group 1: %v", err) + } + defer consumerGroup1.Close() + + ctx1, cancel1 := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel1() + + go func() { + err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1) + if err != nil && err != context.DeadlineExceeded { + t.Logf("Consumer 1 error: %v", err) + } + }() + + // Wait for first consumer to be ready + <-handler1.ready + t.Logf("Consumer 1 ready") + + // Start second consumer + consumerGroup2, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) + if err != nil { + t.Fatalf("Failed to create consumer group 2: %v", err) + } + defer consumerGroup2.Close() + + ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel2() + + go func() { + err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2) + if err != nil && err != context.DeadlineExceeded { + t.Logf("Consumer 2 error: %v", err) + } + }() + + // Wait for second consumer to be ready + <-handler2.ready + t.Logf("Consumer 2 ready") + + // Wait for initial rebalancing + t.Logf("Waiting for initial rebalancing...") + rebalanceCount := 0 + for rebalanceCount < 2 { + select { + case <-handler1.rebalanced: + rebalanceCount++ + t.Logf("Consumer 1 rebalanced (%d/2)", rebalanceCount) + case <-handler2.rebalanced: + rebalanceCount++ + t.Logf("Consumer 2 rebalanced (%d/2)", rebalanceCount) + case <-time.After(10 * time.Second): + t.Logf("Warning: Timeout waiting for initial rebalancing") + break + } + } + + // Collect some messages + t.Logf("=== Collecting messages from 2 consumers ===") + allMessages := make([]*sarama.ConsumerMessage, 0) + messageTimeout := time.After(10 * time.Second) + + // Collect at least half the messages + for len(allMessages) < numMessages/2 { + select { + case msg := <-handler1.messages: + allMessages = append(allMessages, msg) + t.Logf("Consumer 1 message: key=%s, offset=%d", string(msg.Key), msg.Offset) + case msg := <-handler2.messages: + allMessages = append(allMessages, msg) + t.Logf("Consumer 2 message: key=%s, offset=%d", string(msg.Key), msg.Offset) + case <-messageTimeout: + break + } + } + + t.Logf("Collected %d messages from 2 consumers", len(allMessages)) + + // Add a third consumer to trigger rebalancing + t.Logf("=== Adding third consumer to trigger rebalancing ===") + + handler3 := &RebalanceTestHandler{ + messages: make(chan *sarama.ConsumerMessage, numMessages), + ready: make(chan bool), + rebalanced: make(chan bool, 5), + consumerID: "consumer-3", + t: t, + } + + consumerGroup3, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) + if err != nil { + t.Fatalf("Failed to create consumer group 3: %v", err) + } + defer consumerGroup3.Close() + + ctx3, cancel3 := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel3() + + go func() { + err := consumerGroup3.Consume(ctx3, []string{topicName}, handler3) + if err != nil && err != context.DeadlineExceeded { + t.Logf("Consumer 3 error: %v", err) + } + }() + + // Wait for third consumer to be ready + <-handler3.ready + t.Logf("Consumer 3 ready") + + // Wait for rebalancing after adding third consumer + t.Logf("Waiting for rebalancing after adding third consumer...") + rebalanceCount = 0 + rebalanceTimeout := time.After(15 * time.Second) + + for rebalanceCount < 3 { + select { + case <-handler1.rebalanced: + rebalanceCount++ + t.Logf("Consumer 1 rebalanced after adding consumer 3 (%d/3)", rebalanceCount) + case <-handler2.rebalanced: + rebalanceCount++ + t.Logf("Consumer 2 rebalanced after adding consumer 3 (%d/3)", rebalanceCount) + case <-handler3.rebalanced: + rebalanceCount++ + t.Logf("Consumer 3 rebalanced (%d/3)", rebalanceCount) + case <-rebalanceTimeout: + t.Logf("Warning: Timeout waiting for rebalancing after adding consumer 3") + break + } + } + + // Collect remaining messages from all 3 consumers + t.Logf("=== Collecting remaining messages from 3 consumers ===") + finalTimeout := time.After(10 * time.Second) + + for len(allMessages) < numMessages { + select { + case msg := <-handler1.messages: + allMessages = append(allMessages, msg) + t.Logf("Consumer 1 message: key=%s, offset=%d", string(msg.Key), msg.Offset) + case msg := <-handler2.messages: + allMessages = append(allMessages, msg) + t.Logf("Consumer 2 message: key=%s, offset=%d", string(msg.Key), msg.Offset) + case msg := <-handler3.messages: + allMessages = append(allMessages, msg) + t.Logf("Consumer 3 message: key=%s, offset=%d", string(msg.Key), msg.Offset) + case <-finalTimeout: + break + } + } + + t.Logf("Final message count: %d/%d", len(allMessages), numMessages) + + // Verify no duplicate messages + messageKeys := make(map[string]bool) + duplicates := 0 + for _, msg := range allMessages { + key := string(msg.Key) + if messageKeys[key] { + duplicates++ + t.Logf("Duplicate message key: %s", key) + } + messageKeys[key] = true + } + + if duplicates > 0 { + t.Errorf("Found %d duplicate messages during rebalancing", duplicates) + } + + t.Logf("🎉 SUCCESS: Rebalancing test completed. Consumed %d unique messages with %d consumers", + len(messageKeys), 3) +} + +// RebalanceTestHandler implements sarama.ConsumerGroupHandler for rebalancing tests +type RebalanceTestHandler struct { + messages chan *sarama.ConsumerMessage + ready chan bool + rebalanced chan bool + consumerID string + t *testing.T +} + +func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error { + h.t.Logf("%s: Setup - Generation: %d, Claims: %v", + h.consumerID, session.GenerationID(), session.Claims()) + + select { + case h.rebalanced <- true: + default: + } + + select { + case h.ready <- true: + default: + } + + return nil +} + +func (h *RebalanceTestHandler) Cleanup(session sarama.ConsumerGroupSession) error { + h.t.Logf("%s: Cleanup", h.consumerID) + return nil +} + +func (h *RebalanceTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + h.t.Logf("%s: Starting to consume partition %d", h.consumerID, claim.Partition()) + + for { + select { + case message := <-claim.Messages(): + if message == nil { + return nil + } + h.messages <- message + session.MarkMessage(message, "") + case <-session.Context().Done(): + return nil + } + } +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 35e9a1ab6..ab9227d31 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -152,22 +152,22 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { // StoreRecordBatch stores a record batch for later retrieval during Fetch operations func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset) - + // Fix the base offset in the record batch binary data to match the assigned offset // The base offset is stored in the first 8 bytes of the record batch if len(recordBatch) >= 8 { // Create a copy to avoid modifying the original fixedBatch := make([]byte, len(recordBatch)) copy(fixedBatch, recordBatch) - + // Update the base offset (first 8 bytes, big endian) binary.BigEndian.PutUint64(fixedBatch[0:8], uint64(baseOffset)) - + h.recordBatchMu.Lock() defer h.recordBatchMu.Unlock() h.recordBatches[key] = fixedBatch - - fmt.Printf("DEBUG: Stored record batch with corrected base offset %d (was %d)\n", + + fmt.Printf("DEBUG: Stored record batch with corrected base offset %d (was %d)\n", baseOffset, binary.BigEndian.Uint64(recordBatch[0:8])) } else { h.recordBatchMu.Lock()