4 changed files with 0 additions and 387 deletions
-
81weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
-
141weed/mq/client/cmd/weed_pub_record/publisher_record.go
-
65weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
-
100weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@ -1,81 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"flag" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
"log" |
|||
"strings" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
var ( |
|||
messageCount = flag.Int("n", 1000, "message count") |
|||
concurrency = flag.Int("c", 4, "concurrent publishers") |
|||
partitionCount = flag.Int("p", 6, "partition count") |
|||
|
|||
clientName = flag.String("client", "c1", "client name") |
|||
|
|||
namespace = flag.String("ns", "test", "namespace") |
|||
t = flag.String("t", "test", "t") |
|||
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") |
|||
) |
|||
|
|||
func doPublish(publisher *pub_client.TopicPublisher, id int) { |
|||
startTime := time.Now() |
|||
for i := 0; i < *messageCount / *concurrency; i++ { |
|||
// Simulate publishing a message
|
|||
key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i)) |
|||
value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i)) |
|||
if err := publisher.Publish(key, value); err != nil { |
|||
fmt.Println(err) |
|||
break |
|||
} |
|||
time.Sleep(time.Second) |
|||
// println("Published", string(key), string(value))
|
|||
} |
|||
if err := publisher.FinishPublish(); err != nil { |
|||
fmt.Println(err) |
|||
} |
|||
elapsed := time.Since(startTime) |
|||
log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) |
|||
} |
|||
|
|||
func main() { |
|||
flag.Parse() |
|||
util_http.InitGlobalHttpClient() |
|||
|
|||
config := &pub_client.PublisherConfiguration{ |
|||
Topic: topic.NewTopic(*namespace, *t), |
|||
PartitionCount: int32(*partitionCount), |
|||
Brokers: strings.Split(*seedBrokers, ","), |
|||
PublisherName: *clientName, |
|||
} |
|||
publisher, err := pub_client.NewTopicPublisher(config) |
|||
if err != nil { |
|||
log.Fatalf("Failed to create publisher: %v", err) |
|||
} |
|||
|
|||
startTime := time.Now() |
|||
|
|||
var wg sync.WaitGroup |
|||
// Start multiple publishers
|
|||
for i := 0; i < *concurrency; i++ { |
|||
wg.Add(1) |
|||
go func(id int) { |
|||
defer wg.Done() |
|||
doPublish(publisher, id) |
|||
}(i) |
|||
} |
|||
|
|||
// Wait for all publishers to finish
|
|||
wg.Wait() |
|||
elapsed := time.Since(startTime) |
|||
publisher.Shutdown() |
|||
|
|||
log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) |
|||
|
|||
} |
@ -1,141 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"flag" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/schema" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
"log" |
|||
"strings" |
|||
"sync" |
|||
"sync/atomic" |
|||
"time" |
|||
) |
|||
|
|||
var ( |
|||
messageCount = flag.Int("n", 1000, "message count") |
|||
messageDelay = flag.Duration("d", time.Second, "delay between messages") |
|||
concurrency = flag.Int("c", 4, "concurrent publishers") |
|||
partitionCount = flag.Int("p", 6, "partition count") |
|||
|
|||
clientName = flag.String("client", "c1", "client name") |
|||
|
|||
namespace = flag.String("ns", "test", "namespace") |
|||
t = flag.String("t", "test", "t") |
|||
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") |
|||
|
|||
counter int32 |
|||
) |
|||
|
|||
func doPublish(publisher *pub_client.TopicPublisher, id int) { |
|||
startTime := time.Now() |
|||
for { |
|||
i := atomic.AddInt32(&counter, 1) |
|||
if i > int32(*messageCount) { |
|||
break |
|||
} |
|||
// Simulate publishing a message
|
|||
myRecord := genMyRecord(int32(i)) |
|||
if err := publisher.PublishRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil { |
|||
fmt.Println(err) |
|||
break |
|||
} |
|||
if *messageDelay > 0 { |
|||
time.Sleep(*messageDelay) |
|||
fmt.Printf("sent %+v\n", string(myRecord.Key)) |
|||
} |
|||
} |
|||
if err := publisher.FinishPublish(); err != nil { |
|||
fmt.Println(err) |
|||
} |
|||
elapsed := time.Since(startTime) |
|||
log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) |
|||
} |
|||
|
|||
type MyRecord struct { |
|||
Key []byte |
|||
Field1 []byte |
|||
Field2 string |
|||
Field3 int32 |
|||
Field4 int64 |
|||
Field5 float32 |
|||
Field6 float64 |
|||
Field7 bool |
|||
} |
|||
|
|||
func genMyRecord(id int32) *MyRecord { |
|||
return &MyRecord{ |
|||
Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)), |
|||
Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)), |
|||
Field2: fmt.Sprintf("field2-%s-%d", *clientName, id), |
|||
Field3: id, |
|||
Field4: int64(id), |
|||
Field5: float32(id), |
|||
Field6: float64(id), |
|||
Field7: id%2 == 0, |
|||
} |
|||
} |
|||
|
|||
func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { |
|||
return schema.RecordBegin(). |
|||
SetBytes("key", r.Key). |
|||
SetBytes("field1", r.Field1). |
|||
SetString("field2", r.Field2). |
|||
SetInt32("field3", r.Field3). |
|||
SetInt64("field4", r.Field4). |
|||
SetFloat("field5", r.Field5). |
|||
SetDouble("field6", r.Field6). |
|||
SetBool("field7", r.Field7). |
|||
RecordEnd() |
|||
} |
|||
|
|||
func main() { |
|||
flag.Parse() |
|||
util_http.InitGlobalHttpClient() |
|||
|
|||
recordType := schema.RecordTypeBegin(). |
|||
WithField("key", schema.TypeBytes). |
|||
WithField("field1", schema.TypeBytes). |
|||
WithField("field2", schema.TypeString). |
|||
WithField("field3", schema.TypeInt32). |
|||
WithField("field4", schema.TypeInt64). |
|||
WithField("field5", schema.TypeFloat). |
|||
WithField("field6", schema.TypeDouble). |
|||
WithField("field7", schema.TypeBoolean). |
|||
RecordTypeEnd() |
|||
|
|||
config := &pub_client.PublisherConfiguration{ |
|||
Topic: topic.NewTopic(*namespace, *t), |
|||
PartitionCount: int32(*partitionCount), |
|||
Brokers: strings.Split(*seedBrokers, ","), |
|||
PublisherName: *clientName, |
|||
RecordType: recordType, |
|||
} |
|||
publisher, err := pub_client.NewTopicPublisher(config) |
|||
if err != nil { |
|||
log.Fatalf("Failed to create publisher: %v", err) |
|||
} |
|||
|
|||
startTime := time.Now() |
|||
|
|||
var wg sync.WaitGroup |
|||
// Start multiple publishers
|
|||
for i := 0; i < *concurrency; i++ { |
|||
wg.Add(1) |
|||
go func(id int) { |
|||
defer wg.Done() |
|||
doPublish(publisher, id) |
|||
}(i) |
|||
} |
|||
|
|||
// Wait for all publishers to finish
|
|||
wg.Wait() |
|||
elapsed := time.Since(startTime) |
|||
publisher.Shutdown() |
|||
|
|||
log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) |
|||
|
|||
} |
@ -1,65 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"flag" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
"google.golang.org/grpc" |
|||
"google.golang.org/grpc/credentials/insecure" |
|||
"strings" |
|||
) |
|||
|
|||
var ( |
|||
namespace = flag.String("ns", "test", "namespace") |
|||
t = flag.String("topic", "test", "topic") |
|||
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") |
|||
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") |
|||
slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") |
|||
|
|||
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") |
|||
) |
|||
|
|||
func main() { |
|||
flag.Parse() |
|||
util_http.InitGlobalHttpClient() |
|||
|
|||
subscriberConfig := &sub_client.SubscriberConfiguration{ |
|||
ConsumerGroup: "test", |
|||
ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), |
|||
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), |
|||
MaxPartitionCount: int32(*maxPartitionCount), |
|||
SlidingWindowSize: int32(*slidingWindowSize), |
|||
} |
|||
|
|||
contentConfig := &sub_client.ContentConfiguration{ |
|||
Topic: topic.NewTopic(*namespace, *t), |
|||
Filter: "", |
|||
} |
|||
|
|||
brokers := strings.Split(*seedBrokers, ",") |
|||
subscriber := sub_client.NewTopicSubscriber(context.Background(), brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) |
|||
|
|||
counter := 0 |
|||
executors := util.NewLimitedConcurrentExecutor(int(subscriberConfig.SlidingWindowSize)) |
|||
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { |
|||
executors.Execute(func() { |
|||
counter++ |
|||
println(string(m.Data.Key), "=>", string(m.Data.Value), counter) |
|||
}) |
|||
}) |
|||
|
|||
subscriber.SetCompletionFunc(func() { |
|||
glog.V(0).Infof("done received %d messages", counter) |
|||
}) |
|||
|
|||
if err := subscriber.Subscribe(); err != nil { |
|||
fmt.Println(err) |
|||
} |
|||
|
|||
} |
@ -1,100 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"flag" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|||
"google.golang.org/grpc" |
|||
"google.golang.org/grpc/credentials/insecure" |
|||
"google.golang.org/protobuf/proto" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
var ( |
|||
namespace = flag.String("ns", "test", "namespace") |
|||
t = flag.String("topic", "test", "topic") |
|||
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") |
|||
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") |
|||
slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") |
|||
timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") |
|||
|
|||
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") |
|||
) |
|||
|
|||
type MyRecord struct { |
|||
Key []byte |
|||
Field1 []byte |
|||
Field2 string |
|||
Field3 int32 |
|||
Field4 int64 |
|||
Field5 float32 |
|||
Field6 float64 |
|||
Field7 bool |
|||
} |
|||
|
|||
func FromSchemaRecordValue(recordValue *schema_pb.RecordValue) *MyRecord { |
|||
return &MyRecord{ |
|||
Key: recordValue.Fields["key"].GetBytesValue(), |
|||
Field1: recordValue.Fields["field1"].GetBytesValue(), |
|||
Field2: recordValue.Fields["field2"].GetStringValue(), |
|||
Field3: recordValue.Fields["field3"].GetInt32Value(), |
|||
Field4: recordValue.Fields["field4"].GetInt64Value(), |
|||
Field5: recordValue.Fields["field5"].GetFloatValue(), |
|||
Field6: recordValue.Fields["field6"].GetDoubleValue(), |
|||
Field7: recordValue.Fields["field7"].GetBoolValue(), |
|||
} |
|||
} |
|||
|
|||
func main() { |
|||
flag.Parse() |
|||
util_http.InitGlobalHttpClient() |
|||
|
|||
subscriberConfig := &sub_client.SubscriberConfiguration{ |
|||
ConsumerGroup: "test", |
|||
ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), |
|||
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), |
|||
MaxPartitionCount: int32(*maxPartitionCount), |
|||
SlidingWindowSize: int32(*slidingWindowSize), |
|||
} |
|||
|
|||
contentConfig := &sub_client.ContentConfiguration{ |
|||
Topic: topic.NewTopic(*namespace, *t), |
|||
Filter: "", |
|||
// StartTime: time.Now().Add(-*timeAgo),
|
|||
} |
|||
|
|||
brokers := strings.Split(*seedBrokers, ",") |
|||
subscriber := sub_client.NewTopicSubscriber(context.Background(), brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) |
|||
|
|||
counter := 0 |
|||
executors := util.NewLimitedConcurrentExecutor(int(subscriberConfig.SlidingWindowSize)) |
|||
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { |
|||
executors.Execute(func() { |
|||
counter++ |
|||
record := &schema_pb.RecordValue{} |
|||
err := proto.Unmarshal(m.Data.Value, record) |
|||
if err != nil { |
|||
fmt.Printf("unmarshal record value: %v\n", err) |
|||
} else { |
|||
fmt.Printf("%s %d: %v\n", string(m.Data.Key), len(m.Data.Value), record) |
|||
} |
|||
}) |
|||
}) |
|||
|
|||
subscriber.SetCompletionFunc(func() { |
|||
glog.V(0).Infof("done received %d messages", counter) |
|||
}) |
|||
|
|||
if err := subscriber.Subscribe(); err != nil { |
|||
fmt.Println(err) |
|||
} |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue