From d45b1d058db19d7adbe258147434a391eb1ceb82 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 20 May 2024 11:07:54 -0700 Subject: [PATCH] minor --- .../sub_client/connect_to_sub_coordinator.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 118d91bac..53ac27418 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -95,6 +95,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig return fmt.Errorf("create subscribe client: %v", err) } + perPartitionConcurrency := sub.ProcessorConfig.PerPartitionConcurrency + if perPartitionConcurrency <= 0 { + perPartitionConcurrency = 1 + } + if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ @@ -107,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }, Filter: sub.ContentConfig.Filter, FollowerBroker: assigned.FollowerBroker, - Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, + Concurrency: perPartitionConcurrency, }, }, }); err != nil { @@ -124,12 +129,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig defer func() { close(partitionOffsetChan) }() - - perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency) - if perPartitionConcurrency <= 0 { - perPartitionConcurrency = 1 - } - executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency) + executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) go func() { for ack := range partitionOffsetChan {