diff --git a/weed/mq/client/agent_client/subscribe_session.go b/weed/mq/client/agent_client/subscribe_session.go index 79f117219..6105ff73d 100644 --- a/weed/mq/client/agent_client/subscribe_session.go +++ b/weed/mq/client/agent_client/subscribe_session.go @@ -16,7 +16,7 @@ type SubscribeOption struct { Topic topic.Topic Filter string MaxSubscribedPartitions int32 - PerPartitionConcurrency int32 + SlidingWindowSize int32 } type SubscribeSession struct { @@ -41,6 +41,7 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri }, MaxSubscribedPartitions: option.MaxSubscribedPartitions, Filter: option.Filter, + SlidingWindowSize: option.SlidingWindowSize, }) if err != nil { return nil, err diff --git a/weed/mq/client/cmd/agent_sub_record/agent_sub_record.go b/weed/mq/client/cmd/agent_sub_record/agent_sub_record.go index c17938515..75318bd72 100644 --- a/weed/mq/client/cmd/agent_sub_record/agent_sub_record.go +++ b/weed/mq/client/cmd/agent_sub_record/agent_sub_record.go @@ -14,12 +14,12 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - agent = flag.String("agent", "localhost:16777", "mq agent address") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") - perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") - timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + agent = flag.String("agent", "localhost:16777", "mq agent address") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") + timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -34,7 +34,7 @@ func main() { Topic: topic.NewTopic(*namespace, *t), Filter: "", MaxSubscribedPartitions: int32(*maxPartitionCount), - PerPartitionConcurrency: int32(*perPartitionConcurrency), + SlidingWindowSize: int32(*slidingWindowSize), }) if err != nil { log.Printf("new subscribe session: %v", err) diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index f925aa1e1..9bc6ec983 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -15,11 +15,11 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") - perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -33,7 +33,7 @@ func main() { ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), MaxPartitionCount: int32(*maxPartitionCount), - SlidingWindowSize: int32(*perPartitionConcurrency), + SlidingWindowSize: int32(*slidingWindowSize), } contentConfig := &sub_client.ContentConfiguration{ diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index dee289f19..ee35e2486 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -18,12 +18,12 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") - perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") - timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") + timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") ) @@ -61,7 +61,7 @@ func main() { ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), MaxPartitionCount: int32(*maxPartitionCount), - SlidingWindowSize: int32(*perPartitionConcurrency), + SlidingWindowSize: int32(*slidingWindowSize), } contentConfig := &sub_client.ContentConfiguration{