From 8ef2cb5b166daae3491fe629338c4d47cbbf47cf Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 18:37:37 -0700 Subject: [PATCH] fix: Use actual broker nodeID in partition metadata for Metadata responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Metadata responses were hardcoding partition leader and replica nodeIDs to 1, but the actual broker's nodeID is different (0x4fd297f2 / 1329658354). This caused Java clients to get confused: 1. Client reads: "Broker is at nodeID=0x4fd297f2" 2. Client reads: "Partition leader is nodeID=1" 3. Client looks for broker with nodeID=1 → not found 4. Client can't determine leader → retries Metadata request 5. Same wrong response → infinite retry loop until timeout ## Solution Use the actual broker's nodeID consistently: - LeaderID: nodeID (was int32(1)) - ReplicaNodes: [nodeID] (was [1]) - IsrNodes: [nodeID] (was [1]) Now the response is consistent: - Broker: nodeID = 0x4fd297f2 - Partition leader: nodeID = 0x4fd297f2 - Replicas: [0x4fd297f2] - ISR: [0x4fd297f2] ## Impact With both fixes (hostname + nodeID): - Schema Registry consumer won't get stuck - Consumer can proceed to JoinGroup/SyncGroup/Fetch - Producer can send Noop record - Schema Registry initialization completes successfully --- weed/mq/kafka/protocol/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 444e98bb5..7e1b9ca5e 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1886,7 +1886,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // LeaderEpoch (4 bytes) - v7+ addition if apiVersion >= 7 { @@ -1895,11 +1895,11 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // OfflineReplicas array (4 bytes length + nodes) - v5+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas