diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index a6dd6f022..5f2e5b813 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -270,9 +270,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) + // TEMP: Limit to v1 to test if issue is v7-specific response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 7) // max version 7 + response = append(response, 0, 1) // max version 1 (instead of 7) // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -360,7 +361,7 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // Use "localhost" for simplicity - kafka-go should be able to connect back // The port issue is more likely the problem than the host host := "localhost" - + response = append(response, 0, byte(len(host))) response = append(response, []byte(host)...) @@ -377,9 +378,8 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // Controller ID (4 bytes) - -1 (no controller) response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) - // Cluster authorized operations (4 bytes) - For Metadata v7+ - // -1 = not supported/null - response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) + // TEMP: Removed v7+ fields to test with Metadata v1 + // Cluster authorized operations removed for v1 compatibility // Parse topics from request (for metadata discovery) requestedTopics := h.parseMetadataTopics(requestBody) @@ -423,12 +423,9 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by binary.BigEndian.PutUint16(topicNameLen, uint16(len(topicNameBytes))) response = append(response, topicNameLen...) response = append(response, topicNameBytes...) - - // Topic UUID (16 bytes) - For Metadata v7+, using null UUID (all zeros) - response = append(response, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - - // Is internal topic (1 byte) - false - response = append(response, 0) + + // TEMP: Removed v7+ fields for v1 compatibility + // Topic UUID and is_internal_topic removed // Partitions array length (4 bytes) - 1 partition response = append(response, 0, 0, 0, 1) @@ -440,10 +437,8 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by response = append(response, 0, 0, 0, 0) // leader_id = 0 (this broker) response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // replicas = [0] response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // isr = [0] - - // Topic authorized operations (4 bytes) - For Metadata v7+ - // -1 = not supported/null - response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) + + // TEMP: Removed v7+ topic authorized operations for v1 compatibility } fmt.Printf("DEBUG: Metadata response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)