You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

357 lines
12 KiB

package consumer
import (
"fmt"
"sort"
"time"
)
// RebalancePhase represents the phase of incremental cooperative rebalancing
type RebalancePhase int
const (
RebalancePhaseNone RebalancePhase = iota
RebalancePhaseRevocation
RebalancePhaseAssignment
)
func (rp RebalancePhase) String() string {
switch rp {
case RebalancePhaseNone:
return "None"
case RebalancePhaseRevocation:
return "Revocation"
case RebalancePhaseAssignment:
return "Assignment"
default:
return "Unknown"
}
}
// IncrementalRebalanceState tracks the state of incremental cooperative rebalancing
type IncrementalRebalanceState struct {
Phase RebalancePhase
RevocationGeneration int32 // Generation when revocation started
AssignmentGeneration int32 // Generation when assignment started
RevokedPartitions map[string][]PartitionAssignment // Member ID -> revoked partitions
PendingAssignments map[string][]PartitionAssignment // Member ID -> pending assignments
StartTime time.Time
RevocationTimeout time.Duration
}
// NewIncrementalRebalanceState creates a new incremental rebalance state
func NewIncrementalRebalanceState() *IncrementalRebalanceState {
return &IncrementalRebalanceState{
Phase: RebalancePhaseNone,
RevokedPartitions: make(map[string][]PartitionAssignment),
PendingAssignments: make(map[string][]PartitionAssignment),
RevocationTimeout: 30 * time.Second, // Default revocation timeout
}
}
// IncrementalCooperativeAssignmentStrategy implements incremental cooperative rebalancing
// This strategy performs rebalancing in two phases:
// 1. Revocation phase: Members give up partitions that need to be reassigned
// 2. Assignment phase: Members receive new partitions
type IncrementalCooperativeAssignmentStrategy struct {
rebalanceState *IncrementalRebalanceState
}
func NewIncrementalCooperativeAssignmentStrategy() *IncrementalCooperativeAssignmentStrategy {
return &IncrementalCooperativeAssignmentStrategy{
rebalanceState: NewIncrementalRebalanceState(),
}
}
func (ics *IncrementalCooperativeAssignmentStrategy) Name() string {
return "cooperative-sticky"
}
func (ics *IncrementalCooperativeAssignmentStrategy) Assign(
members []*GroupMember,
topicPartitions map[string][]int32,
) map[string][]PartitionAssignment {
if len(members) == 0 {
return make(map[string][]PartitionAssignment)
}
// Check if we need to start a new rebalance
if ics.rebalanceState.Phase == RebalancePhaseNone {
return ics.startIncrementalRebalance(members, topicPartitions)
}
// Continue existing rebalance based on current phase
switch ics.rebalanceState.Phase {
case RebalancePhaseRevocation:
return ics.handleRevocationPhase(members, topicPartitions)
case RebalancePhaseAssignment:
return ics.handleAssignmentPhase(members, topicPartitions)
default:
// Fallback to regular assignment
return ics.performRegularAssignment(members, topicPartitions)
}
}
// startIncrementalRebalance initiates a new incremental rebalance
func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance(
members []*GroupMember,
topicPartitions map[string][]int32,
) map[string][]PartitionAssignment {
// Calculate ideal assignment
idealAssignment := ics.calculateIdealAssignment(members, topicPartitions)
// Determine which partitions need to be revoked
partitionsToRevoke := ics.calculateRevocations(members, idealAssignment)
if len(partitionsToRevoke) == 0 {
// No revocations needed, proceed with regular assignment
return idealAssignment
}
// Start revocation phase
ics.rebalanceState.Phase = RebalancePhaseRevocation
ics.rebalanceState.StartTime = time.Now()
ics.rebalanceState.RevokedPartitions = partitionsToRevoke
// Return current assignments minus revoked partitions
return ics.applyRevocations(members, partitionsToRevoke)
}
// handleRevocationPhase manages the revocation phase of incremental rebalancing
func (ics *IncrementalCooperativeAssignmentStrategy) handleRevocationPhase(
members []*GroupMember,
topicPartitions map[string][]int32,
) map[string][]PartitionAssignment {
// Check if revocation timeout has passed
if time.Since(ics.rebalanceState.StartTime) > ics.rebalanceState.RevocationTimeout {
// Force move to assignment phase
ics.rebalanceState.Phase = RebalancePhaseAssignment
return ics.handleAssignmentPhase(members, topicPartitions)
}
// Continue with revoked assignments (members should stop consuming revoked partitions)
return ics.getCurrentAssignmentsWithRevocations(members)
}
// handleAssignmentPhase manages the assignment phase of incremental rebalancing
func (ics *IncrementalCooperativeAssignmentStrategy) handleAssignmentPhase(
members []*GroupMember,
topicPartitions map[string][]int32,
) map[string][]PartitionAssignment {
// Calculate final assignment including previously revoked partitions
finalAssignment := ics.calculateIdealAssignment(members, topicPartitions)
// Complete the rebalance
ics.rebalanceState.Phase = RebalancePhaseNone
ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment)
ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment)
return finalAssignment
}
// calculateIdealAssignment computes the ideal partition assignment
func (ics *IncrementalCooperativeAssignmentStrategy) calculateIdealAssignment(
members []*GroupMember,
topicPartitions map[string][]int32,
) map[string][]PartitionAssignment {
assignments := make(map[string][]PartitionAssignment)
for _, member := range members {
assignments[member.ID] = make([]PartitionAssignment, 0)
}
// Sort members for consistent assignment
sortedMembers := make([]*GroupMember, len(members))
copy(sortedMembers, members)
sort.Slice(sortedMembers, func(i, j int) bool {
return sortedMembers[i].ID < sortedMembers[j].ID
})
// Get all subscribed topics
subscribedTopics := make(map[string]bool)
for _, member := range members {
for _, topic := range member.Subscription {
subscribedTopics[topic] = true
}
}
// Collect all partitions that need assignment
allPartitions := make([]PartitionAssignment, 0)
for topic := range subscribedTopics {
partitions, exists := topicPartitions[topic]
if !exists {
continue
}
for _, partition := range partitions {
allPartitions = append(allPartitions, PartitionAssignment{
Topic: topic,
Partition: partition,
})
}
}
// Sort partitions for consistent assignment
sort.Slice(allPartitions, func(i, j int) bool {
if allPartitions[i].Topic != allPartitions[j].Topic {
return allPartitions[i].Topic < allPartitions[j].Topic
}
return allPartitions[i].Partition < allPartitions[j].Partition
})
// Distribute partitions based on subscriptions
if len(allPartitions) > 0 && len(sortedMembers) > 0 {
// Group partitions by topic
partitionsByTopic := make(map[string][]PartitionAssignment)
for _, partition := range allPartitions {
partitionsByTopic[partition.Topic] = append(partitionsByTopic[partition.Topic], partition)
}
// Assign partitions topic by topic
for topic, topicPartitions := range partitionsByTopic {
// Find members subscribed to this topic
subscribedMembers := make([]*GroupMember, 0)
for _, member := range sortedMembers {
for _, subscribedTopic := range member.Subscription {
if subscribedTopic == topic {
subscribedMembers = append(subscribedMembers, member)
break
}
}
}
if len(subscribedMembers) == 0 {
continue // No members subscribed to this topic
}
// Distribute topic partitions among subscribed members
partitionsPerMember := len(topicPartitions) / len(subscribedMembers)
extraPartitions := len(topicPartitions) % len(subscribedMembers)
partitionIndex := 0
for i, member := range subscribedMembers {
// Calculate how many partitions this member should get for this topic
numPartitions := partitionsPerMember
if i < extraPartitions {
numPartitions++
}
// Assign partitions to this member
for j := 0; j < numPartitions && partitionIndex < len(topicPartitions); j++ {
assignments[member.ID] = append(assignments[member.ID], topicPartitions[partitionIndex])
partitionIndex++
}
}
}
}
return assignments
}
// calculateRevocations determines which partitions need to be revoked for rebalancing
func (ics *IncrementalCooperativeAssignmentStrategy) calculateRevocations(
members []*GroupMember,
idealAssignment map[string][]PartitionAssignment,
) map[string][]PartitionAssignment {
revocations := make(map[string][]PartitionAssignment)
for _, member := range members {
currentAssignment := member.Assignment
memberIdealAssignment := idealAssignment[member.ID]
// Find partitions that are currently assigned but not in ideal assignment
currentMap := make(map[string]bool)
for _, assignment := range currentAssignment {
key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition)
currentMap[key] = true
}
idealMap := make(map[string]bool)
for _, assignment := range memberIdealAssignment {
key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition)
idealMap[key] = true
}
// Identify partitions to revoke
var toRevoke []PartitionAssignment
for _, assignment := range currentAssignment {
key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition)
if !idealMap[key] {
toRevoke = append(toRevoke, assignment)
}
}
if len(toRevoke) > 0 {
revocations[member.ID] = toRevoke
}
}
return revocations
}
// applyRevocations returns current assignments with specified partitions revoked
func (ics *IncrementalCooperativeAssignmentStrategy) applyRevocations(
members []*GroupMember,
revocations map[string][]PartitionAssignment,
) map[string][]PartitionAssignment {
assignments := make(map[string][]PartitionAssignment)
for _, member := range members {
assignments[member.ID] = make([]PartitionAssignment, 0)
// Get revoked partitions for this member
revokedPartitions := make(map[string]bool)
if revoked, exists := revocations[member.ID]; exists {
for _, partition := range revoked {
key := fmt.Sprintf("%s:%d", partition.Topic, partition.Partition)
revokedPartitions[key] = true
}
}
// Add current assignments except revoked ones
for _, assignment := range member.Assignment {
key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition)
if !revokedPartitions[key] {
assignments[member.ID] = append(assignments[member.ID], assignment)
}
}
}
return assignments
}
// getCurrentAssignmentsWithRevocations returns current assignments with revocations applied
func (ics *IncrementalCooperativeAssignmentStrategy) getCurrentAssignmentsWithRevocations(
members []*GroupMember,
) map[string][]PartitionAssignment {
return ics.applyRevocations(members, ics.rebalanceState.RevokedPartitions)
}
// performRegularAssignment performs a regular (non-incremental) assignment as fallback
func (ics *IncrementalCooperativeAssignmentStrategy) performRegularAssignment(
members []*GroupMember,
topicPartitions map[string][]int32,
) map[string][]PartitionAssignment {
// Reset rebalance state
ics.rebalanceState = NewIncrementalRebalanceState()
// Use regular cooperative-sticky logic
cooperativeSticky := &CooperativeStickyAssignmentStrategy{}
return cooperativeSticky.Assign(members, topicPartitions)
}
// GetRebalanceState returns the current rebalance state (for monitoring/debugging)
func (ics *IncrementalCooperativeAssignmentStrategy) GetRebalanceState() *IncrementalRebalanceState {
return ics.rebalanceState
}
// IsRebalanceInProgress returns true if an incremental rebalance is currently in progress
func (ics *IncrementalCooperativeAssignmentStrategy) IsRebalanceInProgress() bool {
return ics.rebalanceState.Phase != RebalancePhaseNone
}
// ForceCompleteRebalance forces completion of the current rebalance (for timeout scenarios)
func (ics *IncrementalCooperativeAssignmentStrategy) ForceCompleteRebalance() {
ics.rebalanceState.Phase = RebalancePhaseNone
ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment)
ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment)
}