diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 118d91bac..53ac27418 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -95,6 +95,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig return fmt.Errorf("create subscribe client: %v", err) } + perPartitionConcurrency := sub.ProcessorConfig.PerPartitionConcurrency + if perPartitionConcurrency <= 0 { + perPartitionConcurrency = 1 + } + if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ @@ -107,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }, Filter: sub.ContentConfig.Filter, FollowerBroker: assigned.FollowerBroker, - Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, + Concurrency: perPartitionConcurrency, }, }, }); err != nil { @@ -124,12 +129,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig defer func() { close(partitionOffsetChan) }() - - perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency) - if perPartitionConcurrency <= 0 { - perPartitionConcurrency = 1 - } - executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency) + executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) go func() { for ack := range partitionOffsetChan {