diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index b18865877..792794a4a 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -17,6 +17,8 @@ var ( namespace = flag.String("ns", "test", "namespace") t = flag.String("topic", "test", "topic") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -28,6 +30,8 @@ func main() { ConsumerGroup: "test", ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + MaxPartitionCount: int32(*maxPartitionCount), + PerPartitionConcurrency: int32(*perPartitionConcurrency), } contentConfig := &sub_client.ContentConfiguration{ @@ -36,12 +40,8 @@ func main() { StartTime: time.Unix(1, 1), } - processorConfig := sub_client.ProcessorConfiguration{ - MaxPartitionCount: 3, - } - brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) + subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig) counter := 0 subscriber.SetEachMessageFunc(func(key, value []byte) error { diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index 7aa381c76..262da6c4d 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -19,6 +19,8 @@ var ( namespace = flag.String("ns", "test", "namespace") t = flag.String("topic", "test", "topic") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -54,6 +56,8 @@ func main() { ConsumerGroup: "test", ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + MaxPartitionCount: int32(*maxPartitionCount), + PerPartitionConcurrency: int32(*perPartitionConcurrency), } contentConfig := &sub_client.ContentConfiguration{ @@ -62,13 +66,8 @@ func main() { StartTime: time.Unix(1, 1), } - processorConfig := sub_client.ProcessorConfiguration{ - MaxPartitionCount: 3, - PerPartitionConcurrency: 1, - } - brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) + subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig) counter := 0 subscriber.SetEachMessageFunc(func(key, value []byte) error { @@ -76,6 +75,7 @@ func main() { record := &schema_pb.RecordValue{} proto.Unmarshal(value, record) fmt.Printf("record: %v\n", record) + time.Sleep(1300 * time.Millisecond) return nil }) diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 84af3a19c..82b072f22 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -51,7 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: sub.ContentConfig.Topic.ToPbTopic(), - MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount, + MaxPartitionCount: sub.SubscriberConfig.MaxPartitionCount, }, }, }); err != nil { @@ -95,7 +95,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig return fmt.Errorf("create subscribe client: %v", err) } - perPartitionConcurrency := sub.ProcessorConfig.PerPartitionConcurrency + perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency if perPartitionConcurrency <= 0 { perPartitionConcurrency = 1 } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index c519ca18b..bb2b2302a 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -28,7 +28,7 @@ func (sub *TopicSubscriber) startProcessors() { // listen to the messages from the sub coordinator // start one processor per partition var wg sync.WaitGroup - semaphore := make(chan struct{}, sub.ProcessorConfig.MaxPartitionCount) + semaphore := make(chan struct{}, sub.SubscriberConfig.MaxPartitionCount) for assigned := range sub.brokerPartitionAssignmentChan { wg.Add(1) diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index cf41f2881..e00f125ad 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -13,6 +13,8 @@ type SubscriberConfiguration struct { ConsumerGroup string ConsumerGroupInstanceId string GrpcDialOption grpc.DialOption + MaxPartitionCount int32 // how many partitions to process concurrently + PerPartitionConcurrency int32 // how many messages to process concurrently per partition } type ContentConfiguration struct { @@ -21,18 +23,12 @@ type ContentConfiguration struct { StartTime time.Time } -type ProcessorConfiguration struct { - MaxPartitionCount int32 // how many partitions to process concurrently - PerPartitionConcurrency int32 // how many messages to process concurrently per partition -} - type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { SubscriberConfig *SubscriberConfiguration ContentConfig *ContentConfiguration - ProcessorConfig *ProcessorConfiguration brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment OnEachMessageFunc OnEachMessageFunc OnCompletionFunc OnCompletionFunc @@ -42,11 +38,10 @@ type TopicSubscriber struct { activeProcessorsLock sync.Mutex } -func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { +func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { return &TopicSubscriber{ SubscriberConfig: subscriber, ContentConfig: content, - ProcessorConfig: &processor, brokerPartitionAssignmentChan: make(chan *mq_pb.BrokerPartitionAssignment, 1024), bootstrapBrokers: bootstrapBrokers, waitForMoreMessage: true,