Browse Source

feat: implement Metadata API v2, v3/v4 for Kafka 0.11+ compatibility

- Add HandleMetadataV2 with ClusterID field (nullable string)
- Add HandleMetadataV3V4 with ThrottleTimeMs field for Kafka 0.11+ support
- Update handleMetadata routing to support v2-v6 versions
- Advertise Metadata max_version=4 in ApiVersions response
- Update validateAPIVersion to support Metadata v0-v4

This enables compatibility with:
- kafka-go: negotiates v1-v6, will use v4
- Sarama: expects v3/v4 for Kafka 0.11+ compatibility
pull/7231/head
chrislu 2 months ago
parent
commit
335f503450
  1. 207
      weed/mq/kafka/protocol/handler.go

207
weed/mq/kafka/protocol/handler.go

@ -314,10 +314,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 3) // max version 3
// API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2)
// Advertise Metadata v1 as required by kafka-go ReadPartitions
// Advertise Metadata v4 for Kafka 0.11+ compatibility
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 1) // max version 1
response = append(response, 0, 4) // max version 4
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
@ -584,6 +584,195 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
return response, nil
}
// HandleMetadataV2 implements Metadata API v2 with ClusterID field
func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v2 adds ClusterID field (nullable string)
// v2 response layout: correlation_id(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
fmt.Printf("DEBUG: 🔍 METADATA v2 REQUEST - Requested: %v (empty=all)\n", requestedTopics)
// Determine topics to return
h.topicsMu.RLock()
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = make([]string, 0, len(h.topics))
for name := range h.topics {
topicsToReturn = append(topicsToReturn, name)
}
} else {
for _, name := range requestedTopics {
if _, exists := h.topics[name]; exists {
topicsToReturn = append(topicsToReturn, name)
}
}
}
h.topicsMu.RUnlock()
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// Brokers array (4 bytes length + brokers)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker
// Broker 0
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID
// Host (STRING: 2 bytes length + data)
host := h.brokerHost
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(h.brokerPort))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2 addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
}
response := buf.Bytes()
fmt.Printf("DEBUG: Advertising broker (v2) at %s:%d\n", h.brokerHost, h.brokerPort)
fmt.Printf("DEBUG: Metadata v2 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)
return response, nil
}
// HandleMetadataV3V4 implements Metadata API v3/v4 with ThrottleTimeMs field
func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v3/v4 adds ThrottleTimeMs field at the beginning
// v3/v4 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
fmt.Printf("DEBUG: 🔍 METADATA v3/v4 REQUEST - Requested: %v (empty=all)\n", requestedTopics)
// Determine topics to return
h.topicsMu.RLock()
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = make([]string, 0, len(h.topics))
for name := range h.topics {
topicsToReturn = append(topicsToReturn, name)
}
} else {
for _, name := range requestedTopics {
if _, exists := h.topics[name]; exists {
topicsToReturn = append(topicsToReturn, name)
}
}
}
h.topicsMu.RUnlock()
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
// Brokers array (4 bytes length + brokers)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker
// Broker 0
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID
// Host (STRING: 2 bytes length + data)
host := h.brokerHost
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(h.brokerPort))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
}
response := buf.Bytes()
fmt.Printf("DEBUG: Advertising broker (v3/v4) at %s:%d\n", h.brokerHost, h.brokerPort)
fmt.Printf("DEBUG: Metadata v3/v4 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)
return response, nil
}
func (h *Handler) parseMetadataTopics(requestBody []byte) []string {
// Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32),
// while older assumptions may have included a client_id string first.
@ -1058,7 +1247,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error {
supportedVersions := map[uint16][2]uint16{
18: {0, 3}, // ApiVersions: v0-v3
3: {0, 1}, // Metadata: v0-v1
3: {0, 4}, // Metadata: v0-v4
0: {0, 1}, // Produce: v0-v1
1: {0, 1}, // Fetch: v0-v1
2: {0, 5}, // ListOffsets: v0-v5
@ -1113,10 +1302,14 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques
return h.HandleMetadataV0(correlationID, requestBody)
case 1:
return h.HandleMetadataV1(correlationID, requestBody)
case 2, 3, 4, 5, 6:
// For now, use v1 format for higher versions (kafka-go compatibility)
// TODO: Implement proper v2-v6 formats with additional fields
return h.HandleMetadataV1(correlationID, requestBody)
case 2:
return h.HandleMetadataV2(correlationID, requestBody)
case 3, 4:
return h.HandleMetadataV3V4(correlationID, requestBody)
case 5, 6:
// For now, use v3/v4 format for v5/v6 (missing offline_replicas)
// TODO: Implement proper v5/v6 formats with offline_replicas field
return h.HandleMetadataV3V4(correlationID, requestBody)
default:
return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}

Loading…
Cancel
Save