From a4da25db6d8f8c96894ffcc32a1dac6427d9766e Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 10 Sep 2025 14:56:58 -0700 Subject: [PATCH] mq(kafka): CRITICAL Discovery - Issue not v7-specific, persists in Metadata v1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MAJOR BREAKTHROUGH: ❌ Same 'Unknown Topic Or Partition' error occurs with Metadata v1 ✅ This proves issue is NOT related to v7-specific fields ✅ kafka-go correctly negotiates down from v7 → v1 EVIDENCE: - Response size: 120 bytes (v7) → 95 bytes (v1) ✅ - Version negotiation: API 3 v1 requested ✅ - Same error pattern: kafka-go validates → rejects → retries ❌ HYPOTHESIS IDENTIFIED: 🎯 Port/Address Mismatch Issue: - kafka-go connects to gateway on random port (:60364) - Metadata response advertises broker at localhost:9092 - kafka-go may be trying to validate broker reachability CURRENT STATUS: The issue is fundamental to our Metadata response format, not version-specific. kafka-go likely validates that advertised brokers are reachable before proceeding to Produce operations. NEXT: Fix broker address in Metadata to match actual gateway listening port. --- weed/mq/kafka/protocol/handler.go | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index a6dd6f022..5f2e5b813 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -270,9 +270,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) + // TEMP: Limit to v1 to test if issue is v7-specific response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 7) // max version 7 + response = append(response, 0, 1) // max version 1 (instead of 7) // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -360,7 +361,7 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // Use "localhost" for simplicity - kafka-go should be able to connect back // The port issue is more likely the problem than the host host := "localhost" - + response = append(response, 0, byte(len(host))) response = append(response, []byte(host)...) @@ -377,9 +378,8 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // Controller ID (4 bytes) - -1 (no controller) response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) - // Cluster authorized operations (4 bytes) - For Metadata v7+ - // -1 = not supported/null - response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) + // TEMP: Removed v7+ fields to test with Metadata v1 + // Cluster authorized operations removed for v1 compatibility // Parse topics from request (for metadata discovery) requestedTopics := h.parseMetadataTopics(requestBody) @@ -423,12 +423,9 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by binary.BigEndian.PutUint16(topicNameLen, uint16(len(topicNameBytes))) response = append(response, topicNameLen...) response = append(response, topicNameBytes...) - - // Topic UUID (16 bytes) - For Metadata v7+, using null UUID (all zeros) - response = append(response, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - - // Is internal topic (1 byte) - false - response = append(response, 0) + + // TEMP: Removed v7+ fields for v1 compatibility + // Topic UUID and is_internal_topic removed // Partitions array length (4 bytes) - 1 partition response = append(response, 0, 0, 0, 1) @@ -440,10 +437,8 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by response = append(response, 0, 0, 0, 0) // leader_id = 0 (this broker) response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // replicas = [0] response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // isr = [0] - - // Topic authorized operations (4 bytes) - For Metadata v7+ - // -1 = not supported/null - response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) + + // TEMP: Removed v7+ topic authorized operations for v1 compatibility } fmt.Printf("DEBUG: Metadata response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)