From a7d30d0705fa37966583c95f3f7ddfa150f909b8 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 20 May 2024 12:32:12 -0700 Subject: [PATCH] refactor --- .../sub_client/connect_to_sub_coordinator.go | 104 ---------------- .../mq/client/sub_client/on_each_partition.go | 112 ++++++++++++++++++ 2 files changed, 112 insertions(+), 104 deletions(-) create mode 100644 weed/mq/client/sub_client/on_each_partition.go diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 82b072f22..a73161fea 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -2,12 +2,9 @@ package sub_client import ( "context" - "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "github.com/seaweedfs/seaweedfs/weed/util" - "io" "time" ) @@ -85,104 +82,3 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { time.Sleep(waitTime) } } - -func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { - // connect to the partition broker - return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - - subscribeClient, err := client.SubscribeMessage(context.Background()) - if err != nil { - return fmt.Errorf("create subscribe client: %v", err) - } - - perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency - if perPartitionConcurrency <= 0 { - perPartitionConcurrency = 1 - } - - if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Init{ - Init: &mq_pb.SubscribeMessageRequest_InitMessage{ - ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, - ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, - Topic: sub.ContentConfig.Topic.ToPbTopic(), - PartitionOffset: &mq_pb.PartitionOffset{ - Partition: assigned.Partition, - StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, - }, - Filter: sub.ContentConfig.Filter, - FollowerBroker: assigned.FollowerBroker, - Concurrency: perPartitionConcurrency, - }, - }, - }); err != nil { - glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) - } - - glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) - - if sub.OnCompletionFunc != nil { - defer sub.OnCompletionFunc() - } - - type KeyedOffset struct { - Key []byte - Offset int64 - } - - partitionOffsetChan := make(chan KeyedOffset, 1024) - defer func() { - close(partitionOffsetChan) - }() - executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) - - go func() { - for ack := range partitionOffsetChan { - subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Ack{ - Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Key: ack.Key, - Sequence: ack.Offset, - }, - }, - }) - } - subscribeClient.CloseSend() - }() - - var lastErr error - - for lastErr == nil { - // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) - resp, err := subscribeClient.Recv() - if err != nil { - return fmt.Errorf("subscribe recv: %v", err) - } - if resp.Message == nil { - glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) - continue - } - switch m := resp.Message.(type) { - case *mq_pb.SubscribeMessageResponse_Data: - executors.Execute(func() { - processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) - if processErr == nil { - partitionOffsetChan <- KeyedOffset{ - Key: m.Data.Key, - Offset: m.Data.TsNs, - } - } else { - lastErr = processErr - } - }) - case *mq_pb.SubscribeMessageResponse_Ctrl: - // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) - if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { - return io.EOF - } - } - } - - return lastErr - }) -} diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go new file mode 100644 index 000000000..792376a69 --- /dev/null +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -0,0 +1,112 @@ +package sub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "io" +) + +func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { + // connect to the partition broker + return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + + subscribeClient, err := client.SubscribeMessage(context.Background()) + if err != nil { + return fmt.Errorf("create subscribe client: %v", err) + } + + perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency + if perPartitionConcurrency <= 0 { + perPartitionConcurrency = 1 + } + + if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, + ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, + Topic: sub.ContentConfig.Topic.ToPbTopic(), + PartitionOffset: &mq_pb.PartitionOffset{ + Partition: assigned.Partition, + StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, + }, + Filter: sub.ContentConfig.Filter, + FollowerBroker: assigned.FollowerBroker, + Concurrency: perPartitionConcurrency, + }, + }, + }); err != nil { + glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) + } + + glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) + + if sub.OnCompletionFunc != nil { + defer sub.OnCompletionFunc() + } + + type KeyedOffset struct { + Key []byte + Offset int64 + } + + partitionOffsetChan := make(chan KeyedOffset, 1024) + defer func() { + close(partitionOffsetChan) + }() + executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) + + go func() { + for ack := range partitionOffsetChan { + subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Ack{ + Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ + Key: ack.Key, + Sequence: ack.Offset, + }, + }, + }) + } + subscribeClient.CloseSend() + }() + + var lastErr error + + for lastErr == nil { + // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + resp, err := subscribeClient.Recv() + if err != nil { + return fmt.Errorf("subscribe recv: %v", err) + } + if resp.Message == nil { + glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + continue + } + switch m := resp.Message.(type) { + case *mq_pb.SubscribeMessageResponse_Data: + executors.Execute(func() { + processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) + if processErr == nil { + partitionOffsetChan <- KeyedOffset{ + Key: m.Data.Key, + Offset: m.Data.TsNs, + } + } else { + lastErr = processErr + } + }) + case *mq_pb.SubscribeMessageResponse_Ctrl: + // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) + if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { + return io.EOF + } + } + } + + return lastErr + }) +}