diff --git a/weed/mq/agent/agent_grpc_sub_session.go b/weed/mq/agent/agent_grpc_sub_session.go deleted file mode 100644 index b01fbcc65..000000000 --- a/weed/mq/agent/agent_grpc_sub_session.go +++ /dev/null @@ -1,46 +0,0 @@ -package agent - -import ( - "context" - "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "math/rand/v2" -) - -func (a *MessageQueueAgent) StartSubscribeSession(ctx context.Context, req *mq_agent_pb.StartSubscribeSessionRequest) (*mq_agent_pb.StartSubscribeSessionResponse, error) { - sessionId := rand.Int64() - - subscriberConfig := &sub_client.SubscriberConfiguration{ - ConsumerGroup: req.ConsumerGroup, - ConsumerGroupInstanceId: req.ConsumerGroupInstanceId, - GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), - MaxPartitionCount: req.MaxSubscribedPartitions, - SlidingWindowSize: req.SlidingWindowSize, - } - - contentConfig := &sub_client.ContentConfiguration{ - Topic: topic.FromPbTopic(req.Topic), - Filter: req.Filter, - PartitionOffsets: req.PartitionOffsets, - } - - topicSubscriber := sub_client.NewTopicSubscriber( - a.brokersList(), - subscriberConfig, - contentConfig, - make(chan sub_client.KeyedOffset, 1024), - ) - - a.subscribersLock.Lock() - a.subscribers[SessionId(sessionId)] = &SessionEntry[*sub_client.TopicSubscriber]{ - entry: topicSubscriber, - } - a.subscribersLock.Unlock() - - return &mq_agent_pb.StartSubscribeSessionResponse{ - SessionId: sessionId, - }, nil -} diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go index df438c05e..285ad88d1 100644 --- a/weed/mq/agent/agent_grpc_subscribe.go +++ b/weed/mq/agent/agent_grpc_subscribe.go @@ -1,13 +1,16 @@ package agent import ( - "fmt" + "context" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" ) @@ -18,22 +21,12 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA if err != nil { return err } - sessionId := SessionId(initMessage.SessionId) - a.subscribersLock.RLock() - subscriberEntry, found := a.subscribers[sessionId] - a.subscribersLock.RUnlock() - if !found { - return fmt.Errorf("subscribe session id %d not found", sessionId) - } - defer func() { - a.subscribersLock.Lock() - delete(a.subscribers, sessionId) - a.subscribersLock.Unlock() - }() + + subscriber := a.handleInitSubscribeRecordRequest(stream.Context(), initMessage.Init) var lastErr error - executors := util.NewLimitedConcurrentExecutor(int(subscriberEntry.entry.SubscriberConfig.SlidingWindowSize)) - subscriberEntry.entry.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + executors := util.NewLimitedConcurrentExecutor(int(subscriber.SubscriberConfig.SlidingWindowSize)) + subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { executors.Execute(func() { record := &schema_pb.RecordValue{} err := proto.Unmarshal(m.Data.Value, record) @@ -58,9 +51,9 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA }) go func() { - subErr := subscriberEntry.entry.Subscribe() + subErr := subscriber.Subscribe() if subErr != nil { - glog.V(0).Infof("subscriber %d subscribe: %v", sessionId, subErr) + glog.V(0).Infof("subscriber %s subscribe: %v", subscriber.SubscriberConfig.String(), subErr) if lastErr == nil { lastErr = subErr } @@ -70,14 +63,41 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA for { m, err := stream.Recv() if err != nil { - glog.V(0).Infof("subscriber %d receive: %v", sessionId, err) + glog.V(0).Infof("subscriber %d receive: %v", subscriber.SubscriberConfig.String(), err) return err } if m != nil { - subscriberEntry.entry.PartitionOffsetChan <- sub_client.KeyedOffset{ + subscriber.PartitionOffsetChan <- sub_client.KeyedOffset{ Key: m.AckKey, Offset: m.AckSequence, } } } } + +func (a *MessageQueueAgent) handleInitSubscribeRecordRequest(ctx context.Context, req *mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest) *sub_client.TopicSubscriber { + + subscriberConfig := &sub_client.SubscriberConfiguration{ + ConsumerGroup: req.ConsumerGroup, + ConsumerGroupInstanceId: req.ConsumerGroupInstanceId, + GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + MaxPartitionCount: req.MaxSubscribedPartitions, + SlidingWindowSize: req.SlidingWindowSize, + } + + contentConfig := &sub_client.ContentConfiguration{ + Topic: topic.FromPbTopic(req.Topic), + Filter: req.Filter, + PartitionOffsets: req.PartitionOffsets, + } + + topicSubscriber := sub_client.NewTopicSubscriber( + ctx, + a.brokersList(), + subscriberConfig, + contentConfig, + make(chan sub_client.KeyedOffset, 1024), + ) + + return topicSubscriber +} diff --git a/weed/mq/client/agent_client/subscribe_session.go b/weed/mq/client/agent_client/subscribe_session.go index 6105ff73d..32273ce4f 100644 --- a/weed/mq/client/agent_client/subscribe_session.go +++ b/weed/mq/client/agent_client/subscribe_session.go @@ -32,7 +32,7 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri } agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn) - resp, err := agentClient.StartSubscribeSession(context.Background(), &mq_agent_pb.StartSubscribeSessionRequest{ + initRequest := &mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest{ ConsumerGroup: option.ConsumerGroup, ConsumerGroupInstanceId: option.ConsumerGroupInstanceId, Topic: &schema_pb.Topic{ @@ -42,12 +42,6 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri MaxSubscribedPartitions: option.MaxSubscribedPartitions, Filter: option.Filter, SlidingWindowSize: option.SlidingWindowSize, - }) - if err != nil { - return nil, err - } - if resp.Error != "" { - return nil, fmt.Errorf("start subscribe session: %v", resp.Error) } stream, err := agentClient.SubscribeRecord(context.Background()) @@ -56,7 +50,7 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri } if err = stream.Send(&mq_agent_pb.SubscribeRecordRequest{ - SessionId: resp.SessionId, + Init: initRequest, }); err != nil { return nil, fmt.Errorf("send session id: %v", err) } diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index 9bc6ec983..328f07c11 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -42,7 +43,7 @@ func main() { } brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) + subscriber := sub_client.NewTopicSubscriber(context.Background(), brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) counter := 0 executors := util.NewLimitedConcurrentExecutor(int(subscriberConfig.SlidingWindowSize)) diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index ee35e2486..d286a62ca 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -71,7 +72,7 @@ func main() { } brokers := strings.Split(*seedBrokers, ",") - subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) + subscriber := sub_client.NewTopicSubscriber(context.Background(), brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024)) counter := 0 executors := util.NewLimitedConcurrentExecutor(int(subscriberConfig.SlidingWindowSize)) diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index d05ddb960..e88aaca2f 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -1,7 +1,6 @@ package sub_client import ( - "context" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -12,10 +11,17 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { waitTime := 1 * time.Second for { for _, broker := range sub.bootstrapBrokers { + + select { + case <-sub.ctx.Done(): + return + default: + } + // lookup topic brokers var brokerLeader string err := pb.WithBrokerGrpcClient(false, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - resp, err := client.FindBrokerLeader(context.Background(), &mq_pb.FindBrokerLeaderRequest{}) + resp, err := client.FindBrokerLeader(sub.ctx, &mq_pb.FindBrokerLeaderRequest{}) if err != nil { return err } @@ -30,10 +36,8 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { // connect to the balancer pb.WithBrokerGrpcClient(true, brokerLeader, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscriberToSubCoordinator(ctx) + stream, err := client.SubscriberToSubCoordinator(sub.ctx) if err != nil { glog.V(0).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err) return err @@ -58,6 +62,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { go func() { for reply := range sub.brokerPartitionAssignmentAckChan { + + select { + case <-sub.ctx.Done(): + return + default: + } + glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply) if err := stream.Send(reply); err != nil { glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err) @@ -73,6 +84,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err) return err } + + select { + case <-sub.ctx.Done(): + return nil + default: + } + sub.brokerPartitionAssignmentChan <- resp glog.V(0).Infof("Received assignment: %+v", resp) } diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index b22e0aa09..bbad9636d 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -2,6 +2,7 @@ package sub_client import ( "context" + "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -62,6 +63,9 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig go func() { for { select { + case <-sub.ctx.Done(): + subscribeClient.CloseSend() + return case <-stopCh: subscribeClient.CloseSend() return @@ -86,12 +90,24 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { + if errors.Is(err, io.EOF) { + return nil + } return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) continue } + + select { + case <-sub.ctx.Done(): + return nil + case <-stopCh: + return nil + default: + } + switch m := resp.Message.(type) { case *mq_pb.SubscribeMessageResponse_Data: if m.Data.Ctrl != nil { diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 8f4a47c62..707b5581f 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -1,6 +1,7 @@ package sub_client import ( + "context" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -17,6 +18,10 @@ type SubscriberConfiguration struct { SlidingWindowSize int32 // how many messages to process concurrently per partition } +func (s *SubscriberConfiguration) String() string { + return "ClientId: " + s.ClientId + ", ConsumerGroup: " + s.ConsumerGroup + ", ConsumerGroupInstanceId: " + s.ConsumerGroupInstanceId +} + type ContentConfiguration struct { Topic topic.Topic Filter string @@ -27,6 +32,7 @@ type OnDataMessageFn func(m *mq_pb.SubscribeMessageResponse_Data) type OnCompletionFunc func() type TopicSubscriber struct { + ctx context.Context SubscriberConfig *SubscriberConfiguration ContentConfig *ContentConfiguration brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse @@ -39,8 +45,9 @@ type TopicSubscriber struct { PartitionOffsetChan chan KeyedOffset } -func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber { +func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber { return &TopicSubscriber{ + ctx: ctx, SubscriberConfig: subscriber, ContentConfig: content, brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024), diff --git a/weed/pb/mq_agent.proto b/weed/pb/mq_agent.proto index 605f2a31c..e238cdf53 100644 --- a/weed/pb/mq_agent.proto +++ b/weed/pb/mq_agent.proto @@ -21,10 +21,6 @@ service SeaweedMessagingAgent { } // Subscribing - rpc StartSubscribeSession (StartSubscribeSessionRequest) returns (StartSubscribeSessionResponse) { - } - rpc CloseSubscribeSession (CloseSubscribeSessionRequest) returns (CloseSubscribeSessionResponse) { - } rpc SubscribeRecord (stream SubscribeRecordRequest) returns (stream SubscribeRecordResponse) { } @@ -59,28 +55,17 @@ message PublishRecordResponse { string error = 2; } ////////////////////////////////////////////////// -message StartSubscribeSessionRequest { - string consumer_group = 1; - string consumer_group_instance_id = 2; - schema_pb.Topic topic = 4; - repeated schema_pb.PartitionOffset partition_offsets = 5; - string filter = 6; - int32 max_subscribed_partitions = 8; - int32 sliding_window_size = 9; -} -message StartSubscribeSessionResponse { - string error = 1; - int64 session_id = 2; -} -message CloseSubscribeSessionRequest { - int64 session_id = 1; -} -message CloseSubscribeSessionResponse { - string error = 1; -} -////////////////////////////////////////////////// message SubscribeRecordRequest { - int64 session_id = 1; // session_id is required for the first record + message InitSubscribeRecordRequest { + string consumer_group = 1; + string consumer_group_instance_id = 2; + schema_pb.Topic topic = 4; + repeated schema_pb.PartitionOffset partition_offsets = 5; + string filter = 6; + int32 max_subscribed_partitions = 8; + int32 sliding_window_size = 9; + } + InitSubscribeRecordRequest init = 1; int64 ack_sequence = 2; bytes ack_key = 3; } diff --git a/weed/pb/mq_agent_pb/mq_agent.pb.go b/weed/pb/mq_agent_pb/mq_agent.pb.go index 9b280c076..fbaad42f7 100644 --- a/weed/pb/mq_agent_pb/mq_agent.pb.go +++ b/weed/pb/mq_agent_pb/mq_agent.pb.go @@ -362,22 +362,18 @@ func (x *PublishRecordResponse) GetError() string { } // //////////////////////////////////////////////// -type StartSubscribeSessionRequest struct { +type SubscribeRecordRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"` - ConsumerGroupInstanceId string `protobuf:"bytes,2,opt,name=consumer_group_instance_id,json=consumerGroupInstanceId,proto3" json:"consumer_group_instance_id,omitempty"` - Topic *schema_pb.Topic `protobuf:"bytes,4,opt,name=topic,proto3" json:"topic,omitempty"` - PartitionOffsets []*schema_pb.PartitionOffset `protobuf:"bytes,5,rep,name=partition_offsets,json=partitionOffsets,proto3" json:"partition_offsets,omitempty"` - Filter string `protobuf:"bytes,6,opt,name=filter,proto3" json:"filter,omitempty"` - MaxSubscribedPartitions int32 `protobuf:"varint,8,opt,name=max_subscribed_partitions,json=maxSubscribedPartitions,proto3" json:"max_subscribed_partitions,omitempty"` - SlidingWindowSize int32 `protobuf:"varint,9,opt,name=sliding_window_size,json=slidingWindowSize,proto3" json:"sliding_window_size,omitempty"` + Init *SubscribeRecordRequest_InitSubscribeRecordRequest `protobuf:"bytes,1,opt,name=init,proto3" json:"init,omitempty"` + AckSequence int64 `protobuf:"varint,2,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"` + AckKey []byte `protobuf:"bytes,3,opt,name=ack_key,json=ackKey,proto3" json:"ack_key,omitempty"` } -func (x *StartSubscribeSessionRequest) Reset() { - *x = StartSubscribeSessionRequest{} +func (x *SubscribeRecordRequest) Reset() { + *x = SubscribeRecordRequest{} if protoimpl.UnsafeEnabled { mi := &file_mq_agent_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -385,13 +381,13 @@ func (x *StartSubscribeSessionRequest) Reset() { } } -func (x *StartSubscribeSessionRequest) String() string { +func (x *SubscribeRecordRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*StartSubscribeSessionRequest) ProtoMessage() {} +func (*SubscribeRecordRequest) ProtoMessage() {} -func (x *StartSubscribeSessionRequest) ProtoReflect() protoreflect.Message { +func (x *SubscribeRecordRequest) ProtoReflect() protoreflect.Message { mi := &file_mq_agent_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -403,71 +399,47 @@ func (x *StartSubscribeSessionRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use StartSubscribeSessionRequest.ProtoReflect.Descriptor instead. -func (*StartSubscribeSessionRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use SubscribeRecordRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRecordRequest) Descriptor() ([]byte, []int) { return file_mq_agent_proto_rawDescGZIP(), []int{6} } -func (x *StartSubscribeSessionRequest) GetConsumerGroup() string { - if x != nil { - return x.ConsumerGroup - } - return "" -} - -func (x *StartSubscribeSessionRequest) GetConsumerGroupInstanceId() string { - if x != nil { - return x.ConsumerGroupInstanceId - } - return "" -} - -func (x *StartSubscribeSessionRequest) GetTopic() *schema_pb.Topic { +func (x *SubscribeRecordRequest) GetInit() *SubscribeRecordRequest_InitSubscribeRecordRequest { if x != nil { - return x.Topic + return x.Init } return nil } -func (x *StartSubscribeSessionRequest) GetPartitionOffsets() []*schema_pb.PartitionOffset { - if x != nil { - return x.PartitionOffsets - } - return nil -} - -func (x *StartSubscribeSessionRequest) GetFilter() string { - if x != nil { - return x.Filter - } - return "" -} - -func (x *StartSubscribeSessionRequest) GetMaxSubscribedPartitions() int32 { +func (x *SubscribeRecordRequest) GetAckSequence() int64 { if x != nil { - return x.MaxSubscribedPartitions + return x.AckSequence } return 0 } -func (x *StartSubscribeSessionRequest) GetSlidingWindowSize() int32 { +func (x *SubscribeRecordRequest) GetAckKey() []byte { if x != nil { - return x.SlidingWindowSize + return x.AckKey } - return 0 + return nil } -type StartSubscribeSessionResponse struct { +type SubscribeRecordResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - SessionId int64 `protobuf:"varint,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value *schema_pb.RecordValue `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + TsNs int64 `protobuf:"varint,4,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` + Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` + IsEndOfStream bool `protobuf:"varint,6,opt,name=is_end_of_stream,json=isEndOfStream,proto3" json:"is_end_of_stream,omitempty"` + IsEndOfTopic bool `protobuf:"varint,7,opt,name=is_end_of_topic,json=isEndOfTopic,proto3" json:"is_end_of_topic,omitempty"` } -func (x *StartSubscribeSessionResponse) Reset() { - *x = StartSubscribeSessionResponse{} +func (x *SubscribeRecordResponse) Reset() { + *x = SubscribeRecordResponse{} if protoimpl.UnsafeEnabled { mi := &file_mq_agent_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -475,13 +447,13 @@ func (x *StartSubscribeSessionResponse) Reset() { } } -func (x *StartSubscribeSessionResponse) String() string { +func (x *SubscribeRecordResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*StartSubscribeSessionResponse) ProtoMessage() {} +func (*SubscribeRecordResponse) ProtoMessage() {} -func (x *StartSubscribeSessionResponse) ProtoReflect() protoreflect.Message { +func (x *SubscribeRecordResponse) ProtoReflect() protoreflect.Message { mi := &file_mq_agent_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -493,214 +465,84 @@ func (x *StartSubscribeSessionResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use StartSubscribeSessionResponse.ProtoReflect.Descriptor instead. -func (*StartSubscribeSessionResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use SubscribeRecordResponse.ProtoReflect.Descriptor instead. +func (*SubscribeRecordResponse) Descriptor() ([]byte, []int) { return file_mq_agent_proto_rawDescGZIP(), []int{7} } -func (x *StartSubscribeSessionResponse) GetError() string { +func (x *SubscribeRecordResponse) GetKey() []byte { if x != nil { - return x.Error + return x.Key } - return "" + return nil } -func (x *StartSubscribeSessionResponse) GetSessionId() int64 { +func (x *SubscribeRecordResponse) GetValue() *schema_pb.RecordValue { if x != nil { - return x.SessionId - } - return 0 -} - -type CloseSubscribeSessionRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - SessionId int64 `protobuf:"varint,1,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` -} - -func (x *CloseSubscribeSessionRequest) Reset() { - *x = CloseSubscribeSessionRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_mq_agent_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CloseSubscribeSessionRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CloseSubscribeSessionRequest) ProtoMessage() {} - -func (x *CloseSubscribeSessionRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_agent_proto_msgTypes[8] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms + return x.Value } - return mi.MessageOf(x) -} - -// Deprecated: Use CloseSubscribeSessionRequest.ProtoReflect.Descriptor instead. -func (*CloseSubscribeSessionRequest) Descriptor() ([]byte, []int) { - return file_mq_agent_proto_rawDescGZIP(), []int{8} + return nil } -func (x *CloseSubscribeSessionRequest) GetSessionId() int64 { +func (x *SubscribeRecordResponse) GetTsNs() int64 { if x != nil { - return x.SessionId + return x.TsNs } return 0 } -type CloseSubscribeSessionResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` -} - -func (x *CloseSubscribeSessionResponse) Reset() { - *x = CloseSubscribeSessionResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_mq_agent_proto_msgTypes[9] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CloseSubscribeSessionResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CloseSubscribeSessionResponse) ProtoMessage() {} - -func (x *CloseSubscribeSessionResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_agent_proto_msgTypes[9] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CloseSubscribeSessionResponse.ProtoReflect.Descriptor instead. -func (*CloseSubscribeSessionResponse) Descriptor() ([]byte, []int) { - return file_mq_agent_proto_rawDescGZIP(), []int{9} -} - -func (x *CloseSubscribeSessionResponse) GetError() string { +func (x *SubscribeRecordResponse) GetError() string { if x != nil { return x.Error } return "" } -// //////////////////////////////////////////////// -type SubscribeRecordRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - SessionId int64 `protobuf:"varint,1,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // session_id is required for the first record - AckSequence int64 `protobuf:"varint,2,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"` - AckKey []byte `protobuf:"bytes,3,opt,name=ack_key,json=ackKey,proto3" json:"ack_key,omitempty"` -} - -func (x *SubscribeRecordRequest) Reset() { - *x = SubscribeRecordRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_mq_agent_proto_msgTypes[10] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SubscribeRecordRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SubscribeRecordRequest) ProtoMessage() {} - -func (x *SubscribeRecordRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_agent_proto_msgTypes[10] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SubscribeRecordRequest.ProtoReflect.Descriptor instead. -func (*SubscribeRecordRequest) Descriptor() ([]byte, []int) { - return file_mq_agent_proto_rawDescGZIP(), []int{10} -} - -func (x *SubscribeRecordRequest) GetSessionId() int64 { - if x != nil { - return x.SessionId - } - return 0 -} - -func (x *SubscribeRecordRequest) GetAckSequence() int64 { +func (x *SubscribeRecordResponse) GetIsEndOfStream() bool { if x != nil { - return x.AckSequence + return x.IsEndOfStream } - return 0 + return false } -func (x *SubscribeRecordRequest) GetAckKey() []byte { +func (x *SubscribeRecordResponse) GetIsEndOfTopic() bool { if x != nil { - return x.AckKey + return x.IsEndOfTopic } - return nil + return false } -type SubscribeRecordResponse struct { +type SubscribeRecordRequest_InitSubscribeRecordRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Sequence int64 `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"` - Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - Value *schema_pb.RecordValue `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` - TsNs int64 `protobuf:"varint,4,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` - Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` - IsEndOfStream bool `protobuf:"varint,6,opt,name=is_end_of_stream,json=isEndOfStream,proto3" json:"is_end_of_stream,omitempty"` - IsEndOfTopic bool `protobuf:"varint,7,opt,name=is_end_of_topic,json=isEndOfTopic,proto3" json:"is_end_of_topic,omitempty"` + ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"` + ConsumerGroupInstanceId string `protobuf:"bytes,2,opt,name=consumer_group_instance_id,json=consumerGroupInstanceId,proto3" json:"consumer_group_instance_id,omitempty"` + Topic *schema_pb.Topic `protobuf:"bytes,4,opt,name=topic,proto3" json:"topic,omitempty"` + PartitionOffsets []*schema_pb.PartitionOffset `protobuf:"bytes,5,rep,name=partition_offsets,json=partitionOffsets,proto3" json:"partition_offsets,omitempty"` + Filter string `protobuf:"bytes,6,opt,name=filter,proto3" json:"filter,omitempty"` + MaxSubscribedPartitions int32 `protobuf:"varint,8,opt,name=max_subscribed_partitions,json=maxSubscribedPartitions,proto3" json:"max_subscribed_partitions,omitempty"` + SlidingWindowSize int32 `protobuf:"varint,9,opt,name=sliding_window_size,json=slidingWindowSize,proto3" json:"sliding_window_size,omitempty"` } -func (x *SubscribeRecordResponse) Reset() { - *x = SubscribeRecordResponse{} +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) Reset() { + *x = SubscribeRecordRequest_InitSubscribeRecordRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_agent_proto_msgTypes[11] + mi := &file_mq_agent_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *SubscribeRecordResponse) String() string { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SubscribeRecordResponse) ProtoMessage() {} +func (*SubscribeRecordRequest_InitSubscribeRecordRequest) ProtoMessage() {} -func (x *SubscribeRecordResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_agent_proto_msgTypes[11] +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) ProtoReflect() protoreflect.Message { + mi := &file_mq_agent_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -711,58 +553,58 @@ func (x *SubscribeRecordResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SubscribeRecordResponse.ProtoReflect.Descriptor instead. -func (*SubscribeRecordResponse) Descriptor() ([]byte, []int) { - return file_mq_agent_proto_rawDescGZIP(), []int{11} +// Deprecated: Use SubscribeRecordRequest_InitSubscribeRecordRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRecordRequest_InitSubscribeRecordRequest) Descriptor() ([]byte, []int) { + return file_mq_agent_proto_rawDescGZIP(), []int{6, 0} } -func (x *SubscribeRecordResponse) GetSequence() int64 { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) GetConsumerGroup() string { if x != nil { - return x.Sequence + return x.ConsumerGroup } - return 0 + return "" } -func (x *SubscribeRecordResponse) GetKey() []byte { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) GetConsumerGroupInstanceId() string { if x != nil { - return x.Key + return x.ConsumerGroupInstanceId } - return nil + return "" } -func (x *SubscribeRecordResponse) GetValue() *schema_pb.RecordValue { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) GetTopic() *schema_pb.Topic { if x != nil { - return x.Value + return x.Topic } return nil } -func (x *SubscribeRecordResponse) GetTsNs() int64 { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) GetPartitionOffsets() []*schema_pb.PartitionOffset { if x != nil { - return x.TsNs + return x.PartitionOffsets } - return 0 + return nil } -func (x *SubscribeRecordResponse) GetError() string { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) GetFilter() string { if x != nil { - return x.Error + return x.Filter } return "" } -func (x *SubscribeRecordResponse) GetIsEndOfStream() bool { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) GetMaxSubscribedPartitions() int32 { if x != nil { - return x.IsEndOfStream + return x.MaxSubscribedPartitions } - return false + return 0 } -func (x *SubscribeRecordResponse) GetIsEndOfTopic() bool { +func (x *SubscribeRecordRequest_InitSubscribeRecordRequest) GetSlidingWindowSize() int32 { if x != nil { - return x.IsEndOfTopic + return x.SlidingWindowSize } - return false + return 0 } var File_mq_agent_proto protoreflect.FileDescriptor @@ -808,115 +650,89 @@ var file_mq_agent_proto_rawDesc = []byte{ 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xf7, 0x02, - 0x0a, 0x1c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, - 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, - 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x3b, 0x0a, 0x1a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, - 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, - 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x17, 0x63, 0x6f, 0x6e, 0x73, 0x75, - 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, - 0x49, 0x64, 0x12, 0x26, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, - 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x47, 0x0a, 0x11, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x18, - 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, - 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, - 0x74, 0x52, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, - 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x06, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x3a, 0x0a, 0x19, 0x6d, - 0x61, 0x78, 0x5f, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x64, 0x5f, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x05, 0x52, 0x17, - 0x6d, 0x61, 0x78, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x64, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x73, 0x6c, 0x69, 0x64, 0x69, - 0x6e, 0x67, 0x5f, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x09, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x73, 0x6c, 0x69, 0x64, 0x69, 0x6e, 0x67, 0x57, 0x69, 0x6e, - 0x64, 0x6f, 0x77, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x54, 0x0a, 0x1d, 0x53, 0x74, 0x61, 0x72, 0x74, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, - 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1d, - 0x0a, 0x0a, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x3d, 0x0a, - 0x1c, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, - 0x0a, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x35, 0x0a, 0x1d, - 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x65, - 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x22, 0x73, 0x0a, 0x16, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, - 0x0a, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, - 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, - 0x17, 0x0a, 0x07, 0x61, 0x63, 0x6b, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x06, 0x61, 0x63, 0x6b, 0x4b, 0x65, 0x79, 0x22, 0xf0, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, - 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x12, 0x2c, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x16, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, - 0x63, 0x6f, 0x72, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x12, 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x04, 0x74, 0x73, 0x4e, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x27, 0x0a, 0x10, 0x69, - 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, - 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x53, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x12, 0x25, 0x0a, 0x0f, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, - 0x66, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, - 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x32, 0xa1, 0x05, 0x0a, 0x15, - 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x6c, 0x0a, 0x13, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x72, - 0x74, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, 0x62, 0x6c, 0x69, - 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x6c, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, - 0x69, 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, - 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, - 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x5e, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x12, 0x22, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, - 0x01, 0x12, 0x72, 0x0a, 0x15, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2a, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x64, 0x0a, 0x0f, 0x53, 0x75, 0x62, - 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x24, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, - 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, - 0x60, 0x0a, 0x12, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x5f, - 0x61, 0x67, 0x65, 0x6e, 0x74, 0x42, 0x16, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x51, 0x75, - 0x65, 0x75, 0x65, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x32, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, - 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, - 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x70, - 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xa1, 0x04, + 0x0a, 0x16, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x53, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, + 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, + 0x74, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x21, 0x0a, + 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, + 0x12, 0x17, 0x0a, 0x07, 0x61, 0x63, 0x6b, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x06, 0x61, 0x63, 0x6b, 0x4b, 0x65, 0x79, 0x1a, 0xf5, 0x02, 0x0a, 0x1a, 0x49, 0x6e, + 0x69, 0x74, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, + 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, + 0x3b, 0x0a, 0x1a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, + 0x70, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x17, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, + 0x75, 0x70, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x26, 0x0a, 0x05, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, + 0x6f, 0x70, 0x69, 0x63, 0x12, 0x47, 0x0a, 0x11, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x10, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, + 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, + 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x3a, 0x0a, 0x19, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x64, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x05, 0x52, 0x17, 0x6d, 0x61, 0x78, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x64, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x73, 0x6c, 0x69, 0x64, 0x69, 0x6e, 0x67, 0x5f, 0x77, 0x69, 0x6e, + 0x64, 0x6f, 0x77, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, + 0x73, 0x6c, 0x69, 0x64, 0x69, 0x6e, 0x67, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x53, 0x69, 0x7a, + 0x65, 0x22, 0xd4, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, + 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x2c, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, + 0x2e, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x13, 0x0a, + 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x74, 0x73, + 0x4e, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x27, 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x65, + 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x0d, 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x25, 0x0a, 0x0f, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x74, + 0x6f, 0x70, 0x69, 0x63, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x45, 0x6e, + 0x64, 0x4f, 0x66, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x32, 0xb9, 0x03, 0x0a, 0x15, 0x53, 0x65, 0x61, + 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x41, 0x67, 0x65, + 0x6e, 0x74, 0x12, 0x6c, 0x0a, 0x13, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, + 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x6c, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, + 0x69, 0x73, 0x68, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5e, + 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, + 0x22, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x64, + 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x28, 0x01, 0x30, 0x01, 0x42, 0x60, 0x0a, 0x12, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, + 0x73, 0x2e, 0x6d, 0x71, 0x5f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x42, 0x16, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x51, 0x75, 0x65, 0x75, 0x65, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, + 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, + 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x61, 0x67, + 0x65, 0x6e, 0x74, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -931,49 +747,43 @@ func file_mq_agent_proto_rawDescGZIP() []byte { return file_mq_agent_proto_rawDescData } -var file_mq_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_mq_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_mq_agent_proto_goTypes = []any{ - (*StartPublishSessionRequest)(nil), // 0: messaging_pb.StartPublishSessionRequest - (*StartPublishSessionResponse)(nil), // 1: messaging_pb.StartPublishSessionResponse - (*ClosePublishSessionRequest)(nil), // 2: messaging_pb.ClosePublishSessionRequest - (*ClosePublishSessionResponse)(nil), // 3: messaging_pb.ClosePublishSessionResponse - (*PublishRecordRequest)(nil), // 4: messaging_pb.PublishRecordRequest - (*PublishRecordResponse)(nil), // 5: messaging_pb.PublishRecordResponse - (*StartSubscribeSessionRequest)(nil), // 6: messaging_pb.StartSubscribeSessionRequest - (*StartSubscribeSessionResponse)(nil), // 7: messaging_pb.StartSubscribeSessionResponse - (*CloseSubscribeSessionRequest)(nil), // 8: messaging_pb.CloseSubscribeSessionRequest - (*CloseSubscribeSessionResponse)(nil), // 9: messaging_pb.CloseSubscribeSessionResponse - (*SubscribeRecordRequest)(nil), // 10: messaging_pb.SubscribeRecordRequest - (*SubscribeRecordResponse)(nil), // 11: messaging_pb.SubscribeRecordResponse - (*schema_pb.Topic)(nil), // 12: schema_pb.Topic - (*schema_pb.RecordType)(nil), // 13: schema_pb.RecordType - (*schema_pb.RecordValue)(nil), // 14: schema_pb.RecordValue - (*schema_pb.PartitionOffset)(nil), // 15: schema_pb.PartitionOffset + (*StartPublishSessionRequest)(nil), // 0: messaging_pb.StartPublishSessionRequest + (*StartPublishSessionResponse)(nil), // 1: messaging_pb.StartPublishSessionResponse + (*ClosePublishSessionRequest)(nil), // 2: messaging_pb.ClosePublishSessionRequest + (*ClosePublishSessionResponse)(nil), // 3: messaging_pb.ClosePublishSessionResponse + (*PublishRecordRequest)(nil), // 4: messaging_pb.PublishRecordRequest + (*PublishRecordResponse)(nil), // 5: messaging_pb.PublishRecordResponse + (*SubscribeRecordRequest)(nil), // 6: messaging_pb.SubscribeRecordRequest + (*SubscribeRecordResponse)(nil), // 7: messaging_pb.SubscribeRecordResponse + (*SubscribeRecordRequest_InitSubscribeRecordRequest)(nil), // 8: messaging_pb.SubscribeRecordRequest.InitSubscribeRecordRequest + (*schema_pb.Topic)(nil), // 9: schema_pb.Topic + (*schema_pb.RecordType)(nil), // 10: schema_pb.RecordType + (*schema_pb.RecordValue)(nil), // 11: schema_pb.RecordValue + (*schema_pb.PartitionOffset)(nil), // 12: schema_pb.PartitionOffset } var file_mq_agent_proto_depIdxs = []int32{ - 12, // 0: messaging_pb.StartPublishSessionRequest.topic:type_name -> schema_pb.Topic - 13, // 1: messaging_pb.StartPublishSessionRequest.record_type:type_name -> schema_pb.RecordType - 14, // 2: messaging_pb.PublishRecordRequest.value:type_name -> schema_pb.RecordValue - 12, // 3: messaging_pb.StartSubscribeSessionRequest.topic:type_name -> schema_pb.Topic - 15, // 4: messaging_pb.StartSubscribeSessionRequest.partition_offsets:type_name -> schema_pb.PartitionOffset - 14, // 5: messaging_pb.SubscribeRecordResponse.value:type_name -> schema_pb.RecordValue - 0, // 6: messaging_pb.SeaweedMessagingAgent.StartPublishSession:input_type -> messaging_pb.StartPublishSessionRequest - 2, // 7: messaging_pb.SeaweedMessagingAgent.ClosePublishSession:input_type -> messaging_pb.ClosePublishSessionRequest - 4, // 8: messaging_pb.SeaweedMessagingAgent.PublishRecord:input_type -> messaging_pb.PublishRecordRequest - 6, // 9: messaging_pb.SeaweedMessagingAgent.StartSubscribeSession:input_type -> messaging_pb.StartSubscribeSessionRequest - 8, // 10: messaging_pb.SeaweedMessagingAgent.CloseSubscribeSession:input_type -> messaging_pb.CloseSubscribeSessionRequest - 10, // 11: messaging_pb.SeaweedMessagingAgent.SubscribeRecord:input_type -> messaging_pb.SubscribeRecordRequest - 1, // 12: messaging_pb.SeaweedMessagingAgent.StartPublishSession:output_type -> messaging_pb.StartPublishSessionResponse - 3, // 13: messaging_pb.SeaweedMessagingAgent.ClosePublishSession:output_type -> messaging_pb.ClosePublishSessionResponse - 5, // 14: messaging_pb.SeaweedMessagingAgent.PublishRecord:output_type -> messaging_pb.PublishRecordResponse - 7, // 15: messaging_pb.SeaweedMessagingAgent.StartSubscribeSession:output_type -> messaging_pb.StartSubscribeSessionResponse - 9, // 16: messaging_pb.SeaweedMessagingAgent.CloseSubscribeSession:output_type -> messaging_pb.CloseSubscribeSessionResponse - 11, // 17: messaging_pb.SeaweedMessagingAgent.SubscribeRecord:output_type -> messaging_pb.SubscribeRecordResponse - 12, // [12:18] is the sub-list for method output_type - 6, // [6:12] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 9, // 0: messaging_pb.StartPublishSessionRequest.topic:type_name -> schema_pb.Topic + 10, // 1: messaging_pb.StartPublishSessionRequest.record_type:type_name -> schema_pb.RecordType + 11, // 2: messaging_pb.PublishRecordRequest.value:type_name -> schema_pb.RecordValue + 8, // 3: messaging_pb.SubscribeRecordRequest.init:type_name -> messaging_pb.SubscribeRecordRequest.InitSubscribeRecordRequest + 11, // 4: messaging_pb.SubscribeRecordResponse.value:type_name -> schema_pb.RecordValue + 9, // 5: messaging_pb.SubscribeRecordRequest.InitSubscribeRecordRequest.topic:type_name -> schema_pb.Topic + 12, // 6: messaging_pb.SubscribeRecordRequest.InitSubscribeRecordRequest.partition_offsets:type_name -> schema_pb.PartitionOffset + 0, // 7: messaging_pb.SeaweedMessagingAgent.StartPublishSession:input_type -> messaging_pb.StartPublishSessionRequest + 2, // 8: messaging_pb.SeaweedMessagingAgent.ClosePublishSession:input_type -> messaging_pb.ClosePublishSessionRequest + 4, // 9: messaging_pb.SeaweedMessagingAgent.PublishRecord:input_type -> messaging_pb.PublishRecordRequest + 6, // 10: messaging_pb.SeaweedMessagingAgent.SubscribeRecord:input_type -> messaging_pb.SubscribeRecordRequest + 1, // 11: messaging_pb.SeaweedMessagingAgent.StartPublishSession:output_type -> messaging_pb.StartPublishSessionResponse + 3, // 12: messaging_pb.SeaweedMessagingAgent.ClosePublishSession:output_type -> messaging_pb.ClosePublishSessionResponse + 5, // 13: messaging_pb.SeaweedMessagingAgent.PublishRecord:output_type -> messaging_pb.PublishRecordResponse + 7, // 14: messaging_pb.SeaweedMessagingAgent.SubscribeRecord:output_type -> messaging_pb.SubscribeRecordResponse + 11, // [11:15] is the sub-list for method output_type + 7, // [7:11] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_mq_agent_proto_init() } @@ -1055,7 +865,7 @@ func file_mq_agent_proto_init() { } } file_mq_agent_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*StartSubscribeSessionRequest); i { + switch v := v.(*SubscribeRecordRequest); i { case 0: return &v.state case 1: @@ -1067,7 +877,7 @@ func file_mq_agent_proto_init() { } } file_mq_agent_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*StartSubscribeSessionResponse); i { + switch v := v.(*SubscribeRecordResponse); i { case 0: return &v.state case 1: @@ -1079,43 +889,7 @@ func file_mq_agent_proto_init() { } } file_mq_agent_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*CloseSubscribeSessionRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_mq_agent_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*CloseSubscribeSessionResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_mq_agent_proto_msgTypes[10].Exporter = func(v any, i int) any { - switch v := v.(*SubscribeRecordRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_mq_agent_proto_msgTypes[11].Exporter = func(v any, i int) any { - switch v := v.(*SubscribeRecordResponse); i { + switch v := v.(*SubscribeRecordRequest_InitSubscribeRecordRequest); i { case 0: return &v.state case 1: @@ -1133,7 +907,7 @@ func file_mq_agent_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mq_agent_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/mq_agent_pb/mq_agent_grpc.pb.go b/weed/pb/mq_agent_pb/mq_agent_grpc.pb.go index 241064181..5db13506e 100644 --- a/weed/pb/mq_agent_pb/mq_agent_grpc.pb.go +++ b/weed/pb/mq_agent_pb/mq_agent_grpc.pb.go @@ -19,12 +19,10 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - SeaweedMessagingAgent_StartPublishSession_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/StartPublishSession" - SeaweedMessagingAgent_ClosePublishSession_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/ClosePublishSession" - SeaweedMessagingAgent_PublishRecord_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/PublishRecord" - SeaweedMessagingAgent_StartSubscribeSession_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/StartSubscribeSession" - SeaweedMessagingAgent_CloseSubscribeSession_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/CloseSubscribeSession" - SeaweedMessagingAgent_SubscribeRecord_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/SubscribeRecord" + SeaweedMessagingAgent_StartPublishSession_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/StartPublishSession" + SeaweedMessagingAgent_ClosePublishSession_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/ClosePublishSession" + SeaweedMessagingAgent_PublishRecord_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/PublishRecord" + SeaweedMessagingAgent_SubscribeRecord_FullMethodName = "/messaging_pb.SeaweedMessagingAgent/SubscribeRecord" ) // SeaweedMessagingAgentClient is the client API for SeaweedMessagingAgent service. @@ -36,8 +34,6 @@ type SeaweedMessagingAgentClient interface { ClosePublishSession(ctx context.Context, in *ClosePublishSessionRequest, opts ...grpc.CallOption) (*ClosePublishSessionResponse, error) PublishRecord(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[PublishRecordRequest, PublishRecordResponse], error) // Subscribing - StartSubscribeSession(ctx context.Context, in *StartSubscribeSessionRequest, opts ...grpc.CallOption) (*StartSubscribeSessionResponse, error) - CloseSubscribeSession(ctx context.Context, in *CloseSubscribeSessionRequest, opts ...grpc.CallOption) (*CloseSubscribeSessionResponse, error) SubscribeRecord(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SubscribeRecordRequest, SubscribeRecordResponse], error) } @@ -82,26 +78,6 @@ func (c *seaweedMessagingAgentClient) PublishRecord(ctx context.Context, opts .. // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SeaweedMessagingAgent_PublishRecordClient = grpc.BidiStreamingClient[PublishRecordRequest, PublishRecordResponse] -func (c *seaweedMessagingAgentClient) StartSubscribeSession(ctx context.Context, in *StartSubscribeSessionRequest, opts ...grpc.CallOption) (*StartSubscribeSessionResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(StartSubscribeSessionResponse) - err := c.cc.Invoke(ctx, SeaweedMessagingAgent_StartSubscribeSession_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *seaweedMessagingAgentClient) CloseSubscribeSession(ctx context.Context, in *CloseSubscribeSessionRequest, opts ...grpc.CallOption) (*CloseSubscribeSessionResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(CloseSubscribeSessionResponse) - err := c.cc.Invoke(ctx, SeaweedMessagingAgent_CloseSubscribeSession_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *seaweedMessagingAgentClient) SubscribeRecord(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[SubscribeRecordRequest, SubscribeRecordResponse], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &SeaweedMessagingAgent_ServiceDesc.Streams[1], SeaweedMessagingAgent_SubscribeRecord_FullMethodName, cOpts...) @@ -124,8 +100,6 @@ type SeaweedMessagingAgentServer interface { ClosePublishSession(context.Context, *ClosePublishSessionRequest) (*ClosePublishSessionResponse, error) PublishRecord(grpc.BidiStreamingServer[PublishRecordRequest, PublishRecordResponse]) error // Subscribing - StartSubscribeSession(context.Context, *StartSubscribeSessionRequest) (*StartSubscribeSessionResponse, error) - CloseSubscribeSession(context.Context, *CloseSubscribeSessionRequest) (*CloseSubscribeSessionResponse, error) SubscribeRecord(grpc.BidiStreamingServer[SubscribeRecordRequest, SubscribeRecordResponse]) error mustEmbedUnimplementedSeaweedMessagingAgentServer() } @@ -146,12 +120,6 @@ func (UnimplementedSeaweedMessagingAgentServer) ClosePublishSession(context.Cont func (UnimplementedSeaweedMessagingAgentServer) PublishRecord(grpc.BidiStreamingServer[PublishRecordRequest, PublishRecordResponse]) error { return status.Errorf(codes.Unimplemented, "method PublishRecord not implemented") } -func (UnimplementedSeaweedMessagingAgentServer) StartSubscribeSession(context.Context, *StartSubscribeSessionRequest) (*StartSubscribeSessionResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method StartSubscribeSession not implemented") -} -func (UnimplementedSeaweedMessagingAgentServer) CloseSubscribeSession(context.Context, *CloseSubscribeSessionRequest) (*CloseSubscribeSessionResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method CloseSubscribeSession not implemented") -} func (UnimplementedSeaweedMessagingAgentServer) SubscribeRecord(grpc.BidiStreamingServer[SubscribeRecordRequest, SubscribeRecordResponse]) error { return status.Errorf(codes.Unimplemented, "method SubscribeRecord not implemented") } @@ -219,42 +187,6 @@ func _SeaweedMessagingAgent_PublishRecord_Handler(srv interface{}, stream grpc.S // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SeaweedMessagingAgent_PublishRecordServer = grpc.BidiStreamingServer[PublishRecordRequest, PublishRecordResponse] -func _SeaweedMessagingAgent_StartSubscribeSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StartSubscribeSessionRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(SeaweedMessagingAgentServer).StartSubscribeSession(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: SeaweedMessagingAgent_StartSubscribeSession_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SeaweedMessagingAgentServer).StartSubscribeSession(ctx, req.(*StartSubscribeSessionRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _SeaweedMessagingAgent_CloseSubscribeSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(CloseSubscribeSessionRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(SeaweedMessagingAgentServer).CloseSubscribeSession(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: SeaweedMessagingAgent_CloseSubscribeSession_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SeaweedMessagingAgentServer).CloseSubscribeSession(ctx, req.(*CloseSubscribeSessionRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _SeaweedMessagingAgent_SubscribeRecord_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(SeaweedMessagingAgentServer).SubscribeRecord(&grpc.GenericServerStream[SubscribeRecordRequest, SubscribeRecordResponse]{ServerStream: stream}) } @@ -277,14 +209,6 @@ var SeaweedMessagingAgent_ServiceDesc = grpc.ServiceDesc{ MethodName: "ClosePublishSession", Handler: _SeaweedMessagingAgent_ClosePublishSession_Handler, }, - { - MethodName: "StartSubscribeSession", - Handler: _SeaweedMessagingAgent_StartSubscribeSession_Handler, - }, - { - MethodName: "CloseSubscribeSession", - Handler: _SeaweedMessagingAgent_CloseSubscribeSession_Handler, - }, }, Streams: []grpc.StreamDesc{ {