From d34feeb7e9b2216ec3b7841149875cd946cd300b Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 18:43:26 -0700 Subject: [PATCH] check for GroupStatePreparingRebalance --- weed/mq/kafka/protocol/joingroup.go | 31 ++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 69ef6b4a4..192865221 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -863,12 +863,16 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque } // Check if this is the group leader with assignments + glog.Infof("[SYNCGROUP] Member=%s Leader=%s GroupState=%s HasAssignments=%v MemberCount=%d Gen=%d", + request.MemberID, group.Leader, group.State, len(request.GroupAssignments) > 0, len(group.Members), request.GenerationID) + if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { // Leader is providing assignments - process and store them - glog.V(2).Infof("SyncGroup: Leader %s providing client-side assignments for group %s (%d members)", + glog.Infof("[SYNCGROUP] Leader %s providing client-side assignments for group %s (%d assignments)", request.MemberID, request.GroupID, len(request.GroupAssignments)) err = h.processGroupAssignments(group, request.GroupAssignments) if err != nil { + glog.Errorf("[SYNCGROUP] ERROR processing leader assignments: %v", err) return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil } @@ -879,14 +883,18 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque for _, m := range group.Members { m.State = consumer.MemberStateStable } - } else if group.State == consumer.GroupStateCompletingRebalance { - // Non-leader member waiting for assignments - // Assignments should already be processed by leader - glog.V(2).Infof("SyncGroup: Non-leader %s waiting for assignments in group %s", - request.MemberID, request.GroupID) + glog.Infof("[SYNCGROUP] Leader assignments processed successfully, group now STABLE") + } else if group.State == consumer.GroupStateCompletingRebalance || group.State == consumer.GroupStatePreparingRebalance { + // Non-leader member waiting for leader to provide assignments + // CRITICAL FIX: Non-leader members must wait for leader to process client-side assignments + // Do NOT use server-side assignment if group is still rebalancing + glog.Infof("[SYNCGROUP] Non-leader %s waiting for leader assignments in group %s (state=%s)", + request.MemberID, request.GroupID, group.State) } else { - // Trigger partition assignment using built-in strategy - glog.V(2).Infof("SyncGroup: Using server-side assignment for group %s", request.GroupID) + // Trigger partition assignment using built-in strategy (server-side assignment) + // This should only happen for server-side assignment protocols (not Sarama's client-side) + glog.Warningf("[SYNCGROUP] Using server-side assignment for group %s (Leader=%s State=%s) - this should not happen with Sarama!", + request.GroupID, group.Leader, group.State) topicPartitions := h.getTopicPartitions(group) group.AssignPartitions(topicPartitions) @@ -908,7 +916,7 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque } // Log member assignment details - glog.V(2).Infof("SyncGroup: Member %s in group %s assigned %d partitions: %v", + glog.Infof("[SYNCGROUP] Member %s in group %s assigned %d partitions: %v", request.MemberID, request.GroupID, len(member.Assignment), member.Assignment) // Build response @@ -1218,6 +1226,8 @@ func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode in func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error { // Apply leader-provided assignments + glog.Infof("[PROCESS_ASSIGNMENTS] Processing %d member assignments from leader", len(assignments)) + // Clear current assignments for _, m := range group.Members { m.Assignment = nil @@ -1227,14 +1237,17 @@ func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignm m, ok := group.Members[ga.MemberID] if !ok { // Skip unknown members + glog.Warningf("[PROCESS_ASSIGNMENTS] Skipping unknown member: %s", ga.MemberID) continue } parsed, err := h.parseMemberAssignment(ga.Assignment) if err != nil { + glog.Errorf("[PROCESS_ASSIGNMENTS] Failed to parse assignment for member %s: %v", ga.MemberID, err) return err } m.Assignment = parsed + glog.Infof("[PROCESS_ASSIGNMENTS] Member %s assigned %d partitions: %v", ga.MemberID, len(parsed), parsed) } return nil