Browse Source

clean up consumer protocols

pull/7329/head
chrislu 1 month ago
parent
commit
f639c42472
  1. 195
      weed/mq/kafka/consumer/assignment.go
  2. 128
      weed/mq/kafka/consumer/assignment_test.go
  3. 199
      weed/mq/kafka/consumer/cooperative_sticky_test.go
  4. 23
      weed/mq/kafka/consumer/incremental_rebalancing.go
  5. 18
      weed/mq/kafka/protocol/consumer_group_metadata.go
  6. 10
      weed/mq/kafka/protocol/joingroup.go

195
weed/mq/kafka/consumer/assignment.go

@ -4,6 +4,14 @@ import (
"sort"
)
// Assignment strategy protocol names
const (
ProtocolNameRange = "range"
ProtocolNameRoundRobin = "roundrobin"
ProtocolNameSticky = "sticky"
ProtocolNameCooperativeSticky = "cooperative-sticky"
)
// AssignmentStrategy defines how partitions are assigned to consumers
type AssignmentStrategy interface {
Name() string
@ -15,7 +23,7 @@ type AssignmentStrategy interface {
type RangeAssignmentStrategy struct{}
func (r *RangeAssignmentStrategy) Name() string {
return "range"
return ProtocolNameRange
}
func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
@ -104,7 +112,7 @@ func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions
type RoundRobinAssignmentStrategy struct{}
func (rr *RoundRobinAssignmentStrategy) Name() string {
return "roundrobin"
return ProtocolNameRoundRobin
}
func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
@ -194,191 +202,14 @@ func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPart
return assignments
}
// CooperativeStickyAssignmentStrategy implements the cooperative-sticky assignment strategy
// This strategy tries to minimize partition movement during rebalancing while ensuring fairness
type CooperativeStickyAssignmentStrategy struct{}
func (cs *CooperativeStickyAssignmentStrategy) Name() string {
return "cooperative-sticky"
}
func (cs *CooperativeStickyAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment {
if len(members) == 0 {
return make(map[string][]PartitionAssignment)
}
assignments := make(map[string][]PartitionAssignment)
for _, member := range members {
assignments[member.ID] = make([]PartitionAssignment, 0)
}
// Sort members for consistent assignment
sortedMembers := make([]*GroupMember, len(members))
copy(sortedMembers, members)
sort.Slice(sortedMembers, func(i, j int) bool {
return sortedMembers[i].ID < sortedMembers[j].ID
})
// Get all subscribed topics
subscribedTopics := make(map[string]bool)
for _, member := range members {
for _, topic := range member.Subscription {
subscribedTopics[topic] = true
}
}
// Collect all partitions that need assignment
allPartitions := make([]PartitionAssignment, 0)
for topic := range subscribedTopics {
partitions, exists := topicPartitions[topic]
if !exists {
continue
}
for _, partition := range partitions {
allPartitions = append(allPartitions, PartitionAssignment{
Topic: topic,
Partition: partition,
})
}
}
// Sort partitions for consistent assignment
sort.Slice(allPartitions, func(i, j int) bool {
if allPartitions[i].Topic != allPartitions[j].Topic {
return allPartitions[i].Topic < allPartitions[j].Topic
}
return allPartitions[i].Partition < allPartitions[j].Partition
})
// Calculate target assignment counts for fairness
totalPartitions := len(allPartitions)
numMembers := len(sortedMembers)
baseAssignments := totalPartitions / numMembers
extraAssignments := totalPartitions % numMembers
// Phase 1: Try to preserve existing assignments (sticky behavior) but respect fairness
currentAssignments := make(map[string]map[PartitionAssignment]bool)
for _, member := range sortedMembers {
currentAssignments[member.ID] = make(map[PartitionAssignment]bool)
for _, assignment := range member.Assignment {
currentAssignments[member.ID][assignment] = true
}
}
// Track which partitions are already assigned
assignedPartitions := make(map[PartitionAssignment]bool)
// Preserve existing assignments where possible, but respect target counts
for i, member := range sortedMembers {
// Calculate target count for this member
targetCount := baseAssignments
if i < extraAssignments {
targetCount++
}
assignedCount := 0
for assignment := range currentAssignments[member.ID] {
// Stop if we've reached the target count for this member
if assignedCount >= targetCount {
break
}
// Check if member is still subscribed to this topic
subscribed := false
for _, topic := range member.Subscription {
if topic == assignment.Topic {
subscribed = true
break
}
}
if subscribed && !assignedPartitions[assignment] {
assignments[member.ID] = append(assignments[member.ID], assignment)
assignedPartitions[assignment] = true
assignedCount++
}
}
}
// Phase 2: Assign remaining partitions using round-robin for fairness
unassignedPartitions := make([]PartitionAssignment, 0)
for _, partition := range allPartitions {
if !assignedPartitions[partition] {
unassignedPartitions = append(unassignedPartitions, partition)
}
}
// Assign remaining partitions to achieve fairness
memberIndex := 0
for _, partition := range unassignedPartitions {
// Find a member that needs more partitions and is subscribed to this topic
assigned := false
startIndex := memberIndex
for !assigned {
member := sortedMembers[memberIndex]
// Check if this member is subscribed to the topic
subscribed := false
for _, topic := range member.Subscription {
if topic == partition.Topic {
subscribed = true
break
}
}
if subscribed {
// Calculate target count for this member
targetCount := baseAssignments
if memberIndex < extraAssignments {
targetCount++
}
// Assign if member needs more partitions
if len(assignments[member.ID]) < targetCount {
assignments[member.ID] = append(assignments[member.ID], partition)
assigned = true
}
}
memberIndex = (memberIndex + 1) % numMembers
// Prevent infinite loop
if memberIndex == startIndex && !assigned {
// Force assign to any subscribed member
for _, member := range sortedMembers {
subscribed := false
for _, topic := range member.Subscription {
if topic == partition.Topic {
subscribed = true
break
}
}
if subscribed {
assignments[member.ID] = append(assignments[member.ID], partition)
assigned = true
break
}
}
break
}
}
}
return assignments
}
// GetAssignmentStrategy returns the appropriate assignment strategy
func GetAssignmentStrategy(name string) AssignmentStrategy {
switch name {
case "range":
case ProtocolNameRange:
return &RangeAssignmentStrategy{}
case "roundrobin":
case ProtocolNameRoundRobin:
return &RoundRobinAssignmentStrategy{}
case "cooperative-sticky":
return &CooperativeStickyAssignmentStrategy{}
case "incremental-cooperative":
case ProtocolNameCooperativeSticky:
return NewIncrementalCooperativeAssignmentStrategy()
default:
// Default to range strategy

128
weed/mq/kafka/consumer/assignment_test.go

@ -8,11 +8,11 @@ import (
func TestRangeAssignmentStrategy(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
if strategy.Name() != "range" {
t.Errorf("Expected strategy name 'range', got '%s'", strategy.Name())
if strategy.Name() != ProtocolNameRange {
t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRange, strategy.Name())
}
// Test with 2 members, 4 partitions on one topic
members := []*GroupMember{
{
@ -20,38 +20,38 @@ func TestRangeAssignmentStrategy(t *testing.T) {
Subscription: []string{"topic1"},
},
{
ID: "member2",
ID: "member2",
Subscription: []string{"topic1"},
},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify all members have assignments
if len(assignments) != 2 {
t.Fatalf("Expected assignments for 2 members, got %d", len(assignments))
}
// Verify total partitions assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
}
// Range assignment should distribute evenly: 2 partitions each
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment))
}
// Verify all assignments are for the subscribed topic
for _, pa := range assignment {
if pa.Topic != "topic1" {
@ -63,27 +63,27 @@ func TestRangeAssignmentStrategy(t *testing.T) {
func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
// Test with 3 members, 4 partitions - should distribute 2,1,1
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}},
{ID: "member2", Subscription: []string{"topic1"}},
{ID: "member3", Subscription: []string{"topic1"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Get assignment counts
counts := make([]int, 0, 3)
for _, assignment := range assignments {
counts = append(counts, len(assignment))
}
sort.Ints(counts)
// Should be distributed as [1, 1, 2] (first member gets extra partition)
expected := []int{1, 1, 2}
if !reflect.DeepEqual(counts, expected) {
@ -93,30 +93,30 @@ func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) {
func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) {
strategy := &RangeAssignmentStrategy{}
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1", "topic2"}},
{ID: "member2", Subscription: []string{"topic1"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
assignments := strategy.Assign(members, topicPartitions)
// Member1 should get assignments from both topics
member1Assignments := assignments["member1"]
topicsAssigned := make(map[string]int)
for _, pa := range member1Assignments {
topicsAssigned[pa.Topic]++
}
if len(topicsAssigned) != 2 {
t.Errorf("Expected member1 to be assigned to 2 topics, got %d", len(topicsAssigned))
}
// Member2 should only get topic1 assignments
member2Assignments := assignments["member2"]
for _, pa := range member2Assignments {
@ -128,38 +128,38 @@ func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) {
func TestRoundRobinAssignmentStrategy(t *testing.T) {
strategy := &RoundRobinAssignmentStrategy{}
if strategy.Name() != "roundrobin" {
t.Errorf("Expected strategy name 'roundrobin', got '%s'", strategy.Name())
if strategy.Name() != ProtocolNameRoundRobin {
t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameRoundRobin, strategy.Name())
}
// Test with 2 members, 4 partitions on one topic
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}},
{ID: "member2", Subscription: []string{"topic1"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify all members have assignments
if len(assignments) != 2 {
t.Fatalf("Expected assignments for 2 members, got %d", len(assignments))
}
// Verify total partitions assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
}
// Round robin should distribute evenly: 2 partitions each
for memberID, assignment := range assignments {
if len(assignment) != 2 {
@ -170,26 +170,26 @@ func TestRoundRobinAssignmentStrategy(t *testing.T) {
func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) {
strategy := &RoundRobinAssignmentStrategy{}
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1", "topic2"}},
{ID: "member2", Subscription: []string{"topic1", "topic2"}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
assignments := strategy.Assign(members, topicPartitions)
// Each member should get 2 partitions (round robin across topics)
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment))
}
}
// Verify no partition is assigned twice
assignedPartitions := make(map[string]map[int32]bool)
for _, assignment := range assignments {
@ -206,19 +206,19 @@ func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) {
}
func TestGetAssignmentStrategy(t *testing.T) {
rangeStrategy := GetAssignmentStrategy("range")
if rangeStrategy.Name() != "range" {
rangeStrategy := GetAssignmentStrategy(ProtocolNameRange)
if rangeStrategy.Name() != ProtocolNameRange {
t.Errorf("Expected range strategy, got %s", rangeStrategy.Name())
}
rrStrategy := GetAssignmentStrategy("roundrobin")
if rrStrategy.Name() != "roundrobin" {
rrStrategy := GetAssignmentStrategy(ProtocolNameRoundRobin)
if rrStrategy.Name() != ProtocolNameRoundRobin {
t.Errorf("Expected roundrobin strategy, got %s", rrStrategy.Name())
}
// Unknown strategy should default to range
defaultStrategy := GetAssignmentStrategy("unknown")
if defaultStrategy.Name() != "range" {
if defaultStrategy.Name() != ProtocolNameRange {
t.Errorf("Expected default strategy to be range, got %s", defaultStrategy.Name())
}
}
@ -226,7 +226,7 @@ func TestGetAssignmentStrategy(t *testing.T) {
func TestConsumerGroup_AssignPartitions(t *testing.T) {
group := &ConsumerGroup{
ID: "test-group",
Protocol: "range",
Protocol: ProtocolNameRange,
Members: map[string]*GroupMember{
"member1": {
ID: "member1",
@ -234,25 +234,25 @@ func TestConsumerGroup_AssignPartitions(t *testing.T) {
State: MemberStateStable,
},
"member2": {
ID: "member2",
ID: "member2",
Subscription: []string{"topic1"},
State: MemberStateStable,
},
},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
group.AssignPartitions(topicPartitions)
// Verify assignments were created
for memberID, member := range group.Members {
if len(member.Assignment) == 0 {
t.Errorf("Expected member %s to have partition assignments", memberID)
}
// Verify all assignments are valid
for _, pa := range member.Assignment {
if pa.Topic != "topic1" {
@ -277,24 +277,24 @@ func TestConsumerGroup_GetMemberAssignments(t *testing.T) {
},
},
}
assignments := group.GetMemberAssignments()
if len(assignments) != 1 {
t.Fatalf("Expected 1 member assignment, got %d", len(assignments))
}
member1Assignments := assignments["member1"]
if len(member1Assignments) != 2 {
t.Errorf("Expected 2 partition assignments for member1, got %d", len(member1Assignments))
}
// Verify assignment content
expectedAssignments := []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic1", Partition: 1},
}
if !reflect.DeepEqual(member1Assignments, expectedAssignments) {
t.Errorf("Expected assignments %v, got %v", expectedAssignments, member1Assignments)
}
@ -317,21 +317,21 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) {
"topic2": true,
},
}
// Update member1's subscription
group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"})
// Verify member subscription updated
member1 := group.Members["member1"]
expectedSubscription := []string{"topic1", "topic3"}
if !reflect.DeepEqual(member1.Subscription, expectedSubscription) {
t.Errorf("Expected subscription %v, got %v", expectedSubscription, member1.Subscription)
}
// Verify group subscribed topics updated
expectedGroupTopics := []string{"topic1", "topic2", "topic3"}
actualGroupTopics := group.GetSubscribedTopics()
if !reflect.DeepEqual(actualGroupTopics, expectedGroupTopics) {
t.Errorf("Expected group topics %v, got %v", expectedGroupTopics, actualGroupTopics)
}
@ -340,19 +340,19 @@ func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) {
func TestAssignmentStrategy_EmptyMembers(t *testing.T) {
rangeStrategy := &RangeAssignmentStrategy{}
rrStrategy := &RoundRobinAssignmentStrategy{}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
// Both strategies should handle empty members gracefully
rangeAssignments := rangeStrategy.Assign([]*GroupMember{}, topicPartitions)
rrAssignments := rrStrategy.Assign([]*GroupMember{}, topicPartitions)
if len(rangeAssignments) != 0 {
t.Error("Expected empty assignments for empty members list (range)")
}
if len(rrAssignments) != 0 {
t.Error("Expected empty assignments for empty members list (round robin)")
}

199
weed/mq/kafka/consumer/cooperative_sticky_test.go

@ -5,43 +5,43 @@ import (
)
func TestCooperativeStickyAssignmentStrategy_Name(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
if strategy.Name() != "cooperative-sticky" {
t.Errorf("Expected strategy name 'cooperative-sticky', got '%s'", strategy.Name())
strategy := NewIncrementalCooperativeAssignmentStrategy()
if strategy.Name() != ProtocolNameCooperativeSticky {
t.Errorf("Expected strategy name '%s', got '%s'", ProtocolNameCooperativeSticky, strategy.Name())
}
}
func TestCooperativeStickyAssignmentStrategy_InitialAssignment(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
strategy := NewIncrementalCooperativeAssignmentStrategy()
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
{ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify all partitions are assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
}
// Verify fair distribution (2 partitions each)
for memberID, assignment := range assignments {
if len(assignment) != 2 {
t.Errorf("Expected member %s to get 2 partitions, got %d", memberID, len(assignment))
}
}
// Verify no partition is assigned twice
assignedPartitions := make(map[PartitionAssignment]bool)
for _, assignment := range assignments {
@ -55,38 +55,38 @@ func TestCooperativeStickyAssignmentStrategy_InitialAssignment(t *testing.T) {
}
func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
strategy := NewIncrementalCooperativeAssignmentStrategy()
// Initial state: member1 has partitions 0,1 and member2 has partitions 2,3
members := []*GroupMember{
{
ID: "member1",
Subscription: []string{"topic1"},
ID: "member1",
Subscription: []string{"topic1"},
Assignment: []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic1", Partition: 1},
},
},
{
ID: "member2",
Subscription: []string{"topic1"},
ID: "member2",
Subscription: []string{"topic1"},
Assignment: []PartitionAssignment{
{Topic: "topic1", Partition: 2},
{Topic: "topic1", Partition: 3},
},
},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify sticky behavior - existing assignments should be preserved
member1Assignment := assignments["member1"]
member2Assignment := assignments["member2"]
// Check that member1 still has partitions 0 and 1
hasPartition0 := false
hasPartition1 := false
@ -98,11 +98,11 @@ func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) {
hasPartition1 = true
}
}
if !hasPartition0 || !hasPartition1 {
t.Errorf("Member1 should retain partitions 0 and 1, got %v", member1Assignment)
}
// Check that member2 still has partitions 2 and 3
hasPartition2 := false
hasPartition3 := false
@ -114,20 +114,20 @@ func TestCooperativeStickyAssignmentStrategy_StickyBehavior(t *testing.T) {
hasPartition3 = true
}
}
if !hasPartition2 || !hasPartition3 {
t.Errorf("Member2 should retain partitions 2 and 3, got %v", member2Assignment)
}
}
func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
strategy := NewIncrementalCooperativeAssignmentStrategy()
// Scenario: member1 has all partitions, member2 joins
members := []*GroupMember{
{
ID: "member1",
Subscription: []string{"topic1"},
ID: "member1",
Subscription: []string{"topic1"},
Assignment: []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic1", Partition: 1},
@ -136,30 +136,41 @@ func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) {
},
},
{
ID: "member2",
Subscription: []string{"topic1"},
Assignment: []PartitionAssignment{}, // New member, no existing assignment
ID: "member2",
Subscription: []string{"topic1"},
Assignment: []PartitionAssignment{}, // New member, no existing assignment
},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3},
}
// First call: revocation phase
assignments1 := strategy.Assign(members, topicPartitions)
// Update members with revocation results
members[0].Assignment = assignments1["member1"]
members[1].Assignment = assignments1["member2"]
// Force completion of revocation timeout
strategy.GetRebalanceState().RevocationTimeout = 0
// Second call: assignment phase
assignments := strategy.Assign(members, topicPartitions)
// Verify fair redistribution (2 partitions each)
member1Assignment := assignments["member1"]
member2Assignment := assignments["member2"]
if len(member1Assignment) != 2 {
t.Errorf("Expected member1 to have 2 partitions after rebalance, got %d", len(member1Assignment))
}
if len(member2Assignment) != 2 {
t.Errorf("Expected member2 to have 2 partitions after rebalance, got %d", len(member2Assignment))
}
// Verify some stickiness - member1 should retain some of its original partitions
originalPartitions := map[int32]bool{0: true, 1: true, 2: true, 3: true}
retainedCount := 0
@ -168,22 +179,22 @@ func TestCooperativeStickyAssignmentStrategy_NewMemberJoin(t *testing.T) {
retainedCount++
}
}
if retainedCount == 0 {
t.Error("Member1 should retain at least some of its original partitions (sticky behavior)")
}
t.Logf("Member1 retained %d out of 4 original partitions", retainedCount)
}
func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
strategy := NewIncrementalCooperativeAssignmentStrategy()
// Scenario: member2 leaves, member1 should get its partitions
members := []*GroupMember{
{
ID: "member1",
Subscription: []string{"topic1"},
ID: "member1",
Subscription: []string{"topic1"},
Assignment: []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic1", Partition: 1},
@ -191,20 +202,20 @@ func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) {
},
// member2 has left, so it's not in the members list
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3}, // All partitions still need to be assigned
}
assignments := strategy.Assign(members, topicPartitions)
// member1 should get all partitions
member1Assignment := assignments["member1"]
if len(member1Assignment) != 4 {
t.Errorf("Expected member1 to get all 4 partitions after member2 left, got %d", len(member1Assignment))
}
// Verify member1 retained its original partitions (sticky behavior)
hasPartition0 := false
hasPartition1 := false
@ -216,55 +227,55 @@ func TestCooperativeStickyAssignmentStrategy_MemberLeave(t *testing.T) {
hasPartition1 = true
}
}
if !hasPartition0 || !hasPartition1 {
t.Error("Member1 should retain its original partitions 0 and 1")
}
}
func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
strategy := NewIncrementalCooperativeAssignmentStrategy()
members := []*GroupMember{
{
ID: "member1",
Subscription: []string{"topic1", "topic2"},
ID: "member1",
Subscription: []string{"topic1", "topic2"},
Assignment: []PartitionAssignment{
{Topic: "topic1", Partition: 0},
{Topic: "topic2", Partition: 0},
},
},
{
ID: "member2",
Subscription: []string{"topic1", "topic2"},
ID: "member2",
Subscription: []string{"topic1", "topic2"},
Assignment: []PartitionAssignment{
{Topic: "topic1", Partition: 1},
{Topic: "topic2", Partition: 1},
},
},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify all partitions are assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
if totalAssigned != 4 {
t.Errorf("Expected 4 total partitions assigned across both topics, got %d", totalAssigned)
}
// Verify sticky behavior - each member should retain their original assignments
member1Assignment := assignments["member1"]
member2Assignment := assignments["member2"]
// Check member1 retains topic1:0 and topic2:0
hasT1P0 := false
hasT2P0 := false
@ -276,11 +287,11 @@ func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) {
hasT2P0 = true
}
}
if !hasT1P0 || !hasT2P0 {
t.Errorf("Member1 should retain topic1:0 and topic2:0, got %v", member1Assignment)
}
// Check member2 retains topic1:1 and topic2:1
hasT1P1 := false
hasT2P1 := false
@ -292,41 +303,41 @@ func TestCooperativeStickyAssignmentStrategy_MultipleTopics(t *testing.T) {
hasT2P1 = true
}
}
if !hasT1P1 || !hasT2P1 {
t.Errorf("Member2 should retain topic1:1 and topic2:1, got %v", member2Assignment)
}
}
func TestCooperativeStickyAssignmentStrategy_UnevenPartitions(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
strategy := NewIncrementalCooperativeAssignmentStrategy()
// 5 partitions, 2 members - should distribute 3:2 or 2:3
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
{ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1, 2, 3, 4},
}
assignments := strategy.Assign(members, topicPartitions)
// Verify all partitions are assigned
totalAssigned := 0
for _, assignment := range assignments {
totalAssigned += len(assignment)
}
if totalAssigned != 5 {
t.Errorf("Expected 5 total partitions assigned, got %d", totalAssigned)
}
// Verify fair distribution
member1Count := len(assignments["member1"])
member2Count := len(assignments["member2"])
// Should be 3:2 or 2:3 distribution
if !((member1Count == 3 && member2Count == 2) || (member1Count == 2 && member2Count == 3)) {
t.Errorf("Expected 3:2 or 2:3 distribution, got %d:%d", member1Count, member2Count)
@ -334,79 +345,79 @@ func TestCooperativeStickyAssignmentStrategy_UnevenPartitions(t *testing.T) {
}
func TestCooperativeStickyAssignmentStrategy_PartialSubscription(t *testing.T) {
strategy := &CooperativeStickyAssignmentStrategy{}
strategy := NewIncrementalCooperativeAssignmentStrategy()
// member1 subscribes to both topics, member2 only to topic1
members := []*GroupMember{
{ID: "member1", Subscription: []string{"topic1", "topic2"}, Assignment: []PartitionAssignment{}},
{ID: "member2", Subscription: []string{"topic1"}, Assignment: []PartitionAssignment{}},
}
topicPartitions := map[string][]int32{
"topic1": {0, 1},
"topic2": {0, 1},
}
assignments := strategy.Assign(members, topicPartitions)
// member1 should get all topic2 partitions since member2 isn't subscribed
member1Assignment := assignments["member1"]
member2Assignment := assignments["member2"]
// Count topic2 partitions for each member
member1Topic2Count := 0
member2Topic2Count := 0
for _, pa := range member1Assignment {
if pa.Topic == "topic2" {
member1Topic2Count++
}
}
for _, pa := range member2Assignment {
if pa.Topic == "topic2" {
member2Topic2Count++
}
}
if member1Topic2Count != 2 {
t.Errorf("Expected member1 to get all 2 topic2 partitions, got %d", member1Topic2Count)
}
if member2Topic2Count != 0 {
t.Errorf("Expected member2 to get 0 topic2 partitions (not subscribed), got %d", member2Topic2Count)
}
// Both members should get some topic1 partitions
member1Topic1Count := 0
member2Topic1Count := 0
for _, pa := range member1Assignment {
if pa.Topic == "topic1" {
member1Topic1Count++
}
}
for _, pa := range member2Assignment {
if pa.Topic == "topic1" {
member2Topic1Count++
}
}
if member1Topic1Count + member2Topic1Count != 2 {
t.Errorf("Expected all topic1 partitions to be assigned, got %d + %d = %d",
member1Topic1Count, member2Topic1Count, member1Topic1Count + member2Topic1Count)
if member1Topic1Count+member2Topic1Count != 2 {
t.Errorf("Expected all topic1 partitions to be assigned, got %d + %d = %d",
member1Topic1Count, member2Topic1Count, member1Topic1Count+member2Topic1Count)
}
}
func TestGetAssignmentStrategy_CooperativeSticky(t *testing.T) {
strategy := GetAssignmentStrategy("cooperative-sticky")
if strategy.Name() != "cooperative-sticky" {
strategy := GetAssignmentStrategy(ProtocolNameCooperativeSticky)
if strategy.Name() != ProtocolNameCooperativeSticky {
t.Errorf("Expected cooperative-sticky strategy, got %s", strategy.Name())
}
// Verify it's the correct type
if _, ok := strategy.(*CooperativeStickyAssignmentStrategy); !ok {
t.Errorf("Expected CooperativeStickyAssignmentStrategy, got %T", strategy)
if _, ok := strategy.(*IncrementalCooperativeAssignmentStrategy); !ok {
t.Errorf("Expected IncrementalCooperativeAssignmentStrategy, got %T", strategy)
}
}

23
weed/mq/kafka/consumer/incremental_rebalancing.go

@ -31,8 +31,8 @@ func (rp RebalancePhase) String() string {
// IncrementalRebalanceState tracks the state of incremental cooperative rebalancing
type IncrementalRebalanceState struct {
Phase RebalancePhase
RevocationGeneration int32 // Generation when revocation started
AssignmentGeneration int32 // Generation when assignment started
RevocationGeneration int32 // Generation when revocation started
AssignmentGeneration int32 // Generation when assignment started
RevokedPartitions map[string][]PartitionAssignment // Member ID -> revoked partitions
PendingAssignments map[string][]PartitionAssignment // Member ID -> pending assignments
StartTime time.Time
@ -64,7 +64,7 @@ func NewIncrementalCooperativeAssignmentStrategy() *IncrementalCooperativeAssign
}
func (ics *IncrementalCooperativeAssignmentStrategy) Name() string {
return "cooperative-sticky"
return ProtocolNameCooperativeSticky
}
func (ics *IncrementalCooperativeAssignmentStrategy) Assign(
@ -99,10 +99,10 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance(
) map[string][]PartitionAssignment {
// Calculate ideal assignment
idealAssignment := ics.calculateIdealAssignment(members, topicPartitions)
// Determine which partitions need to be revoked
partitionsToRevoke := ics.calculateRevocations(members, idealAssignment)
if len(partitionsToRevoke) == 0 {
// No revocations needed, proceed with regular assignment
return idealAssignment
@ -112,7 +112,7 @@ func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance(
ics.rebalanceState.Phase = RebalancePhaseRevocation
ics.rebalanceState.StartTime = time.Now()
ics.rebalanceState.RevokedPartitions = partitionsToRevoke
// Return current assignments minus revoked partitions
return ics.applyRevocations(members, partitionsToRevoke)
}
@ -140,12 +140,12 @@ func (ics *IncrementalCooperativeAssignmentStrategy) handleAssignmentPhase(
) map[string][]PartitionAssignment {
// Calculate final assignment including previously revoked partitions
finalAssignment := ics.calculateIdealAssignment(members, topicPartitions)
// Complete the rebalance
ics.rebalanceState.Phase = RebalancePhaseNone
ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment)
ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment)
return finalAssignment
}
@ -333,10 +333,9 @@ func (ics *IncrementalCooperativeAssignmentStrategy) performRegularAssignment(
) map[string][]PartitionAssignment {
// Reset rebalance state
ics.rebalanceState = NewIncrementalRebalanceState()
// Use regular cooperative-sticky logic
cooperativeSticky := &CooperativeStickyAssignmentStrategy{}
return cooperativeSticky.Assign(members, topicPartitions)
// Use ideal assignment calculation (non-incremental cooperative assignment)
return ics.calculateIdealAssignment(members, topicPartitions)
}
// GetRebalanceState returns the current rebalance state (for monitoring/debugging)

18
weed/mq/kafka/protocol/consumer_group_metadata.go

@ -5,6 +5,8 @@ import (
"fmt"
"net"
"sync"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
// ConsumerProtocolMetadata represents parsed consumer protocol metadata
@ -148,10 +150,10 @@ func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*Consu
// ValidateAssignmentStrategy checks if an assignment strategy is supported
func ValidateAssignmentStrategy(strategy string) bool {
supportedStrategies := map[string]bool{
"range": true,
"roundrobin": true,
"sticky": true,
"cooperative-sticky": false, // Not yet implemented
consumer.ProtocolNameRange: true,
consumer.ProtocolNameRoundRobin: true,
consumer.ProtocolNameSticky: true,
consumer.ProtocolNameCooperativeSticky: true, // Incremental cooperative rebalancing (Kafka 2.4+)
}
return supportedStrategies[strategy]
@ -184,7 +186,7 @@ func ExtractTopicsFromMetadata(protocols []GroupProtocol, fallbackTopics []strin
// SelectBestProtocol chooses the best assignment protocol from available options
func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) string {
// Priority order: sticky > roundrobin > range
protocolPriority := []string{"sticky", "roundrobin", "range"}
protocolPriority := []string{consumer.ProtocolNameSticky, consumer.ProtocolNameRoundRobin, consumer.ProtocolNameRange}
// Find supported protocols in client's list
clientProtocols := make(map[string]bool)
@ -218,8 +220,8 @@ func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) stri
// No common protocol found - handle special fallback case
// If client supports nothing we validate, but group supports "range", use "range"
if len(clientProtocols) == 0 && groupProtocolSet["range"] {
return "range"
if len(clientProtocols) == 0 && groupProtocolSet[consumer.ProtocolNameRange] {
return consumer.ProtocolNameRange
}
// Return empty string to indicate no compatible protocol found
@ -234,7 +236,7 @@ func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) stri
}
// Last resort
return "range"
return consumer.ProtocolNameRange
}
// ProtocolMetadataDebugInfo returns debug information about protocol metadata

10
weed/mq/kafka/protocol/joingroup.go

@ -232,7 +232,7 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
// Ensure we have a valid protocol - fallback to "range" if empty
if groupProtocol == "" {
groupProtocol = "range"
groupProtocol = consumer.ProtocolNameRange
}
// If a protocol is already selected for the group, reject joins that do not support it.
@ -615,7 +615,7 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
} else {
// NON-nullable compact string in v6 - must not be empty!
if response.ProtocolName == "" {
response.ProtocolName = "range" // fallback to default
response.ProtocolName = consumer.ProtocolNameRange // fallback to default
}
out = append(out, FlexibleString(response.ProtocolName)...)
}
@ -762,9 +762,9 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in
ThrottleTimeMs: 0,
ErrorCode: errorCode,
GenerationID: -1,
ProtocolName: "range", // Use "range" as default protocol instead of empty string
Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field
MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field
ProtocolName: consumer.ProtocolNameRange, // Use "range" as default protocol instead of empty string
Leader: "unknown", // Use "unknown" instead of empty string for non-nullable field
MemberID: "unknown", // Use "unknown" instead of empty string for non-nullable field
Version: apiVersion,
Members: []JoinGroupMember{},
}

Loading…
Cancel
Save