diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index fbb07b042..92a1a1599 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -2,8 +2,10 @@ package pub_client import ( "fmt" + "github.com/golang/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util" "time" ) @@ -25,6 +27,16 @@ func (p *TopicPublisher) Publish(key, value []byte) error { }) } +func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.RecordValue) error { + // serialize record value + value, err := proto.Marshal(recordValue) + if err != nil { + return fmt.Errorf("failed to marshal record value: %v", err) + } + + return p.Publish(key, value) +} + func (p *TopicPublisher) FinishPublish() error { if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { for _, inputBuffer := range inputBuffers { diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 4dfce4030..04c2ebe08 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -17,6 +18,7 @@ type PublisherConfiguration struct { PartitionCount int32 Brokers []string PublisherName string // for debugging + RecordType *schema_pb.RecordType } type PublishClient struct { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index a8d7079c9..c32343e0d 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -236,6 +236,7 @@ func (p *TopicPublisher) doConfigureTopic() (err error) { _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ Topic: p.config.Topic.ToPbTopic(), PartitionCount: p.config.PartitionCount, + RecordType: p.config.RecordType, // TODO schema upgrade }) return err })