You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							61 lines
						
					
					
						
							1.6 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							61 lines
						
					
					
						
							1.6 KiB
						
					
					
				
								package pub_client
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"fmt"
							 | 
						|
									"github.com/golang/protobuf/proto"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
									"time"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) Publish(key, value []byte) error {
							 | 
						|
									if p.config.RecordType != nil {
							 | 
						|
										return fmt.Errorf("record type is set, use PublishRecord instead")
							 | 
						|
									}
							 | 
						|
									return p.doPublish(key, value)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) doPublish(key, value []byte) error {
							 | 
						|
									hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
							 | 
						|
									if hashKey < 0 {
							 | 
						|
										hashKey = -hashKey
							 | 
						|
									}
							 | 
						|
									inputBuffer, found := p.partition2Buffer.Floor(hashKey+1, hashKey+1)
							 | 
						|
									if !found {
							 | 
						|
										return fmt.Errorf("no input buffer found for key %d", hashKey)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return inputBuffer.Enqueue(&mq_pb.DataMessage{
							 | 
						|
										Key:   key,
							 | 
						|
										Value: value,
							 | 
						|
										TsNs:  time.Now().UnixNano(),
							 | 
						|
									})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.RecordValue) error {
							 | 
						|
									// serialize record value
							 | 
						|
									value, err := proto.Marshal(recordValue)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to marshal record value: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return p.doPublish(key, value)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) FinishPublish() error {
							 | 
						|
									if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
							 | 
						|
										for _, inputBuffer := range inputBuffers {
							 | 
						|
											inputBuffer.Enqueue(&mq_pb.DataMessage{
							 | 
						|
												TsNs: time.Now().UnixNano(),
							 | 
						|
												Ctrl: &mq_pb.ControlMessage{
							 | 
						|
													IsClose:       true,
							 | 
						|
													PublisherName: p.config.PublisherName,
							 | 
						|
												},
							 | 
						|
											})
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 |