Browse Source

Remove correlation ID from Metadata v1 response for kafka-go compatibility

PARTIAL FIX: Remove correlation ID from response struct for kafka-go transport layer

## Root Cause Analysis:
- kafka-go handles correlation ID at transport layer (protocol/roundtrip.go)
- kafka-go ReadResponse() reads correlation ID separately from response struct
- Our Metadata responses included correlation ID in struct, causing parsing errors
- Sarama vs kafka-go handle correlation IDs differently

## Changes:
- Removed correlation ID from Metadata v1 response struct
- Added comment explaining kafka-go transport layer handling
- Response size reduced from 92 to 88 bytes (4 bytes = correlation ID)

## Status:
-  Correlation ID issue partially fixed
-  kafka-go still fails with 'multiple Read calls return no data or error'
-  Still uses v1 instead of negotiated v4 (suggests ApiVersions parsing issue)

## Next Steps:
- Investigate remaining Metadata v1 format issues
- Check if other response fields have format problems
- May need to fix ApiVersions response format to enable proper version negotiation

This is progress toward full kafka-go compatibility.
pull/7231/head
chrislu 2 months ago
parent
commit
edeb922749
  1. 8
      weed/mq/kafka/protocol/handler.go

8
weed/mq/kafka/protocol/handler.go

@ -420,7 +420,7 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4)
response = append(response, 0, 0, 0, 0) // node_id = 0
response = append(response, 0, 0, 0, 1) // node_id = 1 (consistent with partitions)
// Use dynamic broker address set by the server
host := h.brokerHost
@ -531,10 +531,8 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
// Build response using same approach as v0 but with v1 additions
response := make([]byte, 0, 256)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// NOTE: Correlation ID is handled by transport layer in kafka-go
// Do NOT include it in the response struct
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)

Loading…
Cancel
Save