From 84842eb6e9c037e15d10ed644addfeadd41227bd Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 16:21:09 -0700 Subject: [PATCH] debug: Add raw request logging - CONSUMER STUCK IN SEEK LOOP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKTHROUGH: Found the exact point where consumer hangs! ## Request Statistics 2049 × Metadata (apiKey=3) - Repeatedly sent 22 × ApiVersions (apiKey=18) 6 × DescribeCluster (apiKey=60) 0 × ListOffsets (apiKey=2) - NEVER SENT 0 × Fetch (apiKey=1) - NEVER SENT 0 × Produce (apiKey=0) - NEVER SENT ## Consumer Initialization Sequence ✅ Consumer created successfully ✅ partitionsFor() succeeds - finds _schemas topic with 1 partition ✅ assign() called - assigns partition to consumer ❌ seekToBeginning() BLOCKS HERE - never sends ListOffsets ❌ Never reaches poll() loop ## Why Metadata is Requested 2049 Times Consumer stuck in retry loop: 1. Get metadata → works 2. Assign partition → works 3. Try to seek → blocks indefinitely 4. Timeout on seek 5. Retry metadata to find alternate broker 6. Loop back to step 1 ## The Real Issue Java KafkaConsumer is stuck at seekToBeginning() but NOT sending ListOffsets requests. This indicates a BROKER CONNECTIVITY ISSUE during offset seeking phase. Root causes to investigate: 1. Metadata response missing critical fields (cluster ID, controller ID) 2. Broker address unreachable for seeks 3. Consumer group coordination incomplete 4. Network connectivity issue specific to seek operations The 2049 metadata requests prove consumer can communicate with gateway, but something in the broker assignment prevents seeking. --- weed/mq/kafka/protocol/handler.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 18534e446..509ed1330 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -908,8 +908,13 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - - glog.Warningf("🔴 REQUEST LOOP: Parsed apiKey=%d, apiVersion=%d, correlationID=%d from %s", apiKey, apiVersion, correlationID, connectionID) + + // LOG EVERY SINGLE REQUEST REGARDLESS OF apiKey + maxLen := len(messageBuf) + if maxLen > 16 { + maxLen = 16 + } + glog.Warningf("🔴 RAW REQUEST: correlationID=%d, firstTwoBytes=[%02x %02x] = apiKey=%d, first16bytes=%v", correlationID, messageBuf[0], messageBuf[1], apiKey, messageBuf[:maxLen]) // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { @@ -2884,10 +2889,10 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, // handleMetadata routes to the appropriate version-specific handler func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { glog.Warningf("🟡 handleMetadata ENTRY: apiVersion=%d, correlationID=%d, requestBodyLen=%d", apiVersion, correlationID, len(requestBody)) - + var response []byte var err error - + switch apiVersion { case 0: response, err = h.HandleMetadataV0(correlationID, requestBody) @@ -2909,7 +2914,7 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques err = fmt.Errorf("metadata version %d not implemented yet", apiVersion) } } - + if err != nil { glog.Warningf("🟡 handleMetadata EXIT ERROR: apiVersion=%d, correlationID=%d, err=%v", apiVersion, correlationID, err) } else {