Browse Source

feat: implement Metadata API v5/v6/v7 for modern Kafka client compatibility

- Add HandleMetadataV5V6 with OfflineReplicas field (Kafka 1.0+)
- Add HandleMetadataV7 with LeaderEpoch field (Kafka 2.1+)
- Update routing to support v5-v7 versions
- Advertise Metadata max_version=7 for full modern client support
- Update validateAPIVersion to support Metadata v0-v7

This follows the recommended approach:
 Target Kafka 0.11+ as baseline (v3/v4)
 Support modern clients with v5/v6/v7
 Proper protocol version negotiation via ApiVersions
 Focus on core APIs: Produce/Fetch/Metadata/ListOffsets/FindCoordinator

Supports both kafka-go and Sarama for Kafka versions 0.11 through 2.1+
pull/7231/head
chrislu 2 months ago
parent
commit
42cbadba82
  1. 215
      weed/mq/kafka/protocol/handler.go

215
weed/mq/kafka/protocol/handler.go

@ -314,10 +314,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 3) // max version 3
// API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2)
// Advertise Metadata v4 for Kafka 0.11+ compatibility
// Advertise Metadata v7 for Kafka 2.1+ compatibility
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
response = append(response, 0, 7) // max version 7
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
@ -773,6 +773,209 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
return response, nil
}
// HandleMetadataV5V6 implements Metadata API v5/v6 with OfflineReplicas field
func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v5/v6 adds OfflineReplicas field to partitions
// v5/v6 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
fmt.Printf("DEBUG: 🔍 METADATA v5/v6 REQUEST - Requested: %v (empty=all)\n", requestedTopics)
// Determine topics to return
h.topicsMu.RLock()
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = make([]string, 0, len(h.topics))
for name := range h.topics {
topicsToReturn = append(topicsToReturn, name)
}
} else {
for _, name := range requestedTopics {
if _, exists := h.topics[name]; exists {
topicsToReturn = append(topicsToReturn, name)
}
}
}
h.topicsMu.RUnlock()
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
// Brokers array (4 bytes length + brokers)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker
// Broker 0
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID
// Host (STRING: 2 bytes length + data)
host := h.brokerHost
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(h.brokerPort))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// OfflineReplicas array (4 bytes length + nodes) - v5+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas
}
response := buf.Bytes()
fmt.Printf("DEBUG: Advertising broker (v5/v6) at %s:%d\n", h.brokerHost, h.brokerPort)
fmt.Printf("DEBUG: Metadata v5/v6 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)
return response, nil
}
// HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field
func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v7 adds LeaderEpoch field to partitions
// v7 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + leader_epoch(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
fmt.Printf("DEBUG: 🔍 METADATA v7 REQUEST - Requested: %v (empty=all)\n", requestedTopics)
// Determine topics to return
h.topicsMu.RLock()
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = make([]string, 0, len(h.topics))
for name := range h.topics {
topicsToReturn = append(topicsToReturn, name)
}
} else {
for _, name := range requestedTopics {
if _, exists := h.topics[name]; exists {
topicsToReturn = append(topicsToReturn, name)
}
}
}
h.topicsMu.RUnlock()
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
// Brokers array (4 bytes length + brokers)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker
// Broker 0
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID
// Host (STRING: 2 bytes length + data)
host := h.brokerHost
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(h.brokerPort))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// LeaderEpoch (4 bytes) - v7+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // Leader epoch 0
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// OfflineReplicas array (4 bytes length + nodes) - v5+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas
}
response := buf.Bytes()
fmt.Printf("DEBUG: Advertising broker (v7) at %s:%d\n", h.brokerHost, h.brokerPort)
fmt.Printf("DEBUG: Metadata v7 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)
return response, nil
}
func (h *Handler) parseMetadataTopics(requestBody []byte) []string {
// Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32),
// while older assumptions may have included a client_id string first.
@ -1247,7 +1450,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error {
supportedVersions := map[uint16][2]uint16{
18: {0, 3}, // ApiVersions: v0-v3
3: {0, 4}, // Metadata: v0-v4
3: {0, 7}, // Metadata: v0-v7
0: {0, 1}, // Produce: v0-v1
1: {0, 1}, // Fetch: v0-v1
2: {0, 5}, // ListOffsets: v0-v5
@ -1307,9 +1510,9 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques
case 3, 4:
return h.HandleMetadataV3V4(correlationID, requestBody)
case 5, 6:
// For now, use v3/v4 format for v5/v6 (missing offline_replicas)
// TODO: Implement proper v5/v6 formats with offline_replicas field
return h.HandleMetadataV3V4(correlationID, requestBody)
return h.HandleMetadataV5V6(correlationID, requestBody)
case 7:
return h.HandleMetadataV7(correlationID, requestBody)
default:
return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}

Loading…
Cancel
Save