From aa4a22ad477ac890d427f19c4a4d0cc0d9864eeb Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 16 Jul 2022 10:49:34 -0700 Subject: [PATCH] segment serde --- weed/mq/broker/brokder_grpc_admin.go | 31 +- weed/mq/broker/broker_segment_serde.go | 89 ++++++ weed/mq/broker/broker_server.go | 10 +- weed/mq/topic.go | 32 +- weed/pb/mq.proto | 10 + weed/pb/mq_pb/mq.pb.go | 407 ++++++++++++++++--------- 6 files changed, 411 insertions(+), 168 deletions(-) create mode 100644 weed/mq/broker/broker_segment_serde.go diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go index 31b5bb84e..0dfb69c50 100644 --- a/weed/mq/broker/brokder_grpc_admin.go +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -53,16 +53,19 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques if err != nil { return ret, err } - // good if the segment is still on the brokers - isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) - if err != nil { - return ret, err - } - if isActive { - for _, broker := range existingBrokers { - ret.Brokers = append(ret.Brokers, string(broker)) + + if len(existingBrokers) > 0 { + // good if the segment is still on the brokers + isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) + if err != nil { + return ret, err + } + if isActive { + for _, broker := range existingBrokers { + ret.Brokers = append(ret.Brokers, string(broker)) + } + return ret, nil } - return ret, nil } // randomly pick up to 10 brokers, and find the ones with the lightest load @@ -72,7 +75,7 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques } // save the allocated brokers info for this segment on the filer - if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil { + if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil { return ret, err } @@ -82,10 +85,6 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques return ret, nil } -func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { - return -} - func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) { var wg sync.WaitGroup @@ -206,7 +205,3 @@ func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddres wg.Wait() return } - -func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { - return -} diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go new file mode 100644 index 000000000..25fca005b --- /dev/null +++ b/weed/mq/broker/broker_segment_serde.go @@ -0,0 +1,89 @@ +package broker + +import ( + "bytes" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/mq" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "github.com/golang/protobuf/jsonpb" + "time" +) + +func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { + info, found, err := broker.readSegmentOnFiler(segment) + if err != nil { + return + } + if !found { + return + } + for _, b := range info.Brokers { + brokers = append(brokers, pb.ServerAddress(b)) + } + + return +} + +func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { + var nodes []string + for _, b := range brokers { + nodes = append(nodes, string(b)) + } + broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{ + Segment: segment.ToPbSegment(), + StartTsNs: time.Now().UnixNano(), + Brokers: nodes, + StopTsNs: 0, + PreviousSegments: nil, + NextSegments: nil, + }) + return +} + +func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) { + dir, name := segment.DirAndName() + + found, err = filer_pb.Exists(broker, dir, name, false) + if !found || err != nil { + return + } + + err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // read filer conf first + data, err := filer.ReadInsideFiler(client, dir, name) + if err != nil { + return fmt.Errorf("ReadEntry: %v", err) + } + + // parse into filer conf object + info = &mq_pb.SegmentInfo{} + if err = jsonpb.Unmarshal(bytes.NewReader(data), info); err != nil { + return err + } + found = true + return nil + }) + + return +} + +func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) { + dir, name := segment.DirAndName() + + var buf bytes.Buffer + filer.ProtoToText(&buf, info) + + err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // read filer conf first + err := filer.SaveInsideFiler(client, dir, name, buf.Bytes()) + if err != nil { + return fmt.Errorf("save segment info: %v", err) + } + return nil + }) + + return +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index f940b00c3..b95db0ecf 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -80,9 +80,15 @@ func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress { return broker.currentFiler } -func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { +func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) + return pb.WithFilerClient(streamingMode, broker.GetFiler(), broker.grpcDialOption, fn) + +} + +func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string { + + return location.Url } diff --git a/weed/mq/topic.go b/weed/mq/topic.go index 87621fca7..12a749133 100644 --- a/weed/mq/topic.go +++ b/weed/mq/topic.go @@ -1,6 +1,8 @@ package mq import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "time" ) @@ -13,9 +15,9 @@ type Topic struct { } type Partition struct { - RangeStart int - RangeStop int // exclusive - RingSize int + RangeStart int32 + RangeStop int32 // exclusive + RingSize int32 } type Segment struct { @@ -32,5 +34,29 @@ func FromPbSegment(segment *mq_pb.Segment) *Segment { Name: segment.Topic, }, Id: segment.Id, + Partition: Partition{ + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + RingSize: segment.Partition.RingSize, + }, } } + +func (segment *Segment) ToPbSegment() *mq_pb.Segment { + return &mq_pb.Segment{ + Namespace: string(segment.Topic.Namespace), + Topic: segment.Topic.Name, + Id: segment.Id, + Partition: &mq_pb.Partition{ + RingSize: segment.Partition.RingSize, + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + }, + } +} + +func (segment *Segment) DirAndName() (dir string, name string) { + dir = fmt.Sprintf("%s/%s/%s", filer.TopicsDir, segment.Topic.Namespace, segment.Topic.Name) + name = fmt.Sprintf("%4d.segment", segment.Id) + return +} diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto index 1f91170fd..2cbe79839 100644 --- a/weed/pb/mq.proto +++ b/weed/pb/mq.proto @@ -21,6 +21,16 @@ service SeaweedMessaging { } +////////////////////////////////////////////////// +message SegmentInfo { + Segment segment = 1; + int64 start_ts_ns = 2; + repeated string brokers = 3; + int64 stop_ts_ns = 4; + repeated int32 previous_segments = 5; + repeated int32 next_segments = 6; +} + ////////////////////////////////////////////////// message FindBrokerLeaderRequest { diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go index e305ba8d0..3323179ee 100644 --- a/weed/pb/mq_pb/mq.pb.go +++ b/weed/pb/mq_pb/mq.pb.go @@ -20,6 +20,94 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +////////////////////////////////////////////////// +type SegmentInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Segment *Segment `protobuf:"bytes,1,opt,name=segment,proto3" json:"segment,omitempty"` + StartTsNs int64 `protobuf:"varint,2,opt,name=start_ts_ns,json=startTsNs,proto3" json:"start_ts_ns,omitempty"` + Brokers []string `protobuf:"bytes,3,rep,name=brokers,proto3" json:"brokers,omitempty"` + StopTsNs int64 `protobuf:"varint,4,opt,name=stop_ts_ns,json=stopTsNs,proto3" json:"stop_ts_ns,omitempty"` + PreviousSegments []int32 `protobuf:"varint,5,rep,packed,name=previous_segments,json=previousSegments,proto3" json:"previous_segments,omitempty"` + NextSegments []int32 `protobuf:"varint,6,rep,packed,name=next_segments,json=nextSegments,proto3" json:"next_segments,omitempty"` +} + +func (x *SegmentInfo) Reset() { + *x = SegmentInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SegmentInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SegmentInfo) ProtoMessage() {} + +func (x *SegmentInfo) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SegmentInfo.ProtoReflect.Descriptor instead. +func (*SegmentInfo) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{0} +} + +func (x *SegmentInfo) GetSegment() *Segment { + if x != nil { + return x.Segment + } + return nil +} + +func (x *SegmentInfo) GetStartTsNs() int64 { + if x != nil { + return x.StartTsNs + } + return 0 +} + +func (x *SegmentInfo) GetBrokers() []string { + if x != nil { + return x.Brokers + } + return nil +} + +func (x *SegmentInfo) GetStopTsNs() int64 { + if x != nil { + return x.StopTsNs + } + return 0 +} + +func (x *SegmentInfo) GetPreviousSegments() []int32 { + if x != nil { + return x.PreviousSegments + } + return nil +} + +func (x *SegmentInfo) GetNextSegments() []int32 { + if x != nil { + return x.NextSegments + } + return nil +} + type FindBrokerLeaderRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -31,7 +119,7 @@ type FindBrokerLeaderRequest struct { func (x *FindBrokerLeaderRequest) Reset() { *x = FindBrokerLeaderRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[0] + mi := &file_mq_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -44,7 +132,7 @@ func (x *FindBrokerLeaderRequest) String() string { func (*FindBrokerLeaderRequest) ProtoMessage() {} func (x *FindBrokerLeaderRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[0] + mi := &file_mq_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -57,7 +145,7 @@ func (x *FindBrokerLeaderRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use FindBrokerLeaderRequest.ProtoReflect.Descriptor instead. func (*FindBrokerLeaderRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{0} + return file_mq_proto_rawDescGZIP(), []int{1} } func (x *FindBrokerLeaderRequest) GetFilerGroup() string { @@ -78,7 +166,7 @@ type FindBrokerLeaderResponse struct { func (x *FindBrokerLeaderResponse) Reset() { *x = FindBrokerLeaderResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[1] + mi := &file_mq_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -91,7 +179,7 @@ func (x *FindBrokerLeaderResponse) String() string { func (*FindBrokerLeaderResponse) ProtoMessage() {} func (x *FindBrokerLeaderResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[1] + mi := &file_mq_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -104,7 +192,7 @@ func (x *FindBrokerLeaderResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use FindBrokerLeaderResponse.ProtoReflect.Descriptor instead. func (*FindBrokerLeaderResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{1} + return file_mq_proto_rawDescGZIP(), []int{2} } func (x *FindBrokerLeaderResponse) GetBroker() string { @@ -127,7 +215,7 @@ type Partition struct { func (x *Partition) Reset() { *x = Partition{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[2] + mi := &file_mq_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -140,7 +228,7 @@ func (x *Partition) String() string { func (*Partition) ProtoMessage() {} func (x *Partition) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[2] + mi := &file_mq_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -153,7 +241,7 @@ func (x *Partition) ProtoReflect() protoreflect.Message { // Deprecated: Use Partition.ProtoReflect.Descriptor instead. func (*Partition) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{2} + return file_mq_proto_rawDescGZIP(), []int{3} } func (x *Partition) GetRingSize() int32 { @@ -191,7 +279,7 @@ type Segment struct { func (x *Segment) Reset() { *x = Segment{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[3] + mi := &file_mq_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -204,7 +292,7 @@ func (x *Segment) String() string { func (*Segment) ProtoMessage() {} func (x *Segment) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[3] + mi := &file_mq_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -217,7 +305,7 @@ func (x *Segment) ProtoReflect() protoreflect.Message { // Deprecated: Use Segment.ProtoReflect.Descriptor instead. func (*Segment) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{3} + return file_mq_proto_rawDescGZIP(), []int{4} } func (x *Segment) GetNamespace() string { @@ -259,7 +347,7 @@ type AssignSegmentBrokersRequest struct { func (x *AssignSegmentBrokersRequest) Reset() { *x = AssignSegmentBrokersRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[4] + mi := &file_mq_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -272,7 +360,7 @@ func (x *AssignSegmentBrokersRequest) String() string { func (*AssignSegmentBrokersRequest) ProtoMessage() {} func (x *AssignSegmentBrokersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[4] + mi := &file_mq_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -285,7 +373,7 @@ func (x *AssignSegmentBrokersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AssignSegmentBrokersRequest.ProtoReflect.Descriptor instead. func (*AssignSegmentBrokersRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{4} + return file_mq_proto_rawDescGZIP(), []int{5} } func (x *AssignSegmentBrokersRequest) GetSegment() *Segment { @@ -306,7 +394,7 @@ type AssignSegmentBrokersResponse struct { func (x *AssignSegmentBrokersResponse) Reset() { *x = AssignSegmentBrokersResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[5] + mi := &file_mq_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -319,7 +407,7 @@ func (x *AssignSegmentBrokersResponse) String() string { func (*AssignSegmentBrokersResponse) ProtoMessage() {} func (x *AssignSegmentBrokersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[5] + mi := &file_mq_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -332,7 +420,7 @@ func (x *AssignSegmentBrokersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use AssignSegmentBrokersResponse.ProtoReflect.Descriptor instead. func (*AssignSegmentBrokersResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{5} + return file_mq_proto_rawDescGZIP(), []int{6} } func (x *AssignSegmentBrokersResponse) GetBrokers() []string { @@ -353,7 +441,7 @@ type CheckSegmentStatusRequest struct { func (x *CheckSegmentStatusRequest) Reset() { *x = CheckSegmentStatusRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[6] + mi := &file_mq_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -366,7 +454,7 @@ func (x *CheckSegmentStatusRequest) String() string { func (*CheckSegmentStatusRequest) ProtoMessage() {} func (x *CheckSegmentStatusRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[6] + mi := &file_mq_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -379,7 +467,7 @@ func (x *CheckSegmentStatusRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckSegmentStatusRequest.ProtoReflect.Descriptor instead. func (*CheckSegmentStatusRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{6} + return file_mq_proto_rawDescGZIP(), []int{7} } func (x *CheckSegmentStatusRequest) GetSegment() *Segment { @@ -400,7 +488,7 @@ type CheckSegmentStatusResponse struct { func (x *CheckSegmentStatusResponse) Reset() { *x = CheckSegmentStatusResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[7] + mi := &file_mq_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -413,7 +501,7 @@ func (x *CheckSegmentStatusResponse) String() string { func (*CheckSegmentStatusResponse) ProtoMessage() {} func (x *CheckSegmentStatusResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[7] + mi := &file_mq_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -426,7 +514,7 @@ func (x *CheckSegmentStatusResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckSegmentStatusResponse.ProtoReflect.Descriptor instead. func (*CheckSegmentStatusResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{7} + return file_mq_proto_rawDescGZIP(), []int{8} } func (x *CheckSegmentStatusResponse) GetIsActive() bool { @@ -445,7 +533,7 @@ type CheckBrokerLoadRequest struct { func (x *CheckBrokerLoadRequest) Reset() { *x = CheckBrokerLoadRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[8] + mi := &file_mq_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -458,7 +546,7 @@ func (x *CheckBrokerLoadRequest) String() string { func (*CheckBrokerLoadRequest) ProtoMessage() {} func (x *CheckBrokerLoadRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[8] + mi := &file_mq_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -471,7 +559,7 @@ func (x *CheckBrokerLoadRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckBrokerLoadRequest.ProtoReflect.Descriptor instead. func (*CheckBrokerLoadRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{8} + return file_mq_proto_rawDescGZIP(), []int{9} } type CheckBrokerLoadResponse struct { @@ -486,7 +574,7 @@ type CheckBrokerLoadResponse struct { func (x *CheckBrokerLoadResponse) Reset() { *x = CheckBrokerLoadResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[9] + mi := &file_mq_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -499,7 +587,7 @@ func (x *CheckBrokerLoadResponse) String() string { func (*CheckBrokerLoadResponse) ProtoMessage() {} func (x *CheckBrokerLoadResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[9] + mi := &file_mq_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -512,7 +600,7 @@ func (x *CheckBrokerLoadResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CheckBrokerLoadResponse.ProtoReflect.Descriptor instead. func (*CheckBrokerLoadResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{9} + return file_mq_proto_rawDescGZIP(), []int{10} } func (x *CheckBrokerLoadResponse) GetMessageCount() int64 { @@ -533,87 +621,102 @@ var File_mq_proto protoreflect.FileDescriptor var file_mq_proto_rawDesc = []byte{ 0x0a, 0x08, 0x6d, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x22, 0x3a, 0x0a, 0x17, 0x46, 0x69, 0x6e, 0x64, - 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, - 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x47, - 0x72, 0x6f, 0x75, 0x70, 0x22, 0x32, 0x0a, 0x18, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, 0x68, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x69, 0x6e, 0x67, 0x5f, 0x73, 0x69, - 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x72, 0x69, 0x6e, 0x67, 0x53, 0x69, - 0x7a, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, - 0x61, 0x72, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x6f, - 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, - 0x6f, 0x70, 0x22, 0x84, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1c, - 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, - 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, - 0x69, 0x63, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, - 0x69, 0x64, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, - 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x4e, 0x0a, 0x1b, 0x41, 0x73, 0x73, - 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x22, 0xe8, 0x01, 0x0a, 0x0b, 0x53, 0x65, 0x67, + 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, - 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x38, 0x0a, 0x1c, 0x41, 0x73, 0x73, - 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x72, 0x6f, - 0x6b, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x73, 0x22, 0x4c, 0x0a, 0x19, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, - 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, - 0x74, 0x22, 0x39, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x18, 0x0a, 0x16, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x5f, 0x0a, 0x17, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, - 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, - 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, - 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x62, 0x79, 0x74, - 0x65, 0x73, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x32, 0xb5, 0x03, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, - 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, - 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, - 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1e, 0x0a, 0x0b, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x5f, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x73, 0x4e, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x73, 0x12, 0x1c, 0x0a, 0x0a, 0x73, 0x74, 0x6f, 0x70, 0x5f, 0x74, 0x73, 0x5f, 0x6e, + 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73, 0x4e, + 0x73, 0x12, 0x2b, 0x0a, 0x11, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x5f, 0x73, 0x65, + 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x05, 0x52, 0x10, 0x70, 0x72, + 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x23, + 0x0a, 0x0d, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x18, + 0x06, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0c, 0x6e, 0x65, 0x78, 0x74, 0x53, 0x65, 0x67, 0x6d, 0x65, + 0x6e, 0x74, 0x73, 0x22, 0x3a, 0x0a, 0x17, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, + 0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x22, + 0x32, 0x0a, 0x18, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x62, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x22, 0x68, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x69, 0x6e, 0x67, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x08, 0x72, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1f, 0x0a, + 0x0b, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x0a, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x1d, + 0x0a, 0x0a, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x6f, 0x70, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x09, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x6f, 0x70, 0x22, 0x84, 0x01, + 0x0a, 0x07, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, + 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, + 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x35, 0x0a, + 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x4e, 0x0a, 0x1b, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, + 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, + 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x38, 0x0a, 0x1c, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, + 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x22, 0x4c, + 0x0a, 0x19, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x73, + 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, + 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x39, 0x0a, 0x1a, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, + 0x5f, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, + 0x73, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x18, 0x0a, 0x16, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x22, 0x5f, 0x0a, 0x17, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x75, 0x6e, + 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x75, + 0x6e, 0x74, 0x32, 0xb5, 0x03, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, + 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, - 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, - 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, - 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, - 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, - 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, - 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, - 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, - 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, - 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, + 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, + 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, + 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, + 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, + 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, + 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, + 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -628,36 +731,38 @@ func file_mq_proto_rawDescGZIP() []byte { return file_mq_proto_rawDescData } -var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_mq_proto_goTypes = []interface{}{ - (*FindBrokerLeaderRequest)(nil), // 0: messaging_pb.FindBrokerLeaderRequest - (*FindBrokerLeaderResponse)(nil), // 1: messaging_pb.FindBrokerLeaderResponse - (*Partition)(nil), // 2: messaging_pb.Partition - (*Segment)(nil), // 3: messaging_pb.Segment - (*AssignSegmentBrokersRequest)(nil), // 4: messaging_pb.AssignSegmentBrokersRequest - (*AssignSegmentBrokersResponse)(nil), // 5: messaging_pb.AssignSegmentBrokersResponse - (*CheckSegmentStatusRequest)(nil), // 6: messaging_pb.CheckSegmentStatusRequest - (*CheckSegmentStatusResponse)(nil), // 7: messaging_pb.CheckSegmentStatusResponse - (*CheckBrokerLoadRequest)(nil), // 8: messaging_pb.CheckBrokerLoadRequest - (*CheckBrokerLoadResponse)(nil), // 9: messaging_pb.CheckBrokerLoadResponse + (*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo + (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest + (*FindBrokerLeaderResponse)(nil), // 2: messaging_pb.FindBrokerLeaderResponse + (*Partition)(nil), // 3: messaging_pb.Partition + (*Segment)(nil), // 4: messaging_pb.Segment + (*AssignSegmentBrokersRequest)(nil), // 5: messaging_pb.AssignSegmentBrokersRequest + (*AssignSegmentBrokersResponse)(nil), // 6: messaging_pb.AssignSegmentBrokersResponse + (*CheckSegmentStatusRequest)(nil), // 7: messaging_pb.CheckSegmentStatusRequest + (*CheckSegmentStatusResponse)(nil), // 8: messaging_pb.CheckSegmentStatusResponse + (*CheckBrokerLoadRequest)(nil), // 9: messaging_pb.CheckBrokerLoadRequest + (*CheckBrokerLoadResponse)(nil), // 10: messaging_pb.CheckBrokerLoadResponse } var file_mq_proto_depIdxs = []int32{ - 2, // 0: messaging_pb.Segment.partition:type_name -> messaging_pb.Partition - 3, // 1: messaging_pb.AssignSegmentBrokersRequest.segment:type_name -> messaging_pb.Segment - 3, // 2: messaging_pb.CheckSegmentStatusRequest.segment:type_name -> messaging_pb.Segment - 0, // 3: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 4, // 4: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest - 6, // 5: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest - 8, // 6: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest - 1, // 7: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 5, // 8: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse - 7, // 9: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse - 9, // 10: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse - 7, // [7:11] is the sub-list for method output_type - 3, // [3:7] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 4, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment + 3, // 1: messaging_pb.Segment.partition:type_name -> messaging_pb.Partition + 4, // 2: messaging_pb.AssignSegmentBrokersRequest.segment:type_name -> messaging_pb.Segment + 4, // 3: messaging_pb.CheckSegmentStatusRequest.segment:type_name -> messaging_pb.Segment + 1, // 4: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 5, // 5: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest + 7, // 6: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest + 9, // 7: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest + 2, // 8: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 6, // 9: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse + 8, // 10: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse + 10, // 11: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse + 8, // [8:12] is the sub-list for method output_type + 4, // [4:8] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_mq_proto_init() } @@ -667,7 +772,7 @@ func file_mq_proto_init() { } if !protoimpl.UnsafeEnabled { file_mq_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FindBrokerLeaderRequest); i { + switch v := v.(*SegmentInfo); i { case 0: return &v.state case 1: @@ -679,7 +784,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FindBrokerLeaderResponse); i { + switch v := v.(*FindBrokerLeaderRequest); i { case 0: return &v.state case 1: @@ -691,7 +796,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Partition); i { + switch v := v.(*FindBrokerLeaderResponse); i { case 0: return &v.state case 1: @@ -703,7 +808,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Segment); i { + switch v := v.(*Partition); i { case 0: return &v.state case 1: @@ -715,7 +820,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AssignSegmentBrokersRequest); i { + switch v := v.(*Segment); i { case 0: return &v.state case 1: @@ -727,7 +832,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AssignSegmentBrokersResponse); i { + switch v := v.(*AssignSegmentBrokersRequest); i { case 0: return &v.state case 1: @@ -739,7 +844,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CheckSegmentStatusRequest); i { + switch v := v.(*AssignSegmentBrokersResponse); i { case 0: return &v.state case 1: @@ -751,7 +856,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CheckSegmentStatusResponse); i { + switch v := v.(*CheckSegmentStatusRequest); i { case 0: return &v.state case 1: @@ -763,7 +868,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CheckBrokerLoadRequest); i { + switch v := v.(*CheckSegmentStatusResponse); i { case 0: return &v.state case 1: @@ -775,6 +880,18 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CheckBrokerLoadRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CheckBrokerLoadResponse); i { case 0: return &v.state @@ -793,7 +910,7 @@ func file_mq_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mq_proto_rawDesc, NumEnums: 0, - NumMessages: 10, + NumMessages: 11, NumExtensions: 0, NumServices: 1, },