chrislu 6 days ago
parent
commit
db8f1e29ad
  1. 3
      weed/mq/client/agent_client/subscribe_session.go
  2. 14
      weed/mq/client/cmd/agent_sub_record/agent_sub_record.go
  3. 12
      weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
  4. 14
      weed/mq/client/cmd/weed_sub_record/subscriber_record.go

3
weed/mq/client/agent_client/subscribe_session.go

@ -16,7 +16,7 @@ type SubscribeOption struct {
Topic topic.Topic Topic topic.Topic
Filter string Filter string
MaxSubscribedPartitions int32 MaxSubscribedPartitions int32
PerPartitionConcurrency int32
SlidingWindowSize int32
} }
type SubscribeSession struct { type SubscribeSession struct {
@ -41,6 +41,7 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri
}, },
MaxSubscribedPartitions: option.MaxSubscribedPartitions, MaxSubscribedPartitions: option.MaxSubscribedPartitions,
Filter: option.Filter, Filter: option.Filter,
SlidingWindowSize: option.SlidingWindowSize,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

14
weed/mq/client/cmd/agent_sub_record/agent_sub_record.go

@ -14,12 +14,12 @@ import (
) )
var ( var (
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
agent = flag.String("agent", "localhost:16777", "mq agent address")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
agent = flag.String("agent", "localhost:16777", "mq agent address")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency")
timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
) )
@ -34,7 +34,7 @@ func main() {
Topic: topic.NewTopic(*namespace, *t), Topic: topic.NewTopic(*namespace, *t),
Filter: "", Filter: "",
MaxSubscribedPartitions: int32(*maxPartitionCount), MaxSubscribedPartitions: int32(*maxPartitionCount),
PerPartitionConcurrency: int32(*perPartitionConcurrency),
SlidingWindowSize: int32(*slidingWindowSize),
}) })
if err != nil { if err != nil {
log.Printf("new subscribe session: %v", err) log.Printf("new subscribe session: %v", err)

12
weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go

@ -15,11 +15,11 @@ import (
) )
var ( var (
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
) )
@ -33,7 +33,7 @@ func main() {
ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
MaxPartitionCount: int32(*maxPartitionCount), MaxPartitionCount: int32(*maxPartitionCount),
SlidingWindowSize: int32(*perPartitionConcurrency),
SlidingWindowSize: int32(*slidingWindowSize),
} }
contentConfig := &sub_client.ContentConfiguration{ contentConfig := &sub_client.ContentConfiguration{

14
weed/mq/client/cmd/weed_sub_record/subscriber_record.go

@ -18,12 +18,12 @@ import (
) )
var ( var (
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency")
timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
) )
@ -61,7 +61,7 @@ func main() {
ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
MaxPartitionCount: int32(*maxPartitionCount), MaxPartitionCount: int32(*maxPartitionCount),
SlidingWindowSize: int32(*perPartitionConcurrency),
SlidingWindowSize: int32(*slidingWindowSize),
} }
contentConfig := &sub_client.ContentConfiguration{ contentConfig := &sub_client.ContentConfiguration{

Loading…
Cancel
Save