From 0529a2af28ad1a7d894463ce22301b47ef2849bd Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 15:34:49 -0700 Subject: [PATCH] debug: Add consumer coordination logging to pinpoint consumer init issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added logging for consumer group coordination API keys (9,11,12,14) to identify where consumer gets stuck during initialization. KEY FINDING: Consumer is NOT stuck in group coordination! Instead, consumer is stuck in seek/metadata discovery phase. Evidence from test logs: - Metadata (apiKey=3): 2,137 requests ✅ - ApiVersions (apiKey=18): 22 requests ✅ - ListOffsets (apiKey=2): 6 requests ✅ (but not completing!) - JoinGroup (apiKey=11): 0 requests ❌ - SyncGroup (apiKey=14): 0 requests ❌ - Fetch (apiKey=1): 0 requests ❌ Consumer is stuck trying to execute seekToBeginning(): 1. Consumer.assign() succeeds 2. Consumer.seekToBeginning() called 3. Consumer sends ListOffsets request (succeeds) 4. Stuck waiting for metadata or broker connection 5. Consumer.poll() never called 6. Initialization never completes Root cause likely in: - ListOffsets (apiKey=2) response format or content - Metadata response broker assignment - Partition leader discovery This is separate from the context timeout bug (Bug #1). Both must be fixed for Schema Registry to work. --- .gitignore | 1 + weed/mq/kafka/protocol/handler.go | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/.gitignore b/.gitignore index 044120bcd..cd240ab6d 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,4 @@ ADVANCED_IAM_DEVELOPMENT_PLAN.md /test/s3/iam/test-volume-data *.log weed-iam +test/kafka/kafka-client-loadtest/weed-linux-arm64 diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 89a46718c..76727628a 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -934,6 +934,18 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) + // Log consumer group coordination requests + switch apiKey { + case 9: + glog.Warningf("🟡 OffsetFetch: correlationID=%d from %s", correlationID, connectionID) + case 11: + glog.Warningf("🟡 JoinGroup: correlationID=%d from %s", correlationID, connectionID) + case 12: + glog.Warningf("🟡 Heartbeat: correlationID=%d from %s", correlationID, connectionID) + case 14: + glog.Warningf("🟡 SyncGroup: correlationID=%d from %s", correlationID, connectionID) + } + // Extract request body - special handling for ApiVersions requests var requestBody []byte if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 {