From b424a5256d0b0b53e7826d8ab10c97afaa1bc14a Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 23 Feb 2025 14:14:27 -0800 Subject: [PATCH] refactor --- .../cmd/agent_pub_record/agent_pub_record.go | 63 +++++-------------- weed/mq/client/cmd/example/my_record.go | 43 +++++++++++++ 2 files changed, 58 insertions(+), 48 deletions(-) create mode 100644 weed/mq/client/cmd/example/my_record.go diff --git a/weed/mq/client/cmd/agent_pub_record/agent_pub_record.go b/weed/mq/client/cmd/agent_pub_record/agent_pub_record.go index 4863456eb..c7b651fff 100644 --- a/weed/mq/client/cmd/agent_pub_record/agent_pub_record.go +++ b/weed/mq/client/cmd/agent_pub_record/agent_pub_record.go @@ -4,8 +4,8 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client" + "github.com/seaweedfs/seaweedfs/weed/mq/client/cmd/example" "github.com/seaweedfs/seaweedfs/weed/mq/schema" - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "log" "sync" "sync/atomic" @@ -27,6 +27,19 @@ var ( counter int32 ) +func genMyRecord(id int32) *example.MyRecord { + return &example.MyRecord{ + Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)), + Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)), + Field2: fmt.Sprintf("field2-%s-%d", *clientName, id), + Field3: id, + Field4: int64(id), + Field5: float32(id), + Field6: float64(id), + Field7: id%2 == 0, + } +} + func doPublish(publisher *agent_client.PublishSession, id int) { startTime := time.Now() for { @@ -49,56 +62,10 @@ func doPublish(publisher *agent_client.PublishSession, id int) { log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) } -type MyRecord struct { - Key []byte - Field1 []byte - Field2 string - Field3 int32 - Field4 int64 - Field5 float32 - Field6 float64 - Field7 bool -} - -func genMyRecord(id int32) *MyRecord { - return &MyRecord{ - Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)), - Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)), - Field2: fmt.Sprintf("field2-%s-%d", *clientName, id), - Field3: id, - Field4: int64(id), - Field5: float32(id), - Field6: float64(id), - Field7: id%2 == 0, - } -} - -func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { - return schema.RecordBegin(). - SetBytes("key", r.Key). - SetBytes("field1", r.Field1). - SetString("field2", r.Field2). - SetInt32("field3", r.Field3). - SetInt64("field4", r.Field4). - SetFloat("field5", r.Field5). - SetDouble("field6", r.Field6). - SetBool("field7", r.Field7). - RecordEnd() -} - func main() { flag.Parse() - recordType := schema.RecordTypeBegin(). - WithField("key", schema.TypeBytes). - WithField("field1", schema.TypeBytes). - WithField("field2", schema.TypeString). - WithField("field3", schema.TypeInt32). - WithField("field4", schema.TypeInt64). - WithField("field5", schema.TypeFloat). - WithField("field6", schema.TypeDouble). - WithField("field7", schema.TypeBoolean). - RecordTypeEnd() + recordType := example.MyRecordType() session, err := agent_client.NewPublishSession(*agent, schema.NewSchema(*namespace, *t, recordType), *partitionCount, *clientName) if err != nil { diff --git a/weed/mq/client/cmd/example/my_record.go b/weed/mq/client/cmd/example/my_record.go new file mode 100644 index 000000000..2ca693438 --- /dev/null +++ b/weed/mq/client/cmd/example/my_record.go @@ -0,0 +1,43 @@ +package example + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +type MyRecord struct { + Key []byte + Field1 []byte + Field2 string + Field3 int32 + Field4 int64 + Field5 float32 + Field6 float64 + Field7 bool +} + +func MyRecordType() *schema_pb.RecordType { + return schema.RecordTypeBegin(). + WithField("key", schema.TypeBytes). + WithField("field1", schema.TypeBytes). + WithField("field2", schema.TypeString). + WithField("field3", schema.TypeInt32). + WithField("field4", schema.TypeInt64). + WithField("field5", schema.TypeFloat). + WithField("field6", schema.TypeDouble). + WithField("field7", schema.TypeBoolean). + RecordTypeEnd() +} + +func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { + return schema.RecordBegin(). + SetBytes("key", r.Key). + SetBytes("field1", r.Field1). + SetString("field2", r.Field2). + SetInt32("field3", r.Field3). + SetInt64("field4", r.Field4). + SetFloat("field5", r.Field5). + SetDouble("field6", r.Field6). + SetBool("field7", r.Field7). + RecordEnd() +}