From 7003ce7425d81684bc02585a8dd4cef79153115c Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 20 Mar 2024 12:25:40 -0700 Subject: [PATCH] publisher is able to fully send data and receive acks still needs to close the pipes cleanly --- weed/mq/broker/broker_grpc_pub.go | 198 ++++-- weed/mq/broker/broker_grpc_pub_follow.go | 45 +- weed/mq/client/pub_client/publish.go | 2 + weed/mq/client/pub_client/scheduler.go | 18 +- weed/pb/mq.proto | 22 +- weed/pb/mq_pb/mq.pb.go | 741 +++++++++++++++-------- weed/pb/mq_pb/mq_grpc.pb.go | 84 ++- 7 files changed, 775 insertions(+), 335 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 3b68db1af..673a7b1cf 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -7,11 +7,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" "google.golang.org/grpc/peer" "io" "math/rand" "net" - "sync/atomic" ) // PUB @@ -41,6 +41,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // 3. write to the filer var localTopicPartition *topic.LocalPartition + var isGenerated bool req, err := stream.Recv() if err != nil { return err @@ -49,62 +50,122 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // TODO check whether current broker should be the leader for the topic partition ackInterval := 1 initMessage := req.GetInit() - var t topic.Topic - var p topic.Partition - if initMessage != nil { - t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p) + if initMessage == nil { + response.Error = fmt.Sprintf("missing init message") + glog.Errorf("missing init message") + return stream.Send(response) + } + + // get or generate a local partition + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + localTopicPartition, isGenerated, err = b.GetOrGenLocalPartition(t, p) + if err != nil { + response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + return stream.Send(response) + } + ackInterval = int(initMessage.AckInterval) + + // connect to follower brokers + var followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient + var grpcConnection *grpc.ClientConn + if isGenerated && len(initMessage.FollowerBrokers) > 0 { + follower := initMessage.FollowerBrokers[0] + ctx := stream.Context() + grpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption) if err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err) + glog.Errorf("fail to dial %s: %v", follower, err) return stream.Send(response) } - ackInterval = int(initMessage.AckInterval) - for _, follower := range initMessage.FollowerBrokers { - followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{ + followerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + followerStream, err = followerClient.PublishFollowMe(ctx) + if err != nil { + response.Error = fmt.Sprintf("fail to create publish client: %v", err) + glog.Errorf("fail to create publish client: %v", err) + return stream.Send(response) + } + if err = followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Init{ + Init: &mq_pb.PublishFollowMeRequest_InitMessage{ Topic: initMessage.Topic, Partition: initMessage.Partition, - BrokerSelf: string(b.option.BrokerAddress()), - }) - return err - }) - if followErr != nil { - response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr) - glog.Errorf("follower %v failed: %v", follower, followErr) - return stream.Send(response) - } + }, + }, + }); err != nil { + return err } - stream.Send(response) - } else { - response.Error = fmt.Sprintf("missing init message") - glog.Errorf("missing init message") - return stream.Send(response) } + // process each published messages clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) ackCounter := 0 var ackSequence int64 - var isStopping int32 - respChan := make(chan *mq_pb.PublishMessageResponse, 128) defer func() { - atomic.StoreInt32(&isStopping, 1) - respChan <- &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - } - close(respChan) - localTopicPartition.Publishers.RemovePublisher(clientName) - glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) - if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, p) + if followerStream == nil { + // remove the publisher + localTopicPartition.Publishers.RemovePublisher(clientName) + glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) + if localTopicPartition.MaybeShutdownLocalPartition() { + b.localTopicManager.RemoveTopicPartition(t, p) + } } }() - go func() { - for resp := range respChan { - if err := stream.Send(resp); err != nil { - glog.Errorf("Error sending response %v: %v", resp, err) + + if followerStream != nil { + go func() { + defer func() { + println("stop receiving ack from follower") + + // remove the publisher + localTopicPartition.Publishers.RemovePublisher(clientName) + glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) + if localTopicPartition.MaybeShutdownLocalPartition() { + b.localTopicManager.RemoveTopicPartition(t, p) + } + println("closing grpcConnection to follower") + grpcConnection.Close() + }() + + for { + ack, err := followerStream.Recv() + if err != nil { + glog.Errorf("Error receiving response: %v", err) + return + } + ackSequence = ack.AckTsNs + println("recv ack", ack.AckTsNs) + if err := stream.Send(&mq_pb.PublishMessageResponse{ + AckSequence: ack.AckTsNs, + }); err != nil { + glog.Errorf("Error sending response %v: %v", ack, err) + return + } + } + }() + } + + // send a hello message + stream.Send(&mq_pb.PublishMessageResponse{}) + + var receivedSequence, acknowledgedSequence int64 + + defer func() { + if followerStream != nil { + //if err := followerStream.CloseSend(); err != nil { + // glog.Errorf("Error closing follower stream: %v", err) + //} + } else { + if acknowledgedSequence < receivedSequence { + acknowledgedSequence = receivedSequence + response := &mq_pb.PublishMessageResponse{ + AckSequence: acknowledgedSequence, + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending response %v: %v", response, err) + } } } }() @@ -122,22 +183,57 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } // Process the received message - if dataMessage := req.GetData(); dataMessage != nil { - localTopicPartition.Publish(dataMessage) + dataMessage := req.GetData() + if dataMessage == nil { + continue } - ackCounter++ - ackSequence++ - if ackCounter >= ackInterval { - ackCounter = 0 - // send back the ack - response := &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, + // send to the local partition + localTopicPartition.Publish(dataMessage) + receivedSequence = dataMessage.TsNs + + // maybe send to the follower + if followerStream != nil { + println("recv", string(dataMessage.Key), dataMessage.TsNs) + if followErr := followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Data{ + Data: dataMessage, + }, + }); followErr != nil { + return followErr + } + } else { + ackCounter++ + if ackCounter >= ackInterval { + ackCounter = 0 + // send back the ack directly + acknowledgedSequence = receivedSequence + response := &mq_pb.PublishMessageResponse{ + AckSequence: acknowledgedSequence, + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending response %v: %v", response, err) + } } - respChan <- response } } + if followerStream != nil { + // send close to the follower + if followErr := followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + return followErr + } + println("closing follower stream") + + //if err := followerStream.CloseSend(); err != nil { + // glog.Errorf("Error closing follower stream: %v", err) + //} + } + glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition) return nil diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index f6a06f0b8..3e7977eba 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -1,12 +1,49 @@ package broker import ( - "context" + "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "io" ) -func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) { - glog.V(0).Infof("PublishFollowMe %v", request) - return &mq_pb.PublishFollowMeResponse{}, nil +func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) error { + req, err := stream.Recv() + if err != nil { + return err + } + initMessage := req.GetInit() + if initMessage == nil { + return fmt.Errorf("missing init message") + } + + // t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + // follow each published messages + for { + // receive a message + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + return err + } + + // Process the received message + if dataMessage := req.GetData(); dataMessage != nil { + // send back the ack + if err := stream.Send(&mq_pb.PublishFollowMeResponse{ + AckTsNs: dataMessage.TsNs, + }); err != nil { + // TODO save un-acked messages to disk + glog.Errorf("Error sending response %v: %v", dataMessage, err) + } + println("ack", string(dataMessage.Key), dataMessage.TsNs) + } else if closeMessage := req.GetClose(); closeMessage != nil { + glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + break + } + } + return nil } diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 1c5891049..df5b0ca06 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" + "time" ) func (p *TopicPublisher) Publish(key, value []byte) error { @@ -20,5 +21,6 @@ func (p *TopicPublisher) Publish(key, value []byte) error { return inputBuffer.Enqueue(&mq_pb.DataMessage{ Key: key, Value: value, + TsNs: time.Now().UnixNano(), }) } diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e6caf896c..80a1ac9ef 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -150,6 +150,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro }); err != nil { return fmt.Errorf("send init message: %v", err) } + // process the hello message resp, err := stream.Recv() if err != nil { return fmt.Errorf("recv init response: %v", err) @@ -158,21 +159,25 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("init response error: %v", resp.Error) } + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for { ackResp, err := publishClient.Recv() if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { + log.Printf("publish to %s EOF", publishClient.Broker) return } publishClient.Err = err - fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) - fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { @@ -193,11 +198,12 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro publishCounter++ } - if err := publishClient.CloseSend(); err != nil { - return fmt.Errorf("close send: %v", err) - } + // CloseSend would cancel the context on the server side + //if err := publishClient.CloseSend(); err != nil { + // return fmt.Errorf("close send: %v", err) + //} - time.Sleep(3 * time.Second) + wg.Wait() log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto index 12c9ccc93..b9a32651c 100644 --- a/weed/pb/mq.proto +++ b/weed/pb/mq.proto @@ -46,7 +46,7 @@ service SeaweedMessaging { rpc SubscribeMessage (SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { } // The lead broker asks a follower broker to follow itself - rpc PublishFollowMe (PublishFollowMeRequest) returns (PublishFollowMeResponse) { + rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) { } } @@ -209,12 +209,24 @@ message PublishMessageResponse { bool should_close = 3; } message PublishFollowMeRequest { - Topic topic = 1; - Partition partition = 2; - string broker_self = 3; + message InitMessage { + Topic topic = 1; + Partition partition = 2; + } + message FlushMessage { + int64 ts_ns = 1; + } + message CloseMessage { + } + oneof message { + InitMessage init = 1; + DataMessage data = 2; + FlushMessage flush = 3; + CloseMessage close = 4; + } } message PublishFollowMeResponse { - string error = 1; + int64 ack_ts_ns = 1; } message SubscribeMessageRequest { message InitMessage { diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go index 91e7e42b0..1d316b937 100644 --- a/weed/pb/mq_pb/mq.pb.go +++ b/weed/pb/mq_pb/mq.pb.go @@ -1569,9 +1569,13 @@ type PublishFollowMeRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` - BrokerSelf string `protobuf:"bytes,3,opt,name=broker_self,json=brokerSelf,proto3" json:"broker_self,omitempty"` + // Types that are assignable to Message: + // + // *PublishFollowMeRequest_Init + // *PublishFollowMeRequest_Data + // *PublishFollowMeRequest_Flush + // *PublishFollowMeRequest_Close + Message isPublishFollowMeRequest_Message `protobuf_oneof:"message"` } func (x *PublishFollowMeRequest) Reset() { @@ -1606,33 +1610,75 @@ func (*PublishFollowMeRequest) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{26} } -func (x *PublishFollowMeRequest) GetTopic() *Topic { - if x != nil { - return x.Topic +func (m *PublishFollowMeRequest) GetMessage() isPublishFollowMeRequest_Message { + if m != nil { + return m.Message } return nil } -func (x *PublishFollowMeRequest) GetPartition() *Partition { - if x != nil { - return x.Partition +func (x *PublishFollowMeRequest) GetInit() *PublishFollowMeRequest_InitMessage { + if x, ok := x.GetMessage().(*PublishFollowMeRequest_Init); ok { + return x.Init } return nil } -func (x *PublishFollowMeRequest) GetBrokerSelf() string { - if x != nil { - return x.BrokerSelf +func (x *PublishFollowMeRequest) GetData() *DataMessage { + if x, ok := x.GetMessage().(*PublishFollowMeRequest_Data); ok { + return x.Data } - return "" + return nil } +func (x *PublishFollowMeRequest) GetFlush() *PublishFollowMeRequest_FlushMessage { + if x, ok := x.GetMessage().(*PublishFollowMeRequest_Flush); ok { + return x.Flush + } + return nil +} + +func (x *PublishFollowMeRequest) GetClose() *PublishFollowMeRequest_CloseMessage { + if x, ok := x.GetMessage().(*PublishFollowMeRequest_Close); ok { + return x.Close + } + return nil +} + +type isPublishFollowMeRequest_Message interface { + isPublishFollowMeRequest_Message() +} + +type PublishFollowMeRequest_Init struct { + Init *PublishFollowMeRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3,oneof"` +} + +type PublishFollowMeRequest_Data struct { + Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` +} + +type PublishFollowMeRequest_Flush struct { + Flush *PublishFollowMeRequest_FlushMessage `protobuf:"bytes,3,opt,name=flush,proto3,oneof"` +} + +type PublishFollowMeRequest_Close struct { + Close *PublishFollowMeRequest_CloseMessage `protobuf:"bytes,4,opt,name=close,proto3,oneof"` +} + +func (*PublishFollowMeRequest_Init) isPublishFollowMeRequest_Message() {} + +func (*PublishFollowMeRequest_Data) isPublishFollowMeRequest_Message() {} + +func (*PublishFollowMeRequest_Flush) isPublishFollowMeRequest_Message() {} + +func (*PublishFollowMeRequest_Close) isPublishFollowMeRequest_Message() {} + type PublishFollowMeResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + AckTsNs int64 `protobuf:"varint,1,opt,name=ack_ts_ns,json=ackTsNs,proto3" json:"ack_ts_ns,omitempty"` } func (x *PublishFollowMeResponse) Reset() { @@ -1667,11 +1713,11 @@ func (*PublishFollowMeResponse) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{27} } -func (x *PublishFollowMeResponse) GetError() string { +func (x *PublishFollowMeResponse) GetAckTsNs() int64 { if x != nil { - return x.Error + return x.AckTsNs } - return "" + return 0 } type SubscribeMessageRequest struct { @@ -2376,6 +2422,146 @@ func (x *PublishMessageRequest_InitMessage) GetFollowerBrokers() []string { return nil } +type PublishFollowMeRequest_InitMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` +} + +func (x *PublishFollowMeRequest_InitMessage) Reset() { + *x = PublishFollowMeRequest_InitMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[41] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishFollowMeRequest_InitMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishFollowMeRequest_InitMessage) ProtoMessage() {} + +func (x *PublishFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[41] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishFollowMeRequest_InitMessage.ProtoReflect.Descriptor instead. +func (*PublishFollowMeRequest_InitMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{26, 0} +} + +func (x *PublishFollowMeRequest_InitMessage) GetTopic() *Topic { + if x != nil { + return x.Topic + } + return nil +} + +func (x *PublishFollowMeRequest_InitMessage) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +type PublishFollowMeRequest_FlushMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TsNs int64 `protobuf:"varint,1,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` +} + +func (x *PublishFollowMeRequest_FlushMessage) Reset() { + *x = PublishFollowMeRequest_FlushMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[42] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishFollowMeRequest_FlushMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishFollowMeRequest_FlushMessage) ProtoMessage() {} + +func (x *PublishFollowMeRequest_FlushMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[42] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishFollowMeRequest_FlushMessage.ProtoReflect.Descriptor instead. +func (*PublishFollowMeRequest_FlushMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{26, 1} +} + +func (x *PublishFollowMeRequest_FlushMessage) GetTsNs() int64 { + if x != nil { + return x.TsNs + } + return 0 +} + +type PublishFollowMeRequest_CloseMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *PublishFollowMeRequest_CloseMessage) Reset() { + *x = PublishFollowMeRequest_CloseMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[43] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishFollowMeRequest_CloseMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishFollowMeRequest_CloseMessage) ProtoMessage() {} + +func (x *PublishFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[43] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishFollowMeRequest_CloseMessage.ProtoReflect.Descriptor instead. +func (*PublishFollowMeRequest_CloseMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{26, 2} +} + type SubscribeMessageRequest_InitMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2392,7 +2578,7 @@ type SubscribeMessageRequest_InitMessage struct { func (x *SubscribeMessageRequest_InitMessage) Reset() { *x = SubscribeMessageRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[41] + mi := &file_mq_proto_msgTypes[44] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2405,7 +2591,7 @@ func (x *SubscribeMessageRequest_InitMessage) String() string { func (*SubscribeMessageRequest_InitMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[41] + mi := &file_mq_proto_msgTypes[44] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2474,7 +2660,7 @@ type SubscribeMessageRequest_AckMessage struct { func (x *SubscribeMessageRequest_AckMessage) Reset() { *x = SubscribeMessageRequest_AckMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[42] + mi := &file_mq_proto_msgTypes[45] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2487,7 +2673,7 @@ func (x *SubscribeMessageRequest_AckMessage) String() string { func (*SubscribeMessageRequest_AckMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[42] + mi := &file_mq_proto_msgTypes[45] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2523,7 +2709,7 @@ type SubscribeMessageResponse_CtrlMessage struct { func (x *SubscribeMessageResponse_CtrlMessage) Reset() { *x = SubscribeMessageResponse_CtrlMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[43] + mi := &file_mq_proto_msgTypes[46] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2536,7 +2722,7 @@ func (x *SubscribeMessageResponse_CtrlMessage) String() string { func (*SubscribeMessageResponse_CtrlMessage) ProtoMessage() {} func (x *SubscribeMessageResponse_CtrlMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[43] + mi := &file_mq_proto_msgTypes[46] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2817,180 +3003,200 @@ var file_mq_proto_rawDesc = []byte{ 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x73, 0x68, 0x6f, 0x75, 0x6c, - 0x64, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x22, 0x9b, 0x01, 0x0a, 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x64, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x22, 0xd8, 0x03, 0x0a, 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x73, 0x65, - 0x6c, 0x66, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x53, 0x65, 0x6c, 0x66, 0x22, 0x2f, 0x0a, 0x17, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, - 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xdf, 0x03, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x74, 0x12, 0x46, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, + 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x49, 0x0a, 0x05, 0x66, 0x6c, + 0x75, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x46, 0x6c, 0x75, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, + 0x66, 0x6c, 0x75, 0x73, 0x68, 0x12, 0x49, 0x0a, 0x05, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, + 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, 0x63, 0x6c, 0x6f, 0x73, 0x65, + 0x1a, 0x6f, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, + 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x1a, 0x23, 0x0a, 0x0c, 0x46, 0x6c, 0x75, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x04, 0x74, 0x73, 0x4e, 0x73, 0x1a, 0x0e, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x22, 0x35, 0x0a, 0x17, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, + 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x09, + 0x61, 0x63, 0x6b, 0x5f, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x07, 0x61, 0x63, 0x6b, 0x54, 0x73, 0x4e, 0x73, 0x22, 0xdf, 0x03, 0x0a, 0x17, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x47, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x44, 0x0a, + 0x03, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x47, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, + 0x74, 0x2e, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x03, + 0x61, 0x63, 0x6b, 0x1a, 0xff, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, + 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, + 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, + 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x63, + 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, + 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x12, 0x48, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x0f, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, + 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, + 0x69, 0x6c, 0x74, 0x65, 0x72, 0x1a, 0x28, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x42, + 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x95, 0x02, 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x44, 0x0a, 0x03, 0x61, 0x63, - 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x41, - 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x03, 0x61, 0x63, 0x6b, - 0x1a, 0xff, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, - 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, - 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x73, 0x75, - 0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, - 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65, - 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, - 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x12, 0x48, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x66, - 0x66, 0x73, 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x69, - 0x6c, 0x74, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, - 0x65, 0x72, 0x1a, 0x28, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x42, 0x09, 0x0a, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x95, 0x02, 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, - 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, - 0x73, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, - 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x12, 0x27, 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, - 0x66, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, - 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x25, 0x0a, - 0x0f, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x54, - 0x6f, 0x70, 0x69, 0x63, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, - 0x65, 0x0a, 0x16, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, - 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, - 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, - 0x6f, 0x70, 0x69, 0x63, 0x12, 0x20, 0x0a, 0x0c, 0x75, 0x6e, 0x69, 0x78, 0x5f, 0x74, 0x69, 0x6d, - 0x65, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x6e, 0x69, 0x78, - 0x54, 0x69, 0x6d, 0x65, 0x4e, 0x73, 0x22, 0x19, 0x0a, 0x17, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, - 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x66, 0x0a, 0x17, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x74, + 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, 0x72, + 0x6c, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, + 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x1a, 0x73, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x27, 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x65, 0x6e, + 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x0d, 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x12, 0x25, 0x0a, 0x0f, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x45, 0x6e, 0x64, + 0x4f, 0x66, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x22, 0x65, 0x0a, 0x16, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x20, 0x0a, 0x0c, 0x75, 0x6e, 0x69, 0x78, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, - 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x4e, 0x73, 0x22, 0x1a, 0x0a, 0x18, 0x43, 0x6c, 0x6f, - 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4c, 0x0a, 0x18, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x53, 0x74, 0x61, 0x72, 0x74, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, 0x10, 0x00, 0x12, - 0x16, 0x0a, 0x12, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, 0x5f, 0x49, 0x4e, 0x5f, 0x4d, - 0x45, 0x4d, 0x4f, 0x52, 0x59, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x54, 0x45, 0x53, - 0x54, 0x10, 0x02, 0x32, 0xda, 0x0a, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, - 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, - 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x79, 0x0a, - 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, - 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x4e, 0x73, 0x22, 0x19, 0x0a, 0x17, 0x43, 0x6c, 0x6f, + 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x66, 0x0a, 0x17, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x20, 0x0a, 0x0c, 0x75, 0x6e, + 0x69, 0x78, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x0a, 0x75, 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x4e, 0x73, 0x22, 0x1a, 0x0a, 0x18, + 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4c, 0x0a, 0x18, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x53, 0x74, 0x61, 0x72, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, + 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, 0x5f, 0x49, + 0x4e, 0x5f, 0x4d, 0x45, 0x4d, 0x4f, 0x52, 0x59, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, + 0x54, 0x45, 0x53, 0x54, 0x10, 0x02, 0x32, 0xde, 0x0a, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, + 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, + 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, + 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, + 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x79, 0x0a, 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x54, 0x6f, 0x50, + 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, + 0x68, 0x65, 0x72, 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, - 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x54, 0x6f, 0x50, - 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5a, 0x0a, 0x0d, 0x42, 0x61, 0x6c, 0x61, - 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x6c, - 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, - 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, - 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, - 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, - 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, - 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, - 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, - 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, - 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x10, 0x43, 0x6c, 0x6f, 0x73, 0x65, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x12, 0x25, 0x2e, 0x6d, 0x65, + 0x54, 0x6f, 0x50, 0x75, 0x62, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x72, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5a, 0x0a, 0x0d, 0x42, + 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x6c, 0x61, + 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, + 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0e, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x23, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x4c, 0x6f, 0x6f, + 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, + 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, + 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, + 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x6c, 0x6f, 0x73, + 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x85, 0x01, 0x0a, - 0x1a, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, - 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x2f, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, - 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, - 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, - 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x65, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, - 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x60, - 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, - 0x65, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, - 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x42, 0x4f, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, - 0x42, 0x11, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, - 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, - 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x10, 0x43, 0x6c, + 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x12, 0x25, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, + 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x85, 0x01, 0x0a, 0x1a, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, + 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x2f, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, + 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, + 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x23, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, + 0x62, 0x6c, 0x69, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x65, 0x0a, 0x10, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, + 0x01, 0x12, 0x64, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, + 0x6f, 0x77, 0x4d, 0x65, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, + 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, + 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x4f, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, + 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x11, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, + 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, + 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -3006,7 +3212,7 @@ func file_mq_proto_rawDescGZIP() []byte { } var file_mq_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 44) +var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 47) var file_mq_proto_goTypes = []interface{}{ (PartitionOffsetStartType)(0), // 0: messaging_pb.PartitionOffsetStartType (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest @@ -3050,9 +3256,12 @@ var file_mq_proto_goTypes = []interface{}{ (*SubscriberToSubCoordinatorResponse_AssignedPartition)(nil), // 39: messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 40: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment (*PublishMessageRequest_InitMessage)(nil), // 41: messaging_pb.PublishMessageRequest.InitMessage - (*SubscribeMessageRequest_InitMessage)(nil), // 42: messaging_pb.SubscribeMessageRequest.InitMessage - (*SubscribeMessageRequest_AckMessage)(nil), // 43: messaging_pb.SubscribeMessageRequest.AckMessage - (*SubscribeMessageResponse_CtrlMessage)(nil), // 44: messaging_pb.SubscribeMessageResponse.CtrlMessage + (*PublishFollowMeRequest_InitMessage)(nil), // 42: messaging_pb.PublishFollowMeRequest.InitMessage + (*PublishFollowMeRequest_FlushMessage)(nil), // 43: messaging_pb.PublishFollowMeRequest.FlushMessage + (*PublishFollowMeRequest_CloseMessage)(nil), // 44: messaging_pb.PublishFollowMeRequest.CloseMessage + (*SubscribeMessageRequest_InitMessage)(nil), // 45: messaging_pb.SubscribeMessageRequest.InitMessage + (*SubscribeMessageRequest_AckMessage)(nil), // 46: messaging_pb.SubscribeMessageRequest.AckMessage + (*SubscribeMessageResponse_CtrlMessage)(nil), // 47: messaging_pb.SubscribeMessageResponse.CtrlMessage } var file_mq_proto_depIdxs = []int32{ 3, // 0: messaging_pb.Offset.topic:type_name -> messaging_pb.Topic @@ -3078,54 +3287,58 @@ var file_mq_proto_depIdxs = []int32{ 40, // 20: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment 41, // 21: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage 24, // 22: messaging_pb.PublishMessageRequest.data:type_name -> messaging_pb.DataMessage - 3, // 23: messaging_pb.PublishFollowMeRequest.topic:type_name -> messaging_pb.Topic - 4, // 24: messaging_pb.PublishFollowMeRequest.partition:type_name -> messaging_pb.Partition - 42, // 25: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage - 43, // 26: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage - 44, // 27: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.CtrlMessage - 24, // 28: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage - 3, // 29: messaging_pb.ClosePublishersRequest.topic:type_name -> messaging_pb.Topic - 3, // 30: messaging_pb.CloseSubscribersRequest.topic:type_name -> messaging_pb.Topic - 8, // 31: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats - 3, // 32: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 4, // 33: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage.partition:type_name -> messaging_pb.Partition - 4, // 34: messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition.partition:type_name -> messaging_pb.Partition - 39, // 35: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.assigned_partitions:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition - 3, // 36: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 4, // 37: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> messaging_pb.Partition - 3, // 38: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic - 6, // 39: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> messaging_pb.PartitionOffset - 1, // 40: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 9, // 41: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest - 11, // 42: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest - 15, // 43: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest - 13, // 44: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest - 17, // 45: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest - 20, // 46: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 31, // 47: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest - 33, // 48: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest - 22, // 49: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest - 25, // 50: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest - 29, // 51: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest - 27, // 52: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest - 2, // 53: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 10, // 54: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse - 12, // 55: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse - 16, // 56: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse - 14, // 57: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse - 18, // 58: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse - 21, // 59: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 32, // 60: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse - 34, // 61: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse - 23, // 62: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse - 26, // 63: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse - 30, // 64: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse - 28, // 65: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse - 53, // [53:66] is the sub-list for method output_type - 40, // [40:53] is the sub-list for method input_type - 40, // [40:40] is the sub-list for extension type_name - 40, // [40:40] is the sub-list for extension extendee - 0, // [0:40] is the sub-list for field type_name + 42, // 23: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage + 24, // 24: messaging_pb.PublishFollowMeRequest.data:type_name -> messaging_pb.DataMessage + 43, // 25: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage + 44, // 26: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage + 45, // 27: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage + 46, // 28: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage + 47, // 29: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.CtrlMessage + 24, // 30: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage + 3, // 31: messaging_pb.ClosePublishersRequest.topic:type_name -> messaging_pb.Topic + 3, // 32: messaging_pb.CloseSubscribersRequest.topic:type_name -> messaging_pb.Topic + 8, // 33: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats + 3, // 34: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 35: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage.partition:type_name -> messaging_pb.Partition + 4, // 36: messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition.partition:type_name -> messaging_pb.Partition + 39, // 37: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.assigned_partitions:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition + 3, // 38: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 39: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 3, // 40: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 41: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 3, // 42: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 6, // 43: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> messaging_pb.PartitionOffset + 1, // 44: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 9, // 45: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest + 11, // 46: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest + 15, // 47: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest + 13, // 48: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest + 17, // 49: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest + 20, // 50: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 31, // 51: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest + 33, // 52: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest + 22, // 53: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest + 25, // 54: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest + 29, // 55: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest + 27, // 56: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest + 2, // 57: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 10, // 58: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse + 12, // 59: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse + 16, // 60: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse + 14, // 61: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse + 18, // 62: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse + 21, // 63: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 32, // 64: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse + 34, // 65: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse + 23, // 66: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse + 26, // 67: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse + 30, // 68: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse + 28, // 69: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse + 57, // [57:70] is the sub-list for method output_type + 44, // [44:57] is the sub-list for method input_type + 44, // [44:44] is the sub-list for extension type_name + 44, // [44:44] is the sub-list for extension extendee + 0, // [0:44] is the sub-list for field type_name } func init() { file_mq_proto_init() } @@ -3615,7 +3828,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeMessageRequest_InitMessage); i { + switch v := v.(*PublishFollowMeRequest_InitMessage); i { case 0: return &v.state case 1: @@ -3627,7 +3840,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[42].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeMessageRequest_AckMessage); i { + switch v := v.(*PublishFollowMeRequest_FlushMessage); i { case 0: return &v.state case 1: @@ -3639,6 +3852,42 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[43].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishFollowMeRequest_CloseMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[44].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeMessageRequest_InitMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[45].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeMessageRequest_AckMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[46].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeMessageResponse_CtrlMessage); i { case 0: return &v.state @@ -3666,6 +3915,12 @@ func file_mq_proto_init() { (*PublishMessageRequest_Init)(nil), (*PublishMessageRequest_Data)(nil), } + file_mq_proto_msgTypes[26].OneofWrappers = []interface{}{ + (*PublishFollowMeRequest_Init)(nil), + (*PublishFollowMeRequest_Data)(nil), + (*PublishFollowMeRequest_Flush)(nil), + (*PublishFollowMeRequest_Close)(nil), + } file_mq_proto_msgTypes[28].OneofWrappers = []interface{}{ (*SubscribeMessageRequest_Init)(nil), (*SubscribeMessageRequest_Ack)(nil), @@ -3680,7 +3935,7 @@ func file_mq_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mq_proto_rawDesc, NumEnums: 1, - NumMessages: 44, + NumMessages: 47, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go index 356c5c3d4..0028f341e 100644 --- a/weed/pb/mq_pb/mq_grpc.pb.go +++ b/weed/pb/mq_pb/mq_grpc.pb.go @@ -57,7 +57,7 @@ type SeaweedMessagingClient interface { PublishMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishMessageClient, error) SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error) // The lead broker asks a follower broker to follow itself - PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error) + PublishFollowMe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishFollowMeClient, error) } type seaweedMessagingClient struct { @@ -265,13 +265,35 @@ func (x *seaweedMessagingSubscribeMessageClient) Recv() (*SubscribeMessageRespon return m, nil } -func (c *seaweedMessagingClient) PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error) { - out := new(PublishFollowMeResponse) - err := c.cc.Invoke(ctx, SeaweedMessaging_PublishFollowMe_FullMethodName, in, out, opts...) +func (c *seaweedMessagingClient) PublishFollowMe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishFollowMeClient, error) { + stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[4], SeaweedMessaging_PublishFollowMe_FullMethodName, opts...) if err != nil { return nil, err } - return out, nil + x := &seaweedMessagingPublishFollowMeClient{stream} + return x, nil +} + +type SeaweedMessaging_PublishFollowMeClient interface { + Send(*PublishFollowMeRequest) error + Recv() (*PublishFollowMeResponse, error) + grpc.ClientStream +} + +type seaweedMessagingPublishFollowMeClient struct { + grpc.ClientStream +} + +func (x *seaweedMessagingPublishFollowMeClient) Send(m *PublishFollowMeRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *seaweedMessagingPublishFollowMeClient) Recv() (*PublishFollowMeResponse, error) { + m := new(PublishFollowMeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // SeaweedMessagingServer is the server API for SeaweedMessaging service. @@ -297,7 +319,7 @@ type SeaweedMessagingServer interface { PublishMessage(SeaweedMessaging_PublishMessageServer) error SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error // The lead broker asks a follower broker to follow itself - PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error) + PublishFollowMe(SeaweedMessaging_PublishFollowMeServer) error mustEmbedUnimplementedSeaweedMessagingServer() } @@ -341,8 +363,8 @@ func (UnimplementedSeaweedMessagingServer) PublishMessage(SeaweedMessaging_Publi func (UnimplementedSeaweedMessagingServer) SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error { return status.Errorf(codes.Unimplemented, "method SubscribeMessage not implemented") } -func (UnimplementedSeaweedMessagingServer) PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented") +func (UnimplementedSeaweedMessagingServer) PublishFollowMe(SeaweedMessaging_PublishFollowMeServer) error { + return status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented") } func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} @@ -600,22 +622,30 @@ func (x *seaweedMessagingSubscribeMessageServer) Send(m *SubscribeMessageRespons return x.ServerStream.SendMsg(m) } -func _SeaweedMessaging_PublishFollowMe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PublishFollowMeRequest) - if err := dec(in); err != nil { +func _SeaweedMessaging_PublishFollowMe_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SeaweedMessagingServer).PublishFollowMe(&seaweedMessagingPublishFollowMeServer{stream}) +} + +type SeaweedMessaging_PublishFollowMeServer interface { + Send(*PublishFollowMeResponse) error + Recv() (*PublishFollowMeRequest, error) + grpc.ServerStream +} + +type seaweedMessagingPublishFollowMeServer struct { + grpc.ServerStream +} + +func (x *seaweedMessagingPublishFollowMeServer) Send(m *PublishFollowMeResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *seaweedMessagingPublishFollowMeServer) Recv() (*PublishFollowMeRequest, error) { + m := new(PublishFollowMeRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(SeaweedMessagingServer).PublishFollowMe(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: SeaweedMessaging_PublishFollowMe_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SeaweedMessagingServer).PublishFollowMe(ctx, req.(*PublishFollowMeRequest)) - } - return interceptor(ctx, in, info, handler) + return m, nil } // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. @@ -657,10 +687,6 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ MethodName: "CloseSubscribers", Handler: _SeaweedMessaging_CloseSubscribers_Handler, }, - { - MethodName: "PublishFollowMe", - Handler: _SeaweedMessaging_PublishFollowMe_Handler, - }, }, Streams: []grpc.StreamDesc{ { @@ -686,6 +712,12 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ Handler: _SeaweedMessaging_SubscribeMessage_Handler, ServerStreams: true, }, + { + StreamName: "PublishFollowMe", + Handler: _SeaweedMessaging_PublishFollowMe_Handler, + ServerStreams: true, + ClientStreams: true, + }, }, Metadata: "mq.proto", }