diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 901efdcac..59bf24aaf 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -314,10 +314,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) - // Advertise Metadata v4 for Kafka 0.11+ compatibility + // Advertise Metadata v7 for Kafka 2.1+ compatibility response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 4) // max version 4 + response = append(response, 0, 7) // max version 7 // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -773,6 +773,209 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( return response, nil } +// HandleMetadataV5V6 implements Metadata API v5/v6 with OfflineReplicas field +func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) ([]byte, error) { + // Metadata v5/v6 adds OfflineReplicas field to partitions + // v5/v6 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) + // Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + fmt.Printf("DEBUG: 🔍 METADATA v5/v6 REQUEST - Requested: %v (empty=all)\n", requestedTopics) + + // Determine topics to return + h.topicsMu.RLock() + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = make([]string, 0, len(h.topics)) + for name := range h.topics { + topicsToReturn = append(topicsToReturn, name) + } + } else { + for _, name := range requestedTopics { + if _, exists := h.topics[name]; exists { + topicsToReturn = append(topicsToReturn, name) + } + } + } + h.topicsMu.RUnlock() + + var buf bytes.Buffer + + // Correlation ID (4 bytes) + binary.Write(&buf, binary.BigEndian, correlationID) + + // ThrottleTimeMs (4 bytes) - v3+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling + + // Brokers array (4 bytes length + brokers) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker + + // Broker 0 + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID + + // Host (STRING: 2 bytes length + data) + host := h.brokerHost + binary.Write(&buf, binary.BigEndian, int16(len(host))) + buf.WriteString(host) + + // Port (4 bytes) + binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) + + // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable + binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string + + // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition + // Use -1 length to indicate null + binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID + + // ControllerID (4 bytes) - v1+ addition + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Topics array (4 bytes length + topics) + binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) + + for _, topicName := range topicsToReturn { + // ErrorCode (2 bytes) + binary.Write(&buf, binary.BigEndian, int16(0)) + + // Name (STRING: 2 bytes length + data) + binary.Write(&buf, binary.BigEndian, int16(len(topicName))) + buf.WriteString(topicName) + + // IsInternal (1 byte) - v1+ addition + buf.WriteByte(0) // false + + // Partitions array (4 bytes length + partitions) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition + + // Partition 0 + binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode + binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex + binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + + // ReplicaNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // IsrNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // OfflineReplicas array (4 bytes length + nodes) - v5+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas + } + + response := buf.Bytes() + fmt.Printf("DEBUG: Advertising broker (v5/v6) at %s:%d\n", h.brokerHost, h.brokerPort) + fmt.Printf("DEBUG: Metadata v5/v6 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) + + return response, nil +} + +// HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field +func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) { + // Metadata v7 adds LeaderEpoch field to partitions + // v7 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) + // Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + leader_epoch(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + fmt.Printf("DEBUG: 🔍 METADATA v7 REQUEST - Requested: %v (empty=all)\n", requestedTopics) + + // Determine topics to return + h.topicsMu.RLock() + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = make([]string, 0, len(h.topics)) + for name := range h.topics { + topicsToReturn = append(topicsToReturn, name) + } + } else { + for _, name := range requestedTopics { + if _, exists := h.topics[name]; exists { + topicsToReturn = append(topicsToReturn, name) + } + } + } + h.topicsMu.RUnlock() + + var buf bytes.Buffer + + // Correlation ID (4 bytes) + binary.Write(&buf, binary.BigEndian, correlationID) + + // ThrottleTimeMs (4 bytes) - v3+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling + + // Brokers array (4 bytes length + brokers) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker + + // Broker 0 + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID + + // Host (STRING: 2 bytes length + data) + host := h.brokerHost + binary.Write(&buf, binary.BigEndian, int16(len(host))) + buf.WriteString(host) + + // Port (4 bytes) + binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) + + // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable + binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string + + // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition + // Use -1 length to indicate null + binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID + + // ControllerID (4 bytes) - v1+ addition + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Topics array (4 bytes length + topics) + binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) + + for _, topicName := range topicsToReturn { + // ErrorCode (2 bytes) + binary.Write(&buf, binary.BigEndian, int16(0)) + + // Name (STRING: 2 bytes length + data) + binary.Write(&buf, binary.BigEndian, int16(len(topicName))) + buf.WriteString(topicName) + + // IsInternal (1 byte) - v1+ addition + buf.WriteByte(0) // false + + // Partitions array (4 bytes length + partitions) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition + + // Partition 0 + binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode + binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex + binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + + // LeaderEpoch (4 bytes) - v7+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // Leader epoch 0 + + // ReplicaNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // IsrNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // OfflineReplicas array (4 bytes length + nodes) - v5+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas + } + + response := buf.Bytes() + fmt.Printf("DEBUG: Advertising broker (v7) at %s:%d\n", h.brokerHost, h.brokerPort) + fmt.Printf("DEBUG: Metadata v7 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) + + return response, nil +} + func (h *Handler) parseMetadataTopics(requestBody []byte) []string { // Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32), // while older assumptions may have included a client_id string first. @@ -1247,7 +1450,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { supportedVersions := map[uint16][2]uint16{ 18: {0, 3}, // ApiVersions: v0-v3 - 3: {0, 4}, // Metadata: v0-v4 + 3: {0, 7}, // Metadata: v0-v7 0: {0, 1}, // Produce: v0-v1 1: {0, 1}, // Fetch: v0-v1 2: {0, 5}, // ListOffsets: v0-v5 @@ -1307,9 +1510,9 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques case 3, 4: return h.HandleMetadataV3V4(correlationID, requestBody) case 5, 6: - // For now, use v3/v4 format for v5/v6 (missing offline_replicas) - // TODO: Implement proper v5/v6 formats with offline_replicas field - return h.HandleMetadataV3V4(correlationID, requestBody) + return h.HandleMetadataV5V6(correlationID, requestBody) + case 7: + return h.HandleMetadataV7(correlationID, requestBody) default: return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) }