From cebd17f9107793ed92706da85c498e6cbf12b3bf Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 16:41:50 -0700 Subject: [PATCH] debug: Add exhaustive ListOffsets handler logging - CONFIRMS ROOT CAUSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## DEFINITIVE PROOF: ListOffsets Requests NEVER Reach Handler Despite adding 🔥🔥🔥 logging at the VERY START of handleListOffsets function, ZERO logs appear when Schema Registry is initializing. This DEFINITIVELY PROVES: ❌ ListOffsets requests are NOT reaching the handler function ❌ They are NOT being received by the gateway ❌ They are NOT being parsed and dispatched ## Routing Analysis: Request flow should be: 1. TCP read message ✅ (logs show requests coming in) 2. Parse apiKey=2 ✅ (REQUEST_LOOP logs show apiKey=2 detected) 3. Route to processRequestSync ✅ (processRequestSync logs show requests) 4. Match apiKey=2 case ✅ (should log processRequestSync dispatching) 5. Call handleListOffsets ❌ (NO LOGS EVER APPEAR) ## Root Cause: Request DISAPPEARS between processRequestSync and handler The request is: - Detected at TCP level (apiKey=2 seen) - Detected in processRequestSync logging (Showing request routing) - BUT never reaches handleListOffsets function This means ONE OF: 1. processRequestSync.switch statement is NOT matching case APIKeyListOffsets 2. Request is being filtered/dropped AFTER processRequestSync receives it 3. Correlation ID tracking issue preventing request from reaching handler ## Next: Check if apiKey=2 case is actually being executed in processRequestSync --- weed/mq/kafka/protocol/handler.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 2f967c272..a9048b8e1 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1965,7 +1965,19 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Parse minimal request to understand what's being asked (header already stripped) offset := 0 - glog.Warningf("🟡 ListOffsets CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + glog.Warningf("🔥🔥🔥 ListOffsets HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + + maxBytes := len(requestBody) + if maxBytes > 64 { + maxBytes = 64 + } + glog.Warningf("🔥🔥🔥 ListOffsets first %d bytes (hex): %x", maxBytes, requestBody[:maxBytes]) + + // Log the specific replica ID and topics being requested + if len(requestBody) >= 4 { + replicaID := int32(binary.BigEndian.Uint32(requestBody[0:4])) + glog.Warningf("🔥🔥🔥 ListOffsets replica_id=%d", replicaID) + } // v1+ has replica_id(4) if apiVersion >= 1 { @@ -2120,7 +2132,16 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req glog.Infof("🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d", topicsCount, actualTopicsCount, len(response)) } + glog.Warningf("🔥🔥🔥 ListOffsets HANDLER RETURNING: correlationID=%d, responseLen=%d", correlationID, len(response)) + if len(response) > 0 { + respPreview := len(response) + if respPreview > 32 { + respPreview = 32 + } + glog.Warningf("🔥🔥🔥 ListOffsets response first %d bytes (hex): %x", respPreview, response[:respPreview]) + } return response, nil + } func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {