You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							357 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							357 lines
						
					
					
						
							12 KiB
						
					
					
				| package consumer | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"sort" | |
| 	"time" | |
| ) | |
| 
 | |
| // RebalancePhase represents the phase of incremental cooperative rebalancing | |
| type RebalancePhase int | |
| 
 | |
| const ( | |
| 	RebalancePhaseNone RebalancePhase = iota | |
| 	RebalancePhaseRevocation | |
| 	RebalancePhaseAssignment | |
| ) | |
| 
 | |
| func (rp RebalancePhase) String() string { | |
| 	switch rp { | |
| 	case RebalancePhaseNone: | |
| 		return "None" | |
| 	case RebalancePhaseRevocation: | |
| 		return "Revocation" | |
| 	case RebalancePhaseAssignment: | |
| 		return "Assignment" | |
| 	default: | |
| 		return "Unknown" | |
| 	} | |
| } | |
| 
 | |
| // IncrementalRebalanceState tracks the state of incremental cooperative rebalancing | |
| type IncrementalRebalanceState struct { | |
| 	Phase                RebalancePhase | |
| 	RevocationGeneration int32 // Generation when revocation started | |
| 	AssignmentGeneration int32 // Generation when assignment started | |
| 	RevokedPartitions    map[string][]PartitionAssignment // Member ID -> revoked partitions | |
| 	PendingAssignments   map[string][]PartitionAssignment // Member ID -> pending assignments | |
| 	StartTime            time.Time | |
| 	RevocationTimeout    time.Duration | |
| } | |
| 
 | |
| // NewIncrementalRebalanceState creates a new incremental rebalance state | |
| func NewIncrementalRebalanceState() *IncrementalRebalanceState { | |
| 	return &IncrementalRebalanceState{ | |
| 		Phase:              RebalancePhaseNone, | |
| 		RevokedPartitions:  make(map[string][]PartitionAssignment), | |
| 		PendingAssignments: make(map[string][]PartitionAssignment), | |
| 		RevocationTimeout:  30 * time.Second, // Default revocation timeout | |
| 	} | |
| } | |
| 
 | |
| // IncrementalCooperativeAssignmentStrategy implements incremental cooperative rebalancing | |
| // This strategy performs rebalancing in two phases: | |
| // 1. Revocation phase: Members give up partitions that need to be reassigned | |
| // 2. Assignment phase: Members receive new partitions | |
| type IncrementalCooperativeAssignmentStrategy struct { | |
| 	rebalanceState *IncrementalRebalanceState | |
| } | |
| 
 | |
| func NewIncrementalCooperativeAssignmentStrategy() *IncrementalCooperativeAssignmentStrategy { | |
| 	return &IncrementalCooperativeAssignmentStrategy{ | |
| 		rebalanceState: NewIncrementalRebalanceState(), | |
| 	} | |
| } | |
| 
 | |
| func (ics *IncrementalCooperativeAssignmentStrategy) Name() string { | |
| 	return "cooperative-sticky" | |
| } | |
| 
 | |
| func (ics *IncrementalCooperativeAssignmentStrategy) Assign( | |
| 	members []*GroupMember, | |
| 	topicPartitions map[string][]int32, | |
| ) map[string][]PartitionAssignment { | |
| 	if len(members) == 0 { | |
| 		return make(map[string][]PartitionAssignment) | |
| 	} | |
| 
 | |
| 	// Check if we need to start a new rebalance | |
| 	if ics.rebalanceState.Phase == RebalancePhaseNone { | |
| 		return ics.startIncrementalRebalance(members, topicPartitions) | |
| 	} | |
| 
 | |
| 	// Continue existing rebalance based on current phase | |
| 	switch ics.rebalanceState.Phase { | |
| 	case RebalancePhaseRevocation: | |
| 		return ics.handleRevocationPhase(members, topicPartitions) | |
| 	case RebalancePhaseAssignment: | |
| 		return ics.handleAssignmentPhase(members, topicPartitions) | |
| 	default: | |
| 		// Fallback to regular assignment | |
| 		return ics.performRegularAssignment(members, topicPartitions) | |
| 	} | |
| } | |
| 
 | |
| // startIncrementalRebalance initiates a new incremental rebalance | |
| func (ics *IncrementalCooperativeAssignmentStrategy) startIncrementalRebalance( | |
| 	members []*GroupMember, | |
| 	topicPartitions map[string][]int32, | |
| ) map[string][]PartitionAssignment { | |
| 	// Calculate ideal assignment | |
| 	idealAssignment := ics.calculateIdealAssignment(members, topicPartitions) | |
| 	 | |
| 	// Determine which partitions need to be revoked | |
| 	partitionsToRevoke := ics.calculateRevocations(members, idealAssignment) | |
| 	 | |
| 	if len(partitionsToRevoke) == 0 { | |
| 		// No revocations needed, proceed with regular assignment | |
| 		return idealAssignment | |
| 	} | |
| 
 | |
| 	// Start revocation phase | |
| 	ics.rebalanceState.Phase = RebalancePhaseRevocation | |
| 	ics.rebalanceState.StartTime = time.Now() | |
| 	ics.rebalanceState.RevokedPartitions = partitionsToRevoke | |
| 	 | |
| 	// Return current assignments minus revoked partitions | |
| 	return ics.applyRevocations(members, partitionsToRevoke) | |
| } | |
| 
 | |
| // handleRevocationPhase manages the revocation phase of incremental rebalancing | |
| func (ics *IncrementalCooperativeAssignmentStrategy) handleRevocationPhase( | |
| 	members []*GroupMember, | |
| 	topicPartitions map[string][]int32, | |
| ) map[string][]PartitionAssignment { | |
| 	// Check if revocation timeout has passed | |
| 	if time.Since(ics.rebalanceState.StartTime) > ics.rebalanceState.RevocationTimeout { | |
| 		// Force move to assignment phase | |
| 		ics.rebalanceState.Phase = RebalancePhaseAssignment | |
| 		return ics.handleAssignmentPhase(members, topicPartitions) | |
| 	} | |
| 
 | |
| 	// Continue with revoked assignments (members should stop consuming revoked partitions) | |
| 	return ics.getCurrentAssignmentsWithRevocations(members) | |
| } | |
| 
 | |
| // handleAssignmentPhase manages the assignment phase of incremental rebalancing | |
| func (ics *IncrementalCooperativeAssignmentStrategy) handleAssignmentPhase( | |
| 	members []*GroupMember, | |
| 	topicPartitions map[string][]int32, | |
| ) map[string][]PartitionAssignment { | |
| 	// Calculate final assignment including previously revoked partitions | |
| 	finalAssignment := ics.calculateIdealAssignment(members, topicPartitions) | |
| 	 | |
| 	// Complete the rebalance | |
| 	ics.rebalanceState.Phase = RebalancePhaseNone | |
| 	ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment) | |
| 	ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment) | |
| 	 | |
| 	return finalAssignment | |
| } | |
| 
 | |
| // calculateIdealAssignment computes the ideal partition assignment | |
| func (ics *IncrementalCooperativeAssignmentStrategy) calculateIdealAssignment( | |
| 	members []*GroupMember, | |
| 	topicPartitions map[string][]int32, | |
| ) map[string][]PartitionAssignment { | |
| 	assignments := make(map[string][]PartitionAssignment) | |
| 	for _, member := range members { | |
| 		assignments[member.ID] = make([]PartitionAssignment, 0) | |
| 	} | |
| 
 | |
| 	// Sort members for consistent assignment | |
| 	sortedMembers := make([]*GroupMember, len(members)) | |
| 	copy(sortedMembers, members) | |
| 	sort.Slice(sortedMembers, func(i, j int) bool { | |
| 		return sortedMembers[i].ID < sortedMembers[j].ID | |
| 	}) | |
| 
 | |
| 	// Get all subscribed topics | |
| 	subscribedTopics := make(map[string]bool) | |
| 	for _, member := range members { | |
| 		for _, topic := range member.Subscription { | |
| 			subscribedTopics[topic] = true | |
| 		} | |
| 	} | |
| 
 | |
| 	// Collect all partitions that need assignment | |
| 	allPartitions := make([]PartitionAssignment, 0) | |
| 	for topic := range subscribedTopics { | |
| 		partitions, exists := topicPartitions[topic] | |
| 		if !exists { | |
| 			continue | |
| 		} | |
| 
 | |
| 		for _, partition := range partitions { | |
| 			allPartitions = append(allPartitions, PartitionAssignment{ | |
| 				Topic:     topic, | |
| 				Partition: partition, | |
| 			}) | |
| 		} | |
| 	} | |
| 
 | |
| 	// Sort partitions for consistent assignment | |
| 	sort.Slice(allPartitions, func(i, j int) bool { | |
| 		if allPartitions[i].Topic != allPartitions[j].Topic { | |
| 			return allPartitions[i].Topic < allPartitions[j].Topic | |
| 		} | |
| 		return allPartitions[i].Partition < allPartitions[j].Partition | |
| 	}) | |
| 
 | |
| 	// Distribute partitions based on subscriptions | |
| 	if len(allPartitions) > 0 && len(sortedMembers) > 0 { | |
| 		// Group partitions by topic | |
| 		partitionsByTopic := make(map[string][]PartitionAssignment) | |
| 		for _, partition := range allPartitions { | |
| 			partitionsByTopic[partition.Topic] = append(partitionsByTopic[partition.Topic], partition) | |
| 		} | |
| 
 | |
| 		// Assign partitions topic by topic | |
| 		for topic, topicPartitions := range partitionsByTopic { | |
| 			// Find members subscribed to this topic | |
| 			subscribedMembers := make([]*GroupMember, 0) | |
| 			for _, member := range sortedMembers { | |
| 				for _, subscribedTopic := range member.Subscription { | |
| 					if subscribedTopic == topic { | |
| 						subscribedMembers = append(subscribedMembers, member) | |
| 						break | |
| 					} | |
| 				} | |
| 			} | |
| 
 | |
| 			if len(subscribedMembers) == 0 { | |
| 				continue // No members subscribed to this topic | |
| 			} | |
| 
 | |
| 			// Distribute topic partitions among subscribed members | |
| 			partitionsPerMember := len(topicPartitions) / len(subscribedMembers) | |
| 			extraPartitions := len(topicPartitions) % len(subscribedMembers) | |
| 
 | |
| 			partitionIndex := 0 | |
| 			for i, member := range subscribedMembers { | |
| 				// Calculate how many partitions this member should get for this topic | |
| 				numPartitions := partitionsPerMember | |
| 				if i < extraPartitions { | |
| 					numPartitions++ | |
| 				} | |
| 
 | |
| 				// Assign partitions to this member | |
| 				for j := 0; j < numPartitions && partitionIndex < len(topicPartitions); j++ { | |
| 					assignments[member.ID] = append(assignments[member.ID], topicPartitions[partitionIndex]) | |
| 					partitionIndex++ | |
| 				} | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	return assignments | |
| } | |
| 
 | |
| // calculateRevocations determines which partitions need to be revoked for rebalancing | |
| func (ics *IncrementalCooperativeAssignmentStrategy) calculateRevocations( | |
| 	members []*GroupMember, | |
| 	idealAssignment map[string][]PartitionAssignment, | |
| ) map[string][]PartitionAssignment { | |
| 	revocations := make(map[string][]PartitionAssignment) | |
| 
 | |
| 	for _, member := range members { | |
| 		currentAssignment := member.Assignment | |
| 		memberIdealAssignment := idealAssignment[member.ID] | |
| 
 | |
| 		// Find partitions that are currently assigned but not in ideal assignment | |
| 		currentMap := make(map[string]bool) | |
| 		for _, assignment := range currentAssignment { | |
| 			key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) | |
| 			currentMap[key] = true | |
| 		} | |
| 
 | |
| 		idealMap := make(map[string]bool) | |
| 		for _, assignment := range memberIdealAssignment { | |
| 			key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) | |
| 			idealMap[key] = true | |
| 		} | |
| 
 | |
| 		// Identify partitions to revoke | |
| 		var toRevoke []PartitionAssignment | |
| 		for _, assignment := range currentAssignment { | |
| 			key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) | |
| 			if !idealMap[key] { | |
| 				toRevoke = append(toRevoke, assignment) | |
| 			} | |
| 		} | |
| 
 | |
| 		if len(toRevoke) > 0 { | |
| 			revocations[member.ID] = toRevoke | |
| 		} | |
| 	} | |
| 
 | |
| 	return revocations | |
| } | |
| 
 | |
| // applyRevocations returns current assignments with specified partitions revoked | |
| func (ics *IncrementalCooperativeAssignmentStrategy) applyRevocations( | |
| 	members []*GroupMember, | |
| 	revocations map[string][]PartitionAssignment, | |
| ) map[string][]PartitionAssignment { | |
| 	assignments := make(map[string][]PartitionAssignment) | |
| 
 | |
| 	for _, member := range members { | |
| 		assignments[member.ID] = make([]PartitionAssignment, 0) | |
| 
 | |
| 		// Get revoked partitions for this member | |
| 		revokedPartitions := make(map[string]bool) | |
| 		if revoked, exists := revocations[member.ID]; exists { | |
| 			for _, partition := range revoked { | |
| 				key := fmt.Sprintf("%s:%d", partition.Topic, partition.Partition) | |
| 				revokedPartitions[key] = true | |
| 			} | |
| 		} | |
| 
 | |
| 		// Add current assignments except revoked ones | |
| 		for _, assignment := range member.Assignment { | |
| 			key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition) | |
| 			if !revokedPartitions[key] { | |
| 				assignments[member.ID] = append(assignments[member.ID], assignment) | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	return assignments | |
| } | |
| 
 | |
| // getCurrentAssignmentsWithRevocations returns current assignments with revocations applied | |
| func (ics *IncrementalCooperativeAssignmentStrategy) getCurrentAssignmentsWithRevocations( | |
| 	members []*GroupMember, | |
| ) map[string][]PartitionAssignment { | |
| 	return ics.applyRevocations(members, ics.rebalanceState.RevokedPartitions) | |
| } | |
| 
 | |
| // performRegularAssignment performs a regular (non-incremental) assignment as fallback | |
| func (ics *IncrementalCooperativeAssignmentStrategy) performRegularAssignment( | |
| 	members []*GroupMember, | |
| 	topicPartitions map[string][]int32, | |
| ) map[string][]PartitionAssignment { | |
| 	// Reset rebalance state | |
| 	ics.rebalanceState = NewIncrementalRebalanceState() | |
| 	 | |
| 	// Use regular cooperative-sticky logic | |
| 	cooperativeSticky := &CooperativeStickyAssignmentStrategy{} | |
| 	return cooperativeSticky.Assign(members, topicPartitions) | |
| } | |
| 
 | |
| // GetRebalanceState returns the current rebalance state (for monitoring/debugging) | |
| func (ics *IncrementalCooperativeAssignmentStrategy) GetRebalanceState() *IncrementalRebalanceState { | |
| 	return ics.rebalanceState | |
| } | |
| 
 | |
| // IsRebalanceInProgress returns true if an incremental rebalance is currently in progress | |
| func (ics *IncrementalCooperativeAssignmentStrategy) IsRebalanceInProgress() bool { | |
| 	return ics.rebalanceState.Phase != RebalancePhaseNone | |
| } | |
| 
 | |
| // ForceCompleteRebalance forces completion of the current rebalance (for timeout scenarios) | |
| func (ics *IncrementalCooperativeAssignmentStrategy) ForceCompleteRebalance() { | |
| 	ics.rebalanceState.Phase = RebalancePhaseNone | |
| 	ics.rebalanceState.RevokedPartitions = make(map[string][]PartitionAssignment) | |
| 	ics.rebalanceState.PendingAssignments = make(map[string][]PartitionAssignment) | |
| }
 |