diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 444e98bb5..7e1b9ca5e 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1886,7 +1886,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // LeaderEpoch (4 bytes) - v7+ addition if apiVersion >= 7 { @@ -1895,11 +1895,11 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // OfflineReplicas array (4 bytes length + nodes) - v5+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas