Browse Source

refactor

mq
chrislu 1 week ago
parent
commit
b424a5256d
  1. 63
      weed/mq/client/cmd/agent_pub_record/agent_pub_record.go
  2. 43
      weed/mq/client/cmd/example/my_record.go

63
weed/mq/client/cmd/agent_pub_record/agent_pub_record.go

@ -4,8 +4,8 @@ import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/client/cmd/example"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"log"
"sync"
"sync/atomic"
@ -27,6 +27,19 @@ var (
counter int32
)
func genMyRecord(id int32) *example.MyRecord {
return &example.MyRecord{
Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)),
Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)),
Field2: fmt.Sprintf("field2-%s-%d", *clientName, id),
Field3: id,
Field4: int64(id),
Field5: float32(id),
Field6: float64(id),
Field7: id%2 == 0,
}
}
func doPublish(publisher *agent_client.PublishSession, id int) {
startTime := time.Now()
for {
@ -49,56 +62,10 @@ func doPublish(publisher *agent_client.PublishSession, id int) {
log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
}
type MyRecord struct {
Key []byte
Field1 []byte
Field2 string
Field3 int32
Field4 int64
Field5 float32
Field6 float64
Field7 bool
}
func genMyRecord(id int32) *MyRecord {
return &MyRecord{
Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)),
Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)),
Field2: fmt.Sprintf("field2-%s-%d", *clientName, id),
Field3: id,
Field4: int64(id),
Field5: float32(id),
Field6: float64(id),
Field7: id%2 == 0,
}
}
func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
return schema.RecordBegin().
SetBytes("key", r.Key).
SetBytes("field1", r.Field1).
SetString("field2", r.Field2).
SetInt32("field3", r.Field3).
SetInt64("field4", r.Field4).
SetFloat("field5", r.Field5).
SetDouble("field6", r.Field6).
SetBool("field7", r.Field7).
RecordEnd()
}
func main() {
flag.Parse()
recordType := schema.RecordTypeBegin().
WithField("key", schema.TypeBytes).
WithField("field1", schema.TypeBytes).
WithField("field2", schema.TypeString).
WithField("field3", schema.TypeInt32).
WithField("field4", schema.TypeInt64).
WithField("field5", schema.TypeFloat).
WithField("field6", schema.TypeDouble).
WithField("field7", schema.TypeBoolean).
RecordTypeEnd()
recordType := example.MyRecordType()
session, err := agent_client.NewPublishSession(*agent, schema.NewSchema(*namespace, *t, recordType), *partitionCount, *clientName)
if err != nil {

43
weed/mq/client/cmd/example/my_record.go

@ -0,0 +1,43 @@
package example
import (
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
type MyRecord struct {
Key []byte
Field1 []byte
Field2 string
Field3 int32
Field4 int64
Field5 float32
Field6 float64
Field7 bool
}
func MyRecordType() *schema_pb.RecordType {
return schema.RecordTypeBegin().
WithField("key", schema.TypeBytes).
WithField("field1", schema.TypeBytes).
WithField("field2", schema.TypeString).
WithField("field3", schema.TypeInt32).
WithField("field4", schema.TypeInt64).
WithField("field5", schema.TypeFloat).
WithField("field6", schema.TypeDouble).
WithField("field7", schema.TypeBoolean).
RecordTypeEnd()
}
func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
return schema.RecordBegin().
SetBytes("key", r.Key).
SetBytes("field1", r.Field1).
SetString("field2", r.Field2).
SetInt32("field3", r.Field3).
SetInt64("field4", r.Field4).
SetFloat("field5", r.Field5).
SetDouble("field6", r.Field6).
SetBool("field7", r.Field7).
RecordEnd()
}
Loading…
Cancel
Save