From 0f35b3a4eab7785501b47239f7bf5866df672f6c Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 2 May 2024 08:48:51 -0700 Subject: [PATCH] add example to publish a record --- .../cmd/weed_pub_record/publisher_record.go | 126 ++++++++++++++++++ weed/mq/client/pub_client/publish.go | 9 +- 2 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 weed/mq/client/cmd/weed_pub_record/publisher_record.go diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go new file mode 100644 index 000000000..8e8e6b21c --- /dev/null +++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go @@ -0,0 +1,126 @@ +package main + +import ( + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "log" + "strings" + "sync" + "time" +) + +var ( + messageCount = flag.Int("n", 1000, "message count") + concurrency = flag.Int("c", 4, "concurrent publishers") + partitionCount = flag.Int("p", 6, "partition count") + + clientName = flag.String("client", "c1", "client name") + + namespace = flag.String("ns", "test", "namespace") + t = flag.String("t", "test", "t") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") +) + +func doPublish(publisher *pub_client.TopicPublisher, id int) { + startTime := time.Now() + for i := 0; i < *messageCount / *concurrency; i++ { + // Simulate publishing a message + myRecord := genMyRecord(i) + if err := publisher.PublishRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil { + fmt.Println(err) + break + } + time.Sleep(time.Second) + // println("Published", string(key), string(value)) + } + if err := publisher.FinishPublish(); err != nil { + fmt.Println(err) + } + elapsed := time.Since(startTime) + log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) +} + +type MyRecord struct { + Key []byte + Field1 []byte + Field2 string + Field3 int + Field4 int64 + Field5 float32 + Field6 float64 + Field7 bool +} + +func genMyRecord(id int) *MyRecord { + return &MyRecord{ + Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)), + Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)), + Field2: fmt.Sprintf("field2-%s-%d", *clientName, id), + Field3: id, + Field4: int64(id), + Field5: float32(id), + Field6: float64(id), + Field7: id%2 == 0, + } +} + +func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { + return schema.RecordBegin(). + SetBytes("key", r.Key). + SetBytes("field1", r.Field1). + SetString("field2", r.Field2). + SetInt("field3", int32(r.Field3)). + SetLong("field4", r.Field4). + SetFloat("field5", r.Field5). + SetDouble("field6", r.Field6). + SetBool("field7", r.Field7). + RecordEnd() +} + +func main() { + flag.Parse() + + recordType := schema.RecordTypeBegin(). + WithField("key", schema.TypeBytes). + WithField("field1", schema.TypeBytes). + WithField("field2", schema.TypeString). + WithField("field3", schema.TypeInteger). + WithField("field4", schema.TypeLong). + WithField("field5", schema.TypeFloat). + WithField("field6", schema.TypeDouble). + WithField("field7", schema.TypeBoolean). + RecordTypeEnd() + + config := &pub_client.PublisherConfiguration{ + Topic: topic.NewTopic(*namespace, *t), + PartitionCount: int32(*partitionCount), + Brokers: strings.Split(*seedBrokers, ","), + PublisherName: *clientName, + RecordType: recordType, + } + publisher := pub_client.NewTopicPublisher(config) + + startTime := time.Now() + + var wg sync.WaitGroup + // Start multiple publishers + for i := 0; i < *concurrency; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + doPublish(publisher, id) + }(i) + } + + // Wait for all publishers to finish + wg.Wait() + elapsed := time.Since(startTime) + publisher.Shutdown() + + log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) + +} diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 92a1a1599..0c162d6a0 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -11,6 +11,13 @@ import ( ) func (p *TopicPublisher) Publish(key, value []byte) error { + if p.config.RecordType != nil { + return fmt.Errorf("record type is set, use PublishRecord instead") + } + return p.doPublish(key, value) +} + +func (p *TopicPublisher) doPublish(key, value []byte) error { hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount if hashKey < 0 { hashKey = -hashKey @@ -34,7 +41,7 @@ func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.Record return fmt.Errorf("failed to marshal record value: %v", err) } - return p.Publish(key, value) + return p.doPublish(key, value) } func (p *TopicPublisher) FinishPublish() error {