From bfe15f970b1a112cc9f2b85b27d698d6ed88ed8e Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 10:10:38 -0700 Subject: [PATCH] Fix kafka-go compatibility: - ApiVersions v0 response: remove unsupported throttle_time field - Metadata v1: include correlation ID (kafka-go transport expects it after size) - Metadata v1: ensure broker/partition IDs consistent and format correct Validated: - TestMetadataV6Debug passes (kafka-go ReadPartitions works) - Sarama simple producer unaffected Root cause: correlation ID handling differences and extra footer in ApiVersions. --- weed/mq/kafka/protocol/handler.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 36dc37b0e..6a13d2968 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -308,7 +308,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // Build ApiVersions response manually - // Response format: correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys + throttle_time(4) + // Response format (v0): correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys response := make([]byte, 0, 64) @@ -396,9 +396,6 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 - // Throttle time (4 bytes, 0 = no throttling) - response = append(response, 0, 0, 0, 0) - return response, nil } @@ -531,8 +528,10 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // Build response using same approach as v0 but with v1 additions response := make([]byte, 0, 256) - // NOTE: Correlation ID is handled by transport layer in kafka-go - // Do NOT include it in the response struct + // Correlation ID (4 bytes) + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1)