|
@ -95,6 +95,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig |
|
|
return fmt.Errorf("create subscribe client: %v", err) |
|
|
return fmt.Errorf("create subscribe client: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
perPartitionConcurrency := sub.ProcessorConfig.PerPartitionConcurrency |
|
|
|
|
|
if perPartitionConcurrency <= 0 { |
|
|
|
|
|
perPartitionConcurrency = 1 |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ |
|
|
if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ |
|
|
Message: &mq_pb.SubscribeMessageRequest_Init{ |
|
|
Message: &mq_pb.SubscribeMessageRequest_Init{ |
|
|
Init: &mq_pb.SubscribeMessageRequest_InitMessage{ |
|
|
Init: &mq_pb.SubscribeMessageRequest_InitMessage{ |
|
@ -107,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig |
|
|
}, |
|
|
}, |
|
|
Filter: sub.ContentConfig.Filter, |
|
|
Filter: sub.ContentConfig.Filter, |
|
|
FollowerBroker: assigned.FollowerBroker, |
|
|
FollowerBroker: assigned.FollowerBroker, |
|
|
Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, |
|
|
|
|
|
|
|
|
Concurrency: perPartitionConcurrency, |
|
|
}, |
|
|
}, |
|
|
}, |
|
|
}, |
|
|
}); err != nil { |
|
|
}); err != nil { |
|
@ -124,12 +129,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig |
|
|
defer func() { |
|
|
defer func() { |
|
|
close(partitionOffsetChan) |
|
|
close(partitionOffsetChan) |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency) |
|
|
|
|
|
if perPartitionConcurrency <= 0 { |
|
|
|
|
|
perPartitionConcurrency = 1 |
|
|
|
|
|
} |
|
|
|
|
|
executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency) |
|
|
|
|
|
|
|
|
executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) |
|
|
|
|
|
|
|
|
go func() { |
|
|
go func() { |
|
|
for ack := range partitionOffsetChan { |
|
|
for ack := range partitionOffsetChan { |
|
|