From 410259060fdff626b25956d89b1224b23ff221f7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 16:31:43 -0700 Subject: [PATCH] debug: Add Metadata response hex logging and enable SR debug logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Key Findings from Enhanced Logging ### Gateway Metadata Response (HEX): 00000000000000014fd297f2000d6b61666b612d6761746577617900002385000000177365617765656466732d6b61666b612d676174657761794fd297f200000001000000085f736368656d617300000000010000000000000000000100000000000000 ### Schema Registry Consumer Log Trace: ✅ [Consumer...] Assigned to partition(s): _schemas-0 ✅ [Consumer...] Seeking to beginning for all partitions ✅ [Consumer...] Seeking to AutoOffsetResetStrategy{type=earliest} offset of partition _schemas-0 ❌ NO FURTHER LOGS - STUCK IN SEEK ### Analysis: 1. Consumer successfully assigned partition 2. Consumer initiated seekToBeginning() 3. Consumer is waiting for ListOffsets response 4. 🔴 BLOCKED - timeout after 60 seconds ### Metadata Response Details: - Format: Metadata v7 (flexible) - Size: 117 bytes - Includes: 1 broker (nodeID=0x4fd297f2='O...'), _schemas topic, 1 partition - Response appears structurally correct ### Next Steps: 1. Decode full Metadata hex to verify all fields 2. Compare with real Kafka broker response 3. Check if missing critical fields blocking consumer state machine 4. Verify ListOffsets handler can receive requests --- .../kafka-client-loadtest/docker-compose.yml | 2 ++ weed/mq/kafka/protocol/handler.go | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/test/kafka/kafka-client-loadtest/docker-compose.yml b/test/kafka/kafka-client-loadtest/docker-compose.yml index 00b3264eb..fa88117e9 100644 --- a/test/kafka/kafka-client-loadtest/docker-compose.yml +++ b/test/kafka/kafka-client-loadtest/docker-compose.yml @@ -62,6 +62,8 @@ services: SCHEMA_REGISTRY_KAFKASTORE_WRITE_TIMEOUT_MS: "60000" SCHEMA_REGISTRY_KAFKASTORE_INIT_RETRY_BACKOFF_MS: "5000" SCHEMA_REGISTRY_KAFKASTORE_CONSUMER_AUTO_OFFSET_RESET: "earliest" + # Enable debug logging for Kafka clients to trace consumer seek/fetch operations + SCHEMA_REGISTRY_LOG4J_LOGGERS: "org.apache.kafka.clients=DEBUG,org.apache.kafka.common=DEBUG,io.confluent.kafka.schemaregistry.storage=DEBUG" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"] interval: 15s diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 509ed1330..2f967c272 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1716,6 +1716,16 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( response := buf.Bytes() + // Detailed logging for Metadata response + maxDisplay := len(response) + if maxDisplay > 50 { + maxDisplay = 50 + } + glog.Warningf("🟡 Metadata v3/v4 FINAL RESPONSE: size=%d bytes, first50bytes=%v", len(response), response[:maxDisplay]) + if len(response) > 100 { + glog.Warningf("🟡 Metadata v3/v4 RESPONSE HEX (first 100 bytes): %x", response[:100]) + } + return response, nil } @@ -1882,6 +1892,16 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, response := buf.Bytes() + // Detailed logging for Metadata response + maxDisplay := len(response) + if maxDisplay > 50 { + maxDisplay = 50 + } + glog.Warningf("🟡 Metadata v%d FINAL RESPONSE: size=%d bytes, first50bytes=%v", apiVersion, len(response), response[:maxDisplay]) + if len(response) > 100 { + glog.Warningf("🟡 Metadata v%d RESPONSE HEX (first 100 bytes): %x", apiVersion, response[:100]) + } + return response, nil }