From 4a3e8869bb49654f8af5932b3c3482760a416fcf Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 2 May 2024 09:56:11 -0700 Subject: [PATCH] subscriber can unmarshal the record --- .../cmd/weed_pub_record/publisher_record.go | 8 +- .../cmd/weed_sub_record/subscriber_record.go | 90 +++++++++++++++++++ 2 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 weed/mq/client/cmd/weed_sub_record/subscriber_record.go diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go index 1c9eeb370..d3efea25e 100644 --- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go +++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go @@ -29,7 +29,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { startTime := time.Now() for i := 0; i < *messageCount / *concurrency; i++ { // Simulate publishing a message - myRecord := genMyRecord(i) + myRecord := genMyRecord(int32(i)) if err := publisher.PublishRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil { fmt.Println(err) break @@ -48,14 +48,14 @@ type MyRecord struct { Key []byte Field1 []byte Field2 string - Field3 int + Field3 int32 Field4 int64 Field5 float32 Field6 float64 Field7 bool } -func genMyRecord(id int) *MyRecord { +func genMyRecord(id int32) *MyRecord { return &MyRecord{ Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)), Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)), @@ -73,7 +73,7 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { SetBytes("key", r.Key). SetBytes("field1", r.Field1). SetString("field2", r.Field2). - SetInt32("field3", int32(r.Field3)). + SetInt32("field3", r.Field3). SetInt64("field4", r.Field4). SetFloat32("field5", r.Field5). SetFloat64("field6", r.Field6). diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go new file mode 100644 index 000000000..53eb4f15b --- /dev/null +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -0,0 +1,90 @@ +package main + +import ( + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" + "strings" + "time" +) + +var ( + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + + clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") +) + +type MyRecord struct { + Key []byte + Field1 []byte + Field2 string + Field3 int32 + Field4 int64 + Field5 float32 + Field6 float64 + Field7 bool +} + +func FromSchemaRecordValue(recordValue *schema_pb.RecordValue) *MyRecord { + return &MyRecord{ + Key: recordValue.Fields["key"].GetBytesValue(), + Field1: recordValue.Fields["field1"].GetBytesValue(), + Field2: recordValue.Fields["field2"].GetStringValue(), + Field3: recordValue.Fields["field3"].GetInt32Value(), + Field4: recordValue.Fields["field4"].GetInt64Value(), + Field5: recordValue.Fields["field5"].GetFloatValue(), + Field6: recordValue.Fields["field6"].GetDoubleValue(), + Field7: recordValue.Fields["field7"].GetBoolValue(), + } +} + +func main() { + flag.Parse() + + subscriberConfig := &sub_client.SubscriberConfiguration{ + ClientId: fmt.Sprintf("client-%d", *clientId), + ConsumerGroup: "test", + ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), + GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + contentConfig := &sub_client.ContentConfiguration{ + Topic: topic.NewTopic(*namespace, *t), + Filter: "", + StartTime: time.Unix(1, 1), + } + + processorConfig := sub_client.ProcessorConfiguration{ + ConcurrentPartitionLimit: 3, + } + + brokers := strings.Split(*seedBrokers, ",") + subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) + + counter := 0 + subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) { + counter++ + record := &schema_pb.RecordValue{} + proto.Unmarshal(value, record) + fmt.Printf("record: %v\n", record) + return true, nil + }) + + subscriber.SetCompletionFunc(func() { + glog.V(0).Infof("done received %d messages", counter) + }) + + if err := subscriber.Subscribe(); err != nil { + fmt.Println(err) + } + +}