You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							351 lines
						
					
					
						
							10 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							351 lines
						
					
					
						
							10 KiB
						
					
					
				| package integration | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"fmt" | |
| 	"sync" | |
| 	"testing" | |
| 	"time" | |
| 
 | |
| 	"github.com/IBM/sarama" | |
| 	"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil" | |
| ) | |
| 
 | |
| // TestConsumerGroups tests consumer group functionality | |
| // This test requires SeaweedFS masters to be running and will skip if not available | |
| func TestConsumerGroups(t *testing.T) { | |
| 	gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired) | |
| 	defer gateway.CleanupAndClose() | |
| 
 | |
| 	addr := gateway.StartAndWait() | |
| 
 | |
| 	t.Logf("Running consumer group tests with SMQ backend for offset persistence") | |
| 
 | |
| 	t.Run("BasicFunctionality", func(t *testing.T) { | |
| 		testConsumerGroupBasicFunctionality(t, addr) | |
| 	}) | |
| 
 | |
| 	t.Run("OffsetCommitAndFetch", func(t *testing.T) { | |
| 		testConsumerGroupOffsetCommitAndFetch(t, addr) | |
| 	}) | |
| 
 | |
| 	t.Run("Rebalancing", func(t *testing.T) { | |
| 		testConsumerGroupRebalancing(t, addr) | |
| 	}) | |
| } | |
| 
 | |
| func testConsumerGroupBasicFunctionality(t *testing.T, addr string) { | |
| 	topicName := testutil.GenerateUniqueTopicName("consumer-group-basic") | |
| 	groupID := testutil.GenerateUniqueGroupID("basic-group") | |
| 
 | |
| 	client := testutil.NewSaramaClient(t, addr) | |
| 	msgGen := testutil.NewMessageGenerator() | |
| 
 | |
| 	// Create topic and produce messages | |
| 	err := client.CreateTopic(topicName, 1, 1) | |
| 	testutil.AssertNoError(t, err, "Failed to create topic") | |
| 
 | |
| 	messages := msgGen.GenerateStringMessages(9) // 3 messages per consumer | |
| 	err = client.ProduceMessages(topicName, messages) | |
| 	testutil.AssertNoError(t, err, "Failed to produce messages") | |
| 
 | |
| 	// Test with multiple consumers in the same group | |
| 	numConsumers := 3 | |
| 	handler := &ConsumerGroupHandler{ | |
| 		messages: make(chan *sarama.ConsumerMessage, len(messages)), | |
| 		ready:    make(chan bool), | |
| 		t:        t, | |
| 	} | |
| 
 | |
| 	var wg sync.WaitGroup | |
| 	consumerErrors := make(chan error, numConsumers) | |
| 
 | |
| 	for i := 0; i < numConsumers; i++ { | |
| 		wg.Add(1) | |
| 		go func(consumerID int) { | |
| 			defer wg.Done() | |
| 
 | |
| 			consumerGroup, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig()) | |
| 			if err != nil { | |
| 				consumerErrors <- fmt.Errorf("consumer %d: failed to create consumer group: %v", consumerID, err) | |
| 				return | |
| 			} | |
| 			defer consumerGroup.Close() | |
| 
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
| 			defer cancel() | |
| 
 | |
| 			err = consumerGroup.Consume(ctx, []string{topicName}, handler) | |
| 			if err != nil && err != context.DeadlineExceeded { | |
| 				consumerErrors <- fmt.Errorf("consumer %d: consumption error: %v", consumerID, err) | |
| 				return | |
| 			} | |
| 		}(i) | |
| 	} | |
| 
 | |
| 	// Wait for consumers to be ready | |
| 	readyCount := 0 | |
| 	for readyCount < numConsumers { | |
| 		select { | |
| 		case <-handler.ready: | |
| 			readyCount++ | |
| 		case <-time.After(5 * time.Second): | |
| 			t.Fatalf("Timeout waiting for consumers to be ready") | |
| 		} | |
| 	} | |
| 
 | |
| 	// Collect consumed messages | |
| 	consumedMessages := make([]*sarama.ConsumerMessage, 0, len(messages)) | |
| 	messageTimeout := time.After(10 * time.Second) | |
| 
 | |
| 	for len(consumedMessages) < len(messages) { | |
| 		select { | |
| 		case msg := <-handler.messages: | |
| 			consumedMessages = append(consumedMessages, msg) | |
| 		case err := <-consumerErrors: | |
| 			t.Fatalf("Consumer error: %v", err) | |
| 		case <-messageTimeout: | |
| 			t.Fatalf("Timeout waiting for messages. Got %d/%d messages", len(consumedMessages), len(messages)) | |
| 		} | |
| 	} | |
| 
 | |
| 	wg.Wait() | |
| 
 | |
| 	// Verify all messages were consumed exactly once | |
| 	testutil.AssertEqual(t, len(messages), len(consumedMessages), "Message count mismatch") | |
| 
 | |
| 	// Verify message uniqueness (no duplicates) | |
| 	messageKeys := make(map[string]bool) | |
| 	for _, msg := range consumedMessages { | |
| 		key := string(msg.Key) | |
| 		if messageKeys[key] { | |
| 			t.Errorf("Duplicate message key: %s", key) | |
| 		} | |
| 		messageKeys[key] = true | |
| 	} | |
| } | |
| 
 | |
| func testConsumerGroupOffsetCommitAndFetch(t *testing.T, addr string) { | |
| 	topicName := testutil.GenerateUniqueTopicName("offset-commit-test") | |
| 	groupID := testutil.GenerateUniqueGroupID("offset-group") | |
| 
 | |
| 	client := testutil.NewSaramaClient(t, addr) | |
| 	msgGen := testutil.NewMessageGenerator() | |
| 
 | |
| 	// Create topic and produce messages | |
| 	err := client.CreateTopic(topicName, 1, 1) | |
| 	testutil.AssertNoError(t, err, "Failed to create topic") | |
| 
 | |
| 	messages := msgGen.GenerateStringMessages(5) | |
| 	err = client.ProduceMessages(topicName, messages) | |
| 	testutil.AssertNoError(t, err, "Failed to produce messages") | |
| 
 | |
| 	// First consumer: consume first 3 messages and commit offsets | |
| 	handler1 := &OffsetTestHandler{ | |
| 		messages:  make(chan *sarama.ConsumerMessage, len(messages)), | |
| 		ready:     make(chan bool), | |
| 		stopAfter: 3, | |
| 		t:         t, | |
| 	} | |
| 
 | |
| 	consumerGroup1, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig()) | |
| 	testutil.AssertNoError(t, err, "Failed to create first consumer group") | |
| 
 | |
| 	ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second) | |
| 	defer cancel1() | |
| 
 | |
| 	go func() { | |
| 		err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1) | |
| 		if err != nil && err != context.DeadlineExceeded { | |
| 			t.Logf("First consumer error: %v", err) | |
| 		} | |
| 	}() | |
| 
 | |
| 	// Wait for first consumer to be ready and consume messages | |
| 	<-handler1.ready | |
| 	consumedCount := 0 | |
| 	for consumedCount < 3 { | |
| 		select { | |
| 		case <-handler1.messages: | |
| 			consumedCount++ | |
| 		case <-time.After(5 * time.Second): | |
| 			t.Fatalf("Timeout waiting for first consumer messages") | |
| 		} | |
| 	} | |
| 
 | |
| 	consumerGroup1.Close() | |
| 	cancel1() | |
| 	time.Sleep(500 * time.Millisecond) // Wait for cleanup | |
|  | |
| 	// Stop the first consumer after N messages | |
| 	// Allow a brief moment for commit/heartbeat to flush | |
| 	time.Sleep(1 * time.Second) | |
| 
 | |
| 	// Start a second consumer in the same group to verify resumption from committed offset | |
| 	handler2 := &OffsetTestHandler{ | |
| 		messages:  make(chan *sarama.ConsumerMessage, len(messages)), | |
| 		ready:     make(chan bool), | |
| 		stopAfter: 2, | |
| 		t:         t, | |
| 	} | |
| 	consumerGroup2, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig()) | |
| 	testutil.AssertNoError(t, err, "Failed to create second consumer group") | |
| 	defer consumerGroup2.Close() | |
| 
 | |
| 	ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) | |
| 	defer cancel2() | |
| 
 | |
| 	go func() { | |
| 		err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2) | |
| 		if err != nil && err != context.DeadlineExceeded { | |
| 			t.Logf("Second consumer error: %v", err) | |
| 		} | |
| 	}() | |
| 
 | |
| 	// Wait for second consumer and collect remaining messages | |
| 	<-handler2.ready | |
| 	secondConsumerMessages := make([]*sarama.ConsumerMessage, 0) | |
| 	consumedCount = 0 | |
| 	for consumedCount < 2 { | |
| 		select { | |
| 		case msg := <-handler2.messages: | |
| 			consumedCount++ | |
| 			secondConsumerMessages = append(secondConsumerMessages, msg) | |
| 		case <-time.After(5 * time.Second): | |
| 			t.Fatalf("Timeout waiting for second consumer messages. Got %d/2", consumedCount) | |
| 		} | |
| 	} | |
| 
 | |
| 	// Verify second consumer started from correct offset | |
| 	if len(secondConsumerMessages) > 0 { | |
| 		firstMessageOffset := secondConsumerMessages[0].Offset | |
| 		if firstMessageOffset < 3 { | |
| 			t.Fatalf("Second consumer should start from offset >= 3: got %d", firstMessageOffset) | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| func testConsumerGroupRebalancing(t *testing.T, addr string) { | |
| 	topicName := testutil.GenerateUniqueTopicName("rebalancing-test") | |
| 	groupID := testutil.GenerateUniqueGroupID("rebalance-group") | |
| 
 | |
| 	client := testutil.NewSaramaClient(t, addr) | |
| 	msgGen := testutil.NewMessageGenerator() | |
| 
 | |
| 	// Create topic with multiple partitions for rebalancing | |
| 	err := client.CreateTopic(topicName, 4, 1) // 4 partitions | |
| 	testutil.AssertNoError(t, err, "Failed to create topic") | |
| 
 | |
| 	// Produce messages to all partitions | |
| 	messages := msgGen.GenerateStringMessages(12) // 3 messages per partition | |
| 	for i, msg := range messages { | |
| 		partition := int32(i % 4) | |
| 		err = client.ProduceMessageToPartition(topicName, partition, msg) | |
| 		testutil.AssertNoError(t, err, "Failed to produce message") | |
| 	} | |
| 
 | |
| 	t.Logf("Produced %d messages across 4 partitions", len(messages)) | |
| 
 | |
| 	// Test scenario 1: Single consumer gets all partitions | |
| 	t.Run("SingleConsumerAllPartitions", func(t *testing.T) { | |
| 		testSingleConsumerAllPartitions(t, addr, topicName, groupID+"-single") | |
| 	}) | |
| 
 | |
| 	// Test scenario 2: Add second consumer, verify rebalancing | |
| 	t.Run("TwoConsumersRebalance", func(t *testing.T) { | |
| 		testTwoConsumersRebalance(t, addr, topicName, groupID+"-two") | |
| 	}) | |
| 
 | |
| 	// Test scenario 3: Remove consumer, verify rebalancing | |
| 	t.Run("ConsumerLeaveRebalance", func(t *testing.T) { | |
| 		testConsumerLeaveRebalance(t, addr, topicName, groupID+"-leave") | |
| 	}) | |
| 
 | |
| 	// Test scenario 4: Multiple consumers join simultaneously | |
| 	t.Run("MultipleConsumersJoin", func(t *testing.T) { | |
| 		testMultipleConsumersJoin(t, addr, topicName, groupID+"-multi") | |
| 	}) | |
| } | |
| 
 | |
| // ConsumerGroupHandler implements sarama.ConsumerGroupHandler | |
| type ConsumerGroupHandler struct { | |
| 	messages  chan *sarama.ConsumerMessage | |
| 	ready     chan bool | |
| 	readyOnce sync.Once | |
| 	t         *testing.T | |
| } | |
| 
 | |
| func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { | |
| 	h.t.Logf("Consumer group session setup") | |
| 	h.readyOnce.Do(func() { | |
| 		close(h.ready) | |
| 	}) | |
| 	return nil | |
| } | |
| 
 | |
| func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { | |
| 	h.t.Logf("Consumer group session cleanup") | |
| 	return nil | |
| } | |
| 
 | |
| func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
| 	for { | |
| 		select { | |
| 		case message := <-claim.Messages(): | |
| 			if message == nil { | |
| 				return nil | |
| 			} | |
| 			h.messages <- message | |
| 			session.MarkMessage(message, "") | |
| 		case <-session.Context().Done(): | |
| 			return nil | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| // OffsetTestHandler implements sarama.ConsumerGroupHandler for offset testing | |
| type OffsetTestHandler struct { | |
| 	messages  chan *sarama.ConsumerMessage | |
| 	ready     chan bool | |
| 	readyOnce sync.Once | |
| 	stopAfter int | |
| 	consumed  int | |
| 	t         *testing.T | |
| } | |
| 
 | |
| func (h *OffsetTestHandler) Setup(sarama.ConsumerGroupSession) error { | |
| 	h.t.Logf("Offset test consumer setup") | |
| 	h.readyOnce.Do(func() { | |
| 		close(h.ready) | |
| 	}) | |
| 	return nil | |
| } | |
| 
 | |
| func (h *OffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error { | |
| 	h.t.Logf("Offset test consumer cleanup") | |
| 	return nil | |
| } | |
| 
 | |
| func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
| 	for { | |
| 		select { | |
| 		case message := <-claim.Messages(): | |
| 			if message == nil { | |
| 				return nil | |
| 			} | |
| 			h.consumed++ | |
| 			h.messages <- message | |
| 			session.MarkMessage(message, "") | |
| 
 | |
| 			// Stop after consuming the specified number of messages | |
| 			if h.consumed >= h.stopAfter { | |
| 				h.t.Logf("Stopping consumer after %d messages", h.consumed) | |
| 				// Ensure commits are flushed before exiting the claim | |
| 				session.Commit() | |
| 				return nil | |
| 			} | |
| 		case <-session.Context().Done(): | |
| 			return nil | |
| 		} | |
| 	} | |
| }
 |