diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 36dc37b0e..6a13d2968 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -308,7 +308,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // Build ApiVersions response manually - // Response format: correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys + throttle_time(4) + // Response format (v0): correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys response := make([]byte, 0, 64) @@ -396,9 +396,6 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 - // Throttle time (4 bytes, 0 = no throttling) - response = append(response, 0, 0, 0, 0) - return response, nil } @@ -531,8 +528,10 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // Build response using same approach as v0 but with v1 additions response := make([]byte, 0, 256) - // NOTE: Correlation ID is handled by transport layer in kafka-go - // Do NOT include it in the response struct + // Correlation ID (4 bytes) + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1)