From 94742b8ace98c3c0c067f33cc70eec26e12de0ab Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 May 2024 18:17:23 -0700 Subject: [PATCH] add subscribe follower --- weed/mq/broker/broker_grpc_sub_follow.go | 76 ++ weed/mq/broker/subscriber_progress.go | 10 + weed/pb/mq.proto | 23 +- weed/pb/mq_pb/mq.pb.go | 852 +++++++++++++++++------ weed/pb/mq_pb/mq_grpc.pb.go | 106 ++- 5 files changed, 823 insertions(+), 244 deletions(-) create mode 100644 weed/mq/broker/broker_grpc_sub_follow.go create mode 100644 weed/mq/broker/subscriber_progress.go diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go new file mode 100644 index 000000000..48c625c1c --- /dev/null +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -0,0 +1,76 @@ +package broker + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "io" + "time" +) + + +func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) { + var req *mq_pb.SubscribeFollowMeRequest + req, err = stream.Recv() + if err != nil { + return err + } + initMessage := req.GetInit() + if initMessage == nil { + return fmt.Errorf("missing init message") + } + + // create an in-memory offset + subscriberProgress := &SubscriberProgress{ + Topic: topic.FromPbTopic(initMessage.Topic), + Partition: topic.FromPbPartition(initMessage.Partition), + ConsumerGroup: "consumer_group", + } + + // follow each published messages + for { + // receive a message + req, err = stream.Recv() + if err != nil { + if err == io.EOF { + err = nil + break + } + glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err) + break + } + + // Process the received message + if ackMessage := req.GetAck(); ackMessage != nil { + subscriberProgress.Offset = ackMessage.TsNs + println("offset", subscriberProgress.Offset) + } else if closeMessage := req.GetClose(); closeMessage != nil { + glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + return nil + } else { + glog.Errorf("unknown message: %v", req) + } + } + + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) + partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop) + offsetFileName := fmt.Sprintf("%s.offset", subscriberProgress.ConsumerGroup) + + offsetBytes := make([]byte, 8) + util.Uint64toBytes(offsetBytes, uint64(subscriberProgress.Offset)) + + err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes) + }) + + glog.V(0).Infof("shut down follower for %v %v", t, p) + + return err +} diff --git a/weed/mq/broker/subscriber_progress.go b/weed/mq/broker/subscriber_progress.go new file mode 100644 index 000000000..9c8c055e0 --- /dev/null +++ b/weed/mq/broker/subscriber_progress.go @@ -0,0 +1,10 @@ +package broker + +import "github.com/seaweedfs/seaweedfs/weed/mq/topic" + +type SubscriberProgress struct { + topic.Topic + topic.Partition + ConsumerGroup string + Offset int64 +} diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto index a6bc89ad2..5b3ab6b5e 100644 --- a/weed/pb/mq.proto +++ b/weed/pb/mq.proto @@ -45,11 +45,13 @@ service SeaweedMessaging { // data plane for each topic partition rpc PublishMessage (stream PublishMessageRequest) returns (stream PublishMessageResponse) { } - rpc SubscribeMessage (SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { + rpc SubscribeMessage (stream SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { } // The lead broker asks a follower broker to follow itself rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) { } + rpc SubscribeFollowMe (stream SubscribeFollowMeRequest) returns (SubscribeFollowMeResponse) { + } } ////////////////////////////////////////////////// @@ -262,6 +264,25 @@ message SubscribeMessageResponse { DataMessage data = 2; } } +message SubscribeFollowMeRequest { + message InitMessage { + Topic topic = 1; + Partition partition = 2; + } + message AckMessage { + int64 ts_ns = 1; + } + message CloseMessage { + } + oneof message { + InitMessage init = 1; + AckMessage ack = 2; + CloseMessage close = 3; + } +} +message SubscribeFollowMeResponse { + int64 ack_ts_ns = 1; +} message ClosePublishersRequest { Topic topic = 1; int64 unix_time_ns = 2; diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go index 6a4fb8e1e..9b52a6c75 100644 --- a/weed/pb/mq_pb/mq.pb.go +++ b/weed/pb/mq_pb/mq.pb.go @@ -1954,6 +1954,148 @@ func (*SubscribeMessageResponse_Ctrl) isSubscribeMessageResponse_Message() {} func (*SubscribeMessageResponse_Data) isSubscribeMessageResponse_Message() {} +type SubscribeFollowMeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Message: + // + // *SubscribeFollowMeRequest_Init + // *SubscribeFollowMeRequest_Ack + // *SubscribeFollowMeRequest_Close + Message isSubscribeFollowMeRequest_Message `protobuf_oneof:"message"` +} + +func (x *SubscribeFollowMeRequest) Reset() { + *x = SubscribeFollowMeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[31] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeFollowMeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeFollowMeRequest) ProtoMessage() {} + +func (x *SubscribeFollowMeRequest) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[31] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeFollowMeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeFollowMeRequest) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{31} +} + +func (m *SubscribeFollowMeRequest) GetMessage() isSubscribeFollowMeRequest_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *SubscribeFollowMeRequest) GetInit() *SubscribeFollowMeRequest_InitMessage { + if x, ok := x.GetMessage().(*SubscribeFollowMeRequest_Init); ok { + return x.Init + } + return nil +} + +func (x *SubscribeFollowMeRequest) GetAck() *SubscribeFollowMeRequest_AckMessage { + if x, ok := x.GetMessage().(*SubscribeFollowMeRequest_Ack); ok { + return x.Ack + } + return nil +} + +func (x *SubscribeFollowMeRequest) GetClose() *SubscribeFollowMeRequest_CloseMessage { + if x, ok := x.GetMessage().(*SubscribeFollowMeRequest_Close); ok { + return x.Close + } + return nil +} + +type isSubscribeFollowMeRequest_Message interface { + isSubscribeFollowMeRequest_Message() +} + +type SubscribeFollowMeRequest_Init struct { + Init *SubscribeFollowMeRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3,oneof"` +} + +type SubscribeFollowMeRequest_Ack struct { + Ack *SubscribeFollowMeRequest_AckMessage `protobuf:"bytes,2,opt,name=ack,proto3,oneof"` +} + +type SubscribeFollowMeRequest_Close struct { + Close *SubscribeFollowMeRequest_CloseMessage `protobuf:"bytes,3,opt,name=close,proto3,oneof"` +} + +func (*SubscribeFollowMeRequest_Init) isSubscribeFollowMeRequest_Message() {} + +func (*SubscribeFollowMeRequest_Ack) isSubscribeFollowMeRequest_Message() {} + +func (*SubscribeFollowMeRequest_Close) isSubscribeFollowMeRequest_Message() {} + +type SubscribeFollowMeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AckTsNs int64 `protobuf:"varint,1,opt,name=ack_ts_ns,json=ackTsNs,proto3" json:"ack_ts_ns,omitempty"` +} + +func (x *SubscribeFollowMeResponse) Reset() { + *x = SubscribeFollowMeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[32] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeFollowMeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeFollowMeResponse) ProtoMessage() {} + +func (x *SubscribeFollowMeResponse) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[32] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeFollowMeResponse.ProtoReflect.Descriptor instead. +func (*SubscribeFollowMeResponse) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{32} +} + +func (x *SubscribeFollowMeResponse) GetAckTsNs() int64 { + if x != nil { + return x.AckTsNs + } + return 0 +} + type ClosePublishersRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1966,7 +2108,7 @@ type ClosePublishersRequest struct { func (x *ClosePublishersRequest) Reset() { *x = ClosePublishersRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[31] + mi := &file_mq_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1979,7 +2121,7 @@ func (x *ClosePublishersRequest) String() string { func (*ClosePublishersRequest) ProtoMessage() {} func (x *ClosePublishersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[31] + mi := &file_mq_proto_msgTypes[33] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1992,7 +2134,7 @@ func (x *ClosePublishersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ClosePublishersRequest.ProtoReflect.Descriptor instead. func (*ClosePublishersRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{31} + return file_mq_proto_rawDescGZIP(), []int{33} } func (x *ClosePublishersRequest) GetTopic() *Topic { @@ -2018,7 +2160,7 @@ type ClosePublishersResponse struct { func (x *ClosePublishersResponse) Reset() { *x = ClosePublishersResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[32] + mi := &file_mq_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2031,7 +2173,7 @@ func (x *ClosePublishersResponse) String() string { func (*ClosePublishersResponse) ProtoMessage() {} func (x *ClosePublishersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[32] + mi := &file_mq_proto_msgTypes[34] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2044,7 +2186,7 @@ func (x *ClosePublishersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ClosePublishersResponse.ProtoReflect.Descriptor instead. func (*ClosePublishersResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{32} + return file_mq_proto_rawDescGZIP(), []int{34} } type CloseSubscribersRequest struct { @@ -2059,7 +2201,7 @@ type CloseSubscribersRequest struct { func (x *CloseSubscribersRequest) Reset() { *x = CloseSubscribersRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[33] + mi := &file_mq_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2072,7 +2214,7 @@ func (x *CloseSubscribersRequest) String() string { func (*CloseSubscribersRequest) ProtoMessage() {} func (x *CloseSubscribersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[33] + mi := &file_mq_proto_msgTypes[35] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2085,7 +2227,7 @@ func (x *CloseSubscribersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CloseSubscribersRequest.ProtoReflect.Descriptor instead. func (*CloseSubscribersRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{33} + return file_mq_proto_rawDescGZIP(), []int{35} } func (x *CloseSubscribersRequest) GetTopic() *Topic { @@ -2111,7 +2253,7 @@ type CloseSubscribersResponse struct { func (x *CloseSubscribersResponse) Reset() { *x = CloseSubscribersResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[34] + mi := &file_mq_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2124,7 +2266,7 @@ func (x *CloseSubscribersResponse) String() string { func (*CloseSubscribersResponse) ProtoMessage() {} func (x *CloseSubscribersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[34] + mi := &file_mq_proto_msgTypes[36] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2137,7 +2279,7 @@ func (x *CloseSubscribersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CloseSubscribersResponse.ProtoReflect.Descriptor instead. func (*CloseSubscribersResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{34} + return file_mq_proto_rawDescGZIP(), []int{36} } type PublisherToPubBalancerRequest_InitMessage struct { @@ -2151,7 +2293,7 @@ type PublisherToPubBalancerRequest_InitMessage struct { func (x *PublisherToPubBalancerRequest_InitMessage) Reset() { *x = PublisherToPubBalancerRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[36] + mi := &file_mq_proto_msgTypes[38] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2164,7 +2306,7 @@ func (x *PublisherToPubBalancerRequest_InitMessage) String() string { func (*PublisherToPubBalancerRequest_InitMessage) ProtoMessage() {} func (x *PublisherToPubBalancerRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[36] + mi := &file_mq_proto_msgTypes[38] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2201,7 +2343,7 @@ type SubscriberToSubCoordinatorRequest_InitMessage struct { func (x *SubscriberToSubCoordinatorRequest_InitMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[37] + mi := &file_mq_proto_msgTypes[39] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2214,7 +2356,7 @@ func (x *SubscriberToSubCoordinatorRequest_InitMessage) String() string { func (*SubscriberToSubCoordinatorRequest_InitMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[37] + mi := &file_mq_proto_msgTypes[39] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2270,7 +2412,7 @@ type SubscriberToSubCoordinatorRequest_AckMessage struct { func (x *SubscriberToSubCoordinatorRequest_AckMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_AckMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[38] + mi := &file_mq_proto_msgTypes[40] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2283,7 +2425,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckMessage) String() string { func (*SubscriberToSubCoordinatorRequest_AckMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[38] + mi := &file_mq_proto_msgTypes[40] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2325,7 +2467,7 @@ type SubscriberToSubCoordinatorResponse_Assignment struct { func (x *SubscriberToSubCoordinatorResponse_Assignment) Reset() { *x = SubscriberToSubCoordinatorResponse_Assignment{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[39] + mi := &file_mq_proto_msgTypes[41] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2338,7 +2480,7 @@ func (x *SubscriberToSubCoordinatorResponse_Assignment) String() string { func (*SubscriberToSubCoordinatorResponse_Assignment) ProtoMessage() {} func (x *SubscriberToSubCoordinatorResponse_Assignment) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[39] + mi := &file_mq_proto_msgTypes[41] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2383,7 +2525,7 @@ type PublishMessageRequest_InitMessage struct { func (x *PublishMessageRequest_InitMessage) Reset() { *x = PublishMessageRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[40] + mi := &file_mq_proto_msgTypes[42] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2396,7 +2538,7 @@ func (x *PublishMessageRequest_InitMessage) String() string { func (*PublishMessageRequest_InitMessage) ProtoMessage() {} func (x *PublishMessageRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[40] + mi := &file_mq_proto_msgTypes[42] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2459,7 +2601,7 @@ type PublishFollowMeRequest_InitMessage struct { func (x *PublishFollowMeRequest_InitMessage) Reset() { *x = PublishFollowMeRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[41] + mi := &file_mq_proto_msgTypes[43] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2472,7 +2614,7 @@ func (x *PublishFollowMeRequest_InitMessage) String() string { func (*PublishFollowMeRequest_InitMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[41] + mi := &file_mq_proto_msgTypes[43] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2513,7 +2655,7 @@ type PublishFollowMeRequest_FlushMessage struct { func (x *PublishFollowMeRequest_FlushMessage) Reset() { *x = PublishFollowMeRequest_FlushMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[42] + mi := &file_mq_proto_msgTypes[44] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2526,7 +2668,7 @@ func (x *PublishFollowMeRequest_FlushMessage) String() string { func (*PublishFollowMeRequest_FlushMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_FlushMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[42] + mi := &file_mq_proto_msgTypes[44] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2558,7 +2700,7 @@ type PublishFollowMeRequest_CloseMessage struct { func (x *PublishFollowMeRequest_CloseMessage) Reset() { *x = PublishFollowMeRequest_CloseMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[43] + mi := &file_mq_proto_msgTypes[45] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2571,7 +2713,7 @@ func (x *PublishFollowMeRequest_CloseMessage) String() string { func (*PublishFollowMeRequest_CloseMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[43] + mi := &file_mq_proto_msgTypes[45] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2604,7 +2746,7 @@ type SubscribeMessageRequest_InitMessage struct { func (x *SubscribeMessageRequest_InitMessage) Reset() { *x = SubscribeMessageRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[44] + mi := &file_mq_proto_msgTypes[46] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2617,7 +2759,7 @@ func (x *SubscribeMessageRequest_InitMessage) String() string { func (*SubscribeMessageRequest_InitMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[44] + mi := &file_mq_proto_msgTypes[46] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2693,7 +2835,7 @@ type SubscribeMessageRequest_AckMessage struct { func (x *SubscribeMessageRequest_AckMessage) Reset() { *x = SubscribeMessageRequest_AckMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[45] + mi := &file_mq_proto_msgTypes[47] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2706,7 +2848,7 @@ func (x *SubscribeMessageRequest_AckMessage) String() string { func (*SubscribeMessageRequest_AckMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[45] + mi := &file_mq_proto_msgTypes[47] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2742,7 +2884,7 @@ type SubscribeMessageResponse_SubscribeCtrlMessage struct { func (x *SubscribeMessageResponse_SubscribeCtrlMessage) Reset() { *x = SubscribeMessageResponse_SubscribeCtrlMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[46] + mi := &file_mq_proto_msgTypes[48] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2755,7 +2897,7 @@ func (x *SubscribeMessageResponse_SubscribeCtrlMessage) String() string { func (*SubscribeMessageResponse_SubscribeCtrlMessage) ProtoMessage() {} func (x *SubscribeMessageResponse_SubscribeCtrlMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[46] + mi := &file_mq_proto_msgTypes[48] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2792,6 +2934,146 @@ func (x *SubscribeMessageResponse_SubscribeCtrlMessage) GetIsEndOfTopic() bool { return false } +type SubscribeFollowMeRequest_InitMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` +} + +func (x *SubscribeFollowMeRequest_InitMessage) Reset() { + *x = SubscribeFollowMeRequest_InitMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[49] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeFollowMeRequest_InitMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeFollowMeRequest_InitMessage) ProtoMessage() {} + +func (x *SubscribeFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[49] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeFollowMeRequest_InitMessage.ProtoReflect.Descriptor instead. +func (*SubscribeFollowMeRequest_InitMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{31, 0} +} + +func (x *SubscribeFollowMeRequest_InitMessage) GetTopic() *Topic { + if x != nil { + return x.Topic + } + return nil +} + +func (x *SubscribeFollowMeRequest_InitMessage) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +type SubscribeFollowMeRequest_AckMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TsNs int64 `protobuf:"varint,1,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` +} + +func (x *SubscribeFollowMeRequest_AckMessage) Reset() { + *x = SubscribeFollowMeRequest_AckMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[50] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeFollowMeRequest_AckMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeFollowMeRequest_AckMessage) ProtoMessage() {} + +func (x *SubscribeFollowMeRequest_AckMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[50] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeFollowMeRequest_AckMessage.ProtoReflect.Descriptor instead. +func (*SubscribeFollowMeRequest_AckMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{31, 1} +} + +func (x *SubscribeFollowMeRequest_AckMessage) GetTsNs() int64 { + if x != nil { + return x.TsNs + } + return 0 +} + +type SubscribeFollowMeRequest_CloseMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SubscribeFollowMeRequest_CloseMessage) Reset() { + *x = SubscribeFollowMeRequest_CloseMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[51] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeFollowMeRequest_CloseMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeFollowMeRequest_CloseMessage) ProtoMessage() {} + +func (x *SubscribeFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[51] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeFollowMeRequest_CloseMessage.ProtoReflect.Descriptor instead. +func (*SubscribeFollowMeRequest_CloseMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{31, 2} +} + var File_mq_proto protoreflect.FileDescriptor var file_mq_proto_rawDesc = []byte{ @@ -3132,120 +3414,157 @@ var file_mq_proto_rawDesc = []byte{ 0x61, 0x6d, 0x12, 0x25, 0x0a, 0x0f, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x22, 0x65, 0x0a, 0x16, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, - 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, + 0x73, 0x61, 0x67, 0x65, 0x22, 0xa7, 0x03, 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x48, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x32, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x45, 0x0a, 0x03, 0x61, + 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x03, 0x61, + 0x63, 0x6b, 0x12, 0x4b, 0x0a, 0x05, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x33, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, + 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x1a, + 0x6f, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, - 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x20, 0x0a, 0x0c, 0x75, 0x6e, 0x69, - 0x78, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x0a, 0x75, 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x4e, 0x73, 0x22, 0x19, 0x0a, 0x17, 0x43, - 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x66, 0x0a, 0x17, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x1a, 0x21, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x13, + 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x74, + 0x73, 0x4e, 0x73, 0x1a, 0x0e, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x37, + 0x0a, 0x19, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, + 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x09, 0x61, + 0x63, 0x6b, 0x5f, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, + 0x61, 0x63, 0x6b, 0x54, 0x73, 0x4e, 0x73, 0x22, 0x65, 0x0a, 0x16, 0x43, 0x6c, 0x6f, 0x73, 0x65, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x20, 0x0a, 0x0c, 0x75, 0x6e, 0x69, 0x78, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x0a, 0x75, 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x4e, 0x73, 0x22, 0x1a, - 0x0a, 0x18, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4c, 0x0a, 0x18, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x53, 0x74, 0x61, - 0x72, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, - 0x53, 0x54, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, - 0x5f, 0x49, 0x4e, 0x5f, 0x4d, 0x45, 0x4d, 0x4f, 0x52, 0x59, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, - 0x4c, 0x41, 0x54, 0x45, 0x53, 0x54, 0x10, 0x02, 0x32, 0xde, 0x0a, 0x0a, 0x10, 0x53, 0x65, 0x61, - 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, - 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x79, 0x0a, 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x54, - 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x12, 0x2b, 0x2e, 0x6d, + 0x28, 0x03, 0x52, 0x0a, 0x75, 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x4e, 0x73, 0x22, 0x19, + 0x0a, 0x17, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x66, 0x0a, 0x17, 0x43, 0x6c, 0x6f, + 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, + 0x20, 0x0a, 0x0c, 0x75, 0x6e, 0x69, 0x78, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x6e, 0x73, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x4e, + 0x73, 0x22, 0x1a, 0x0a, 0x18, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4c, 0x0a, + 0x18, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, + 0x53, 0x74, 0x61, 0x72, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x45, 0x41, 0x52, + 0x4c, 0x49, 0x45, 0x53, 0x54, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x45, 0x41, 0x52, 0x4c, 0x49, + 0x45, 0x53, 0x54, 0x5f, 0x49, 0x4e, 0x5f, 0x4d, 0x45, 0x4d, 0x4f, 0x52, 0x59, 0x10, 0x01, 0x12, + 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x54, 0x45, 0x53, 0x54, 0x10, 0x02, 0x32, 0xca, 0x0b, 0x0a, 0x10, + 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x79, 0x0a, 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x65, 0x72, 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x12, + 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, + 0x61, 0x6e, 0x63, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, - 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, - 0x65, 0x72, 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5a, 0x0a, - 0x0d, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x61, - 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0a, 0x4c, 0x69, 0x73, - 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, - 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0e, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x23, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, + 0x12, 0x5a, 0x0a, 0x0d, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, + 0x73, 0x12, 0x22, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, + 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0a, + 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x5d, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, + 0x63, 0x12, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, + 0x0a, 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, + 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, + 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, + 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, + 0x0f, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, + 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, + 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x63, 0x0a, 0x10, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x72, 0x73, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x85, 0x01, 0x0a, 0x1a, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, + 0x74, 0x6f, 0x72, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, + 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, + 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x0e, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x23, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, + 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, - 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x4c, - 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, - 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, - 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, - 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, - 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, - 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x10, - 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, - 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x85, 0x01, 0x0a, 0x1a, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, - 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, - 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, - 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, - 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x0e, 0x50, 0x75, 0x62, - 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x23, 0x2e, 0x6d, 0x65, + 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, + 0x67, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x64, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, + 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, - 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x65, 0x0a, 0x10, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x30, 0x01, 0x12, 0x64, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, - 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, - 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, - 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x4f, 0x0a, 0x0c, 0x73, 0x65, 0x61, - 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x11, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, - 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, - 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x68, + 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, + 0x77, 0x4d, 0x65, 0x12, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x46, 0x6f, 0x6c, 0x6c, + 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x42, 0x4f, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, + 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x11, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, + 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, + 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -3261,7 +3580,7 @@ func file_mq_proto_rawDescGZIP() []byte { } var file_mq_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 47) +var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 52) var file_mq_proto_goTypes = []interface{}{ (PartitionOffsetStartType)(0), // 0: messaging_pb.PartitionOffsetStartType (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest @@ -3295,38 +3614,43 @@ var file_mq_proto_goTypes = []interface{}{ (*PublishFollowMeResponse)(nil), // 29: messaging_pb.PublishFollowMeResponse (*SubscribeMessageRequest)(nil), // 30: messaging_pb.SubscribeMessageRequest (*SubscribeMessageResponse)(nil), // 31: messaging_pb.SubscribeMessageResponse - (*ClosePublishersRequest)(nil), // 32: messaging_pb.ClosePublishersRequest - (*ClosePublishersResponse)(nil), // 33: messaging_pb.ClosePublishersResponse - (*CloseSubscribersRequest)(nil), // 34: messaging_pb.CloseSubscribersRequest - (*CloseSubscribersResponse)(nil), // 35: messaging_pb.CloseSubscribersResponse - nil, // 36: messaging_pb.BrokerStats.StatsEntry - (*PublisherToPubBalancerRequest_InitMessage)(nil), // 37: messaging_pb.PublisherToPubBalancerRequest.InitMessage - (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 38: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage - (*SubscriberToSubCoordinatorRequest_AckMessage)(nil), // 39: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage - (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 40: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment - (*PublishMessageRequest_InitMessage)(nil), // 41: messaging_pb.PublishMessageRequest.InitMessage - (*PublishFollowMeRequest_InitMessage)(nil), // 42: messaging_pb.PublishFollowMeRequest.InitMessage - (*PublishFollowMeRequest_FlushMessage)(nil), // 43: messaging_pb.PublishFollowMeRequest.FlushMessage - (*PublishFollowMeRequest_CloseMessage)(nil), // 44: messaging_pb.PublishFollowMeRequest.CloseMessage - (*SubscribeMessageRequest_InitMessage)(nil), // 45: messaging_pb.SubscribeMessageRequest.InitMessage - (*SubscribeMessageRequest_AckMessage)(nil), // 46: messaging_pb.SubscribeMessageRequest.AckMessage - (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 47: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage - (*schema_pb.RecordType)(nil), // 48: schema_pb.RecordType + (*SubscribeFollowMeRequest)(nil), // 32: messaging_pb.SubscribeFollowMeRequest + (*SubscribeFollowMeResponse)(nil), // 33: messaging_pb.SubscribeFollowMeResponse + (*ClosePublishersRequest)(nil), // 34: messaging_pb.ClosePublishersRequest + (*ClosePublishersResponse)(nil), // 35: messaging_pb.ClosePublishersResponse + (*CloseSubscribersRequest)(nil), // 36: messaging_pb.CloseSubscribersRequest + (*CloseSubscribersResponse)(nil), // 37: messaging_pb.CloseSubscribersResponse + nil, // 38: messaging_pb.BrokerStats.StatsEntry + (*PublisherToPubBalancerRequest_InitMessage)(nil), // 39: messaging_pb.PublisherToPubBalancerRequest.InitMessage + (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 40: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage + (*SubscriberToSubCoordinatorRequest_AckMessage)(nil), // 41: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage + (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 42: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment + (*PublishMessageRequest_InitMessage)(nil), // 43: messaging_pb.PublishMessageRequest.InitMessage + (*PublishFollowMeRequest_InitMessage)(nil), // 44: messaging_pb.PublishFollowMeRequest.InitMessage + (*PublishFollowMeRequest_FlushMessage)(nil), // 45: messaging_pb.PublishFollowMeRequest.FlushMessage + (*PublishFollowMeRequest_CloseMessage)(nil), // 46: messaging_pb.PublishFollowMeRequest.CloseMessage + (*SubscribeMessageRequest_InitMessage)(nil), // 47: messaging_pb.SubscribeMessageRequest.InitMessage + (*SubscribeMessageRequest_AckMessage)(nil), // 48: messaging_pb.SubscribeMessageRequest.AckMessage + (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 49: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + (*SubscribeFollowMeRequest_InitMessage)(nil), // 50: messaging_pb.SubscribeFollowMeRequest.InitMessage + (*SubscribeFollowMeRequest_AckMessage)(nil), // 51: messaging_pb.SubscribeFollowMeRequest.AckMessage + (*SubscribeFollowMeRequest_CloseMessage)(nil), // 52: messaging_pb.SubscribeFollowMeRequest.CloseMessage + (*schema_pb.RecordType)(nil), // 53: schema_pb.RecordType } var file_mq_proto_depIdxs = []int32{ 3, // 0: messaging_pb.Offset.topic:type_name -> messaging_pb.Topic 6, // 1: messaging_pb.Offset.partition_offsets:type_name -> messaging_pb.PartitionOffset 4, // 2: messaging_pb.PartitionOffset.partition:type_name -> messaging_pb.Partition 0, // 3: messaging_pb.PartitionOffset.start_type:type_name -> messaging_pb.PartitionOffsetStartType - 36, // 4: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry + 38, // 4: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry 3, // 5: messaging_pb.TopicPartitionStats.topic:type_name -> messaging_pb.Topic 4, // 6: messaging_pb.TopicPartitionStats.partition:type_name -> messaging_pb.Partition - 37, // 7: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage + 39, // 7: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage 7, // 8: messaging_pb.PublisherToPubBalancerRequest.stats:type_name -> messaging_pb.BrokerStats 3, // 9: messaging_pb.ConfigureTopicRequest.topic:type_name -> messaging_pb.Topic - 48, // 10: messaging_pb.ConfigureTopicRequest.record_type:type_name -> schema_pb.RecordType + 53, // 10: messaging_pb.ConfigureTopicRequest.record_type:type_name -> schema_pb.RecordType 19, // 11: messaging_pb.ConfigureTopicResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 48, // 12: messaging_pb.ConfigureTopicResponse.record_type:type_name -> schema_pb.RecordType + 53, // 12: messaging_pb.ConfigureTopicResponse.record_type:type_name -> schema_pb.RecordType 3, // 13: messaging_pb.ListTopicsResponse.topics:type_name -> messaging_pb.Topic 3, // 14: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> messaging_pb.Topic 3, // 15: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> messaging_pb.Topic @@ -3334,63 +3658,70 @@ var file_mq_proto_depIdxs = []int32{ 4, // 17: messaging_pb.BrokerPartitionAssignment.partition:type_name -> messaging_pb.Partition 3, // 18: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> messaging_pb.Topic 19, // 19: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 38, // 20: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage - 39, // 21: messaging_pb.SubscriberToSubCoordinatorRequest.ack:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage - 40, // 22: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment + 40, // 20: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage + 41, // 21: messaging_pb.SubscriberToSubCoordinatorRequest.ack:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage + 42, // 22: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment 24, // 23: messaging_pb.DataMessage.ctrl:type_name -> messaging_pb.ControlMessage - 41, // 24: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage + 43, // 24: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage 25, // 25: messaging_pb.PublishMessageRequest.data:type_name -> messaging_pb.DataMessage - 42, // 26: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage + 44, // 26: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage 25, // 27: messaging_pb.PublishFollowMeRequest.data:type_name -> messaging_pb.DataMessage - 43, // 28: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage - 44, // 29: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage - 45, // 30: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage - 46, // 31: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage - 47, // 32: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + 45, // 28: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage + 46, // 29: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage + 47, // 30: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage + 48, // 31: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage + 49, // 32: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage 25, // 33: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage - 3, // 34: messaging_pb.ClosePublishersRequest.topic:type_name -> messaging_pb.Topic - 3, // 35: messaging_pb.CloseSubscribersRequest.topic:type_name -> messaging_pb.Topic - 8, // 36: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats - 3, // 37: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 4, // 38: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage.partition:type_name -> messaging_pb.Partition - 19, // 39: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 3, // 40: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 4, // 41: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> messaging_pb.Partition - 3, // 42: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 4, // 43: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> messaging_pb.Partition - 3, // 44: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 6, // 45: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> messaging_pb.PartitionOffset - 1, // 46: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 9, // 47: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest - 11, // 48: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest - 15, // 49: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest - 13, // 50: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest - 17, // 51: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest - 20, // 52: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 32, // 53: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest - 34, // 54: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest - 22, // 55: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest - 26, // 56: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest - 30, // 57: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest - 28, // 58: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest - 2, // 59: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 10, // 60: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse - 12, // 61: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse - 16, // 62: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse - 14, // 63: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse - 18, // 64: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse - 21, // 65: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 33, // 66: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse - 35, // 67: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse - 23, // 68: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse - 27, // 69: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse - 31, // 70: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse - 29, // 71: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse - 59, // [59:72] is the sub-list for method output_type - 46, // [46:59] is the sub-list for method input_type - 46, // [46:46] is the sub-list for extension type_name - 46, // [46:46] is the sub-list for extension extendee - 0, // [0:46] is the sub-list for field type_name + 50, // 34: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage + 51, // 35: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage + 52, // 36: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage + 3, // 37: messaging_pb.ClosePublishersRequest.topic:type_name -> messaging_pb.Topic + 3, // 38: messaging_pb.CloseSubscribersRequest.topic:type_name -> messaging_pb.Topic + 8, // 39: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats + 3, // 40: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 41: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage.partition:type_name -> messaging_pb.Partition + 19, // 42: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 3, // 43: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 44: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 3, // 45: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 46: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 3, // 47: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 6, // 48: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> messaging_pb.PartitionOffset + 3, // 49: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 50: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 1, // 51: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 9, // 52: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest + 11, // 53: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest + 15, // 54: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest + 13, // 55: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest + 17, // 56: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest + 20, // 57: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 34, // 58: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest + 36, // 59: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest + 22, // 60: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest + 26, // 61: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest + 30, // 62: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest + 28, // 63: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest + 32, // 64: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest + 2, // 65: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 10, // 66: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse + 12, // 67: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse + 16, // 68: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse + 14, // 69: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse + 18, // 70: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse + 21, // 71: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 35, // 72: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse + 37, // 73: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse + 23, // 74: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse + 27, // 75: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse + 31, // 76: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse + 29, // 77: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse + 33, // 78: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse + 65, // [65:79] is the sub-list for method output_type + 51, // [51:65] is the sub-list for method input_type + 51, // [51:51] is the sub-list for extension type_name + 51, // [51:51] is the sub-list for extension extendee + 0, // [0:51] is the sub-list for field type_name } func init() { file_mq_proto_init() } @@ -3772,7 +4103,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[31].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ClosePublishersRequest); i { + switch v := v.(*SubscribeFollowMeRequest); i { case 0: return &v.state case 1: @@ -3784,7 +4115,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ClosePublishersResponse); i { + switch v := v.(*SubscribeFollowMeResponse); i { case 0: return &v.state case 1: @@ -3796,7 +4127,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[33].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CloseSubscribersRequest); i { + switch v := v.(*ClosePublishersRequest); i { case 0: return &v.state case 1: @@ -3808,7 +4139,19 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[34].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CloseSubscribersResponse); i { + switch v := v.(*ClosePublishersResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[35].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CloseSubscribersRequest); i { case 0: return &v.state case 1: @@ -3820,6 +4163,18 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[36].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CloseSubscribersResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[38].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PublisherToPubBalancerRequest_InitMessage); i { case 0: return &v.state @@ -3831,7 +4186,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[37].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[39].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscriberToSubCoordinatorRequest_InitMessage); i { case 0: return &v.state @@ -3843,7 +4198,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[38].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[40].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscriberToSubCoordinatorRequest_AckMessage); i { case 0: return &v.state @@ -3855,7 +4210,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[39].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscriberToSubCoordinatorResponse_Assignment); i { case 0: return &v.state @@ -3867,7 +4222,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[40].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[42].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PublishMessageRequest_InitMessage); i { case 0: return &v.state @@ -3879,7 +4234,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[43].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PublishFollowMeRequest_InitMessage); i { case 0: return &v.state @@ -3891,7 +4246,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[42].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[44].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PublishFollowMeRequest_FlushMessage); i { case 0: return &v.state @@ -3903,7 +4258,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[43].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[45].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PublishFollowMeRequest_CloseMessage); i { case 0: return &v.state @@ -3915,7 +4270,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[44].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[46].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeMessageRequest_InitMessage); i { case 0: return &v.state @@ -3927,7 +4282,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[45].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[47].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeMessageRequest_AckMessage); i { case 0: return &v.state @@ -3939,7 +4294,7 @@ func file_mq_proto_init() { return nil } } - file_mq_proto_msgTypes[46].Exporter = func(v interface{}, i int) interface{} { + file_mq_proto_msgTypes[48].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeMessageResponse_SubscribeCtrlMessage); i { case 0: return &v.state @@ -3951,6 +4306,42 @@ func file_mq_proto_init() { return nil } } + file_mq_proto_msgTypes[49].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeFollowMeRequest_InitMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[50].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeFollowMeRequest_AckMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[51].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeFollowMeRequest_CloseMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_mq_proto_msgTypes[8].OneofWrappers = []interface{}{ (*PublisherToPubBalancerRequest_Init)(nil), @@ -3981,13 +4372,18 @@ func file_mq_proto_init() { (*SubscribeMessageResponse_Ctrl)(nil), (*SubscribeMessageResponse_Data)(nil), } + file_mq_proto_msgTypes[31].OneofWrappers = []interface{}{ + (*SubscribeFollowMeRequest_Init)(nil), + (*SubscribeFollowMeRequest_Ack)(nil), + (*SubscribeFollowMeRequest_Close)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mq_proto_rawDesc, NumEnums: 1, - NumMessages: 47, + NumMessages: 52, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go index 0028f341e..2123415a8 100644 --- a/weed/pb/mq_pb/mq_grpc.pb.go +++ b/weed/pb/mq_pb/mq_grpc.pb.go @@ -32,6 +32,7 @@ const ( SeaweedMessaging_PublishMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishMessage" SeaweedMessaging_SubscribeMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeMessage" SeaweedMessaging_PublishFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishFollowMe" + SeaweedMessaging_SubscribeFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeFollowMe" ) // SeaweedMessagingClient is the client API for SeaweedMessaging service. @@ -55,9 +56,10 @@ type SeaweedMessagingClient interface { SubscriberToSubCoordinator(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscriberToSubCoordinatorClient, error) // data plane for each topic partition PublishMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishMessageClient, error) - SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error) + SubscribeMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error) // The lead broker asks a follower broker to follow itself PublishFollowMe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishFollowMeClient, error) + SubscribeFollowMe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeFollowMeClient, error) } type seaweedMessagingClient struct { @@ -233,22 +235,17 @@ func (x *seaweedMessagingPublishMessageClient) Recv() (*PublishMessageResponse, return m, nil } -func (c *seaweedMessagingClient) SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error) { +func (c *seaweedMessagingClient) SubscribeMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error) { stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[3], SeaweedMessaging_SubscribeMessage_FullMethodName, opts...) if err != nil { return nil, err } x := &seaweedMessagingSubscribeMessageClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } return x, nil } type SeaweedMessaging_SubscribeMessageClient interface { + Send(*SubscribeMessageRequest) error Recv() (*SubscribeMessageResponse, error) grpc.ClientStream } @@ -257,6 +254,10 @@ type seaweedMessagingSubscribeMessageClient struct { grpc.ClientStream } +func (x *seaweedMessagingSubscribeMessageClient) Send(m *SubscribeMessageRequest) error { + return x.ClientStream.SendMsg(m) +} + func (x *seaweedMessagingSubscribeMessageClient) Recv() (*SubscribeMessageResponse, error) { m := new(SubscribeMessageResponse) if err := x.ClientStream.RecvMsg(m); err != nil { @@ -296,6 +297,40 @@ func (x *seaweedMessagingPublishFollowMeClient) Recv() (*PublishFollowMeResponse return m, nil } +func (c *seaweedMessagingClient) SubscribeFollowMe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeFollowMeClient, error) { + stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[5], SeaweedMessaging_SubscribeFollowMe_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &seaweedMessagingSubscribeFollowMeClient{stream} + return x, nil +} + +type SeaweedMessaging_SubscribeFollowMeClient interface { + Send(*SubscribeFollowMeRequest) error + CloseAndRecv() (*SubscribeFollowMeResponse, error) + grpc.ClientStream +} + +type seaweedMessagingSubscribeFollowMeClient struct { + grpc.ClientStream +} + +func (x *seaweedMessagingSubscribeFollowMeClient) Send(m *SubscribeFollowMeRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *seaweedMessagingSubscribeFollowMeClient) CloseAndRecv() (*SubscribeFollowMeResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(SubscribeFollowMeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // SeaweedMessagingServer is the server API for SeaweedMessaging service. // All implementations must embed UnimplementedSeaweedMessagingServer // for forward compatibility @@ -317,9 +352,10 @@ type SeaweedMessagingServer interface { SubscriberToSubCoordinator(SeaweedMessaging_SubscriberToSubCoordinatorServer) error // data plane for each topic partition PublishMessage(SeaweedMessaging_PublishMessageServer) error - SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error + SubscribeMessage(SeaweedMessaging_SubscribeMessageServer) error // The lead broker asks a follower broker to follow itself PublishFollowMe(SeaweedMessaging_PublishFollowMeServer) error + SubscribeFollowMe(SeaweedMessaging_SubscribeFollowMeServer) error mustEmbedUnimplementedSeaweedMessagingServer() } @@ -360,12 +396,15 @@ func (UnimplementedSeaweedMessagingServer) SubscriberToSubCoordinator(SeaweedMes func (UnimplementedSeaweedMessagingServer) PublishMessage(SeaweedMessaging_PublishMessageServer) error { return status.Errorf(codes.Unimplemented, "method PublishMessage not implemented") } -func (UnimplementedSeaweedMessagingServer) SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error { +func (UnimplementedSeaweedMessagingServer) SubscribeMessage(SeaweedMessaging_SubscribeMessageServer) error { return status.Errorf(codes.Unimplemented, "method SubscribeMessage not implemented") } func (UnimplementedSeaweedMessagingServer) PublishFollowMe(SeaweedMessaging_PublishFollowMeServer) error { return status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented") } +func (UnimplementedSeaweedMessagingServer) SubscribeFollowMe(SeaweedMessaging_SubscribeFollowMeServer) error { + return status.Errorf(codes.Unimplemented, "method SubscribeFollowMe not implemented") +} func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} // UnsafeSeaweedMessagingServer may be embedded to opt out of forward compatibility for this service. @@ -602,15 +641,12 @@ func (x *seaweedMessagingPublishMessageServer) Recv() (*PublishMessageRequest, e } func _SeaweedMessaging_SubscribeMessage_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(SubscribeMessageRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(SeaweedMessagingServer).SubscribeMessage(m, &seaweedMessagingSubscribeMessageServer{stream}) + return srv.(SeaweedMessagingServer).SubscribeMessage(&seaweedMessagingSubscribeMessageServer{stream}) } type SeaweedMessaging_SubscribeMessageServer interface { Send(*SubscribeMessageResponse) error + Recv() (*SubscribeMessageRequest, error) grpc.ServerStream } @@ -622,6 +658,14 @@ func (x *seaweedMessagingSubscribeMessageServer) Send(m *SubscribeMessageRespons return x.ServerStream.SendMsg(m) } +func (x *seaweedMessagingSubscribeMessageServer) Recv() (*SubscribeMessageRequest, error) { + m := new(SubscribeMessageRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func _SeaweedMessaging_PublishFollowMe_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(SeaweedMessagingServer).PublishFollowMe(&seaweedMessagingPublishFollowMeServer{stream}) } @@ -648,6 +692,32 @@ func (x *seaweedMessagingPublishFollowMeServer) Recv() (*PublishFollowMeRequest, return m, nil } +func _SeaweedMessaging_SubscribeFollowMe_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SeaweedMessagingServer).SubscribeFollowMe(&seaweedMessagingSubscribeFollowMeServer{stream}) +} + +type SeaweedMessaging_SubscribeFollowMeServer interface { + SendAndClose(*SubscribeFollowMeResponse) error + Recv() (*SubscribeFollowMeRequest, error) + grpc.ServerStream +} + +type seaweedMessagingSubscribeFollowMeServer struct { + grpc.ServerStream +} + +func (x *seaweedMessagingSubscribeFollowMeServer) SendAndClose(m *SubscribeFollowMeResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *seaweedMessagingSubscribeFollowMeServer) Recv() (*SubscribeFollowMeRequest, error) { + m := new(SubscribeFollowMeRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -711,6 +781,7 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ StreamName: "SubscribeMessage", Handler: _SeaweedMessaging_SubscribeMessage_Handler, ServerStreams: true, + ClientStreams: true, }, { StreamName: "PublishFollowMe", @@ -718,6 +789,11 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "SubscribeFollowMe", + Handler: _SeaweedMessaging_SubscribeFollowMe_Handler, + ClientStreams: true, + }, }, Metadata: "mq.proto", }