Browse Source

subscriber process messages

mq
chrislu 7 days ago
parent
commit
8955209e3b
  1. 6
      weed/mq/client/sub_client/subscribe.go
  2. 12
      weed/mq/client/sub_client/subscriber.go

6
weed/mq/client/sub_client/subscribe.go

@ -72,13 +72,13 @@ func (sub *TopicSubscriber) startProcessors() {
executors := util.NewLimitedConcurrentExecutor(int(sub.SubscriberConfig.SlidingWindowSize))
onDataMessageFn := func(m *mq_pb.SubscribeMessageResponse_Data) {
executors.Execute(func() {
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil {
if sub.OnDataMessageFunc != nil {
sub.OnDataMessageFunc(m)
}
sub.PartitionOffsetChan <- KeyedOffset{
Key: m.Data.Key,
Offset: m.Data.TsNs,
}
}
})
}

12
weed/mq/client/sub_client/subscriber.go

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc"
"sync"
"time"
)
type SubscriberConfiguration struct {
@ -21,10 +22,10 @@ type ContentConfiguration struct {
Topic topic.Topic
Filter string
PartitionOffsets []*schema_pb.PartitionOffset
DefaultStartTime time.Time
}
type OnDataMessageFn func(m *mq_pb.SubscribeMessageResponse_Data)
type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func()
type TopicSubscriber struct {
@ -32,8 +33,7 @@ type TopicSubscriber struct {
ContentConfig *ContentConfiguration
brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse
brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest
OnDataMessageFnnc OnDataMessageFn
OnEachMessageFunc OnEachMessageFunc
OnDataMessageFunc OnDataMessageFn
OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string
waitForMoreMessage bool
@ -55,12 +55,8 @@ func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfigu
}
}
func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
sub.OnEachMessageFunc = onEachMessageFn
}
func (sub *TopicSubscriber) SetOnDataMessageFn(fn OnDataMessageFn) {
sub.OnDataMessageFnnc = fn
sub.OnDataMessageFunc = fn
}
func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {

Loading…
Cancel
Save