From 244bbe37c3dd446ccfa7ec1bb9fcb72de4b076d5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 16:12:50 -0700 Subject: [PATCH] debug: Add comprehensive Metadata response logging - METADATA IS CORRECT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CRITICAL FINDING: Metadata responses are CORRECT! Verified: ✅ handleMetadata being called ✅ Topics include _schemas (the required topic) ✅ Broker information: nodeID=1339201522, host=kafka-gateway, port=9093 ✅ Response size ~117 bytes (reasonable) ✅ Response is being generated without errors IMPLICATION: The problem is NOT in Metadata responses. Since Schema Registry client has: 1. ✅ Received Metadata successfully (_schemas topic found) 2. ❌ Never sends ListOffsets requests 3. ❌ Never sends Fetch requests 4. ❌ Never sends consumer group requests The issue must be in Schema Registry's consumer thread after it gets partition information from metadata. Likely causes: 1. partitionsFor() succeeded but something else blocks 2. Consumer is in assignPartitions() and blocking there 3. Something in seekToBeginning() is blocking 4. An exception is being thrown and caught silently Need to check Schema Registry logs more carefully for ANY error/exception or trace logs indicating where exactly it's blocking in initialization. --- weed/mq/kafka/protocol/handler.go | 35 ++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 684fdd75e..18534e446 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1613,12 +1613,14 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() + glog.Warningf("🟡 Metadata v3/v4: No specific topics requested, returning ALL topics: %v", topicsToReturn) } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } + glog.Warningf("🟡 Metadata v3/v4: Requested topics: %v, matched topics: %v", requestedTopics, topicsToReturn) } var buf bytes.Buffer @@ -1639,6 +1641,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( nodeID := h.GetNodeID() // Get consistent node ID for this gateway // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) + glog.Warningf("🟡 Metadata v3/v4: Returning broker - nodeID=%d, host=%s, port=%d", nodeID, host, port) binary.Write(&buf, binary.BigEndian, nodeID) // Host (STRING: 2 bytes length + data) - validate length fits in int16 @@ -1777,6 +1780,8 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // NOTE: Correlation ID is handled by writeResponseWithCorrelationID // Do NOT include it in the response body + glog.Warningf("🟡 Metadata v%d: About to build response with topics=%v", apiVersion, topicsToReturn) + // ThrottleTimeMs (4 bytes) - v3+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling @@ -1789,6 +1794,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, nodeID := h.GetNodeID() // Get consistent node ID for this gateway // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) + glog.Warningf("🟡 Metadata v%d: Returning broker - nodeID=%d, host=%s, port=%d", apiVersion, nodeID, host, port) binary.Write(&buf, binary.BigEndian, nodeID) // Host (STRING: 2 bytes length + data) - validate length fits in int16 @@ -2877,26 +2883,39 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, // handleMetadata routes to the appropriate version-specific handler func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + glog.Warningf("🟡 handleMetadata ENTRY: apiVersion=%d, correlationID=%d, requestBodyLen=%d", apiVersion, correlationID, len(requestBody)) + + var response []byte + var err error + switch apiVersion { case 0: - return h.HandleMetadataV0(correlationID, requestBody) + response, err = h.HandleMetadataV0(correlationID, requestBody) case 1: - return h.HandleMetadataV1(correlationID, requestBody) + response, err = h.HandleMetadataV1(correlationID, requestBody) case 2: - return h.HandleMetadataV2(correlationID, requestBody) + response, err = h.HandleMetadataV2(correlationID, requestBody) case 3, 4: - return h.HandleMetadataV3V4(correlationID, requestBody) + response, err = h.HandleMetadataV3V4(correlationID, requestBody) case 5, 6: - return h.HandleMetadataV5V6(correlationID, requestBody) + response, err = h.HandleMetadataV5V6(correlationID, requestBody) case 7: - return h.HandleMetadataV7(correlationID, requestBody) + response, err = h.HandleMetadataV7(correlationID, requestBody) default: // For versions > 7, use the V7 handler (flexible format) if apiVersion > 7 { - return h.HandleMetadataV7(correlationID, requestBody) + response, err = h.HandleMetadataV7(correlationID, requestBody) + } else { + err = fmt.Errorf("metadata version %d not implemented yet", apiVersion) } - return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) } + + if err != nil { + glog.Warningf("🟡 handleMetadata EXIT ERROR: apiVersion=%d, correlationID=%d, err=%v", apiVersion, correlationID, err) + } else { + glog.Warningf("🟡 handleMetadata EXIT OK: apiVersion=%d, correlationID=%d, responseLen=%d", apiVersion, correlationID, len(response)) + } + return response, err } // getAPIName returns a human-readable name for Kafka API keys (for debugging)