You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
453 lines
14 KiB
453 lines
14 KiB
package integration
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
|
|
)
|
|
|
|
func testSingleConsumerAllPartitions(t *testing.T, addr, topicName, groupID string) {
|
|
config := sarama.NewConfig()
|
|
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
config.Consumer.Return.Errors = true
|
|
|
|
client, err := sarama.NewClient([]string{addr}, config)
|
|
testutil.AssertNoError(t, err, "Failed to create client")
|
|
defer client.Close()
|
|
|
|
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
|
|
testutil.AssertNoError(t, err, "Failed to create consumer group")
|
|
defer consumerGroup.Close()
|
|
|
|
handler := &RebalanceTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, 20),
|
|
ready: make(chan bool),
|
|
assignments: make(chan []int32, 5),
|
|
t: t,
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Start consumer
|
|
go func() {
|
|
err := consumerGroup.Consume(ctx, []string{topicName}, handler)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Consumer error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for consumer to be ready
|
|
<-handler.ready
|
|
|
|
// Wait for assignment
|
|
select {
|
|
case partitions := <-handler.assignments:
|
|
t.Logf("Single consumer assigned partitions: %v", partitions)
|
|
if len(partitions) != 4 {
|
|
t.Errorf("Expected single consumer to get all 4 partitions, got %d", len(partitions))
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("Timeout waiting for partition assignment")
|
|
}
|
|
|
|
// Consume some messages to verify functionality
|
|
consumedCount := 0
|
|
for consumedCount < 4 { // At least one from each partition
|
|
select {
|
|
case msg := <-handler.messages:
|
|
t.Logf("Consumed message from partition %d: %s", msg.Partition, string(msg.Value))
|
|
consumedCount++
|
|
case <-time.After(5 * time.Second):
|
|
t.Logf("Consumed %d messages so far", consumedCount)
|
|
break
|
|
}
|
|
}
|
|
|
|
if consumedCount == 0 {
|
|
t.Error("No messages consumed by single consumer")
|
|
}
|
|
}
|
|
|
|
func testTwoConsumersRebalance(t *testing.T, addr, topicName, groupID string) {
|
|
config := sarama.NewConfig()
|
|
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
config.Consumer.Return.Errors = true
|
|
|
|
// Start first consumer
|
|
client1, err := sarama.NewClient([]string{addr}, config)
|
|
testutil.AssertNoError(t, err, "Failed to create client1")
|
|
defer client1.Close()
|
|
|
|
consumerGroup1, err := sarama.NewConsumerGroupFromClient(groupID, client1)
|
|
testutil.AssertNoError(t, err, "Failed to create consumer group 1")
|
|
defer consumerGroup1.Close()
|
|
|
|
handler1 := &RebalanceTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, 20),
|
|
ready: make(chan bool),
|
|
assignments: make(chan []int32, 5),
|
|
t: t,
|
|
name: "Consumer1",
|
|
}
|
|
|
|
ctx1, cancel1 := context.WithTimeout(context.Background(), 45*time.Second)
|
|
defer cancel1()
|
|
|
|
go func() {
|
|
err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Consumer1 error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for first consumer to be ready and get initial assignment
|
|
<-handler1.ready
|
|
select {
|
|
case partitions := <-handler1.assignments:
|
|
t.Logf("Consumer1 initial assignment: %v", partitions)
|
|
if len(partitions) != 4 {
|
|
t.Errorf("Expected Consumer1 to initially get all 4 partitions, got %d", len(partitions))
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("Timeout waiting for Consumer1 initial assignment")
|
|
}
|
|
|
|
// Start second consumer
|
|
client2, err := sarama.NewClient([]string{addr}, config)
|
|
testutil.AssertNoError(t, err, "Failed to create client2")
|
|
defer client2.Close()
|
|
|
|
consumerGroup2, err := sarama.NewConsumerGroupFromClient(groupID, client2)
|
|
testutil.AssertNoError(t, err, "Failed to create consumer group 2")
|
|
defer consumerGroup2.Close()
|
|
|
|
handler2 := &RebalanceTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, 20),
|
|
ready: make(chan bool),
|
|
assignments: make(chan []int32, 5),
|
|
t: t,
|
|
name: "Consumer2",
|
|
}
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel2()
|
|
|
|
go func() {
|
|
err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Consumer2 error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for second consumer to be ready
|
|
<-handler2.ready
|
|
|
|
// Wait for rebalancing to occur - both consumers should get new assignments
|
|
var rebalancedAssignment1, rebalancedAssignment2 []int32
|
|
|
|
// Consumer1 should get a rebalance assignment
|
|
select {
|
|
case partitions := <-handler1.assignments:
|
|
rebalancedAssignment1 = partitions
|
|
t.Logf("Consumer1 rebalanced assignment: %v", partitions)
|
|
case <-time.After(15 * time.Second):
|
|
t.Error("Timeout waiting for Consumer1 rebalance assignment")
|
|
}
|
|
|
|
// Consumer2 should get its assignment
|
|
select {
|
|
case partitions := <-handler2.assignments:
|
|
rebalancedAssignment2 = partitions
|
|
t.Logf("Consumer2 assignment: %v", partitions)
|
|
case <-time.After(15 * time.Second):
|
|
t.Error("Timeout waiting for Consumer2 assignment")
|
|
}
|
|
|
|
// Verify rebalancing occurred correctly
|
|
totalPartitions := len(rebalancedAssignment1) + len(rebalancedAssignment2)
|
|
if totalPartitions != 4 {
|
|
t.Errorf("Expected total of 4 partitions assigned, got %d", totalPartitions)
|
|
}
|
|
|
|
// Each consumer should have at least 1 partition, and no more than 3
|
|
if len(rebalancedAssignment1) == 0 || len(rebalancedAssignment1) > 3 {
|
|
t.Errorf("Consumer1 should have 1-3 partitions, got %d", len(rebalancedAssignment1))
|
|
}
|
|
if len(rebalancedAssignment2) == 0 || len(rebalancedAssignment2) > 3 {
|
|
t.Errorf("Consumer2 should have 1-3 partitions, got %d", len(rebalancedAssignment2))
|
|
}
|
|
|
|
// Verify no partition overlap
|
|
partitionSet := make(map[int32]bool)
|
|
for _, p := range rebalancedAssignment1 {
|
|
if partitionSet[p] {
|
|
t.Errorf("Partition %d assigned to multiple consumers", p)
|
|
}
|
|
partitionSet[p] = true
|
|
}
|
|
for _, p := range rebalancedAssignment2 {
|
|
if partitionSet[p] {
|
|
t.Errorf("Partition %d assigned to multiple consumers", p)
|
|
}
|
|
partitionSet[p] = true
|
|
}
|
|
|
|
t.Logf("Rebalancing test completed successfully")
|
|
}
|
|
|
|
func testConsumerLeaveRebalance(t *testing.T, addr, topicName, groupID string) {
|
|
config := sarama.NewConfig()
|
|
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
config.Consumer.Return.Errors = true
|
|
|
|
// Start two consumers
|
|
client1, err := sarama.NewClient([]string{addr}, config)
|
|
testutil.AssertNoError(t, err, "Failed to create client1")
|
|
defer client1.Close()
|
|
|
|
client2, err := sarama.NewClient([]string{addr}, config)
|
|
testutil.AssertNoError(t, err, "Failed to create client2")
|
|
defer client2.Close()
|
|
|
|
consumerGroup1, err := sarama.NewConsumerGroupFromClient(groupID, client1)
|
|
testutil.AssertNoError(t, err, "Failed to create consumer group 1")
|
|
defer consumerGroup1.Close()
|
|
|
|
consumerGroup2, err := sarama.NewConsumerGroupFromClient(groupID, client2)
|
|
testutil.AssertNoError(t, err, "Failed to create consumer group 2")
|
|
|
|
handler1 := &RebalanceTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, 20),
|
|
ready: make(chan bool),
|
|
assignments: make(chan []int32, 5),
|
|
t: t,
|
|
name: "Consumer1",
|
|
}
|
|
|
|
handler2 := &RebalanceTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, 20),
|
|
ready: make(chan bool),
|
|
assignments: make(chan []int32, 5),
|
|
t: t,
|
|
name: "Consumer2",
|
|
}
|
|
|
|
ctx1, cancel1 := context.WithTimeout(context.Background(), 60*time.Second)
|
|
defer cancel1()
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
|
|
|
|
// Start both consumers
|
|
go func() {
|
|
err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Consumer1 error: %v", err)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Consumer2 error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for both consumers to be ready
|
|
<-handler1.ready
|
|
<-handler2.ready
|
|
|
|
// Wait for initial assignments
|
|
<-handler1.assignments
|
|
<-handler2.assignments
|
|
|
|
t.Logf("Both consumers started, now stopping Consumer2")
|
|
|
|
// Stop second consumer (simulate leave)
|
|
cancel2()
|
|
consumerGroup2.Close()
|
|
|
|
// Wait for Consumer1 to get rebalanced assignment (should get all partitions)
|
|
select {
|
|
case partitions := <-handler1.assignments:
|
|
t.Logf("Consumer1 rebalanced assignment after Consumer2 left: %v", partitions)
|
|
if len(partitions) != 4 {
|
|
t.Errorf("Expected Consumer1 to get all 4 partitions after Consumer2 left, got %d", len(partitions))
|
|
}
|
|
case <-time.After(20 * time.Second):
|
|
t.Error("Timeout waiting for Consumer1 rebalance after Consumer2 left")
|
|
}
|
|
|
|
t.Logf("Consumer leave rebalancing test completed successfully")
|
|
}
|
|
|
|
func testMultipleConsumersJoin(t *testing.T, addr, topicName, groupID string) {
|
|
config := sarama.NewConfig()
|
|
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
config.Consumer.Return.Errors = true
|
|
|
|
numConsumers := 4
|
|
consumers := make([]sarama.ConsumerGroup, numConsumers)
|
|
clients := make([]sarama.Client, numConsumers)
|
|
handlers := make([]*RebalanceTestHandler, numConsumers)
|
|
contexts := make([]context.Context, numConsumers)
|
|
cancels := make([]context.CancelFunc, numConsumers)
|
|
|
|
// Start all consumers simultaneously
|
|
for i := 0; i < numConsumers; i++ {
|
|
client, err := sarama.NewClient([]string{addr}, config)
|
|
testutil.AssertNoError(t, err, fmt.Sprintf("Failed to create client%d", i))
|
|
clients[i] = client
|
|
|
|
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
|
|
testutil.AssertNoError(t, err, fmt.Sprintf("Failed to create consumer group %d", i))
|
|
consumers[i] = consumerGroup
|
|
|
|
handlers[i] = &RebalanceTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, 20),
|
|
ready: make(chan bool),
|
|
assignments: make(chan []int32, 5),
|
|
t: t,
|
|
name: fmt.Sprintf("Consumer%d", i),
|
|
}
|
|
|
|
contexts[i], cancels[i] = context.WithTimeout(context.Background(), 45*time.Second)
|
|
|
|
go func(idx int) {
|
|
err := consumers[idx].Consume(contexts[idx], []string{topicName}, handlers[idx])
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Consumer%d error: %v", idx, err)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Cleanup
|
|
defer func() {
|
|
for i := 0; i < numConsumers; i++ {
|
|
cancels[i]()
|
|
consumers[i].Close()
|
|
clients[i].Close()
|
|
}
|
|
}()
|
|
|
|
// Wait for all consumers to be ready
|
|
for i := 0; i < numConsumers; i++ {
|
|
select {
|
|
case <-handlers[i].ready:
|
|
t.Logf("Consumer%d ready", i)
|
|
case <-time.After(15 * time.Second):
|
|
t.Fatalf("Timeout waiting for Consumer%d to be ready", i)
|
|
}
|
|
}
|
|
|
|
// Collect final assignments from all consumers
|
|
assignments := make([][]int32, numConsumers)
|
|
for i := 0; i < numConsumers; i++ {
|
|
select {
|
|
case partitions := <-handlers[i].assignments:
|
|
assignments[i] = partitions
|
|
t.Logf("Consumer%d final assignment: %v", i, partitions)
|
|
case <-time.After(20 * time.Second):
|
|
t.Errorf("Timeout waiting for Consumer%d assignment", i)
|
|
}
|
|
}
|
|
|
|
// Verify all partitions are assigned exactly once
|
|
assignedPartitions := make(map[int32]int)
|
|
totalAssigned := 0
|
|
for i, assignment := range assignments {
|
|
totalAssigned += len(assignment)
|
|
for _, partition := range assignment {
|
|
assignedPartitions[partition]++
|
|
if assignedPartitions[partition] > 1 {
|
|
t.Errorf("Partition %d assigned to multiple consumers", partition)
|
|
}
|
|
}
|
|
|
|
// Each consumer should get exactly 1 partition (4 partitions / 4 consumers)
|
|
if len(assignment) != 1 {
|
|
t.Errorf("Consumer%d should get exactly 1 partition, got %d", i, len(assignment))
|
|
}
|
|
}
|
|
|
|
if totalAssigned != 4 {
|
|
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
|
|
}
|
|
|
|
// Verify all partitions 0-3 are assigned
|
|
for i := int32(0); i < 4; i++ {
|
|
if assignedPartitions[i] != 1 {
|
|
t.Errorf("Partition %d assigned %d times, expected 1", i, assignedPartitions[i])
|
|
}
|
|
}
|
|
|
|
t.Logf("Multiple consumers join test completed successfully")
|
|
}
|
|
|
|
// RebalanceTestHandler implements sarama.ConsumerGroupHandler with rebalancing awareness
|
|
type RebalanceTestHandler struct {
|
|
messages chan *sarama.ConsumerMessage
|
|
ready chan bool
|
|
assignments chan []int32
|
|
readyOnce sync.Once
|
|
t *testing.T
|
|
name string
|
|
}
|
|
|
|
func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("%s: Consumer group session setup", h.name)
|
|
h.readyOnce.Do(func() {
|
|
close(h.ready)
|
|
})
|
|
|
|
// Send partition assignment
|
|
partitions := make([]int32, 0)
|
|
for topic, partitionList := range session.Claims() {
|
|
h.t.Logf("%s: Assigned topic %s with partitions %v", h.name, topic, partitionList)
|
|
for _, partition := range partitionList {
|
|
partitions = append(partitions, partition)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case h.assignments <- partitions:
|
|
default:
|
|
// Channel might be full, that's ok
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RebalanceTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("%s: Consumer group session cleanup", h.name)
|
|
return nil
|
|
}
|
|
|
|
func (h *RebalanceTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
for {
|
|
select {
|
|
case message := <-claim.Messages():
|
|
if message == nil {
|
|
return nil
|
|
}
|
|
h.t.Logf("%s: Received message from partition %d: %s", h.name, message.Partition, string(message.Value))
|
|
select {
|
|
case h.messages <- message:
|
|
default:
|
|
// Channel full, drop message for test
|
|
}
|
|
session.MarkMessage(message, "")
|
|
case <-session.Context().Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|