diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 9b62f616e..a1279c204 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -71,8 +71,6 @@ func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartit func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason) - now := time.Now().UnixNano() - // collect current topic partitions partitionSlotToBrokerList := knownPartitionSlotToBrokerList if partitionSlotToBrokerList == nil { @@ -104,7 +102,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr if !found { partitionSlots = make([]*PartitionSlotToConsumerInstance, 0) } - consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots, now) + consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots) assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots)) for i, partitionSlot := range partitionSlots { assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ @@ -112,7 +110,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr RangeStop: partitionSlot.RangeStop, RangeStart: partitionSlot.RangeStart, RingSize: partitionSlotToBrokerList.RingSize, - UnixTimeNs: now, + UnixTimeNs: partitionSlot.UnixTimeNs, }, Broker: partitionSlot.Broker, } diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go index 7f02253f6..fa0e3761f 100644 --- a/weed/mq/sub_coordinator/partition_list.go +++ b/weed/mq/sub_coordinator/partition_list.go @@ -23,10 +23,10 @@ func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *Part } } -func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance, unixTimeNs int64) []*topic.Partition { +func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition { partitions := make([]*topic.Partition, 0, len(slots)) for _, slot := range slots { - partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, unixTimeNs)) + partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, slot.UnixTimeNs)) } return partitions }