You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

218 lines
7.2 KiB

package consumer
import (
"time"
)
// RebalanceTimeoutManager handles rebalance timeout logic and member eviction
type RebalanceTimeoutManager struct {
coordinator *GroupCoordinator
}
// NewRebalanceTimeoutManager creates a new rebalance timeout manager
func NewRebalanceTimeoutManager(coordinator *GroupCoordinator) *RebalanceTimeoutManager {
return &RebalanceTimeoutManager{
coordinator: coordinator,
}
}
// CheckRebalanceTimeouts checks for members that have exceeded rebalance timeouts
func (rtm *RebalanceTimeoutManager) CheckRebalanceTimeouts() {
now := time.Now()
rtm.coordinator.groupsMu.RLock()
defer rtm.coordinator.groupsMu.RUnlock()
for _, group := range rtm.coordinator.groups {
group.Mu.Lock()
// Only check timeouts for groups in rebalancing states
if group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance {
rtm.checkGroupRebalanceTimeout(group, now)
}
group.Mu.Unlock()
}
}
// checkGroupRebalanceTimeout checks and handles rebalance timeout for a specific group
func (rtm *RebalanceTimeoutManager) checkGroupRebalanceTimeout(group *ConsumerGroup, now time.Time) {
expiredMembers := make([]string, 0)
for memberID, member := range group.Members {
// Check if member has exceeded its rebalance timeout
rebalanceTimeout := time.Duration(member.RebalanceTimeout) * time.Millisecond
if rebalanceTimeout == 0 {
// Use default rebalance timeout if not specified
rebalanceTimeout = time.Duration(rtm.coordinator.rebalanceTimeoutMs) * time.Millisecond
}
// For members in pending state during rebalance, check against join time
if member.State == MemberStatePending {
if now.Sub(member.JoinedAt) > rebalanceTimeout {
expiredMembers = append(expiredMembers, memberID)
}
}
// Also check session timeout as a fallback
sessionTimeout := time.Duration(member.SessionTimeout) * time.Millisecond
if now.Sub(member.LastHeartbeat) > sessionTimeout {
expiredMembers = append(expiredMembers, memberID)
}
}
// Remove expired members and trigger rebalance if necessary
if len(expiredMembers) > 0 {
rtm.evictExpiredMembers(group, expiredMembers)
}
}
// evictExpiredMembers removes expired members and updates group state
func (rtm *RebalanceTimeoutManager) evictExpiredMembers(group *ConsumerGroup, expiredMembers []string) {
for _, memberID := range expiredMembers {
delete(group.Members, memberID)
// If the leader was evicted, clear leader
if group.Leader == memberID {
group.Leader = ""
}
}
// Update group state based on remaining members
if len(group.Members) == 0 {
group.State = GroupStateEmpty
group.Generation++
group.Leader = ""
} else {
// If we were in the middle of rebalancing, restart the process
if group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance {
// Select new leader if needed
if group.Leader == "" {
for memberID := range group.Members {
group.Leader = memberID
break
}
}
// Reset to preparing rebalance to restart the process
group.State = GroupStatePreparingRebalance
group.Generation++
// Mark remaining members as pending
for _, member := range group.Members {
member.State = MemberStatePending
}
}
}
group.LastActivity = time.Now()
}
// IsRebalanceStuck checks if a group has been stuck in rebalancing for too long
func (rtm *RebalanceTimeoutManager) IsRebalanceStuck(group *ConsumerGroup, maxRebalanceDuration time.Duration) bool {
if group.State != GroupStatePreparingRebalance && group.State != GroupStateCompletingRebalance {
return false
}
return time.Since(group.LastActivity) > maxRebalanceDuration
}
// ForceCompleteRebalance forces completion of a stuck rebalance
func (rtm *RebalanceTimeoutManager) ForceCompleteRebalance(group *ConsumerGroup) {
group.Mu.Lock()
defer group.Mu.Unlock()
// If stuck in preparing rebalance, move to completing
if group.State == GroupStatePreparingRebalance {
group.State = GroupStateCompletingRebalance
group.LastActivity = time.Now()
return
}
// If stuck in completing rebalance, force to stable
if group.State == GroupStateCompletingRebalance {
group.State = GroupStateStable
for _, member := range group.Members {
member.State = MemberStateStable
}
group.LastActivity = time.Now()
return
}
}
// GetRebalanceStatus returns the current rebalance status for a group
func (rtm *RebalanceTimeoutManager) GetRebalanceStatus(groupID string) *RebalanceStatus {
group := rtm.coordinator.GetGroup(groupID)
if group == nil {
return nil
}
group.Mu.RLock()
defer group.Mu.RUnlock()
status := &RebalanceStatus{
GroupID: groupID,
State: group.State,
Generation: group.Generation,
MemberCount: len(group.Members),
Leader: group.Leader,
LastActivity: group.LastActivity,
IsRebalancing: group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance,
RebalanceDuration: time.Since(group.LastActivity),
}
// Calculate member timeout status
now := time.Now()
for memberID, member := range group.Members {
memberStatus := MemberTimeoutStatus{
MemberID: memberID,
State: member.State,
LastHeartbeat: member.LastHeartbeat,
JoinedAt: member.JoinedAt,
SessionTimeout: time.Duration(member.SessionTimeout) * time.Millisecond,
RebalanceTimeout: time.Duration(member.RebalanceTimeout) * time.Millisecond,
}
// Calculate time until session timeout
sessionTimeRemaining := memberStatus.SessionTimeout - now.Sub(member.LastHeartbeat)
if sessionTimeRemaining < 0 {
sessionTimeRemaining = 0
}
memberStatus.SessionTimeRemaining = sessionTimeRemaining
// Calculate time until rebalance timeout
rebalanceTimeRemaining := memberStatus.RebalanceTimeout - now.Sub(member.JoinedAt)
if rebalanceTimeRemaining < 0 {
rebalanceTimeRemaining = 0
}
memberStatus.RebalanceTimeRemaining = rebalanceTimeRemaining
status.Members = append(status.Members, memberStatus)
}
return status
}
// RebalanceStatus represents the current status of a group's rebalance
type RebalanceStatus struct {
GroupID string `json:"group_id"`
State GroupState `json:"state"`
Generation int32 `json:"generation"`
MemberCount int `json:"member_count"`
Leader string `json:"leader"`
LastActivity time.Time `json:"last_activity"`
IsRebalancing bool `json:"is_rebalancing"`
RebalanceDuration time.Duration `json:"rebalance_duration"`
Members []MemberTimeoutStatus `json:"members"`
}
// MemberTimeoutStatus represents timeout status for a group member
type MemberTimeoutStatus struct {
MemberID string `json:"member_id"`
State MemberState `json:"state"`
LastHeartbeat time.Time `json:"last_heartbeat"`
JoinedAt time.Time `json:"joined_at"`
SessionTimeout time.Duration `json:"session_timeout"`
RebalanceTimeout time.Duration `json:"rebalance_timeout"`
SessionTimeRemaining time.Duration `json:"session_time_remaining"`
RebalanceTimeRemaining time.Duration `json:"rebalance_time_remaining"`
}