diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 7e1b9ca5e..77b313c2b 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1609,15 +1609,15 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 } } @@ -1723,15 +1723,15 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 } }