diff --git a/weed/command/mq_agent.go b/weed/command/mq_agent.go new file mode 100644 index 000000000..3c7b7bbd3 --- /dev/null +++ b/weed/command/mq_agent.go @@ -0,0 +1,74 @@ +package command + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/agent" + "google.golang.org/grpc/reflection" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +var ( + mqAgentOptions MessageQueueAgentOptions +) + +type MessageQueueAgentOptions struct { + brokers []pb.ServerAddress + brokersString *string + filerGroup *string + ip *string + port *int +} + +func init() { + cmdMqAgent.Run = runMqAgent // break init cycle + mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers") + mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "localhost", "message queue agent host address") + mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port") +} + +var cmdMqAgent = &Command{ + UsageLine: "mq.agent [-port=6377] [-master=]", + Short: " start a message queue agent", + Long: `start a message queue agent + + The agent runs on local server to accept gRPC calls to write or read messages. + The messages are sent to message queue brokers. + +`, +} + +func runMqAgent(cmd *Command, args []string) bool { + + util.LoadSecurityConfiguration() + + mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses() + + return mqAgentOptions.startQueueAgent() + +} + +func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent") + + agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{ + SeedBrokers: mqAgentOpt.brokers, + }, grpcDialOption) + + // start grpc listener + grpcL, _, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err) + } + grpcS := pb.NewGrpcServer() + mq_pb.RegisterSeaweedMessagingServer(grpcS, agentServer) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml index 113e5b016..d70e730d5 100644 --- a/weed/command/scaffold/security.toml +++ b/weed/command/scaffold/security.toml @@ -89,6 +89,11 @@ cert = "" key = "" allowed_commonNames = "" # comma-separated SSL certificate common names +[grpc.msg_agent] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + # use this for any place needs a grpc client # i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" [grpc.client] diff --git a/weed/mq/agent/agent_server.go b/weed/mq/agent/agent_server.go new file mode 100644 index 000000000..cd7c098b8 --- /dev/null +++ b/weed/mq/agent/agent_server.go @@ -0,0 +1,29 @@ +package agent + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" +) + +type MessageQueueAgentOptions struct { + SeedBrokers []pb.ServerAddress +} + +type MessageQueueAgent struct { + mq_pb.UnimplementedSeaweedMessagingServer + option *MessageQueueAgentOptions + brokers []pb.ServerAddress + grpcDialOption grpc.DialOption +} + +func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent { + + // check masters to list all brokers + + return &MessageQueueAgent{ + option: option, + brokers: []pb.ServerAddress{}, + grpcDialOption: grpcDialOption, + } +} diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto index 23284b767..09701ab60 100644 --- a/weed/pb/mq.proto +++ b/weed/pb/mq.proto @@ -26,6 +26,7 @@ service SeaweedMessaging { rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) { } rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) { + // implemented by message queue agent and broker } rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) { } @@ -44,8 +45,10 @@ service SeaweedMessaging { // data plane for each topic partition rpc PublishMessage (stream PublishMessageRequest) returns (stream PublishMessageResponse) { + // implemented by message queue agent and broker } rpc SubscribeMessage (stream SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { + // implemented by message queue agent and broker } // The lead broker asks a follower broker to follow itself rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) {