diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index c69a46efe..f70a206b2 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -235,6 +235,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { } case 14: // SyncGroup fmt.Printf("DEBUG: *** SYNCGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) + fmt.Printf("DEBUG: *** THIS IS CRITICAL - SYNCGROUP WAS CALLED! ***\n") response, err = h.handleSyncGroup(correlationID, apiVersion, messageBuf[8:]) // skip header if err != nil { fmt.Printf("DEBUG: SyncGroup error: %v\n", err) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 770f85825..25e57c4a6 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -101,13 +101,13 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque // Handle member ID logic var memberID string var isNewMember bool - + // Use deterministic client identifier based on group + session timeout + protocol clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) if request.MemberID == "" { // New member - check if we already have a member for this client - + // Look for existing member with same client characteristics var existingMemberID string for existingID, member := range group.Members { @@ -116,7 +116,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque break } } - + if existingMemberID != "" { // Reuse existing member ID for this client memberID = existingMemberID @@ -149,8 +149,13 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque group.Generation++ fmt.Printf("DEBUG: JoinGroup transitioned to PreparingRebalance, new generation: %d\n", group.Generation) } - case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: - // Rebalance already in progress + case consumer.GroupStatePreparingRebalance: + // Rebalance in progress - if this is the leader and we have members, transition to CompletingRebalance + if len(group.Members) > 0 && memberID == group.Leader { + group.State = consumer.GroupStateCompletingRebalance + fmt.Printf("DEBUG: JoinGroup leader '%s' transitioning group to CompletingRebalance (ready for SyncGroup)\n", memberID) + } + case consumer.GroupStateCompletingRebalance: // Allow join but don't change generation until SyncGroup case consumer.GroupStateDead: return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil