|
@ -53,7 +53,7 @@ func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string) { |
|
|
cg.reBalanceTimer = nil |
|
|
cg.reBalanceTimer = nil |
|
|
} |
|
|
} |
|
|
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() { |
|
|
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() { |
|
|
cg.RebalanceConsumberGroupInstances(nil, reason) |
|
|
cg.BalanceConsumerGroupInstances(nil, reason) |
|
|
cg.reBalanceTimer = nil |
|
|
cg.reBalanceTimer = nil |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
@ -66,10 +66,10 @@ func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartit |
|
|
for _, assignment := range assignments { |
|
|
for _, assignment := range assignments { |
|
|
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker) |
|
|
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker) |
|
|
} |
|
|
} |
|
|
cg.RebalanceConsumberGroupInstances(partitionSlotToBrokerList, "partition list change") |
|
|
cg.BalanceConsumerGroupInstances(partitionSlotToBrokerList, "partition list change") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { |
|
|
func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { |
|
|
glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason) |
|
|
glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason) |
|
|
|
|
|
|
|
|
// collect current topic partitions
|
|
|
// collect current topic partitions
|
|
|
xxxxxxxxxx