You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							453 lines
						
					
					
						
							14 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							453 lines
						
					
					
						
							14 KiB
						
					
					
				
								package integration
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"sync"
							 | 
						|
									"testing"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/IBM/sarama"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func testSingleConsumerAllPartitions(t *testing.T, addr, topicName, groupID string) {
							 | 
						|
									config := sarama.NewConfig()
							 | 
						|
									config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
							 | 
						|
									config.Consumer.Offsets.Initial = sarama.OffsetOldest
							 | 
						|
									config.Consumer.Return.Errors = true
							 | 
						|
								
							 | 
						|
									client, err := sarama.NewClient([]string{addr}, config)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create client")
							 | 
						|
									defer client.Close()
							 | 
						|
								
							 | 
						|
									consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create consumer group")
							 | 
						|
									defer consumerGroup.Close()
							 | 
						|
								
							 | 
						|
									handler := &RebalanceTestHandler{
							 | 
						|
										messages:    make(chan *sarama.ConsumerMessage, 20),
							 | 
						|
										ready:       make(chan bool),
							 | 
						|
										assignments: make(chan []int32, 5),
							 | 
						|
										t:           t,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
							 | 
						|
									defer cancel()
							 | 
						|
								
							 | 
						|
									// Start consumer
							 | 
						|
									go func() {
							 | 
						|
										err := consumerGroup.Consume(ctx, []string{topicName}, handler)
							 | 
						|
										if err != nil && err != context.DeadlineExceeded {
							 | 
						|
											t.Logf("Consumer error: %v", err)
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for consumer to be ready
							 | 
						|
									<-handler.ready
							 | 
						|
								
							 | 
						|
									// Wait for assignment
							 | 
						|
									select {
							 | 
						|
									case partitions := <-handler.assignments:
							 | 
						|
										t.Logf("Single consumer assigned partitions: %v", partitions)
							 | 
						|
										if len(partitions) != 4 {
							 | 
						|
											t.Errorf("Expected single consumer to get all 4 partitions, got %d", len(partitions))
							 | 
						|
										}
							 | 
						|
									case <-time.After(10 * time.Second):
							 | 
						|
										t.Fatal("Timeout waiting for partition assignment")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Consume some messages to verify functionality
							 | 
						|
									consumedCount := 0
							 | 
						|
									for consumedCount < 4 { // At least one from each partition
							 | 
						|
										select {
							 | 
						|
										case msg := <-handler.messages:
							 | 
						|
											t.Logf("Consumed message from partition %d: %s", msg.Partition, string(msg.Value))
							 | 
						|
											consumedCount++
							 | 
						|
										case <-time.After(5 * time.Second):
							 | 
						|
											t.Logf("Consumed %d messages so far", consumedCount)
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if consumedCount == 0 {
							 | 
						|
										t.Error("No messages consumed by single consumer")
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testTwoConsumersRebalance(t *testing.T, addr, topicName, groupID string) {
							 | 
						|
									config := sarama.NewConfig()
							 | 
						|
									config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
							 | 
						|
									config.Consumer.Offsets.Initial = sarama.OffsetOldest
							 | 
						|
									config.Consumer.Return.Errors = true
							 | 
						|
								
							 | 
						|
									// Start first consumer
							 | 
						|
									client1, err := sarama.NewClient([]string{addr}, config)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create client1")
							 | 
						|
									defer client1.Close()
							 | 
						|
								
							 | 
						|
									consumerGroup1, err := sarama.NewConsumerGroupFromClient(groupID, client1)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create consumer group 1")
							 | 
						|
									defer consumerGroup1.Close()
							 | 
						|
								
							 | 
						|
									handler1 := &RebalanceTestHandler{
							 | 
						|
										messages:    make(chan *sarama.ConsumerMessage, 20),
							 | 
						|
										ready:       make(chan bool),
							 | 
						|
										assignments: make(chan []int32, 5),
							 | 
						|
										t:           t,
							 | 
						|
										name:        "Consumer1",
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									ctx1, cancel1 := context.WithTimeout(context.Background(), 45*time.Second)
							 | 
						|
									defer cancel1()
							 | 
						|
								
							 | 
						|
									go func() {
							 | 
						|
										err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
							 | 
						|
										if err != nil && err != context.DeadlineExceeded {
							 | 
						|
											t.Logf("Consumer1 error: %v", err)
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for first consumer to be ready and get initial assignment
							 | 
						|
									<-handler1.ready
							 | 
						|
									select {
							 | 
						|
									case partitions := <-handler1.assignments:
							 | 
						|
										t.Logf("Consumer1 initial assignment: %v", partitions)
							 | 
						|
										if len(partitions) != 4 {
							 | 
						|
											t.Errorf("Expected Consumer1 to initially get all 4 partitions, got %d", len(partitions))
							 | 
						|
										}
							 | 
						|
									case <-time.After(10 * time.Second):
							 | 
						|
										t.Fatal("Timeout waiting for Consumer1 initial assignment")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Start second consumer
							 | 
						|
									client2, err := sarama.NewClient([]string{addr}, config)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create client2")
							 | 
						|
									defer client2.Close()
							 | 
						|
								
							 | 
						|
									consumerGroup2, err := sarama.NewConsumerGroupFromClient(groupID, client2)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create consumer group 2")
							 | 
						|
									defer consumerGroup2.Close()
							 | 
						|
								
							 | 
						|
									handler2 := &RebalanceTestHandler{
							 | 
						|
										messages:    make(chan *sarama.ConsumerMessage, 20),
							 | 
						|
										ready:       make(chan bool),
							 | 
						|
										assignments: make(chan []int32, 5),
							 | 
						|
										t:           t,
							 | 
						|
										name:        "Consumer2",
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
							 | 
						|
									defer cancel2()
							 | 
						|
								
							 | 
						|
									go func() {
							 | 
						|
										err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
							 | 
						|
										if err != nil && err != context.DeadlineExceeded {
							 | 
						|
											t.Logf("Consumer2 error: %v", err)
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for second consumer to be ready
							 | 
						|
									<-handler2.ready
							 | 
						|
								
							 | 
						|
									// Wait for rebalancing to occur - both consumers should get new assignments
							 | 
						|
									var rebalancedAssignment1, rebalancedAssignment2 []int32
							 | 
						|
									
							 | 
						|
									// Consumer1 should get a rebalance assignment
							 | 
						|
									select {
							 | 
						|
									case partitions := <-handler1.assignments:
							 | 
						|
										rebalancedAssignment1 = partitions
							 | 
						|
										t.Logf("Consumer1 rebalanced assignment: %v", partitions)
							 | 
						|
									case <-time.After(15 * time.Second):
							 | 
						|
										t.Error("Timeout waiting for Consumer1 rebalance assignment")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Consumer2 should get its assignment
							 | 
						|
									select {
							 | 
						|
									case partitions := <-handler2.assignments:
							 | 
						|
										rebalancedAssignment2 = partitions
							 | 
						|
										t.Logf("Consumer2 assignment: %v", partitions)
							 | 
						|
									case <-time.After(15 * time.Second):
							 | 
						|
										t.Error("Timeout waiting for Consumer2 assignment")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Verify rebalancing occurred correctly
							 | 
						|
									totalPartitions := len(rebalancedAssignment1) + len(rebalancedAssignment2)
							 | 
						|
									if totalPartitions != 4 {
							 | 
						|
										t.Errorf("Expected total of 4 partitions assigned, got %d", totalPartitions)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Each consumer should have at least 1 partition, and no more than 3
							 | 
						|
									if len(rebalancedAssignment1) == 0 || len(rebalancedAssignment1) > 3 {
							 | 
						|
										t.Errorf("Consumer1 should have 1-3 partitions, got %d", len(rebalancedAssignment1))
							 | 
						|
									}
							 | 
						|
									if len(rebalancedAssignment2) == 0 || len(rebalancedAssignment2) > 3 {
							 | 
						|
										t.Errorf("Consumer2 should have 1-3 partitions, got %d", len(rebalancedAssignment2))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Verify no partition overlap
							 | 
						|
									partitionSet := make(map[int32]bool)
							 | 
						|
									for _, p := range rebalancedAssignment1 {
							 | 
						|
										if partitionSet[p] {
							 | 
						|
											t.Errorf("Partition %d assigned to multiple consumers", p)
							 | 
						|
										}
							 | 
						|
										partitionSet[p] = true
							 | 
						|
									}
							 | 
						|
									for _, p := range rebalancedAssignment2 {
							 | 
						|
										if partitionSet[p] {
							 | 
						|
											t.Errorf("Partition %d assigned to multiple consumers", p)
							 | 
						|
										}
							 | 
						|
										partitionSet[p] = true
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.Logf("Rebalancing test completed successfully")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testConsumerLeaveRebalance(t *testing.T, addr, topicName, groupID string) {
							 | 
						|
									config := sarama.NewConfig()
							 | 
						|
									config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
							 | 
						|
									config.Consumer.Offsets.Initial = sarama.OffsetOldest
							 | 
						|
									config.Consumer.Return.Errors = true
							 | 
						|
								
							 | 
						|
									// Start two consumers
							 | 
						|
									client1, err := sarama.NewClient([]string{addr}, config)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create client1")
							 | 
						|
									defer client1.Close()
							 | 
						|
								
							 | 
						|
									client2, err := sarama.NewClient([]string{addr}, config)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create client2")
							 | 
						|
									defer client2.Close()
							 | 
						|
								
							 | 
						|
									consumerGroup1, err := sarama.NewConsumerGroupFromClient(groupID, client1)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create consumer group 1")
							 | 
						|
									defer consumerGroup1.Close()
							 | 
						|
								
							 | 
						|
									consumerGroup2, err := sarama.NewConsumerGroupFromClient(groupID, client2)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create consumer group 2")
							 | 
						|
								
							 | 
						|
									handler1 := &RebalanceTestHandler{
							 | 
						|
										messages:    make(chan *sarama.ConsumerMessage, 20),
							 | 
						|
										ready:       make(chan bool),
							 | 
						|
										assignments: make(chan []int32, 5),
							 | 
						|
										t:           t,
							 | 
						|
										name:        "Consumer1",
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									handler2 := &RebalanceTestHandler{
							 | 
						|
										messages:    make(chan *sarama.ConsumerMessage, 20),
							 | 
						|
										ready:       make(chan bool),
							 | 
						|
										assignments: make(chan []int32, 5),
							 | 
						|
										t:           t,
							 | 
						|
										name:        "Consumer2",
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									ctx1, cancel1 := context.WithTimeout(context.Background(), 60*time.Second)
							 | 
						|
									defer cancel1()
							 | 
						|
								
							 | 
						|
									ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
							 | 
						|
								
							 | 
						|
									// Start both consumers
							 | 
						|
									go func() {
							 | 
						|
										err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
							 | 
						|
										if err != nil && err != context.DeadlineExceeded {
							 | 
						|
											t.Logf("Consumer1 error: %v", err)
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									go func() {
							 | 
						|
										err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
							 | 
						|
										if err != nil && err != context.DeadlineExceeded {
							 | 
						|
											t.Logf("Consumer2 error: %v", err)
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for both consumers to be ready
							 | 
						|
									<-handler1.ready
							 | 
						|
									<-handler2.ready
							 | 
						|
								
							 | 
						|
									// Wait for initial assignments
							 | 
						|
									<-handler1.assignments
							 | 
						|
									<-handler2.assignments
							 | 
						|
								
							 | 
						|
									t.Logf("Both consumers started, now stopping Consumer2")
							 | 
						|
								
							 | 
						|
									// Stop second consumer (simulate leave)
							 | 
						|
									cancel2()
							 | 
						|
									consumerGroup2.Close()
							 | 
						|
								
							 | 
						|
									// Wait for Consumer1 to get rebalanced assignment (should get all partitions)
							 | 
						|
									select {
							 | 
						|
									case partitions := <-handler1.assignments:
							 | 
						|
										t.Logf("Consumer1 rebalanced assignment after Consumer2 left: %v", partitions)
							 | 
						|
										if len(partitions) != 4 {
							 | 
						|
											t.Errorf("Expected Consumer1 to get all 4 partitions after Consumer2 left, got %d", len(partitions))
							 | 
						|
										}
							 | 
						|
									case <-time.After(20 * time.Second):
							 | 
						|
										t.Error("Timeout waiting for Consumer1 rebalance after Consumer2 left")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.Logf("Consumer leave rebalancing test completed successfully")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testMultipleConsumersJoin(t *testing.T, addr, topicName, groupID string) {
							 | 
						|
									config := sarama.NewConfig()
							 | 
						|
									config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
							 | 
						|
									config.Consumer.Offsets.Initial = sarama.OffsetOldest
							 | 
						|
									config.Consumer.Return.Errors = true
							 | 
						|
								
							 | 
						|
									numConsumers := 4
							 | 
						|
									consumers := make([]sarama.ConsumerGroup, numConsumers)
							 | 
						|
									clients := make([]sarama.Client, numConsumers)
							 | 
						|
									handlers := make([]*RebalanceTestHandler, numConsumers)
							 | 
						|
									contexts := make([]context.Context, numConsumers)
							 | 
						|
									cancels := make([]context.CancelFunc, numConsumers)
							 | 
						|
								
							 | 
						|
									// Start all consumers simultaneously
							 | 
						|
									for i := 0; i < numConsumers; i++ {
							 | 
						|
										client, err := sarama.NewClient([]string{addr}, config)
							 | 
						|
										testutil.AssertNoError(t, err, fmt.Sprintf("Failed to create client%d", i))
							 | 
						|
										clients[i] = client
							 | 
						|
								
							 | 
						|
										consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
							 | 
						|
										testutil.AssertNoError(t, err, fmt.Sprintf("Failed to create consumer group %d", i))
							 | 
						|
										consumers[i] = consumerGroup
							 | 
						|
								
							 | 
						|
										handlers[i] = &RebalanceTestHandler{
							 | 
						|
											messages:    make(chan *sarama.ConsumerMessage, 20),
							 | 
						|
											ready:       make(chan bool),
							 | 
						|
											assignments: make(chan []int32, 5),
							 | 
						|
											t:           t,
							 | 
						|
											name:        fmt.Sprintf("Consumer%d", i),
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										contexts[i], cancels[i] = context.WithTimeout(context.Background(), 45*time.Second)
							 | 
						|
								
							 | 
						|
										go func(idx int) {
							 | 
						|
											err := consumers[idx].Consume(contexts[idx], []string{topicName}, handlers[idx])
							 | 
						|
											if err != nil && err != context.DeadlineExceeded {
							 | 
						|
												t.Logf("Consumer%d error: %v", idx, err)
							 | 
						|
											}
							 | 
						|
										}(i)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Cleanup
							 | 
						|
									defer func() {
							 | 
						|
										for i := 0; i < numConsumers; i++ {
							 | 
						|
											cancels[i]()
							 | 
						|
											consumers[i].Close()
							 | 
						|
											clients[i].Close()
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Wait for all consumers to be ready
							 | 
						|
									for i := 0; i < numConsumers; i++ {
							 | 
						|
										select {
							 | 
						|
										case <-handlers[i].ready:
							 | 
						|
											t.Logf("Consumer%d ready", i)
							 | 
						|
										case <-time.After(15 * time.Second):
							 | 
						|
											t.Fatalf("Timeout waiting for Consumer%d to be ready", i)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Collect final assignments from all consumers
							 | 
						|
									assignments := make([][]int32, numConsumers)
							 | 
						|
									for i := 0; i < numConsumers; i++ {
							 | 
						|
										select {
							 | 
						|
										case partitions := <-handlers[i].assignments:
							 | 
						|
											assignments[i] = partitions
							 | 
						|
											t.Logf("Consumer%d final assignment: %v", i, partitions)
							 | 
						|
										case <-time.After(20 * time.Second):
							 | 
						|
											t.Errorf("Timeout waiting for Consumer%d assignment", i)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Verify all partitions are assigned exactly once
							 | 
						|
									assignedPartitions := make(map[int32]int)
							 | 
						|
									totalAssigned := 0
							 | 
						|
									for i, assignment := range assignments {
							 | 
						|
										totalAssigned += len(assignment)
							 | 
						|
										for _, partition := range assignment {
							 | 
						|
											assignedPartitions[partition]++
							 | 
						|
											if assignedPartitions[partition] > 1 {
							 | 
						|
												t.Errorf("Partition %d assigned to multiple consumers", partition)
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										
							 | 
						|
										// Each consumer should get exactly 1 partition (4 partitions / 4 consumers)
							 | 
						|
										if len(assignment) != 1 {
							 | 
						|
											t.Errorf("Consumer%d should get exactly 1 partition, got %d", i, len(assignment))
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if totalAssigned != 4 {
							 | 
						|
										t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Verify all partitions 0-3 are assigned
							 | 
						|
									for i := int32(0); i < 4; i++ {
							 | 
						|
										if assignedPartitions[i] != 1 {
							 | 
						|
											t.Errorf("Partition %d assigned %d times, expected 1", i, assignedPartitions[i])
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.Logf("Multiple consumers join test completed successfully")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// RebalanceTestHandler implements sarama.ConsumerGroupHandler with rebalancing awareness
							 | 
						|
								type RebalanceTestHandler struct {
							 | 
						|
									messages    chan *sarama.ConsumerMessage
							 | 
						|
									ready       chan bool
							 | 
						|
									assignments chan []int32
							 | 
						|
									readyOnce   sync.Once
							 | 
						|
									t           *testing.T
							 | 
						|
									name        string
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error {
							 | 
						|
									h.t.Logf("%s: Consumer group session setup", h.name)
							 | 
						|
									h.readyOnce.Do(func() {
							 | 
						|
										close(h.ready)
							 | 
						|
									})
							 | 
						|
									
							 | 
						|
									// Send partition assignment
							 | 
						|
									partitions := make([]int32, 0)
							 | 
						|
									for topic, partitionList := range session.Claims() {
							 | 
						|
										h.t.Logf("%s: Assigned topic %s with partitions %v", h.name, topic, partitionList)
							 | 
						|
										for _, partition := range partitionList {
							 | 
						|
											partitions = append(partitions, partition)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									
							 | 
						|
									select {
							 | 
						|
									case h.assignments <- partitions:
							 | 
						|
									default:
							 | 
						|
										// Channel might be full, that's ok
							 | 
						|
									}
							 | 
						|
									
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (h *RebalanceTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
							 | 
						|
									h.t.Logf("%s: Consumer group session cleanup", h.name)
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (h *RebalanceTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
							 | 
						|
									for {
							 | 
						|
										select {
							 | 
						|
										case message := <-claim.Messages():
							 | 
						|
											if message == nil {
							 | 
						|
												return nil
							 | 
						|
											}
							 | 
						|
											h.t.Logf("%s: Received message from partition %d: %s", h.name, message.Partition, string(message.Value))
							 | 
						|
											select {
							 | 
						|
											case h.messages <- message:
							 | 
						|
											default:
							 | 
						|
												// Channel full, drop message for test
							 | 
						|
											}
							 | 
						|
											session.MarkMessage(message, "")
							 | 
						|
										case <-session.Context().Done():
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 |