Browse Source

scaffolding messaging

pull/1293/head
Chris Lu 5 years ago
parent
commit
ce4b369be2
  1. 6
      weed/command/msg_broker.go
  2. 2
      weed/pb/Makefile
  3. 76
      weed/pb/messaging.proto
  4. 601
      weed/pb/messaging_pb/messaging.pb.go
  5. 66
      weed/pb/queue.proto
  6. 516
      weed/pb/queue_pb/queue.pb.go
  7. 10
      weed/server/msg_broker_grpc_server.go

6
weed/command/msg_broker.go

@ -10,7 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
weed_server "github.com/chrislusf/seaweedfs/weed/server" weed_server "github.com/chrislusf/seaweedfs/weed/server"
@ -25,14 +25,12 @@ var (
type QueueOptions struct { type QueueOptions struct {
filer *string filer *string
port *int port *int
defaultTtl *string
} }
func init() { func init() {
cmdMsgBroker.Run = runMsgBroker // break init cycle cmdMsgBroker.Run = runMsgBroker // break init cycle
messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port") messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
} }
var cmdMsgBroker = &Command{ var cmdMsgBroker = &Command{
@ -94,7 +92,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
} }
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
reflection.Register(grpcS) reflection.Register(grpcS)
grpcS.Serve(grpcL) grpcS.Serve(grpcL)

2
weed/pb/Makefile

@ -7,6 +7,6 @@ gen:
protoc volume_server.proto --go_out=plugins=grpc:./volume_server_pb protoc volume_server.proto --go_out=plugins=grpc:./volume_server_pb
protoc filer.proto --go_out=plugins=grpc:./filer_pb protoc filer.proto --go_out=plugins=grpc:./filer_pb
protoc iam.proto --go_out=plugins=grpc:./iam_pb protoc iam.proto --go_out=plugins=grpc:./iam_pb
protoc queue.proto --go_out=plugins=grpc:./queue_pb
protoc messaging.proto --go_out=plugins=grpc:./messaging_pb
# protoc filer.proto --java_out=../../other/java/client/src/main/java # protoc filer.proto --java_out=../../other/java/client/src/main/java
cp filer.proto ../../other/java/client/src/main/proto cp filer.proto ../../other/java/client/src/main/proto

76
weed/pb/messaging.proto

@ -0,0 +1,76 @@
syntax = "proto3";
package messaging_pb;
option java_package = "seaweedfs.client";
option java_outer_classname = "MessagingProto";
//////////////////////////////////////////////////
service SeaweedMessaging {
rpc Subscribe (SubscribeRequest) returns (stream Message) {
}
rpc Publish (stream PublishRequest) returns (stream PublishResponse) {
}
rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) {
}
rpc GetTopicConfiguration (GetTopicConfigurationRequest) returns (GetTopicConfigurationResponse) {
}
}
//////////////////////////////////////////////////
message SubscribeRequest {
string namespace = 1;
string topic = 2;
int32 partition = 3;
enum StartPosition {
LATEST = 0; // Start at the newest message
EARLIEST = 1; // Start at the oldest message
TIMESTAMP = 2; // Start after a specified timestamp, exclusive
}
StartPosition startPosition = 4; // Where to begin consuming from
int64 timestampNs = 5; // timestamp in nano seconds
}
message Message {
int64 timestamp = 1 [jstype = JS_STRING]; // When the message was received by the broker
bytes key = 2; // Message key
bytes value = 3; // Message payload
map<string, bytes> headers = 4; // Message headers
}
message PublishRequest {
string namespace = 1; // only needed on the initial request
string topic = 2; // only needed on the initial request
int32 partition = 4;
bytes key = 5; // Message key
bytes value = 6; // Message payload
map<string, bytes> headers = 7; // Message headers
}
message PublishResponse {
int32 partition_count = 1;
}
message ConfigureTopicRequest {
string namespace = 1;
string topic = 2;
int32 partition_count = 3;
string collection = 4;
}
message ConfigureTopicResponse {
}
message GetTopicConfigurationRequest {
string namespace = 1;
string topic = 2;
}
message GetTopicConfigurationResponse {
int32 partitions = 3;
}

601
weed/pb/messaging_pb/messaging.pb.go

@ -0,0 +1,601 @@
// Code generated by protoc-gen-go.
// source: messaging.proto
// DO NOT EDIT!
/*
Package messaging_pb is a generated protocol buffer package.
It is generated from these files:
messaging.proto
It has these top-level messages:
SubscribeRequest
Message
PublishRequest
PublishResponse
ConfigureTopicRequest
ConfigureTopicResponse
GetTopicConfigurationRequest
GetTopicConfigurationResponse
*/
package messaging_pb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type SubscribeRequest_StartPosition int32
const (
SubscribeRequest_LATEST SubscribeRequest_StartPosition = 0
SubscribeRequest_EARLIEST SubscribeRequest_StartPosition = 1
SubscribeRequest_TIMESTAMP SubscribeRequest_StartPosition = 2
)
var SubscribeRequest_StartPosition_name = map[int32]string{
0: "LATEST",
1: "EARLIEST",
2: "TIMESTAMP",
}
var SubscribeRequest_StartPosition_value = map[string]int32{
"LATEST": 0,
"EARLIEST": 1,
"TIMESTAMP": 2,
}
func (x SubscribeRequest_StartPosition) String() string {
return proto.EnumName(SubscribeRequest_StartPosition_name, int32(x))
}
func (SubscribeRequest_StartPosition) EnumDescriptor() ([]byte, []int) {
return fileDescriptor0, []int{0, 0}
}
type SubscribeRequest struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
Partition int32 `protobuf:"varint,3,opt,name=partition" json:"partition,omitempty"`
StartPosition SubscribeRequest_StartPosition `protobuf:"varint,4,opt,name=startPosition,enum=messaging_pb.SubscribeRequest_StartPosition" json:"startPosition,omitempty"`
TimestampNs int64 `protobuf:"varint,5,opt,name=timestampNs" json:"timestampNs,omitempty"`
}
func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} }
func (m *SubscribeRequest) String() string { return proto.CompactTextString(m) }
func (*SubscribeRequest) ProtoMessage() {}
func (*SubscribeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *SubscribeRequest) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
func (m *SubscribeRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
func (m *SubscribeRequest) GetPartition() int32 {
if m != nil {
return m.Partition
}
return 0
}
func (m *SubscribeRequest) GetStartPosition() SubscribeRequest_StartPosition {
if m != nil {
return m.StartPosition
}
return SubscribeRequest_LATEST
}
func (m *SubscribeRequest) GetTimestampNs() int64 {
if m != nil {
return m.TimestampNs
}
return 0
}
type Message struct {
Timestamp int64 `protobuf:"varint,1,opt,name=timestamp" json:"timestamp,omitempty"`
Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
Headers map[string][]byte `protobuf:"bytes,4,rep,name=headers" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Message) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *Message) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *Message) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *Message) GetHeaders() map[string][]byte {
if m != nil {
return m.Headers
}
return nil
}
type PublishRequest struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
Partition int32 `protobuf:"varint,4,opt,name=partition" json:"partition,omitempty"`
Key []byte `protobuf:"bytes,5,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,6,opt,name=value,proto3" json:"value,omitempty"`
Headers map[string][]byte `protobuf:"bytes,7,rep,name=headers" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (m *PublishRequest) Reset() { *m = PublishRequest{} }
func (m *PublishRequest) String() string { return proto.CompactTextString(m) }
func (*PublishRequest) ProtoMessage() {}
func (*PublishRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *PublishRequest) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
func (m *PublishRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
func (m *PublishRequest) GetPartition() int32 {
if m != nil {
return m.Partition
}
return 0
}
func (m *PublishRequest) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *PublishRequest) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *PublishRequest) GetHeaders() map[string][]byte {
if m != nil {
return m.Headers
}
return nil
}
type PublishResponse struct {
PartitionCount int32 `protobuf:"varint,1,opt,name=partition_count,json=partitionCount" json:"partition_count,omitempty"`
}
func (m *PublishResponse) Reset() { *m = PublishResponse{} }
func (m *PublishResponse) String() string { return proto.CompactTextString(m) }
func (*PublishResponse) ProtoMessage() {}
func (*PublishResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *PublishResponse) GetPartitionCount() int32 {
if m != nil {
return m.PartitionCount
}
return 0
}
type ConfigureTopicRequest struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
PartitionCount int32 `protobuf:"varint,3,opt,name=partition_count,json=partitionCount" json:"partition_count,omitempty"`
Collection string `protobuf:"bytes,4,opt,name=collection" json:"collection,omitempty"`
}
func (m *ConfigureTopicRequest) Reset() { *m = ConfigureTopicRequest{} }
func (m *ConfigureTopicRequest) String() string { return proto.CompactTextString(m) }
func (*ConfigureTopicRequest) ProtoMessage() {}
func (*ConfigureTopicRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *ConfigureTopicRequest) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
func (m *ConfigureTopicRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
func (m *ConfigureTopicRequest) GetPartitionCount() int32 {
if m != nil {
return m.PartitionCount
}
return 0
}
func (m *ConfigureTopicRequest) GetCollection() string {
if m != nil {
return m.Collection
}
return ""
}
type ConfigureTopicResponse struct {
}
func (m *ConfigureTopicResponse) Reset() { *m = ConfigureTopicResponse{} }
func (m *ConfigureTopicResponse) String() string { return proto.CompactTextString(m) }
func (*ConfigureTopicResponse) ProtoMessage() {}
func (*ConfigureTopicResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
type GetTopicConfigurationRequest struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
}
func (m *GetTopicConfigurationRequest) Reset() { *m = GetTopicConfigurationRequest{} }
func (m *GetTopicConfigurationRequest) String() string { return proto.CompactTextString(m) }
func (*GetTopicConfigurationRequest) ProtoMessage() {}
func (*GetTopicConfigurationRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *GetTopicConfigurationRequest) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
func (m *GetTopicConfigurationRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
type GetTopicConfigurationResponse struct {
Partitions int32 `protobuf:"varint,3,opt,name=partitions" json:"partitions,omitempty"`
}
func (m *GetTopicConfigurationResponse) Reset() { *m = GetTopicConfigurationResponse{} }
func (m *GetTopicConfigurationResponse) String() string { return proto.CompactTextString(m) }
func (*GetTopicConfigurationResponse) ProtoMessage() {}
func (*GetTopicConfigurationResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *GetTopicConfigurationResponse) GetPartitions() int32 {
if m != nil {
return m.Partitions
}
return 0
}
func init() {
proto.RegisterType((*SubscribeRequest)(nil), "messaging_pb.SubscribeRequest")
proto.RegisterType((*Message)(nil), "messaging_pb.Message")
proto.RegisterType((*PublishRequest)(nil), "messaging_pb.PublishRequest")
proto.RegisterType((*PublishResponse)(nil), "messaging_pb.PublishResponse")
proto.RegisterType((*ConfigureTopicRequest)(nil), "messaging_pb.ConfigureTopicRequest")
proto.RegisterType((*ConfigureTopicResponse)(nil), "messaging_pb.ConfigureTopicResponse")
proto.RegisterType((*GetTopicConfigurationRequest)(nil), "messaging_pb.GetTopicConfigurationRequest")
proto.RegisterType((*GetTopicConfigurationResponse)(nil), "messaging_pb.GetTopicConfigurationResponse")
proto.RegisterEnum("messaging_pb.SubscribeRequest_StartPosition", SubscribeRequest_StartPosition_name, SubscribeRequest_StartPosition_value)
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for SeaweedMessaging service
type SeaweedMessagingClient interface {
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error)
Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error)
ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error)
GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error)
}
type seaweedMessagingClient struct {
cc *grpc.ClientConn
}
func NewSeaweedMessagingClient(cc *grpc.ClientConn) SeaweedMessagingClient {
return &seaweedMessagingClient{cc}
}
func (c *seaweedMessagingClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) {
stream, err := grpc.NewClientStream(ctx, &_SeaweedMessaging_serviceDesc.Streams[0], c.cc, "/messaging_pb.SeaweedMessaging/Subscribe", opts...)
if err != nil {
return nil, err
}
x := &seaweedMessagingSubscribeClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SeaweedMessaging_SubscribeClient interface {
Recv() (*Message, error)
grpc.ClientStream
}
type seaweedMessagingSubscribeClient struct {
grpc.ClientStream
}
func (x *seaweedMessagingSubscribeClient) Recv() (*Message, error) {
m := new(Message)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *seaweedMessagingClient) Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) {
stream, err := grpc.NewClientStream(ctx, &_SeaweedMessaging_serviceDesc.Streams[1], c.cc, "/messaging_pb.SeaweedMessaging/Publish", opts...)
if err != nil {
return nil, err
}
x := &seaweedMessagingPublishClient{stream}
return x, nil
}
type SeaweedMessaging_PublishClient interface {
Send(*PublishRequest) error
Recv() (*PublishResponse, error)
grpc.ClientStream
}
type seaweedMessagingPublishClient struct {
grpc.ClientStream
}
func (x *seaweedMessagingPublishClient) Send(m *PublishRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) {
m := new(PublishResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) {
out := new(ConfigureTopicResponse)
err := grpc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/ConfigureTopic", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error) {
out := new(GetTopicConfigurationResponse)
err := grpc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/GetTopicConfiguration", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for SeaweedMessaging service
type SeaweedMessagingServer interface {
Subscribe(*SubscribeRequest, SeaweedMessaging_SubscribeServer) error
Publish(SeaweedMessaging_PublishServer) error
ConfigureTopic(context.Context, *ConfigureTopicRequest) (*ConfigureTopicResponse, error)
GetTopicConfiguration(context.Context, *GetTopicConfigurationRequest) (*GetTopicConfigurationResponse, error)
}
func RegisterSeaweedMessagingServer(s *grpc.Server, srv SeaweedMessagingServer) {
s.RegisterService(&_SeaweedMessaging_serviceDesc, srv)
}
func _SeaweedMessaging_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SeaweedMessagingServer).Subscribe(m, &seaweedMessagingSubscribeServer{stream})
}
type SeaweedMessaging_SubscribeServer interface {
Send(*Message) error
grpc.ServerStream
}
type seaweedMessagingSubscribeServer struct {
grpc.ServerStream
}
func (x *seaweedMessagingSubscribeServer) Send(m *Message) error {
return x.ServerStream.SendMsg(m)
}
func _SeaweedMessaging_Publish_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SeaweedMessagingServer).Publish(&seaweedMessagingPublishServer{stream})
}
type SeaweedMessaging_PublishServer interface {
Send(*PublishResponse) error
Recv() (*PublishRequest, error)
grpc.ServerStream
}
type seaweedMessagingPublishServer struct {
grpc.ServerStream
}
func (x *seaweedMessagingPublishServer) Send(m *PublishResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *seaweedMessagingPublishServer) Recv() (*PublishRequest, error) {
m := new(PublishRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _SeaweedMessaging_ConfigureTopic_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ConfigureTopicRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedMessagingServer).ConfigureTopic(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/messaging_pb.SeaweedMessaging/ConfigureTopic",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMessagingServer).ConfigureTopic(ctx, req.(*ConfigureTopicRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedMessaging_GetTopicConfiguration_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetTopicConfigurationRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedMessagingServer).GetTopicConfiguration(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/messaging_pb.SeaweedMessaging/GetTopicConfiguration",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMessagingServer).GetTopicConfiguration(ctx, req.(*GetTopicConfigurationRequest))
}
return interceptor(ctx, in, info, handler)
}
var _SeaweedMessaging_serviceDesc = grpc.ServiceDesc{
ServiceName: "messaging_pb.SeaweedMessaging",
HandlerType: (*SeaweedMessagingServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ConfigureTopic",
Handler: _SeaweedMessaging_ConfigureTopic_Handler,
},
{
MethodName: "GetTopicConfiguration",
Handler: _SeaweedMessaging_GetTopicConfiguration_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Subscribe",
Handler: _SeaweedMessaging_Subscribe_Handler,
ServerStreams: true,
},
{
StreamName: "Publish",
Handler: _SeaweedMessaging_Publish_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "messaging.proto",
}
func init() { proto.RegisterFile("messaging.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 586 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x95, 0xdb, 0x8b, 0xda, 0x4e,
0x14, 0xc7, 0x9d, 0xc4, 0xcb, 0x2f, 0x67, 0xbd, 0x31, 0xfc, 0x2c, 0x41, 0x5c, 0x09, 0x69, 0xa1,
0xe9, 0x85, 0x20, 0xf6, 0x65, 0x91, 0x42, 0x51, 0xb1, 0xed, 0x82, 0x16, 0x19, 0x7d, 0x2d, 0x4b,
0xcc, 0xce, 0xba, 0xa1, 0x9a, 0xa4, 0x99, 0x49, 0xcb, 0xfe, 0x0d, 0x7d, 0xdd, 0x7f, 0xab, 0x7f,
0x51, 0x5f, 0x4a, 0xae, 0x26, 0x92, 0x95, 0xd2, 0xcb, 0x5b, 0xe6, 0x3b, 0x67, 0xce, 0xf7, 0x9c,
0xcf, 0x49, 0x26, 0xd0, 0xda, 0x53, 0xc6, 0x8c, 0xad, 0x65, 0x6f, 0x75, 0xd7, 0x73, 0xb8, 0x83,
0xeb, 0xa9, 0x70, 0xe5, 0x6e, 0xd4, 0x7b, 0x01, 0xda, 0x2b, 0x7f, 0xc3, 0x4c, 0xcf, 0xda, 0x50,
0x42, 0x3f, 0xfb, 0x94, 0x71, 0xdc, 0x03, 0xc9, 0x36, 0xf6, 0x94, 0xb9, 0x86, 0x49, 0x65, 0xa4,
0x20, 0x4d, 0x22, 0x07, 0x01, 0xff, 0x0f, 0x15, 0xee, 0xb8, 0x96, 0x29, 0x0b, 0xe1, 0x4e, 0xb4,
0x08, 0xce, 0xb8, 0x86, 0xc7, 0x2d, 0x6e, 0x39, 0xb6, 0x2c, 0x2a, 0x48, 0xab, 0x90, 0x83, 0x80,
0x09, 0x34, 0x18, 0x37, 0x3c, 0xbe, 0x74, 0x58, 0x14, 0x51, 0x56, 0x90, 0xd6, 0x1c, 0xbe, 0xd4,
0xb3, 0xc5, 0xe8, 0xc7, 0x85, 0xe8, 0xab, 0xec, 0x19, 0x92, 0x4f, 0x81, 0x15, 0x38, 0xe3, 0xd6,
0x9e, 0x32, 0x6e, 0xec, 0xdd, 0x0f, 0x4c, 0xae, 0x28, 0x48, 0x13, 0x49, 0x56, 0x52, 0x2f, 0xa0,
0x91, 0xcb, 0x80, 0x01, 0xaa, 0xf3, 0xf1, 0x7a, 0xb6, 0x5a, 0xb7, 0x4b, 0xb8, 0x0e, 0xff, 0xcd,
0xc6, 0x64, 0x7e, 0x19, 0xac, 0x10, 0x6e, 0x80, 0xb4, 0xbe, 0x5c, 0xcc, 0x56, 0xeb, 0xf1, 0x62,
0xd9, 0x16, 0xd4, 0xef, 0x08, 0x6a, 0x8b, 0xb0, 0x34, 0x8a, 0x15, 0x90, 0xd2, 0xa4, 0x21, 0x0d,
0x71, 0x22, 0x0c, 0x10, 0x39, 0x88, 0xb8, 0x0d, 0xe2, 0x27, 0x7a, 0x17, 0xf2, 0xa8, 0x93, 0xe0,
0x31, 0x60, 0xf4, 0xc5, 0xd8, 0xf9, 0x34, 0x24, 0x51, 0x27, 0xd1, 0x02, 0xbf, 0x86, 0xda, 0x2d,
0x35, 0xae, 0xa9, 0xc7, 0xe4, 0xb2, 0x22, 0x6a, 0x67, 0x43, 0x35, 0xdf, 0x7f, 0xec, 0xa8, 0xbf,
0x8f, 0x82, 0x66, 0x36, 0xf7, 0xee, 0x48, 0x72, 0xa4, 0x3b, 0x82, 0x7a, 0x76, 0x23, 0x71, 0x8d,
0xe6, 0x93, 0x77, 0x15, 0x32, 0xae, 0x23, 0xe1, 0x02, 0xa9, 0xdf, 0x04, 0x68, 0x2e, 0xfd, 0xcd,
0xce, 0x62, 0xb7, 0x7f, 0x6d, 0xc8, 0xe5, 0xe3, 0x21, 0xc7, 0x05, 0x55, 0x0a, 0x30, 0x54, 0xb3,
0x18, 0xa6, 0x07, 0x0c, 0xb5, 0x10, 0xc3, 0xb3, 0x3c, 0x86, 0x7c, 0xa1, 0xff, 0x80, 0xc6, 0x08,
0x5a, 0xa9, 0x07, 0x73, 0x1d, 0x9b, 0x51, 0xfc, 0x14, 0x5a, 0x69, 0x23, 0x57, 0xa6, 0xe3, 0xdb,
0x3c, 0x4c, 0x55, 0x21, 0xcd, 0x54, 0x9e, 0x06, 0xaa, 0x7a, 0x8f, 0xa0, 0x33, 0x75, 0xec, 0x1b,
0x6b, 0xeb, 0x7b, 0x74, 0x1d, 0x50, 0xf9, 0x13, 0xa0, 0x05, 0xb6, 0x62, 0x91, 0x2d, 0xee, 0x03,
0x98, 0xce, 0x6e, 0x47, 0xcd, 0x14, 0xbd, 0x44, 0x32, 0x8a, 0x2a, 0xc3, 0xa3, 0xe3, 0xaa, 0xa2,
0xce, 0x54, 0x02, 0xbd, 0x77, 0x94, 0x87, 0x5a, 0x12, 0x61, 0x84, 0x9f, 0xd3, 0xef, 0x97, 0xad,
0xbe, 0x81, 0xf3, 0x07, 0x72, 0xc6, 0x38, 0xfb, 0x00, 0x69, 0x03, 0x2c, 0x6e, 0x29, 0xa3, 0x0c,
0x7f, 0x04, 0xd7, 0x0e, 0x35, 0xbe, 0x52, 0x7a, 0xbd, 0x48, 0x46, 0x8f, 0xdf, 0x82, 0x94, 0xde,
0x00, 0xb8, 0x7f, 0xfa, 0x6a, 0xe8, 0x76, 0x0a, 0x3f, 0x1d, 0xb5, 0x34, 0x40, 0x78, 0x0e, 0xb5,
0x78, 0xbc, 0xb8, 0x77, 0xea, 0xcd, 0xea, 0x9e, 0x3f, 0xb0, 0x1b, 0x93, 0x2b, 0x69, 0x68, 0x80,
0xf0, 0x47, 0x68, 0xe6, 0xc9, 0xe2, 0xc7, 0xf9, 0x63, 0x85, 0x6f, 0x43, 0xf7, 0xc9, 0xe9, 0xa0,
0xc4, 0x02, 0x7b, 0xd0, 0x29, 0x44, 0x89, 0x9f, 0xe7, 0x13, 0x9c, 0x9a, 0x61, 0xf7, 0xc5, 0x2f,
0xc5, 0x26, 0x9e, 0x13, 0x15, 0xda, 0x2c, 0x82, 0x7f, 0xc3, 0x74, 0x73, 0x67, 0x51, 0x9b, 0x4f,
0x9a, 0xe9, 0x1c, 0x96, 0xc1, 0x6f, 0x62, 0x53, 0x0d, 0xff, 0x16, 0xaf, 0x7e, 0x06, 0x00, 0x00,
0xff, 0xff, 0x83, 0x4f, 0x5b, 0x91, 0x40, 0x06, 0x00, 0x00,
}

66
weed/pb/queue.proto

@ -1,66 +0,0 @@
syntax = "proto3";
package queue_pb;
option java_package = "seaweedfs.client";
option java_outer_classname = "QueueProto";
//////////////////////////////////////////////////
service SeaweedQueue {
rpc StreamWrite (stream WriteMessageRequest) returns (stream WriteMessageResponse) {
}
rpc StreamRead (ReadMessageRequest) returns (stream ReadMessageResponse) {
}
rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) {
}
rpc DeleteTopic (DeleteTopicRequest) returns (DeleteTopicResponse) {
}
}
//////////////////////////////////////////////////
message WriteMessageRequest {
string topic = 1;
int64 event_ns = 2;
bytes partition_key = 3;
bytes data = 4;
}
message WriteMessageResponse {
string error = 1;
int64 ack_ns = 2;
}
message ReadMessageRequest {
string topic = 1;
int64 start_ns = 2;
}
message ReadMessageResponse {
string error = 1;
int64 event_ns = 2;
bytes data = 3;
}
message ConfigureTopicRequest {
string topic = 1;
int64 ttl_seconds = 2;
int32 partition_count = 3;
}
message ConfigureTopicResponse {
string error = 1;
}
message DeleteTopicRequest {
string topic = 1;
}
message DeleteTopicResponse {
string error = 1;
}

516
weed/pb/queue_pb/queue.pb.go

@ -1,516 +0,0 @@
// Code generated by protoc-gen-go.
// source: queue.proto
// DO NOT EDIT!
/*
Package queue_pb is a generated protocol buffer package.
It is generated from these files:
queue.proto
It has these top-level messages:
WriteMessageRequest
WriteMessageResponse
ReadMessageRequest
ReadMessageResponse
ConfigureTopicRequest
ConfigureTopicResponse
DeleteTopicRequest
DeleteTopicResponse
*/
package queue_pb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type WriteMessageRequest struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
EventNs int64 `protobuf:"varint,2,opt,name=event_ns,json=eventNs" json:"event_ns,omitempty"`
PartitionKey []byte `protobuf:"bytes,3,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
}
func (m *WriteMessageRequest) Reset() { *m = WriteMessageRequest{} }
func (m *WriteMessageRequest) String() string { return proto.CompactTextString(m) }
func (*WriteMessageRequest) ProtoMessage() {}
func (*WriteMessageRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *WriteMessageRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
func (m *WriteMessageRequest) GetEventNs() int64 {
if m != nil {
return m.EventNs
}
return 0
}
func (m *WriteMessageRequest) GetPartitionKey() []byte {
if m != nil {
return m.PartitionKey
}
return nil
}
func (m *WriteMessageRequest) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
type WriteMessageResponse struct {
Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
AckNs int64 `protobuf:"varint,2,opt,name=ack_ns,json=ackNs" json:"ack_ns,omitempty"`
}
func (m *WriteMessageResponse) Reset() { *m = WriteMessageResponse{} }
func (m *WriteMessageResponse) String() string { return proto.CompactTextString(m) }
func (*WriteMessageResponse) ProtoMessage() {}
func (*WriteMessageResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *WriteMessageResponse) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func (m *WriteMessageResponse) GetAckNs() int64 {
if m != nil {
return m.AckNs
}
return 0
}
type ReadMessageRequest struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
StartNs int64 `protobuf:"varint,2,opt,name=start_ns,json=startNs" json:"start_ns,omitempty"`
}
func (m *ReadMessageRequest) Reset() { *m = ReadMessageRequest{} }
func (m *ReadMessageRequest) String() string { return proto.CompactTextString(m) }
func (*ReadMessageRequest) ProtoMessage() {}
func (*ReadMessageRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *ReadMessageRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
func (m *ReadMessageRequest) GetStartNs() int64 {
if m != nil {
return m.StartNs
}
return 0
}
type ReadMessageResponse struct {
Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
EventNs int64 `protobuf:"varint,2,opt,name=event_ns,json=eventNs" json:"event_ns,omitempty"`
Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
}
func (m *ReadMessageResponse) Reset() { *m = ReadMessageResponse{} }
func (m *ReadMessageResponse) String() string { return proto.CompactTextString(m) }
func (*ReadMessageResponse) ProtoMessage() {}
func (*ReadMessageResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *ReadMessageResponse) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func (m *ReadMessageResponse) GetEventNs() int64 {
if m != nil {
return m.EventNs
}
return 0
}
func (m *ReadMessageResponse) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
type ConfigureTopicRequest struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
TtlSeconds int64 `protobuf:"varint,2,opt,name=ttl_seconds,json=ttlSeconds" json:"ttl_seconds,omitempty"`
PartitionCount int32 `protobuf:"varint,3,opt,name=partition_count,json=partitionCount" json:"partition_count,omitempty"`
}
func (m *ConfigureTopicRequest) Reset() { *m = ConfigureTopicRequest{} }
func (m *ConfigureTopicRequest) String() string { return proto.CompactTextString(m) }
func (*ConfigureTopicRequest) ProtoMessage() {}
func (*ConfigureTopicRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *ConfigureTopicRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
func (m *ConfigureTopicRequest) GetTtlSeconds() int64 {
if m != nil {
return m.TtlSeconds
}
return 0
}
func (m *ConfigureTopicRequest) GetPartitionCount() int32 {
if m != nil {
return m.PartitionCount
}
return 0
}
type ConfigureTopicResponse struct {
Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
}
func (m *ConfigureTopicResponse) Reset() { *m = ConfigureTopicResponse{} }
func (m *ConfigureTopicResponse) String() string { return proto.CompactTextString(m) }
func (*ConfigureTopicResponse) ProtoMessage() {}
func (*ConfigureTopicResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *ConfigureTopicResponse) GetError() string {
if m != nil {
return m.Error
}
return ""
}
type DeleteTopicRequest struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
}
func (m *DeleteTopicRequest) Reset() { *m = DeleteTopicRequest{} }
func (m *DeleteTopicRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteTopicRequest) ProtoMessage() {}
func (*DeleteTopicRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *DeleteTopicRequest) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
type DeleteTopicResponse struct {
Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
}
func (m *DeleteTopicResponse) Reset() { *m = DeleteTopicResponse{} }
func (m *DeleteTopicResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteTopicResponse) ProtoMessage() {}
func (*DeleteTopicResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *DeleteTopicResponse) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func init() {
proto.RegisterType((*WriteMessageRequest)(nil), "queue_pb.WriteMessageRequest")
proto.RegisterType((*WriteMessageResponse)(nil), "queue_pb.WriteMessageResponse")
proto.RegisterType((*ReadMessageRequest)(nil), "queue_pb.ReadMessageRequest")
proto.RegisterType((*ReadMessageResponse)(nil), "queue_pb.ReadMessageResponse")
proto.RegisterType((*ConfigureTopicRequest)(nil), "queue_pb.ConfigureTopicRequest")
proto.RegisterType((*ConfigureTopicResponse)(nil), "queue_pb.ConfigureTopicResponse")
proto.RegisterType((*DeleteTopicRequest)(nil), "queue_pb.DeleteTopicRequest")
proto.RegisterType((*DeleteTopicResponse)(nil), "queue_pb.DeleteTopicResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for SeaweedQueue service
type SeaweedQueueClient interface {
StreamWrite(ctx context.Context, opts ...grpc.CallOption) (SeaweedQueue_StreamWriteClient, error)
StreamRead(ctx context.Context, in *ReadMessageRequest, opts ...grpc.CallOption) (SeaweedQueue_StreamReadClient, error)
ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error)
DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error)
}
type seaweedQueueClient struct {
cc *grpc.ClientConn
}
func NewSeaweedQueueClient(cc *grpc.ClientConn) SeaweedQueueClient {
return &seaweedQueueClient{cc}
}
func (c *seaweedQueueClient) StreamWrite(ctx context.Context, opts ...grpc.CallOption) (SeaweedQueue_StreamWriteClient, error) {
stream, err := grpc.NewClientStream(ctx, &_SeaweedQueue_serviceDesc.Streams[0], c.cc, "/queue_pb.SeaweedQueue/StreamWrite", opts...)
if err != nil {
return nil, err
}
x := &seaweedQueueStreamWriteClient{stream}
return x, nil
}
type SeaweedQueue_StreamWriteClient interface {
Send(*WriteMessageRequest) error
Recv() (*WriteMessageResponse, error)
grpc.ClientStream
}
type seaweedQueueStreamWriteClient struct {
grpc.ClientStream
}
func (x *seaweedQueueStreamWriteClient) Send(m *WriteMessageRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *seaweedQueueStreamWriteClient) Recv() (*WriteMessageResponse, error) {
m := new(WriteMessageResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *seaweedQueueClient) StreamRead(ctx context.Context, in *ReadMessageRequest, opts ...grpc.CallOption) (SeaweedQueue_StreamReadClient, error) {
stream, err := grpc.NewClientStream(ctx, &_SeaweedQueue_serviceDesc.Streams[1], c.cc, "/queue_pb.SeaweedQueue/StreamRead", opts...)
if err != nil {
return nil, err
}
x := &seaweedQueueStreamReadClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SeaweedQueue_StreamReadClient interface {
Recv() (*ReadMessageResponse, error)
grpc.ClientStream
}
type seaweedQueueStreamReadClient struct {
grpc.ClientStream
}
func (x *seaweedQueueStreamReadClient) Recv() (*ReadMessageResponse, error) {
m := new(ReadMessageResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *seaweedQueueClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) {
out := new(ConfigureTopicResponse)
err := grpc.Invoke(ctx, "/queue_pb.SeaweedQueue/ConfigureTopic", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedQueueClient) DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error) {
out := new(DeleteTopicResponse)
err := grpc.Invoke(ctx, "/queue_pb.SeaweedQueue/DeleteTopic", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for SeaweedQueue service
type SeaweedQueueServer interface {
StreamWrite(SeaweedQueue_StreamWriteServer) error
StreamRead(*ReadMessageRequest, SeaweedQueue_StreamReadServer) error
ConfigureTopic(context.Context, *ConfigureTopicRequest) (*ConfigureTopicResponse, error)
DeleteTopic(context.Context, *DeleteTopicRequest) (*DeleteTopicResponse, error)
}
func RegisterSeaweedQueueServer(s *grpc.Server, srv SeaweedQueueServer) {
s.RegisterService(&_SeaweedQueue_serviceDesc, srv)
}
func _SeaweedQueue_StreamWrite_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SeaweedQueueServer).StreamWrite(&seaweedQueueStreamWriteServer{stream})
}
type SeaweedQueue_StreamWriteServer interface {
Send(*WriteMessageResponse) error
Recv() (*WriteMessageRequest, error)
grpc.ServerStream
}
type seaweedQueueStreamWriteServer struct {
grpc.ServerStream
}
func (x *seaweedQueueStreamWriteServer) Send(m *WriteMessageResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *seaweedQueueStreamWriteServer) Recv() (*WriteMessageRequest, error) {
m := new(WriteMessageRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _SeaweedQueue_StreamRead_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(ReadMessageRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SeaweedQueueServer).StreamRead(m, &seaweedQueueStreamReadServer{stream})
}
type SeaweedQueue_StreamReadServer interface {
Send(*ReadMessageResponse) error
grpc.ServerStream
}
type seaweedQueueStreamReadServer struct {
grpc.ServerStream
}
func (x *seaweedQueueStreamReadServer) Send(m *ReadMessageResponse) error {
return x.ServerStream.SendMsg(m)
}
func _SeaweedQueue_ConfigureTopic_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ConfigureTopicRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedQueueServer).ConfigureTopic(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/queue_pb.SeaweedQueue/ConfigureTopic",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedQueueServer).ConfigureTopic(ctx, req.(*ConfigureTopicRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedQueue_DeleteTopic_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteTopicRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedQueueServer).DeleteTopic(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/queue_pb.SeaweedQueue/DeleteTopic",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedQueueServer).DeleteTopic(ctx, req.(*DeleteTopicRequest))
}
return interceptor(ctx, in, info, handler)
}
var _SeaweedQueue_serviceDesc = grpc.ServiceDesc{
ServiceName: "queue_pb.SeaweedQueue",
HandlerType: (*SeaweedQueueServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ConfigureTopic",
Handler: _SeaweedQueue_ConfigureTopic_Handler,
},
{
MethodName: "DeleteTopic",
Handler: _SeaweedQueue_DeleteTopic_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamWrite",
Handler: _SeaweedQueue_StreamWrite_Handler,
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "StreamRead",
Handler: _SeaweedQueue_StreamRead_Handler,
ServerStreams: true,
},
},
Metadata: "queue.proto",
}
func init() { proto.RegisterFile("queue.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 429 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xae, 0x9b, 0xa6, 0x94, 0x49, 0x28, 0x68, 0xd2, 0xa2, 0x10, 0xd1, 0x36, 0x5a, 0x0e, 0x44,
0x20, 0x59, 0x15, 0xbc, 0x41, 0x03, 0x27, 0x68, 0x04, 0x0e, 0x08, 0x89, 0x8b, 0xb5, 0xb5, 0xa7,
0x95, 0x15, 0xb3, 0xeb, 0xee, 0x8e, 0xa9, 0x7a, 0xe2, 0x2d, 0x79, 0x1e, 0xe4, 0xb5, 0x5c, 0xdb,
0x34, 0xb1, 0x7a, 0xf3, 0xcc, 0x78, 0xe7, 0xfb, 0xd9, 0x6f, 0x61, 0x70, 0x9d, 0x53, 0x4e, 0x7e,
0x66, 0x34, 0x6b, 0xdc, 0x73, 0x45, 0x98, 0x5d, 0x88, 0x3f, 0x30, 0xfa, 0x61, 0x12, 0xa6, 0x73,
0xb2, 0x56, 0x5e, 0x51, 0x40, 0xd7, 0x39, 0x59, 0xc6, 0x03, 0xe8, 0xb3, 0xce, 0x92, 0x68, 0xec,
0x4d, 0xbd, 0xd9, 0xe3, 0xa0, 0x2c, 0xf0, 0x05, 0xec, 0xd1, 0x6f, 0x52, 0x1c, 0x2a, 0x3b, 0xde,
0x9e, 0x7a, 0xb3, 0x5e, 0xf0, 0xc8, 0xd5, 0x0b, 0x8b, 0xaf, 0xe0, 0x49, 0x26, 0x0d, 0x27, 0x9c,
0x68, 0x15, 0xae, 0xe8, 0x76, 0xdc, 0x9b, 0x7a, 0xb3, 0x61, 0x30, 0xbc, 0x6b, 0x7e, 0xa2, 0x5b,
0x44, 0xd8, 0x89, 0x25, 0xcb, 0xf1, 0x8e, 0x9b, 0xb9, 0x6f, 0x31, 0x87, 0x83, 0x36, 0x01, 0x9b,
0x69, 0x65, 0xa9, 0x60, 0x40, 0xc6, 0x68, 0x53, 0x31, 0x70, 0x05, 0x1e, 0xc2, 0xae, 0x8c, 0x56,
0x35, 0x7e, 0x5f, 0x46, 0xab, 0x85, 0x15, 0x1f, 0x01, 0x03, 0x92, 0xf1, 0x43, 0x45, 0x58, 0x96,
0xa6, 0x29, 0xc2, 0xd5, 0x0b, 0x2b, 0x7e, 0xc2, 0xa8, 0xb5, 0xa6, 0x93, 0x4a, 0x87, 0x19, 0x95,
0xce, 0x5e, 0x43, 0xe7, 0x0d, 0x1c, 0xce, 0xb5, 0xba, 0x4c, 0xae, 0x72, 0x43, 0xdf, 0x0a, 0x22,
0xdd, 0x2c, 0x4f, 0x60, 0xc0, 0x9c, 0x86, 0x96, 0x22, 0xad, 0xe2, 0x0a, 0x00, 0x98, 0xd3, 0x65,
0xd9, 0xc1, 0xd7, 0xf0, 0xb4, 0x36, 0x3c, 0xd2, 0xb9, 0x62, 0x07, 0xd7, 0x0f, 0xf6, 0xef, 0xda,
0xf3, 0xa2, 0x2b, 0x7c, 0x78, 0xfe, 0x3f, 0x70, 0x97, 0x2e, 0xf1, 0x06, 0xf0, 0x03, 0xa5, 0xc4,
0x0f, 0x60, 0x29, 0xde, 0xc2, 0xa8, 0xf5, 0x6f, 0xd7, 0xe2, 0x77, 0x7f, 0xb7, 0x61, 0xb8, 0x24,
0x79, 0x43, 0x14, 0x7f, 0x2d, 0xe2, 0x87, 0x01, 0x0c, 0x96, 0x6c, 0x48, 0xfe, 0x72, 0x01, 0xc0,
0x23, 0xbf, 0x4a, 0xa5, 0xbf, 0x26, 0x92, 0x93, 0xe3, 0x4d, 0xe3, 0x12, 0x54, 0x6c, 0xcd, 0xbc,
0x53, 0x0f, 0xcf, 0x01, 0xca, 0x9d, 0xc5, 0x45, 0xe2, 0xcb, 0xfa, 0xcc, 0xfd, 0x7c, 0x4c, 0x8e,
0x36, 0x4c, 0xab, 0x85, 0xa7, 0x1e, 0x7e, 0x87, 0xfd, 0xb6, 0x79, 0x78, 0x52, 0x1f, 0x5a, 0x7b,
0x9f, 0x93, 0xe9, 0xe6, 0x1f, 0xaa, 0xc5, 0xf8, 0x19, 0x06, 0x0d, 0xdf, 0x9a, 0x34, 0xef, 0x5b,
0xdf, 0xa4, 0xb9, 0xc6, 0x6c, 0xb1, 0x75, 0x76, 0x0c, 0xcf, 0x6c, 0xe9, 0xeb, 0xa5, 0xf5, 0xa3,
0x34, 0x21, 0xc5, 0x67, 0xe0, 0x2c, 0xfe, 0x52, 0xbc, 0xf6, 0x8b, 0x5d, 0xf7, 0xe8, 0xdf, 0xff,
0x0b, 0x00, 0x00, 0xff, 0xff, 0x7d, 0x3e, 0x14, 0xd8, 0x03, 0x04, 0x00, 0x00,
}

10
weed/server/msg_broker_grpc_server.go

@ -3,21 +3,21 @@ package weed_server
import ( import (
"context" "context"
"github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
) )
func (broker *MessageBroker) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) {
func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error {
panic("implement me") panic("implement me")
} }
func (broker *MessageBroker) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) {
func (broker *MessageBroker) Publish(server messaging_pb.SeaweedMessaging_PublishServer) error {
panic("implement me") panic("implement me")
} }
func (broker *MessageBroker) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error {
func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
panic("implement me") panic("implement me")
} }
func (broker *MessageBroker) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error {
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
panic("implement me") panic("implement me")
} }
Loading…
Cancel
Save