From f639c4247258913f13573f86b948497af601f4f1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 19:01:12 -0700 Subject: [PATCH] clean up consumer protocols --- weed/mq/kafka/consumer/assignment.go | 195 ++--------------- weed/mq/kafka/consumer/assignment_test.go | 128 +++++------ .../kafka/consumer/cooperative_sticky_test.go | 199 +++++++++--------- .../kafka/consumer/incremental_rebalancing.go | 23 +- .../kafka/protocol/consumer_group_metadata.go | 18 +- weed/mq/kafka/protocol/joingroup.go | 10 +- 6 files changed, 208 insertions(+), 365 deletions(-) diff --git a/weed/mq/kafka/consumer/assignment.go b/weed/mq/kafka/consumer/assignment.go index 5799ed2b5..706efe5c9 100644 --- a/weed/mq/kafka/consumer/assignment.go +++ b/weed/mq/kafka/consumer/assignment.go @@ -4,6 +4,14 @@ import ( "sort" ) +// Assignment strategy protocol names +const ( + ProtocolNameRange = "range" + ProtocolNameRoundRobin = "roundrobin" + ProtocolNameSticky = "sticky" + ProtocolNameCooperativeSticky = "cooperative-sticky" +) + // AssignmentStrategy defines how partitions are assigned to consumers type AssignmentStrategy interface { Name() string @@ -15,7 +23,7 @@ type AssignmentStrategy interface { type RangeAssignmentStrategy struct{} func (r *RangeAssignmentStrategy) Name() string { - return "range" + return ProtocolNameRange } func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { @@ -104,7 +112,7 @@ func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions type RoundRobinAssignmentStrategy struct{} func (rr *RoundRobinAssignmentStrategy) Name() string { - return "roundrobin" + return ProtocolNameRoundRobin } func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { @@ -194,191 +202,14 @@ func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPart return assignments } -// CooperativeStickyAssignmentStrategy implements the cooperative-sticky assignment strategy -// This strategy tries to minimize partition movement during rebalancing while ensuring fairness -type CooperativeStickyAssignmentStrategy struct{} - -func (cs *CooperativeStickyAssignmentStrategy) Name() string { - return "cooperative-sticky" -} - -func (cs *CooperativeStickyAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { - if len(members) == 0 { - return make(map[string][]PartitionAssignment) - } - - assignments := make(map[string][]PartitionAssignment) - for _, member := range members { - assignments[member.ID] = make([]PartitionAssignment, 0) - } - - // Sort members for consistent assignment - sortedMembers := make([]*GroupMember, len(members)) - copy(sortedMembers, members) - sort.Slice(sortedMembers, func(i, j int) bool { - return sortedMembers[i].ID < sortedMembers[j].ID - }) - - // Get all subscribed topics - subscribedTopics := make(map[string]bool) - for _, member := range members { - for _, topic := range member.Subscription { - subscribedTopics[topic] = true - } - } - - // Collect all partitions that need assignment - allPartitions := make([]PartitionAssignment, 0) - for topic := range subscribedTopics { - partitions, exists := topicPartitions[topic] - if !exists { - continue - } - - for _, partition := range partitions { - allPartitions = append(allPartitions, PartitionAssignment{ - Topic: topic, - Partition: partition, - }) - } - } - - // Sort partitions for consistent assignment - sort.Slice(allPartitions, func(i, j int) bool { - if allPartitions[i].Topic != allPartitions[j].Topic { - return allPartitions[i].Topic < allPartitions[j].Topic - } - return allPartitions[i].Partition < allPartitions[j].Partition - }) - - // Calculate target assignment counts for fairness - totalPartitions := len(allPartitions) - numMembers := len(sortedMembers) - baseAssignments := totalPartitions / numMembers - extraAssignments := totalPartitions % numMembers - - // Phase 1: Try to preserve existing assignments (sticky behavior) but respect fairness - currentAssignments := make(map[string]map[PartitionAssignment]bool) - for _, member := range sortedMembers { - currentAssignments[member.ID] = make(map[PartitionAssignment]bool) - for _, assignment := range member.Assignment { - currentAssignments[member.ID][assignment] = true - } - } - - // Track which partitions are already assigned - assignedPartitions := make(map[PartitionAssignment]bool) - - // Preserve existing assignments where possible, but respect target counts - for i, member := range sortedMembers { - // Calculate target count for this member - targetCount := baseAssignments - if i < extraAssignments { - targetCount++ - } - - assignedCount := 0 - for assignment := range currentAssignments[member.ID] { - // Stop if we've reached the target count for this member - if assignedCount >= targetCount { - break - } - - // Check if member is still subscribed to this topic - subscribed := false - for _, topic := range member.Subscription { - if topic == assignment.Topic { - subscribed = true - break - } - } - - if subscribed && !assignedPartitions[assignment] { - assignments[member.ID] = append(assignments[member.ID], assignment) - assignedPartitions[assignment] = true - assignedCount++ - } - } - } - - // Phase 2: Assign remaining partitions using round-robin for fairness - unassignedPartitions := make([]PartitionAssignment, 0) - for _, partition := range allPartitions { - if !assignedPartitions[partition] { - unassignedPartitions = append(unassignedPartitions, partition) - } - } - - // Assign remaining partitions to achieve fairness - memberIndex := 0 - for _, partition := range unassignedPartitions { - // Find a member that needs more partitions and is subscribed to this topic - assigned := false - startIndex := memberIndex - - for !assigned { - member := sortedMembers[memberIndex] - - // Check if this member is subscribed to the topic - subscribed := false - for _, topic := range member.Subscription { - if topic == partition.Topic { - subscribed = true - break - } - } - - if subscribed { - // Calculate target count for this member - targetCount := baseAssignments - if memberIndex < extraAssignments { - targetCount++ - } - - // Assign if member needs more partitions - if len(assignments[member.ID]) < targetCount { - assignments[member.ID] = append(assignments[member.ID], partition) - assigned = true - } - } - - memberIndex = (memberIndex + 1) % numMembers - - // Prevent infinite loop - if memberIndex == startIndex && !assigned { - // Force assign to any subscribed member - for _, member := range sortedMembers { - subscribed := false - for _, topic := range member.Subscription { - if topic == partition.Topic { - subscribed = true - break - } - } - if subscribed { - assignments[member.ID] = append(assignments[member.ID], partition) - assigned = true - break - } - } - break - } - } - } - - return assignments -} - // GetAssignmentStrategy returns the appropriate assignment strategy func GetAssignmentStrategy(name string) AssignmentStrategy { switch name { - case "range": + case ProtocolNameRange: return &RangeAssignmentStrategy{} - case "roundrobin": + case ProtocolNameRoundRobin: return &RoundRobinAssignmentStrategy{} - case "cooperative-sticky": - return &CooperativeStickyAssignmentStrategy{} - case "incremental-cooperative": + case ProtocolNameCooperativeSticky: return NewIncrementalCooperativeAssignmentStrategy() default: // Default to range strategy diff --git a/weed/mq/kafka/consumer/assignment_test.go b/weed/mq/kafka/consumer/assignment_test.go index 520200ed3..14200366f 100644 --- a/weed/mq/kafka/consumer/assignment_test.go +++ b/weed/mq/kafka/consumer/assignment_test.go @@ -8,11 +8,11 @@ import ( func TestRangeAssignmentStrategy(t *testing.T) { strategy := &RangeAssignmentStrategy{} - - if strategy.Name() != "range" { - t.Errorf("Expected strategy name 'range', got '%s'", strategy.Name()) + + if strategy.Name() != ProtocolNameRange { + t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRange, strategy.Name()) } - + // Test with 2 members, 4 partitions on one topic members := []*GroupMember{ { @@ -20,38 +20,38 @@ func TestRangeAssignmentStrategy(t *testing.T) { Subscription: []string{"topic1"}, }, { - ID: "member2", + ID: "member2", Subscription: []string{"topic1"}, }, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify all members have assignments if len(assignments) != 2 { t.Fatalf("Expected assignments for 2 members, got %d", len(assignments)) } - + // Verify total partitions assigned totalAssigned := 0 for _, assignment := range assignments { totalAssigned += len(assignment) } - + if totalAssigned != 4 { t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) } - + // Range assignment should distribute evenly: 2 partitions each for memberID, assignment := range assignments { if len(assignment) != 2 { t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment)) } - + // Verify all assignments are for the subscribed topic for _, pa := range assignment { if pa.Topic != "topic1" { @@ -63,27 +63,27 @@ func TestRangeAssignmentStrategy(t *testing.T) { func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) { strategy := &RangeAssignmentStrategy{} - + // Test with 3 members, 4 partitions - should distribute 2,1,1 members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1"}}, {ID: "member2", Subscription: []string{"topic1"}}, {ID: "member3", Subscription: []string{"topic1"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Get assignment counts counts := make([]int, 0, 3) for _, assignment := range assignments { counts = append(counts, len(assignment)) } sort.Ints(counts) - + // Should be distributed as [1, 1, 2] (first member gets extra partition) expected := []int{1, 1, 2} if !reflect.DeepEqual(counts, expected) { @@ -93,30 +93,30 @@ func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) { func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) { strategy := &RangeAssignmentStrategy{} - + members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1", "topic2"}}, {ID: "member2", Subscription: []string{"topic1"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1}, "topic2": {0, 1}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Member1 should get assignments from both topics member1Assignments := assignments["member1"] topicsAssigned := make(map[string]int) for _, pa := range member1Assignments { topicsAssigned[pa.Topic]++ } - + if len(topicsAssigned) != 2 { t.Errorf("Expected member1 to be assigned to 2 topics, got %d", len(topicsAssigned)) } - + // Member2 should only get topic1 assignments member2Assignments := assignments["member2"] for _, pa := range member2Assignments { @@ -128,38 +128,38 @@ func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) { func TestRoundRobinAssignmentStrategy(t *testing.T) { strategy := &RoundRobinAssignmentStrategy{} - - if strategy.Name() != "roundrobin" { - t.Errorf("Expected strategy name 'roundrobin', got '%s'", strategy.Name()) + + if strategy.Name() != ProtocolNameRoundRobin { + t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRoundRobin, strategy.Name()) } - + // Test with 2 members, 4 partitions on one topic members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1"}}, {ID: "member2", Subscription: []string{"topic1"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify all members have assignments if len(assignments) != 2 { t.Fatalf("Expected assignments for 2 members, got %d", len(assignments)) } - + // Verify total partitions assigned totalAssigned := 0 for _, assignment := range assignments { totalAssigned += len(assignment) } - + if totalAssigned != 4 { t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) } - + // Round robin should distribute evenly: 2 partitions each for memberID, assignment := range assignments { if len(assignment) != 2 { @@ -170,26 +170,26 @@ func TestRoundRobinAssignmentStrategy(t *testing.T) { func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) { strategy := &RoundRobinAssignmentStrategy{} - + members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1", "topic2"}}, {ID: "member2", Subscription: []string{"topic1", "topic2"}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1}, "topic2": {0, 1}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Each member should get 2 partitions (round robin across topics) for memberID, assignment := range assignments { if len(assignment) != 2 { t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment)) } } - + // Verify no partition is assigned twice assignedPartitions := make(map[string]map[int32]bool) for _, assignment := range assignments { @@ -206,19 +206,19 @@ func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) { } func TestGetAssignmentStrategy(t *testing.T) { - rangeStrategy := GetAssignmentStrategy("range") - if rangeStrategy.Name() != "range" { + rangeStrategy := GetAssignmentStrategy(ProtocolNameRange) + if rangeStrategy.Name() != ProtocolNameRange { t.Errorf("Expected range strategy, got %s", rangeStrategy.Name()) } - - rrStrategy := GetAssignmentStrategy("roundrobin") - if rrStrategy.Name() != "roundrobin" { + + rrStrategy := GetAssignmentStrategy(ProtocolNameRoundRobin) + if rrStrategy.Name() != ProtocolNameRoundRobin { t.Errorf("Expected roundrobin strategy, got %s", rrStrategy.Name()) } - + // Unknown strategy should default to range defaultStrategy := GetAssignmentStrategy("unknown") - if defaultStrategy.Name() != "range" { + if defaultStrategy.Name() != ProtocolNameRange { t.Errorf("Expected default strategy to be range, got %s", defaultStrategy.Name()) } } @@ -226,7 +226,7 @@ func TestGetAssignmentStrategy(t *testing.T) { func TestConsumerGroup_AssignPartitions(t *testing.T) { group := &ConsumerGroup{ ID: "test-group", - Protocol: "range", + Protocol: ProtocolNameRange, Members: map[string]*GroupMember{ "member1": { ID: "member1", @@ -234,25 +234,25 @@ func TestConsumerGroup_AssignPartitions(t *testing.T) { State: MemberStateStable, }, "member2": { - ID: "member2", + ID: "member2", Subscription: []string{"topic1"}, State: MemberStateStable, }, }, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + group.AssignPartitions(topicPartitions) - + // Verify assignments were created for memberID, member := range group.Members { if len(member.Assignment) == 0 { t.Errorf("Expected member %s to have partition assignments", memberID) } - + // Verify all assignments are valid for _, pa := range member.Assignment { if pa.Topic != "topic1" { @@ -277,24 +277,24 @@ func TestConsumerGroup_GetMemberAssignments(t *testing.T) { }, }, } - + assignments := group.GetMemberAssignments() - + if len(assignments) != 1 { t.Fatalf("Expected 1 member assignment, got %d", len(assignments)) } - + member1Assignments := assignments["member1"] if len(member1Assignments) != 2 { t.Errorf("Expected 2 partition assignments for member1, got %d", len(member1Assignments)) } - + // Verify assignment content expectedAssignments := []PartitionAssignment{ {Topic: "topic1", Partition: 0}, {Topic: "topic1", Partition: 1}, } - + if !reflect.DeepEqual(member1Assignments, expectedAssignments) { t.Errorf("Expected assignments %v, got %v", expectedAssignments, member1Assignments) } @@ -317,21 +317,21 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) { "topic2": true, }, } - + // Update member1's subscription group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"}) - + // Verify member subscription updated member1 := group.Members["member1"] expectedSubscription := []string{"topic1", "topic3"} if !reflect.DeepEqual(member1.Subscription, expectedSubscription) { t.Errorf("Expected subscription %v, got %v", expectedSubscription, member1.Subscription) } - + // Verify group subscribed topics updated expectedGroupTopics := []string{"topic1", "topic2", "topic3"} actualGroupTopics := group.GetSubscribedTopics() - + if !reflect.DeepEqual(actualGroupTopics, expectedGroupTopics) { t.Errorf("Expected group topics %v, got %v", expectedGroupTopics, actualGroupTopics) } @@ -340,19 +340,19 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) { func TestAssignmentStrategy_EmptyMembers(t *testing.T) { rangeStrategy := &RangeAssignmentStrategy{} rrStrategy := &RoundRobinAssignmentStrategy{} - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + // Both strategies should handle empty members gracefully rangeAssignments := rangeStrategy.Assign([]*GroupMember{}, topicPartitions) rrAssignments := rrStrategy.Assign([]*GroupMember{}, topicPartitions) - + if len(rangeAssignments) != 0 { t.Error("Expected empty assignments for empty members list (range)") } - + if len(rrAssignments) != 0 { t.Error("Expected empty assignments for empty members list (round robin)") } diff --git a/weed/mq/kafka/consumer/cooperative_sticky_test.go b/weed/mq/kafka/consumer/cooperative_sticky_test.go index 373ff67ec..0c579d3f4 100644 --- a/weed/mq/kafka/consumer/cooperative_sticky_test.go +++ b/weed/mq/kafka/consumer/cooperative_sticky_test.go @@ -5,43 +5,43 @@ import ( ) func TestCooperativeStickyAssignmentStrategy_Name(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - if strategy.Name() != "cooperative-sticky" { - t.Errorf("Expected strategy name 'cooperative-sticky', got '%s'", strategy.Name()) + strategy := NewIncrementalCooperativeAssignmentStrategy() + if strategy.Name() != ProtocolNameCooperativeSticky { + t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameCooperativeSticky, strategy.Name()) } } func TestCooperativeStickyAssignmentStrategy_InitialAssignment(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - + strategy := NewIncrementalCooperativeAssignmentStrategy() + members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify all partitions are assigned totalAssigned := 0 for _, assignment := range assignments { totalAssigned += len(assignment) } - + if totalAssigned != 4 { t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) } - + // Verify fair distribution (2 partitions each) for memberID, assignment := range assignments { if len(assignment) != 2 { t.Errorf("Expected member %s to get 2 partitions, got %d", memberID, len(assignment)) } } - + // Verify no partition is assigned twice assignedPartitions := make(map[PartitionAssignment]bool) for _, assignment := range assignments { @@ -55,38 +55,38 @@ func TestCooperativeStickyAssignmentStrategy_InitialAssignment(t *testing.T) { } func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - + strategy := NewIncrementalCooperativeAssignmentStrategy() + // Initial state: member1 has partitions 0,1 and member2 has partitions 2,3 members := []*GroupMember{ { - ID: "member1", - Subscription: []string{"topic1"}, + ID: "member1", + Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{ {Topic: "topic1", Partition: 0}, {Topic: "topic1", Partition: 1}, }, }, { - ID: "member2", - Subscription: []string{"topic1"}, + ID: "member2", + Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{ {Topic: "topic1", Partition: 2}, {Topic: "topic1", Partition: 3}, }, }, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify sticky behavior - existing assignments should be preserved member1Assignment := assignments["member1"] member2Assignment := assignments["member2"] - + // Check that member1 still has partitions 0 and 1 hasPartition0 := false hasPartition1 := false @@ -98,11 +98,11 @@ func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) { hasPartition1 = true } } - + if !hasPartition0 || !hasPartition1 { t.Errorf("Member1 should retain partitions 0 and 1, got %v", member1Assignment) } - + // Check that member2 still has partitions 2 and 3 hasPartition2 := false hasPartition3 := false @@ -114,20 +114,20 @@ func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) { hasPartition3 = true } } - + if !hasPartition2 || !hasPartition3 { t.Errorf("Member2 should retain partitions 2 and 3, got %v", member2Assignment) } } func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - + strategy := NewIncrementalCooperativeAssignmentStrategy() + // Scenario: member1 has all partitions, member2 joins members := []*GroupMember{ { - ID: "member1", - Subscription: []string{"topic1"}, + ID: "member1", + Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{ {Topic: "topic1", Partition: 0}, {Topic: "topic1", Partition: 1}, @@ -136,30 +136,41 @@ func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) { }, }, { - ID: "member2", - Subscription: []string{"topic1"}, - Assignment: []PartitionAssignment{}, // New member, no existing assignment + ID: "member2", + Subscription: []string{"topic1"}, + Assignment: []PartitionAssignment{}, // New member, no existing assignment }, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, } - + + // First call: revocation phase + assignments1 := strategy.Assign(members, topicPartitions) + + // Update members with revocation results + members[0].Assignment = assignments1["member1"] + members[1].Assignment = assignments1["member2"] + + // Force completion of revocation timeout + strategy.GetRebalanceState().RevocationTimeout = 0 + + // Second call: assignment phase assignments := strategy.Assign(members, topicPartitions) - + // Verify fair redistribution (2 partitions each) member1Assignment := assignments["member1"] member2Assignment := assignments["member2"] - + if len(member1Assignment) != 2 { t.Errorf("Expected member1 to have 2 partitions after rebalance, got %d", len(member1Assignment)) } - + if len(member2Assignment) != 2 { t.Errorf("Expected member2 to have 2 partitions after rebalance, got %d", len(member2Assignment)) } - + // Verify some stickiness - member1 should retain some of its original partitions originalPartitions := map[int32]bool{0: true, 1: true, 2: true, 3: true} retainedCount := 0 @@ -168,22 +179,22 @@ func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) { retainedCount++ } } - + if retainedCount == 0 { t.Error("Member1 should retain at least some of its original partitions (sticky behavior)") } - + t.Logf("Member1 retained %d out of 4 original partitions", retainedCount) } func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - + strategy := NewIncrementalCooperativeAssignmentStrategy() + // Scenario: member2 leaves, member1 should get its partitions members := []*GroupMember{ { - ID: "member1", - Subscription: []string{"topic1"}, + ID: "member1", + Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{ {Topic: "topic1", Partition: 0}, {Topic: "topic1", Partition: 1}, @@ -191,20 +202,20 @@ func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) { }, // member2 has left, so it's not in the members list } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3}, // All partitions still need to be assigned } - + assignments := strategy.Assign(members, topicPartitions) - + // member1 should get all partitions member1Assignment := assignments["member1"] - + if len(member1Assignment) != 4 { t.Errorf("Expected member1 to get all 4 partitions after member2 left, got %d", len(member1Assignment)) } - + // Verify member1 retained its original partitions (sticky behavior) hasPartition0 := false hasPartition1 := false @@ -216,55 +227,55 @@ func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) { hasPartition1 = true } } - + if !hasPartition0 || !hasPartition1 { t.Error("Member1 should retain its original partitions 0 and 1") } } func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - + strategy := NewIncrementalCooperativeAssignmentStrategy() + members := []*GroupMember{ { - ID: "member1", - Subscription: []string{"topic1", "topic2"}, + ID: "member1", + Subscription: []string{"topic1", "topic2"}, Assignment: []PartitionAssignment{ {Topic: "topic1", Partition: 0}, {Topic: "topic2", Partition: 0}, }, }, { - ID: "member2", - Subscription: []string{"topic1", "topic2"}, + ID: "member2", + Subscription: []string{"topic1", "topic2"}, Assignment: []PartitionAssignment{ {Topic: "topic1", Partition: 1}, {Topic: "topic2", Partition: 1}, }, }, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1}, "topic2": {0, 1}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify all partitions are assigned totalAssigned := 0 for _, assignment := range assignments { totalAssigned += len(assignment) } - + if totalAssigned != 4 { t.Errorf("Expected 4 total partitions assigned across both topics, got %d", totalAssigned) } - + // Verify sticky behavior - each member should retain their original assignments member1Assignment := assignments["member1"] member2Assignment := assignments["member2"] - + // Check member1 retains topic1:0 and topic2:0 hasT1P0 := false hasT2P0 := false @@ -276,11 +287,11 @@ func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) { hasT2P0 = true } } - + if !hasT1P0 || !hasT2P0 { t.Errorf("Member1 should retain topic1:0 and topic2:0, got %v", member1Assignment) } - + // Check member2 retains topic1:1 and topic2:1 hasT1P1 := false hasT2P1 := false @@ -292,41 +303,41 @@ func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) { hasT2P1 = true } } - + if !hasT1P1 || !hasT2P1 { t.Errorf("Member2 should retain topic1:1 and topic2:1, got %v", member2Assignment) } } func TestCooperativeStickyAssignmentStrategy_UnevenPartitions(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - + strategy := NewIncrementalCooperativeAssignmentStrategy() + // 5 partitions, 2 members - should distribute 3:2 or 2:3 members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1, 2, 3, 4}, } - + assignments := strategy.Assign(members, topicPartitions) - + // Verify all partitions are assigned totalAssigned := 0 for _, assignment := range assignments { totalAssigned += len(assignment) } - + if totalAssigned != 5 { t.Errorf("Expected 5 total partitions assigned, got %d", totalAssigned) } - + // Verify fair distribution member1Count := len(assignments["member1"]) member2Count := len(assignments["member2"]) - + // Should be 3:2 or 2:3 distribution if !((member1Count == 3 && member2Count == 2) || (member1Count == 2 && member2Count == 3)) { t.Errorf("Expected 3:2 or 2:3 distribution, got %d:%d", member1Count, member2Count) @@ -334,79 +345,79 @@ func TestCooperativeStickyAssignmentStrategy_UnevenPartitions(t *testing.T) { } func TestCooperativeStickyAssignmentStrategy_PartialSubscription(t *testing.T) { - strategy := &CooperativeStickyAssignmentStrategy{} - + strategy := NewIncrementalCooperativeAssignmentStrategy() + // member1 subscribes to both topics, member2 only to topic1 members := []*GroupMember{ {ID: "member1", Subscription: []string{"topic1", "topic2"}, Assignment: []PartitionAssignment{}}, {ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}}, } - + topicPartitions := map[string][]int32{ "topic1": {0, 1}, "topic2": {0, 1}, } - + assignments := strategy.Assign(members, topicPartitions) - + // member1 should get all topic2 partitions since member2 isn't subscribed member1Assignment := assignments["member1"] member2Assignment := assignments["member2"] - + // Count topic2 partitions for each member member1Topic2Count := 0 member2Topic2Count := 0 - + for _, pa := range member1Assignment { if pa.Topic == "topic2" { member1Topic2Count++ } } - + for _, pa := range member2Assignment { if pa.Topic == "topic2" { member2Topic2Count++ } } - + if member1Topic2Count != 2 { t.Errorf("Expected member1 to get all 2 topic2 partitions, got %d", member1Topic2Count) } - + if member2Topic2Count != 0 { t.Errorf("Expected member2 to get 0 topic2 partitions (not subscribed), got %d", member2Topic2Count) } - + // Both members should get some topic1 partitions member1Topic1Count := 0 member2Topic1Count := 0 - + for _, pa := range member1Assignment { if pa.Topic == "topic1" { member1Topic1Count++ } } - + for _, pa := range member2Assignment { if pa.Topic == "topic1" { member2Topic1Count++ } } - - if member1Topic1Count + member2Topic1Count != 2 { - t.Errorf("Expected all topic1 partitions to be assigned, got %d + %d = %d", - member1Topic1Count, member2Topic1Count, member1Topic1Count + member2Topic1Count) + + if member1Topic1Count+member2Topic1Count != 2 { + t.Errorf("Expected all topic1 partitions to be assigned, got %d + %d = %d", + member1Topic1Count, member2Topic1Count, member1Topic1Count+member2Topic1Count) } } func TestGetAssignmentStrategy_CooperativeSticky(t *testing.T) { - strategy := GetAssignmentStrategy("cooperative-sticky") - if strategy.Name() != "cooperative-sticky" { + strategy := GetAssignmentStrategy(ProtocolNameCooperativeSticky) + if strategy.Name() != ProtocolNameCooperativeSticky { t.Errorf("Expected cooperative-sticky strategy, got %s", strategy.Name()) } - + // Verify it's the correct type - if _, ok := strategy.(*CooperativeStickyAssignmentStrategy); !ok { - t.Errorf("Expected CooperativeStickyAssignmentStrategy, got %T", strategy) + if _, ok := strategy.(*IncrementalCooperativeAssignmentStrategy); !ok { + t.Errorf("Expected IncrementalCooperativeAssignmentStrategy, got %T", strategy) } } diff --git a/weed/mq/kafka/consumer/incremental_rebalancing.go b/weed/mq/kafka/consumer/incremental_rebalancing.go index 10c794375..49509bc76 100644 --- a/weed/mq/kafka/consumer/incremental_rebalancing.go +++ b/weed/mq/kafka/consumer/incremental_rebalancing.go @@ -31,8 +31,8 @@ func (rp RebalancePhase) String() string { // IncrementalRebalanceState tracks the state of incremental cooperative rebalancing type IncrementalRebalanceState struct { Phase RebalancePhase - RevocationGeneration int32 // Generation when revocation started - AssignmentGeneration int32 // Generation when assignment started + RevocationGeneration int32 // Generation when revocation started + AssignmentGeneration int32 // Generation when assignment started RevokedPartitions map[string][]PartitionAssignment // Member ID -> revoked partitions PendingAssignments map[string][]PartitionAssignment // Member ID -> pending assignments StartTime time.Time @@ -64,7 +64,7 @@ func NewIncrementalCooperativeAssignmentStrategy() *IncrementalCooperativeAssign } func (ics *IncrementalCooperativeAssignmentStrategy) Name() string { - return "cooperative-sticky" + return ProtocolNameCooperativeSticky } func (ics *IncrementalCooperativeAssignmentStrategy) Assign( @@ -99,10 +99,10 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance( ) map[string][]PartitionAssignment { // Calculate ideal assignment idealAssignment := ics.calculateIdealAssignment(members, topicPartitions) - + // Determine which partitions need to be revoked partitionsToRevoke := ics.calculateRevocations(members, idealAssignment) - + if len(partitionsToRevoke) == 0 { // No revocations needed, proceed with regular assignment return idealAssignment @@ -112,7 +112,7 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance( ics.rebalanceState.Phase = RebalancePhaseRevocation ics.rebalanceState.StartTime = time.Now() ics.rebalanceState.RevokedPartitions = partitionsToRevoke - + // Return current assignments minus revoked partitions return ics.applyRevocations(members, partitionsToRevoke) } @@ -140,12 +140,12 @@ func (ics *IncrementalCooperativeAssignmentStrategy) handleAssignmentPhase( ) map[string][]PartitionAssignment { // Calculate final assignment including previously revoked partitions finalAssignment := ics.calculateIdealAssignment(members, topicPartitions) - + // Complete the rebalance ics.rebalanceState.Phase = RebalancePhaseNone ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment) ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment) - + return finalAssignment } @@ -333,10 +333,9 @@ func (ics *IncrementalCooperativeAssignmentStrategy) performRegularAssignment( ) map[string][]PartitionAssignment { // Reset rebalance state ics.rebalanceState = NewIncrementalRebalanceState() - - // Use regular cooperative-sticky logic - cooperativeSticky := &CooperativeStickyAssignmentStrategy{} - return cooperativeSticky.Assign(members, topicPartitions) + + // Use ideal assignment calculation (non-incremental cooperative assignment) + return ics.calculateIdealAssignment(members, topicPartitions) } // GetRebalanceState returns the current rebalance state (for monitoring/debugging) diff --git a/weed/mq/kafka/protocol/consumer_group_metadata.go b/weed/mq/kafka/protocol/consumer_group_metadata.go index e820f21c4..7c0b7f665 100644 --- a/weed/mq/kafka/protocol/consumer_group_metadata.go +++ b/weed/mq/kafka/protocol/consumer_group_metadata.go @@ -5,6 +5,8 @@ import ( "fmt" "net" "sync" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) // ConsumerProtocolMetadata represents parsed consumer protocol metadata @@ -148,10 +150,10 @@ func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*Consu // ValidateAssignmentStrategy checks if an assignment strategy is supported func ValidateAssignmentStrategy(strategy string) bool { supportedStrategies := map[string]bool{ - "range": true, - "roundrobin": true, - "sticky": true, - "cooperative-sticky": false, // Not yet implemented + consumer.ProtocolNameRange: true, + consumer.ProtocolNameRoundRobin: true, + consumer.ProtocolNameSticky: true, + consumer.ProtocolNameCooperativeSticky: true, // Incremental cooperative rebalancing (Kafka 2.4+) } return supportedStrategies[strategy] @@ -184,7 +186,7 @@ func ExtractTopicsFromMetadata(protocols []GroupProtocol, fallbackTopics []strin // SelectBestProtocol chooses the best assignment protocol from available options func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) string { // Priority order: sticky > roundrobin > range - protocolPriority := []string{"sticky", "roundrobin", "range"} + protocolPriority := []string{consumer.ProtocolNameSticky, consumer.ProtocolNameRoundRobin, consumer.ProtocolNameRange} // Find supported protocols in client's list clientProtocols := make(map[string]bool) @@ -218,8 +220,8 @@ func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) stri // No common protocol found - handle special fallback case // If client supports nothing we validate, but group supports "range", use "range" - if len(clientProtocols) == 0 && groupProtocolSet["range"] { - return "range" + if len(clientProtocols) == 0 && groupProtocolSet[consumer.ProtocolNameRange] { + return consumer.ProtocolNameRange } // Return empty string to indicate no compatible protocol found @@ -234,7 +236,7 @@ func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) stri } // Last resort - return "range" + return consumer.ProtocolNameRange } // ProtocolMetadataDebugInfo returns debug information about protocol metadata diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 8f92f7cfd..fb68c5821 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -232,7 +232,7 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID // Ensure we have a valid protocol - fallback to "range" if empty if groupProtocol == "" { - groupProtocol = "range" + groupProtocol = consumer.ProtocolNameRange } // If a protocol is already selected for the group, reject joins that do not support it. @@ -615,7 +615,7 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { } else { // NON-nullable compact string in v6 - must not be empty! if response.ProtocolName == "" { - response.ProtocolName = "range" // fallback to default + response.ProtocolName = consumer.ProtocolNameRange // fallback to default } out = append(out, FlexibleString(response.ProtocolName)...) } @@ -762,9 +762,9 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in ThrottleTimeMs: 0, ErrorCode: errorCode, GenerationID: -1, - ProtocolName: "range", // Use "range" as default protocol instead of empty string - Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field - MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field + ProtocolName: consumer.ProtocolNameRange, // Use "range" as default protocol instead of empty string + Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field + MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field Version: apiVersion, Members: []JoinGroupMember{}, }