diff --git a/weed/command/server.go b/weed/command/server.go index 02641bbe2..0ad126dbb 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -24,14 +24,15 @@ type ServerOptions struct { } var ( - serverOptions ServerOptions - masterOptions MasterOptions - filerOptions FilerOptions - s3Options S3Options - sftpOptions SftpOptions - iamOptions IamOptions - webdavOptions WebDavOption - mqBrokerOptions MessageQueueBrokerOptions + serverOptions ServerOptions + masterOptions MasterOptions + filerOptions FilerOptions + s3Options S3Options + sftpOptions SftpOptions + iamOptions IamOptions + webdavOptions WebDavOption + mqBrokerOptions MessageQueueBrokerOptions + mqAgentServerOptions MessageQueueAgentOptions ) func init() { @@ -78,6 +79,7 @@ var ( isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service") isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway") isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker") + isStartingMqAgent = cmdServer.Flag.Bool("mq.agent", false, "whether to start message queue agent") False = false ) @@ -191,6 +193,9 @@ func init() { mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port") + mqAgentServerOptions.brokersString = cmdServer.Flag.String("mq.agent.brokers", "localhost:17777", "comma-separated message queue brokers") + mqAgentServerOptions.port = cmdServer.Flag.Int("mq.agent.port", 16777, "message queue agent gRPC listen port") + } func runServer(cmd *Command, args []string) bool { @@ -219,6 +224,10 @@ func runServer(cmd *Command, args []string) bool { if *isStartingMqBroker { *isStartingFiler = true } + if *isStartingMqAgent { + *isStartingMqBroker = true + *isStartingFiler = true + } if *isStartingMasterServer { _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.portGrpc, *masterOptions.peers) @@ -258,6 +267,8 @@ func runServer(cmd *Command, args []string) bool { mqBrokerOptions.ip = serverIp mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap() mqBrokerOptions.filerGroup = filerOptions.filerGroup + mqAgentServerOptions.ip = serverIp + mqAgentServerOptions.brokers = pb.ServerAddresses(*mqAgentServerOptions.brokersString).ToAddresses() // serverOptions.v.pulseSeconds = pulseSeconds // masterOptions.pulseSeconds = pulseSeconds @@ -346,6 +357,13 @@ func runServer(cmd *Command, args []string) bool { }() } + if *isStartingMqAgent { + go func() { + time.Sleep(2 * time.Second) + mqAgentServerOptions.startQueueAgent() + }() + } + // start volume server if *isStartingVolumeServer { minFreeSpaces := util.MustParseMinFreeSpace(*volumeMinFreeSpace, *volumeMinFreeSpacePercent)