|
|
package pub_client
import ( "fmt" "github.com/golang/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util" "time" )
func (p *TopicPublisher) Publish(key, value []byte) error { if p.config.RecordType != nil { return fmt.Errorf("record type is set, use PublishRecord instead") } return p.doPublish(key, value) }
func (p *TopicPublisher) doPublish(key, value []byte) error { hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount if hashKey < 0 { hashKey = -hashKey } inputBuffer, found := p.partition2Buffer.Floor(hashKey+1, hashKey+1) if !found { return fmt.Errorf("no input buffer found for key %d", hashKey) }
return inputBuffer.Enqueue(&mq_pb.DataMessage{ Key: key, Value: value, TsNs: time.Now().UnixNano(), }) }
func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.RecordValue) error { // serialize record value
value, err := proto.Marshal(recordValue) if err != nil { return fmt.Errorf("failed to marshal record value: %v", err) }
return p.doPublish(key, value) }
func (p *TopicPublisher) FinishPublish() error { if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { for _, inputBuffer := range inputBuffers { inputBuffer.Enqueue(&mq_pb.DataMessage{ TsNs: time.Now().UnixNano(), Ctrl: &mq_pb.ControlMessage{ IsClose: true, }, }) } }
return nil }
|