Browse Source

scaffold message queue agent

add-message-queue-agent
chrislu 1 month ago
parent
commit
6cce8e5d9b
  1. 74
      weed/command/mq_agent.go
  2. 5
      weed/command/scaffold/security.toml
  3. 29
      weed/mq/agent/agent_server.go
  4. 3
      weed/pb/mq.proto

74
weed/command/mq_agent.go

@ -0,0 +1,74 @@
package command
import (
"github.com/seaweedfs/seaweedfs/weed/mq/agent"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
mqAgentOptions MessageQueueAgentOptions
)
type MessageQueueAgentOptions struct {
brokers []pb.ServerAddress
brokersString *string
filerGroup *string
ip *string
port *int
}
func init() {
cmdMqAgent.Run = runMqAgent // break init cycle
mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers")
mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "localhost", "message queue agent host address")
mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port")
}
var cmdMqAgent = &Command{
UsageLine: "mq.agent [-port=6377] [-master=<ip:port>]",
Short: "<WIP> start a message queue agent",
Long: `start a message queue agent
The agent runs on local server to accept gRPC calls to write or read messages.
The messages are sent to message queue brokers.
`,
}
func runMqAgent(cmd *Command, args []string) bool {
util.LoadSecurityConfiguration()
mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses()
return mqAgentOptions.startQueueAgent()
}
func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent")
agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{
SeedBrokers: mqAgentOpt.brokers,
}, grpcDialOption)
// start grpc listener
grpcL, _, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err)
}
grpcS := pb.NewGrpcServer()
mq_pb.RegisterSeaweedMessagingServer(grpcS, agentServer)
reflection.Register(grpcS)
grpcS.Serve(grpcL)
return true
}

5
weed/command/scaffold/security.toml

@ -89,6 +89,11 @@ cert = ""
key = "" key = ""
allowed_commonNames = "" # comma-separated SSL certificate common names allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.msg_agent]
cert = ""
key = ""
allowed_commonNames = "" # comma-separated SSL certificate common names
# use this for any place needs a grpc client # use this for any place needs a grpc client
# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" # i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
[grpc.client] [grpc.client]

29
weed/mq/agent/agent_server.go

@ -0,0 +1,29 @@
package agent
import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
)
type MessageQueueAgentOptions struct {
SeedBrokers []pb.ServerAddress
}
type MessageQueueAgent struct {
mq_pb.UnimplementedSeaweedMessagingServer
option *MessageQueueAgentOptions
brokers []pb.ServerAddress
grpcDialOption grpc.DialOption
}
func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
// check masters to list all brokers
return &MessageQueueAgent{
option: option,
brokers: []pb.ServerAddress{},
grpcDialOption: grpcDialOption,
}
}

3
weed/pb/mq.proto

@ -26,6 +26,7 @@ service SeaweedMessaging {
rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) { rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) {
} }
rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) { rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) {
// implemented by message queue agent and broker
} }
rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) { rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) {
} }
@ -44,8 +45,10 @@ service SeaweedMessaging {
// data plane for each topic partition // data plane for each topic partition
rpc PublishMessage (stream PublishMessageRequest) returns (stream PublishMessageResponse) { rpc PublishMessage (stream PublishMessageRequest) returns (stream PublishMessageResponse) {
// implemented by message queue agent and broker
} }
rpc SubscribeMessage (stream SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { rpc SubscribeMessage (stream SubscribeMessageRequest) returns (stream SubscribeMessageResponse) {
// implemented by message queue agent and broker
} }
// The lead broker asks a follower broker to follow itself // The lead broker asks a follower broker to follow itself
rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) { rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) {

Loading…
Cancel
Save