Browse Source

Fix kafka-go compatibility:

- ApiVersions v0 response: remove unsupported throttle_time field
- Metadata v1: include correlation ID (kafka-go transport expects it after size)
- Metadata v1: ensure broker/partition IDs consistent and format correct

Validated:
- TestMetadataV6Debug passes (kafka-go ReadPartitions works)
- Sarama simple producer unaffected

Root cause: correlation ID handling differences and extra footer in ApiVersions.
pull/7231/head
chrislu 2 months ago
parent
commit
bfe15f970b
  1. 11
      weed/mq/kafka/protocol/handler.go

11
weed/mq/kafka/protocol/handler.go

@ -308,7 +308,7 @@ func (h *Handler) HandleConn(conn net.Conn) error {
func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
// Build ApiVersions response manually // Build ApiVersions response manually
// Response format: correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys + throttle_time(4)
// Response format (v0): correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys
response := make([]byte, 0, 64) response := make([]byte, 0, 64)
@ -396,9 +396,6 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 0) // min version 0 response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4 response = append(response, 0, 4) // max version 4
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
return response, nil return response, nil
} }
@ -531,8 +528,10 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
// Build response using same approach as v0 but with v1 additions // Build response using same approach as v0 but with v1 additions
response := make([]byte, 0, 256) response := make([]byte, 0, 256)
// NOTE: Correlation ID is handled by transport layer in kafka-go
// Do NOT include it in the response struct
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Brokers array length (4 bytes) - 1 broker (this gateway) // Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1) response = append(response, 0, 0, 0, 1)

Loading…
Cancel
Save