From 8955209e3b88d49e8c5a93b861853316010b2232 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 25 Feb 2025 10:34:40 -0800 Subject: [PATCH] subscriber process messages --- weed/mq/client/sub_client/subscribe.go | 12 ++++++------ weed/mq/client/sub_client/subscriber.go | 12 ++++-------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index cf2294891..d4dea3852 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -72,12 +72,12 @@ func (sub *TopicSubscriber) startProcessors() { executors := util.NewLimitedConcurrentExecutor(int(sub.SubscriberConfig.SlidingWindowSize)) onDataMessageFn := func(m *mq_pb.SubscribeMessageResponse_Data) { executors.Execute(func() { - processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) - if processErr == nil { - sub.PartitionOffsetChan <- KeyedOffset{ - Key: m.Data.Key, - Offset: m.Data.TsNs, - } + if sub.OnDataMessageFunc != nil { + sub.OnDataMessageFunc(m) + } + sub.PartitionOffsetChan <- KeyedOffset{ + Key: m.Data.Key, + Offset: m.Data.TsNs, } }) } diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 9a51ce01e..fa4335232 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" "sync" + "time" ) type SubscriberConfiguration struct { @@ -21,10 +22,10 @@ type ContentConfiguration struct { Topic topic.Topic Filter string PartitionOffsets []*schema_pb.PartitionOffset + DefaultStartTime time.Time } type OnDataMessageFn func(m *mq_pb.SubscribeMessageResponse_Data) -type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { @@ -32,8 +33,7 @@ type TopicSubscriber struct { ContentConfig *ContentConfiguration brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest - OnDataMessageFnnc OnDataMessageFn - OnEachMessageFunc OnEachMessageFunc + OnDataMessageFunc OnDataMessageFn OnCompletionFunc OnCompletionFunc bootstrapBrokers []string waitForMoreMessage bool @@ -55,12 +55,8 @@ func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfigu } } -func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) { - sub.OnEachMessageFunc = onEachMessageFn -} - func (sub *TopicSubscriber) SetOnDataMessageFn(fn OnDataMessageFn) { - sub.OnDataMessageFnnc = fn + sub.OnDataMessageFunc = fn } func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {