diff --git a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go deleted file mode 100644 index 801374244..000000000 --- a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go +++ /dev/null @@ -1,81 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" - "log" - "strings" - "sync" - "time" -) - -var ( - messageCount = flag.Int("n", 1000, "message count") - concurrency = flag.Int("c", 4, "concurrent publishers") - partitionCount = flag.Int("p", 6, "partition count") - - clientName = flag.String("client", "c1", "client name") - - namespace = flag.String("ns", "test", "namespace") - t = flag.String("t", "test", "t") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") -) - -func doPublish(publisher *pub_client.TopicPublisher, id int) { - startTime := time.Now() - for i := 0; i < *messageCount / *concurrency; i++ { - // Simulate publishing a message - key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i)) - value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i)) - if err := publisher.Publish(key, value); err != nil { - fmt.Println(err) - break - } - time.Sleep(time.Second) - // println("Published", string(key), string(value)) - } - if err := publisher.FinishPublish(); err != nil { - fmt.Println(err) - } - elapsed := time.Since(startTime) - log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) -} - -func main() { - flag.Parse() - util_http.InitGlobalHttpClient() - - config := &pub_client.PublisherConfiguration{ - Topic: topic.NewTopic(*namespace, *t), - PartitionCount: int32(*partitionCount), - Brokers: strings.Split(*seedBrokers, ","), - PublisherName: *clientName, - } - publisher, err := pub_client.NewTopicPublisher(config) - if err != nil { - log.Fatalf("Failed to create publisher: %v", err) - } - - startTime := time.Now() - - var wg sync.WaitGroup - // Start multiple publishers - for i := 0; i < *concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - doPublish(publisher, id) - }(i) - } - - // Wait for all publishers to finish - wg.Wait() - elapsed := time.Since(startTime) - publisher.Shutdown() - - log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) - -} diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go deleted file mode 100644 index 1efc2ea33..000000000 --- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go +++ /dev/null @@ -1,141 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" - "github.com/seaweedfs/seaweedfs/weed/mq/schema" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" - "log" - "strings" - "sync" - "sync/atomic" - "time" -) - -var ( - messageCount = flag.Int("n", 1000, "message count") - messageDelay = flag.Duration("d", time.Second, "delay between messages") - concurrency = flag.Int("c", 4, "concurrent publishers") - partitionCount = flag.Int("p", 6, "partition count") - - clientName = flag.String("client", "c1", "client name") - - namespace = flag.String("ns", "test", "namespace") - t = flag.String("t", "test", "t") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - - counter int32 -) - -func doPublish(publisher *pub_client.TopicPublisher, id int) { - startTime := time.Now() - for { - i := atomic.AddInt32(&counter, 1) - if i > int32(*messageCount) { - break - } - // Simulate publishing a message - myRecord := genMyRecord(int32(i)) - if err := publisher.PublishRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil { - fmt.Println(err) - break - } - if *messageDelay > 0 { - time.Sleep(*messageDelay) - fmt.Printf("sent %+v\n", string(myRecord.Key)) - } - } - if err := publisher.FinishPublish(); err != nil { - fmt.Println(err) - } - elapsed := time.Since(startTime) - log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) -} - -type MyRecord struct { - Key []byte - Field1 []byte - Field2 string - Field3 int32 - Field4 int64 - Field5 float32 - Field6 float64 - Field7 bool -} - -func genMyRecord(id int32) *MyRecord { - return &MyRecord{ - Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)), - Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)), - Field2: fmt.Sprintf("field2-%s-%d", *clientName, id), - Field3: id, - Field4: int64(id), - Field5: float32(id), - Field6: float64(id), - Field7: id%2 == 0, - } -} - -func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { - return schema.RecordBegin(). - SetBytes("key", r.Key). - SetBytes("field1", r.Field1). - SetString("field2", r.Field2). - SetInt32("field3", r.Field3). - SetInt64("field4", r.Field4). - SetFloat("field5", r.Field5). - SetDouble("field6", r.Field6). - SetBool("field7", r.Field7). - RecordEnd() -} - -func main() { - flag.Parse() - util_http.InitGlobalHttpClient() - - recordType := schema.RecordTypeBegin(). - WithField("key", schema.TypeBytes). - WithField("field1", schema.TypeBytes). - WithField("field2", schema.TypeString). - WithField("field3", schema.TypeInt32). - WithField("field4", schema.TypeInt64). - WithField("field5", schema.TypeFloat). - WithField("field6", schema.TypeDouble). - WithField("field7", schema.TypeBoolean). - RecordTypeEnd() - - config := &pub_client.PublisherConfiguration{ - Topic: topic.NewTopic(*namespace, *t), - PartitionCount: int32(*partitionCount), - Brokers: strings.Split(*seedBrokers, ","), - PublisherName: *clientName, - RecordType: recordType, - } - publisher, err := pub_client.NewTopicPublisher(config) - if err != nil { - log.Fatalf("Failed to create publisher: %v", err) - } - - startTime := time.Now() - - var wg sync.WaitGroup - // Start multiple publishers - for i := 0; i < *concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - doPublish(publisher, id) - }(i) - } - - // Wait for all publishers to finish - wg.Wait() - elapsed := time.Since(startTime) - publisher.Shutdown() - - log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) - -} diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go deleted file mode 100644 index 328f07c11..000000000 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ /dev/null @@ -1,65 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "github.com/seaweedfs/seaweedfs/weed/util" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "strings" -) - -var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") - slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") - - clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") -) - -func main() { - flag.Parse() - util_http.InitGlobalHttpClient() - - subscriberConfig := &sub_client.SubscriberConfiguration{ - ConsumerGroup: "test", - ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), - GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), - MaxPartitionCount: int32(*maxPartitionCount), - SlidingWindowSize: int32(*slidingWindowSize), - } - - contentConfig := &sub_client.ContentConfiguration{ - Topic: topic.NewTopic(*namespace, *t), - Filter: "", - } - - brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(context.Background(), brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) - - counter := 0 - executors := util.NewLimitedConcurrentExecutor(int(subscriberConfig.SlidingWindowSize)) - subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { - executors.Execute(func() { - counter++ - println(string(m.Data.Key), "=>", string(m.Data.Value), counter) - }) - }) - - subscriber.SetCompletionFunc(func() { - glog.V(0).Infof("done received %d messages", counter) - }) - - if err := subscriber.Subscribe(); err != nil { - fmt.Println(err) - } - -} diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go deleted file mode 100644 index d286a62ca..000000000 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ /dev/null @@ -1,100 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - "github.com/seaweedfs/seaweedfs/weed/util" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/proto" - "strings" - "time" -) - -var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") - slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") - timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") - - clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") -) - -type MyRecord struct { - Key []byte - Field1 []byte - Field2 string - Field3 int32 - Field4 int64 - Field5 float32 - Field6 float64 - Field7 bool -} - -func FromSchemaRecordValue(recordValue *schema_pb.RecordValue) *MyRecord { - return &MyRecord{ - Key: recordValue.Fields["key"].GetBytesValue(), - Field1: recordValue.Fields["field1"].GetBytesValue(), - Field2: recordValue.Fields["field2"].GetStringValue(), - Field3: recordValue.Fields["field3"].GetInt32Value(), - Field4: recordValue.Fields["field4"].GetInt64Value(), - Field5: recordValue.Fields["field5"].GetFloatValue(), - Field6: recordValue.Fields["field6"].GetDoubleValue(), - Field7: recordValue.Fields["field7"].GetBoolValue(), - } -} - -func main() { - flag.Parse() - util_http.InitGlobalHttpClient() - - subscriberConfig := &sub_client.SubscriberConfiguration{ - ConsumerGroup: "test", - ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), - GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), - MaxPartitionCount: int32(*maxPartitionCount), - SlidingWindowSize: int32(*slidingWindowSize), - } - - contentConfig := &sub_client.ContentConfiguration{ - Topic: topic.NewTopic(*namespace, *t), - Filter: "", - // StartTime: time.Now().Add(-*timeAgo), - } - - brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(context.Background(), brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) - - counter := 0 - executors := util.NewLimitedConcurrentExecutor(int(subscriberConfig.SlidingWindowSize)) - subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { - executors.Execute(func() { - counter++ - record := &schema_pb.RecordValue{} - err := proto.Unmarshal(m.Data.Value, record) - if err != nil { - fmt.Printf("unmarshal record value: %v\n", err) - } else { - fmt.Printf("%s %d: %v\n", string(m.Data.Key), len(m.Data.Value), record) - } - }) - }) - - subscriber.SetCompletionFunc(func() { - glog.V(0).Infof("done received %d messages", counter) - }) - - if err := subscriber.Subscribe(); err != nil { - fmt.Println(err) - } - -}