diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index f4938d539..882e7ddf9 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -53,7 +53,7 @@ func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string) { cg.reBalanceTimer = nil } cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() { - cg.RebalanceConsumberGroupInstances(nil, reason) + cg.BalanceConsumerGroupInstances(nil, reason) cg.reBalanceTimer = nil }) } @@ -66,10 +66,10 @@ func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartit for _, assignment := range assignments { partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker) } - cg.RebalanceConsumberGroupInstances(partitionSlotToBrokerList, "partition list change") + cg.BalanceConsumerGroupInstances(partitionSlotToBrokerList, "partition list change") } -func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { +func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason) // collect current topic partitions