You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
218 lines
7.2 KiB
218 lines
7.2 KiB
package consumer
|
|
|
|
import (
|
|
"time"
|
|
)
|
|
|
|
// RebalanceTimeoutManager handles rebalance timeout logic and member eviction
|
|
type RebalanceTimeoutManager struct {
|
|
coordinator *GroupCoordinator
|
|
}
|
|
|
|
// NewRebalanceTimeoutManager creates a new rebalance timeout manager
|
|
func NewRebalanceTimeoutManager(coordinator *GroupCoordinator) *RebalanceTimeoutManager {
|
|
return &RebalanceTimeoutManager{
|
|
coordinator: coordinator,
|
|
}
|
|
}
|
|
|
|
// CheckRebalanceTimeouts checks for members that have exceeded rebalance timeouts
|
|
func (rtm *RebalanceTimeoutManager) CheckRebalanceTimeouts() {
|
|
now := time.Now()
|
|
rtm.coordinator.groupsMu.RLock()
|
|
defer rtm.coordinator.groupsMu.RUnlock()
|
|
|
|
for _, group := range rtm.coordinator.groups {
|
|
group.Mu.Lock()
|
|
|
|
// Only check timeouts for groups in rebalancing states
|
|
if group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance {
|
|
rtm.checkGroupRebalanceTimeout(group, now)
|
|
}
|
|
|
|
group.Mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// checkGroupRebalanceTimeout checks and handles rebalance timeout for a specific group
|
|
func (rtm *RebalanceTimeoutManager) checkGroupRebalanceTimeout(group *ConsumerGroup, now time.Time) {
|
|
expiredMembers := make([]string, 0)
|
|
|
|
for memberID, member := range group.Members {
|
|
// Check if member has exceeded its rebalance timeout
|
|
rebalanceTimeout := time.Duration(member.RebalanceTimeout) * time.Millisecond
|
|
if rebalanceTimeout == 0 {
|
|
// Use default rebalance timeout if not specified
|
|
rebalanceTimeout = time.Duration(rtm.coordinator.rebalanceTimeoutMs) * time.Millisecond
|
|
}
|
|
|
|
// For members in pending state during rebalance, check against join time
|
|
if member.State == MemberStatePending {
|
|
if now.Sub(member.JoinedAt) > rebalanceTimeout {
|
|
expiredMembers = append(expiredMembers, memberID)
|
|
}
|
|
}
|
|
|
|
// Also check session timeout as a fallback
|
|
sessionTimeout := time.Duration(member.SessionTimeout) * time.Millisecond
|
|
if now.Sub(member.LastHeartbeat) > sessionTimeout {
|
|
expiredMembers = append(expiredMembers, memberID)
|
|
}
|
|
}
|
|
|
|
// Remove expired members and trigger rebalance if necessary
|
|
if len(expiredMembers) > 0 {
|
|
rtm.evictExpiredMembers(group, expiredMembers)
|
|
}
|
|
}
|
|
|
|
// evictExpiredMembers removes expired members and updates group state
|
|
func (rtm *RebalanceTimeoutManager) evictExpiredMembers(group *ConsumerGroup, expiredMembers []string) {
|
|
for _, memberID := range expiredMembers {
|
|
delete(group.Members, memberID)
|
|
|
|
// If the leader was evicted, clear leader
|
|
if group.Leader == memberID {
|
|
group.Leader = ""
|
|
}
|
|
}
|
|
|
|
// Update group state based on remaining members
|
|
if len(group.Members) == 0 {
|
|
group.State = GroupStateEmpty
|
|
group.Generation++
|
|
group.Leader = ""
|
|
} else {
|
|
// If we were in the middle of rebalancing, restart the process
|
|
if group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance {
|
|
// Select new leader if needed
|
|
if group.Leader == "" {
|
|
for memberID := range group.Members {
|
|
group.Leader = memberID
|
|
break
|
|
}
|
|
}
|
|
|
|
// Reset to preparing rebalance to restart the process
|
|
group.State = GroupStatePreparingRebalance
|
|
group.Generation++
|
|
|
|
// Mark remaining members as pending
|
|
for _, member := range group.Members {
|
|
member.State = MemberStatePending
|
|
}
|
|
}
|
|
}
|
|
|
|
group.LastActivity = time.Now()
|
|
}
|
|
|
|
// IsRebalanceStuck checks if a group has been stuck in rebalancing for too long
|
|
func (rtm *RebalanceTimeoutManager) IsRebalanceStuck(group *ConsumerGroup, maxRebalanceDuration time.Duration) bool {
|
|
if group.State != GroupStatePreparingRebalance && group.State != GroupStateCompletingRebalance {
|
|
return false
|
|
}
|
|
|
|
return time.Since(group.LastActivity) > maxRebalanceDuration
|
|
}
|
|
|
|
// ForceCompleteRebalance forces completion of a stuck rebalance
|
|
func (rtm *RebalanceTimeoutManager) ForceCompleteRebalance(group *ConsumerGroup) {
|
|
group.Mu.Lock()
|
|
defer group.Mu.Unlock()
|
|
|
|
// If stuck in preparing rebalance, move to completing
|
|
if group.State == GroupStatePreparingRebalance {
|
|
group.State = GroupStateCompletingRebalance
|
|
group.LastActivity = time.Now()
|
|
return
|
|
}
|
|
|
|
// If stuck in completing rebalance, force to stable
|
|
if group.State == GroupStateCompletingRebalance {
|
|
group.State = GroupStateStable
|
|
for _, member := range group.Members {
|
|
member.State = MemberStateStable
|
|
}
|
|
group.LastActivity = time.Now()
|
|
return
|
|
}
|
|
}
|
|
|
|
// GetRebalanceStatus returns the current rebalance status for a group
|
|
func (rtm *RebalanceTimeoutManager) GetRebalanceStatus(groupID string) *RebalanceStatus {
|
|
group := rtm.coordinator.GetGroup(groupID)
|
|
if group == nil {
|
|
return nil
|
|
}
|
|
|
|
group.Mu.RLock()
|
|
defer group.Mu.RUnlock()
|
|
|
|
status := &RebalanceStatus{
|
|
GroupID: groupID,
|
|
State: group.State,
|
|
Generation: group.Generation,
|
|
MemberCount: len(group.Members),
|
|
Leader: group.Leader,
|
|
LastActivity: group.LastActivity,
|
|
IsRebalancing: group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance,
|
|
RebalanceDuration: time.Since(group.LastActivity),
|
|
}
|
|
|
|
// Calculate member timeout status
|
|
now := time.Now()
|
|
for memberID, member := range group.Members {
|
|
memberStatus := MemberTimeoutStatus{
|
|
MemberID: memberID,
|
|
State: member.State,
|
|
LastHeartbeat: member.LastHeartbeat,
|
|
JoinedAt: member.JoinedAt,
|
|
SessionTimeout: time.Duration(member.SessionTimeout) * time.Millisecond,
|
|
RebalanceTimeout: time.Duration(member.RebalanceTimeout) * time.Millisecond,
|
|
}
|
|
|
|
// Calculate time until session timeout
|
|
sessionTimeRemaining := memberStatus.SessionTimeout - now.Sub(member.LastHeartbeat)
|
|
if sessionTimeRemaining < 0 {
|
|
sessionTimeRemaining = 0
|
|
}
|
|
memberStatus.SessionTimeRemaining = sessionTimeRemaining
|
|
|
|
// Calculate time until rebalance timeout
|
|
rebalanceTimeRemaining := memberStatus.RebalanceTimeout - now.Sub(member.JoinedAt)
|
|
if rebalanceTimeRemaining < 0 {
|
|
rebalanceTimeRemaining = 0
|
|
}
|
|
memberStatus.RebalanceTimeRemaining = rebalanceTimeRemaining
|
|
|
|
status.Members = append(status.Members, memberStatus)
|
|
}
|
|
|
|
return status
|
|
}
|
|
|
|
// RebalanceStatus represents the current status of a group's rebalance
|
|
type RebalanceStatus struct {
|
|
GroupID string `json:"group_id"`
|
|
State GroupState `json:"state"`
|
|
Generation int32 `json:"generation"`
|
|
MemberCount int `json:"member_count"`
|
|
Leader string `json:"leader"`
|
|
LastActivity time.Time `json:"last_activity"`
|
|
IsRebalancing bool `json:"is_rebalancing"`
|
|
RebalanceDuration time.Duration `json:"rebalance_duration"`
|
|
Members []MemberTimeoutStatus `json:"members"`
|
|
}
|
|
|
|
// MemberTimeoutStatus represents timeout status for a group member
|
|
type MemberTimeoutStatus struct {
|
|
MemberID string `json:"member_id"`
|
|
State MemberState `json:"state"`
|
|
LastHeartbeat time.Time `json:"last_heartbeat"`
|
|
JoinedAt time.Time `json:"joined_at"`
|
|
SessionTimeout time.Duration `json:"session_timeout"`
|
|
RebalanceTimeout time.Duration `json:"rebalance_timeout"`
|
|
SessionTimeRemaining time.Duration `json:"session_time_remaining"`
|
|
RebalanceTimeRemaining time.Duration `json:"rebalance_time_remaining"`
|
|
}
|