Browse Source

combine msgBroker into weed server command

pull/1329/head
Chris Lu 5 years ago
parent
commit
2bfd810912
  1. 10
      weed/command/msg_broker.go
  2. 24
      weed/command/server.go

10
weed/command/msg_broker.go

@ -20,10 +20,10 @@ import (
) )
var ( var (
messageBrokerStandaloneOptions QueueOptions
messageBrokerStandaloneOptions MessageBrokerOptions
) )
type QueueOptions struct {
type MessageBrokerOptions struct {
filer *string filer *string
ip *string ip *string
port *int port *int
@ -41,8 +41,8 @@ func init() {
} }
var cmdMsgBroker = &Command{ var cmdMsgBroker = &Command{
UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]",
Short: "<WIP> start a message queue broker",
UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
Short: "start a message queue broker",
Long: `start a message queue broker Long: `start a message queue broker
The broker can accept gRPC calls to write or read messages. The messages are stored via filer. The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
@ -59,7 +59,7 @@ func runMsgBroker(cmd *Command, args []string) bool {
} }
func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)

24
weed/command/server.go

@ -18,10 +18,11 @@ type ServerOptions struct {
} }
var ( var (
serverOptions ServerOptions
masterOptions MasterOptions
filerOptions FilerOptions
s3Options S3Options
serverOptions ServerOptions
masterOptions MasterOptions
filerOptions FilerOptions
s3Options S3Options
msgBrokerOptions MessageBrokerOptions
) )
func init() { func init() {
@ -57,6 +58,7 @@ var (
pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string serverWhiteList []string
) )
@ -98,6 +100,8 @@ func init() {
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
} }
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {
@ -117,6 +121,9 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingS3 { if *isStartingS3 {
*isStartingFiler = true *isStartingFiler = true
} }
if *isStartingMsgBroker {
*isStartingFiler = true
}
_, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
peers := strings.Join(peerList, ",") peers := strings.Join(peerList, ",")
@ -133,6 +140,7 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack serverOptions.v.rack = serverRack
msgBrokerOptions.ip = serverIp
serverOptions.v.pulseSeconds = pulseSeconds serverOptions.v.pulseSeconds = pulseSeconds
masterOptions.pulseSeconds = pulseSeconds masterOptions.pulseSeconds = pulseSeconds
@ -145,6 +153,7 @@ func runServer(cmd *Command, args []string) bool {
filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port) filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port)
s3Options.filer = &filerAddress s3Options.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
if *filerOptions.defaultReplicaPlacement == "" { if *filerOptions.defaultReplicaPlacement == "" {
*filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
@ -188,6 +197,13 @@ func runServer(cmd *Command, args []string) bool {
}() }()
} }
if *isStartingMsgBroker {
go func() {
time.Sleep(2 * time.Second)
msgBrokerOptions.startQueueServer()
}()
}
// start volume server // start volume server
{ {
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption) go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)

Loading…
Cancel
Save