You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

359 lines
9.7 KiB

package consumer
import (
"reflect"
"sort"
"testing"
)
func TestRangeAssignmentStrategy(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
if strategy.Name() != "range" {
t.Errorf("Expected strategy name 'range', got '%s'", strategy.Name())
}
// Test with 2 members, 4 partitions on one topic
members := []*GroupMember{
{
ID: "member1",
Subscription: []string{"topic1"},
},
{
ID: "member2",
Subscription: []string{"topic1"},
},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify all members have assignments
if len(assignments) != 2 {
t.Fatalf("Expected assignments for 2 members, got %d", len(assignments))
}
// Verify total partitions assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
}
// Range assignment should distribute evenly: 2 partitions each
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment))
}
// Verify all assignments are for the subscribed topic
for _, pa := range assignment {
if pa.Topic != "topic1" {
t.Errorf("Expected topic 'topic1', got '%s'", pa.Topic)
}
}
}
}
func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
// Test with 3 members, 4 partitions - should distribute 2,1,1
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}},
{ID: "member2", Subscription: []string{"topic1"}},
{ID: "member3", Subscription: []string{"topic1"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Get assignment counts
counts := make([]int, 0, 3)
for _, assignment := range assignments {
counts = append(counts, len(assignment))
}
sort.Ints(counts)
// Should be distributed as [1, 1, 2] (first member gets extra partition)
expected := []int{1, 1, 2}
if !reflect.DeepEqual(counts, expected) {
t.Errorf("Expected partition distribution %v, got %v", expected, counts)
}
}
func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1", "topic2"}},
{ID: "member2", Subscription: []string{"topic1"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
assignments := strategy.Assign(members, topicPartitions)
// Member1 should get assignments from both topics
member1Assignments := assignments["member1"]
topicsAssigned := make(map[string]int)
for _, pa := range member1Assignments {
topicsAssigned[pa.Topic]++
}
if len(topicsAssigned) != 2 {
t.Errorf("Expected member1 to be assigned to 2 topics, got %d", len(topicsAssigned))
}
// Member2 should only get topic1 assignments
member2Assignments := assignments["member2"]
for _, pa := range member2Assignments {
if pa.Topic != "topic1" {
t.Errorf("Expected member2 to only get topic1, but got %s", pa.Topic)
}
}
}
func TestRoundRobinAssignmentStrategy(t *testing.T) {
strategy := &RoundRobinAssignmentStrategy{}
if strategy.Name() != "roundrobin" {
t.Errorf("Expected strategy name 'roundrobin', got '%s'", strategy.Name())
}
// Test with 2 members, 4 partitions on one topic
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}},
{ID: "member2", Subscription: []string{"topic1"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify all members have assignments
if len(assignments) != 2 {
t.Fatalf("Expected assignments for 2 members, got %d", len(assignments))
}
// Verify total partitions assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
}
// Round robin should distribute evenly: 2 partitions each
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment))
}
}
}
func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) {
strategy := &RoundRobinAssignmentStrategy{}
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1", "topic2"}},
{ID: "member2", Subscription: []string{"topic1", "topic2"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
assignments := strategy.Assign(members, topicPartitions)
// Each member should get 2 partitions (round robin across topics)
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment))
}
}
// Verify no partition is assigned twice
assignedPartitions := make(map[string]map[int32]bool)
for _, assignment := range assignments {
for _, pa := range assignment {
if assignedPartitions[pa.Topic] == nil {
assignedPartitions[pa.Topic] = make(map[int32]bool)
}
if assignedPartitions[pa.Topic][pa.Partition] {
t.Errorf("Partition %d of topic %s assigned multiple times", pa.Partition, pa.Topic)
}
assignedPartitions[pa.Topic][pa.Partition] = true
}
}
}
func TestGetAssignmentStrategy(t *testing.T) {
rangeStrategy := GetAssignmentStrategy("range")
if rangeStrategy.Name() != "range" {
t.Errorf("Expected range strategy, got %s", rangeStrategy.Name())
}
rrStrategy := GetAssignmentStrategy("roundrobin")
if rrStrategy.Name() != "roundrobin" {
t.Errorf("Expected roundrobin strategy, got %s", rrStrategy.Name())
}
// Unknown strategy should default to range
defaultStrategy := GetAssignmentStrategy("unknown")
if defaultStrategy.Name() != "range" {
t.Errorf("Expected default strategy to be range, got %s", defaultStrategy.Name())
}
}
func TestConsumerGroup_AssignPartitions(t *testing.T) {
group := &ConsumerGroup{
ID: "test-group",
Protocol: "range",
Members: map[string]*GroupMember{
"member1": {
ID: "member1",
Subscription: []string{"topic1"},
State: MemberStateStable,
},
"member2": {
ID: "member2",
Subscription: []string{"topic1"},
State: MemberStateStable,
},
},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
group.AssignPartitions(topicPartitions)
// Verify assignments were created
for memberID, member := range group.Members {
if len(member.Assignment) == 0 {
t.Errorf("Expected member %s to have partition assignments", memberID)
}
// Verify all assignments are valid
for _, pa := range member.Assignment {
if pa.Topic != "topic1" {
t.Errorf("Unexpected topic assignment: %s", pa.Topic)
}
if pa.Partition < 0 || pa.Partition >= 4 {
t.Errorf("Unexpected partition assignment: %d", pa.Partition)
}
}
}
}
func TestConsumerGroup_GetMemberAssignments(t *testing.T) {
group := &ConsumerGroup{
Members: map[string]*GroupMember{
"member1": {
ID: "member1",
Assignment: []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic1", Partition: 1},
},
},
},
}
assignments := group.GetMemberAssignments()
if len(assignments) != 1 {
t.Fatalf("Expected 1 member assignment, got %d", len(assignments))
}
member1Assignments := assignments["member1"]
if len(member1Assignments) != 2 {
t.Errorf("Expected 2 partition assignments for member1, got %d", len(member1Assignments))
}
// Verify assignment content
expectedAssignments := []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic1", Partition: 1},
}
if !reflect.DeepEqual(member1Assignments, expectedAssignments) {
t.Errorf("Expected assignments %v, got %v", expectedAssignments, member1Assignments)
}
}
func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) {
group := &ConsumerGroup{
Members: map[string]*GroupMember{
"member1": {
ID: "member1",
Subscription: []string{"topic1"},
},
"member2": {
ID: "member2",
Subscription: []string{"topic2"},
},
},
SubscribedTopics: map[string]bool{
"topic1": true,
"topic2": true,
},
}
// Update member1's subscription
group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"})
// Verify member subscription updated
member1 := group.Members["member1"]
expectedSubscription := []string{"topic1", "topic3"}
if !reflect.DeepEqual(member1.Subscription, expectedSubscription) {
t.Errorf("Expected subscription %v, got %v", expectedSubscription, member1.Subscription)
}
// Verify group subscribed topics updated
expectedGroupTopics := []string{"topic1", "topic2", "topic3"}
actualGroupTopics := group.GetSubscribedTopics()
if !reflect.DeepEqual(actualGroupTopics, expectedGroupTopics) {
t.Errorf("Expected group topics %v, got %v", expectedGroupTopics, actualGroupTopics)
}
}
func TestAssignmentStrategy_EmptyMembers(t *testing.T) {
rangeStrategy := &RangeAssignmentStrategy{}
rrStrategy := &RoundRobinAssignmentStrategy{}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
// Both strategies should handle empty members gracefully
rangeAssignments := rangeStrategy.Assign([]*GroupMember{}, topicPartitions)
rrAssignments := rrStrategy.Assign([]*GroupMember{}, topicPartitions)
if len(rangeAssignments) != 0 {
t.Error("Expected empty assignments for empty members list (range)")
}
if len(rrAssignments) != 0 {
t.Error("Expected empty assignments for empty members list (round robin)")
}
}