You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							62 lines
						
					
					
						
							1.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							62 lines
						
					
					
						
							1.8 KiB
						
					
					
				
								package sub_client
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"google.golang.org/grpc"
							 | 
						|
									"time"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								type SubscriberConfiguration struct {
							 | 
						|
									ClientId                string
							 | 
						|
									ConsumerGroup           string
							 | 
						|
									ConsumerGroupInstanceId string
							 | 
						|
									GroupMinimumPeers       int32
							 | 
						|
									GroupMaximumPeers int32
							 | 
						|
									BootstrapServers  []string
							 | 
						|
									GrpcDialOption    grpc.DialOption
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type ContentConfiguration struct {
							 | 
						|
									Namespace string
							 | 
						|
									Topic     string
							 | 
						|
									Filter    string
							 | 
						|
									StartTime time.Time
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type ProcessorConfiguration struct {
							 | 
						|
									ConcurrentPartitionLimit int // how many partitions to process concurrently
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error)
							 | 
						|
								type OnCompletionFunc func()
							 | 
						|
								
							 | 
						|
								type TopicSubscriber struct {
							 | 
						|
									SubscriberConfig           *SubscriberConfiguration
							 | 
						|
									ContentConfig              *ContentConfiguration
							 | 
						|
									ProcessorConfig            *ProcessorConfiguration
							 | 
						|
									brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
							 | 
						|
									OnEachMessageFunc          OnEachMessageFunc
							 | 
						|
									OnCompletionFunc           OnCompletionFunc
							 | 
						|
									bootstrapBrokers           []string
							 | 
						|
									waitForMoreMessage         bool
							 | 
						|
									alreadyProcessedTsNs       int64
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {
							 | 
						|
									return &TopicSubscriber{
							 | 
						|
										SubscriberConfig:     subscriber,
							 | 
						|
										ContentConfig:        content,
							 | 
						|
										ProcessorConfig:      &processor,
							 | 
						|
										bootstrapBrokers:     bootstrapBrokers,
							 | 
						|
										waitForMoreMessage:   true,
							 | 
						|
										alreadyProcessedTsNs: content.StartTime.UnixNano(),
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
							 | 
						|
									sub.OnEachMessageFunc = onEachMessageFn
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {
							 | 
						|
									sub.OnCompletionFunc = onCompletionFn
							 | 
						|
								}
							 |