Browse Source

debug: Add comprehensive Metadata response logging - METADATA IS CORRECT

CRITICAL FINDING: Metadata responses are CORRECT!

Verified:
 handleMetadata being called
 Topics include _schemas (the required topic)
 Broker information: nodeID=1339201522, host=kafka-gateway, port=9093
 Response size ~117 bytes (reasonable)
 Response is being generated without errors

IMPLICATION: The problem is NOT in Metadata responses.

Since Schema Registry client has:
1.  Received Metadata successfully (_schemas topic found)
2.  Never sends ListOffsets requests
3.  Never sends Fetch requests
4.  Never sends consumer group requests

The issue must be in Schema Registry's consumer thread after it gets
partition information from metadata. Likely causes:
1. partitionsFor() succeeded but something else blocks
2. Consumer is in assignPartitions() and blocking there
3. Something in seekToBeginning() is blocking
4. An exception is being thrown and caught silently

Need to check Schema Registry logs more carefully for ANY error/exception
or trace logs indicating where exactly it's blocking in initialization.
pull/7329/head
chrislu 5 days ago
parent
commit
244bbe37c3
  1. 35
      weed/mq/kafka/protocol/handler.go

35
weed/mq/kafka/protocol/handler.go

@ -1613,12 +1613,14 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = h.seaweedMQHandler.ListTopics()
glog.Warningf("🟡 Metadata v3/v4: No specific topics requested, returning ALL topics: %v", topicsToReturn)
} else {
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
}
}
glog.Warningf("🟡 Metadata v3/v4: Requested topics: %v, matched topics: %v", requestedTopics, topicsToReturn)
}
var buf bytes.Buffer
@ -1639,6 +1641,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
glog.Warningf("🟡 Metadata v3/v4: Returning broker - nodeID=%d, host=%s, port=%d", nodeID, host, port)
binary.Write(&buf, binary.BigEndian, nodeID)
// Host (STRING: 2 bytes length + data) - validate length fits in int16
@ -1777,6 +1780,8 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
// Do NOT include it in the response body
glog.Warningf("🟡 Metadata v%d: About to build response with topics=%v", apiVersion, topicsToReturn)
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
@ -1789,6 +1794,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
glog.Warningf("🟡 Metadata v%d: Returning broker - nodeID=%d, host=%s, port=%d", apiVersion, nodeID, host, port)
binary.Write(&buf, binary.BigEndian, nodeID)
// Host (STRING: 2 bytes length + data) - validate length fits in int16
@ -2877,26 +2883,39 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey,
// handleMetadata routes to the appropriate version-specific handler
func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
glog.Warningf("🟡 handleMetadata ENTRY: apiVersion=%d, correlationID=%d, requestBodyLen=%d", apiVersion, correlationID, len(requestBody))
var response []byte
var err error
switch apiVersion {
case 0:
return h.HandleMetadataV0(correlationID, requestBody)
response, err = h.HandleMetadataV0(correlationID, requestBody)
case 1:
return h.HandleMetadataV1(correlationID, requestBody)
response, err = h.HandleMetadataV1(correlationID, requestBody)
case 2:
return h.HandleMetadataV2(correlationID, requestBody)
response, err = h.HandleMetadataV2(correlationID, requestBody)
case 3, 4:
return h.HandleMetadataV3V4(correlationID, requestBody)
response, err = h.HandleMetadataV3V4(correlationID, requestBody)
case 5, 6:
return h.HandleMetadataV5V6(correlationID, requestBody)
response, err = h.HandleMetadataV5V6(correlationID, requestBody)
case 7:
return h.HandleMetadataV7(correlationID, requestBody)
response, err = h.HandleMetadataV7(correlationID, requestBody)
default:
// For versions > 7, use the V7 handler (flexible format)
if apiVersion > 7 {
return h.HandleMetadataV7(correlationID, requestBody)
response, err = h.HandleMetadataV7(correlationID, requestBody)
} else {
err = fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}
return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}
if err != nil {
glog.Warningf("🟡 handleMetadata EXIT ERROR: apiVersion=%d, correlationID=%d, err=%v", apiVersion, correlationID, err)
} else {
glog.Warningf("🟡 handleMetadata EXIT OK: apiVersion=%d, correlationID=%d, responseLen=%d", apiVersion, correlationID, len(response))
}
return response, err
}
// getAPIName returns a human-readable name for Kafka API keys (for debugging)

Loading…
Cancel
Save