From 7e2c1fd9acc7d1506b3d2f1ad99272d2a63e47b4 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 10 Sep 2025 19:06:05 -0700 Subject: [PATCH] mq(kafka): Investigate SyncGroup workflow - kafka-go not calling SyncGroup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🔍 CRITICAL FINDINGS - Consumer Group Protocol Analysis ✅ CONFIRMED WORKING: - FindCoordinator API (key 10) ✅ - JoinGroup API (key 11) ✅ - Deterministic member ID generation ✅ - No more JoinGroup retries ✅ ❌ CONFIRMED NOT WORKING: - SyncGroup API (key 14) - NEVER called by kafka-go ❌ - Fetch API (key 1) - NEVER called by kafka-go ❌ 🔍 OBSERVED BEHAVIOR: - kafka-go calls: FindCoordinator → JoinGroup → (stops) - kafka-go makes repeated Metadata requests - No progression to SyncGroup or Fetch - Test fails with 'context deadline exceeded' 🎯 HYPOTHESIS: kafka-go may be: 1. Using simplified consumer protocol (no SyncGroup) 2. Expecting specific JoinGroup response format 3. Waiting for specific error codes/state transitions 4. Using different rebalancing strategy 📊 EVIDENCE: - JoinGroup response: 215 bytes, includes member metadata - Group state: Empty → PreparingRebalance → CompletingRebalance - Member ID: consistent across calls (4b60f587) - Protocol: 'range' selection working NEXT: Research kafka-go consumer group implementation to understand why SyncGroup is bypassed. --- weed/mq/kafka/protocol/handler.go | 1 + weed/mq/kafka/protocol/joingroup.go | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index c69a46efe..f70a206b2 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -235,6 +235,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { } case 14: // SyncGroup fmt.Printf("DEBUG: *** SYNCGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) + fmt.Printf("DEBUG: *** THIS IS CRITICAL - SYNCGROUP WAS CALLED! ***\n") response, err = h.handleSyncGroup(correlationID, apiVersion, messageBuf[8:]) // skip header if err != nil { fmt.Printf("DEBUG: SyncGroup error: %v\n", err) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 770f85825..25e57c4a6 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -101,13 +101,13 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque // Handle member ID logic var memberID string var isNewMember bool - + // Use deterministic client identifier based on group + session timeout + protocol clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) if request.MemberID == "" { // New member - check if we already have a member for this client - + // Look for existing member with same client characteristics var existingMemberID string for existingID, member := range group.Members { @@ -116,7 +116,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque break } } - + if existingMemberID != "" { // Reuse existing member ID for this client memberID = existingMemberID @@ -149,8 +149,13 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque group.Generation++ fmt.Printf("DEBUG: JoinGroup transitioned to PreparingRebalance, new generation: %d\n", group.Generation) } - case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: - // Rebalance already in progress + case consumer.GroupStatePreparingRebalance: + // Rebalance in progress - if this is the leader and we have members, transition to CompletingRebalance + if len(group.Members) > 0 && memberID == group.Leader { + group.State = consumer.GroupStateCompletingRebalance + fmt.Printf("DEBUG: JoinGroup leader '%s' transitioning group to CompletingRebalance (ready for SyncGroup)\n", memberID) + } + case consumer.GroupStateCompletingRebalance: // Allow join but don't change generation until SyncGroup case consumer.GroupStateDead: return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil