|
|
@ -863,12 +863,16 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque |
|
|
|
} |
|
|
|
|
|
|
|
// Check if this is the group leader with assignments
|
|
|
|
glog.Infof("[SYNCGROUP] Member=%s Leader=%s GroupState=%s HasAssignments=%v MemberCount=%d Gen=%d", |
|
|
|
request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID) |
|
|
|
|
|
|
|
if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { |
|
|
|
// Leader is providing assignments - process and store them
|
|
|
|
glog.V(2).Infof("SyncGroup: Leader %s providing client-side assignments for group %s (%d members)", |
|
|
|
glog.Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)", |
|
|
|
request.MemberID, request.GroupID, len(request.GroupAssignments)) |
|
|
|
err = h.processGroupAssignments(group, request.GroupAssignments) |
|
|
|
if err != nil { |
|
|
|
glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err) |
|
|
|
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil |
|
|
|
} |
|
|
|
|
|
|
@ -879,14 +883,18 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque |
|
|
|
for _, m := range group.Members { |
|
|
|
m.State = consumer.MemberStateStable |
|
|
|
} |
|
|
|
} else if group.State == consumer.GroupStateCompletingRebalance { |
|
|
|
// Non-leader member waiting for assignments
|
|
|
|
// Assignments should already be processed by leader
|
|
|
|
glog.V(2).Infof("SyncGroup: Non-leader %s waiting for assignments in group %s", |
|
|
|
request.MemberID, request.GroupID) |
|
|
|
glog.Infof("[SYNCGROUP] Leader assignments processed successfully, group now STABLE") |
|
|
|
} else if group.State == consumer.GroupStateCompletingRebalance || group.State == consumer.GroupStatePreparingRebalance { |
|
|
|
// Non-leader member waiting for leader to provide assignments
|
|
|
|
// CRITICAL FIX: Non-leader members must wait for leader to process client-side assignments
|
|
|
|
// Do NOT use server-side assignment if group is still rebalancing
|
|
|
|
glog.Infof("[SYNCGROUP] Non-leader %s waiting for leader assignments in group %s (state=%s)", |
|
|
|
request.MemberID, request.GroupID, group.State) |
|
|
|
} else { |
|
|
|
// Trigger partition assignment using built-in strategy
|
|
|
|
glog.V(2).Infof("SyncGroup: Using server-side assignment for group %s", request.GroupID) |
|
|
|
// Trigger partition assignment using built-in strategy (server-side assignment)
|
|
|
|
// This should only happen for server-side assignment protocols (not Sarama's client-side)
|
|
|
|
glog.Warningf("[SYNCGROUP] Using server-side assignment for group %s (Leader=%s State=%s) - this should not happen with Sarama!", |
|
|
|
request.GroupID, group.Leader, group.State) |
|
|
|
topicPartitions := h.getTopicPartitions(group) |
|
|
|
group.AssignPartitions(topicPartitions) |
|
|
|
|
|
|
@ -908,7 +916,7 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque |
|
|
|
} |
|
|
|
|
|
|
|
// Log member assignment details
|
|
|
|
glog.V(2).Infof("SyncGroup: Member %s in group %s assigned %d partitions: %v", |
|
|
|
glog.Infof("[SYNCGROUP] Member %s in group %s assigned %d partitions: %v", |
|
|
|
request.MemberID, request.GroupID, len(member.Assignment), member.Assignment) |
|
|
|
|
|
|
|
// Build response
|
|
|
@ -1218,6 +1226,8 @@ func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode in |
|
|
|
|
|
|
|
func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error { |
|
|
|
// Apply leader-provided assignments
|
|
|
|
glog.Infof("[PROCESS_ASSIGNMENTS] Processing %d member assignments from leader", len(assignments)) |
|
|
|
|
|
|
|
// Clear current assignments
|
|
|
|
for _, m := range group.Members { |
|
|
|
m.Assignment = nil |
|
|
@ -1227,14 +1237,17 @@ func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignm |
|
|
|
m, ok := group.Members[ga.MemberID] |
|
|
|
if !ok { |
|
|
|
// Skip unknown members
|
|
|
|
glog.Warningf("[PROCESS_ASSIGNMENTS] Skipping unknown member: %s", ga.MemberID) |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
parsed, err := h.parseMemberAssignment(ga.Assignment) |
|
|
|
if err != nil { |
|
|
|
glog.Errorf("[PROCESS_ASSIGNMENTS] Failed to parse assignment for member %s: %v", ga.MemberID, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
m.Assignment = parsed |
|
|
|
glog.Infof("[PROCESS_ASSIGNMENTS] Member %s assigned %d partitions: %v", ga.MemberID, len(parsed), parsed) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|