Browse Source

rename to SeaweedMQ

pull/3379/head
chrislu 3 years ago
parent
commit
7df440e36f
  1. 2
      weed/command/command.go
  2. 42
      weed/command/mq_broker.go
  3. 16
      weed/command/server.go

2
weed/command/command.go

@ -33,7 +33,7 @@ var Commands = []*Command{
cmdMount, cmdMount,
cmdS3, cmdS3,
cmdIam, cmdIam,
cmdMsgBroker,
cmdMqBroker,
cmdScaffold, cmdScaffold,
cmdServer, cmdServer,
cmdShell, cmdShell,

42
weed/command/msg_broker.go → weed/command/mq_broker.go

@ -19,10 +19,10 @@ import (
) )
var ( var (
messageBrokerStandaloneOptions MessageBrokerOptions
mqBrokerStandaloneOptions MessageQueueBrokerOptions
) )
type MessageBrokerOptions struct {
type MessageQueueBrokerOptions struct {
filer *string filer *string
ip *string ip *string
port *int port *int
@ -31,16 +31,16 @@ type MessageBrokerOptions struct {
} }
func init() { func init() {
cmdMsgBroker.Run = runMsgBroker // break init cycle
messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
cmdMqBroker.Run = runMqBroker // break init cycle
mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address")
mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file")
mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file")
} }
var cmdMsgBroker = &Command{
UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
var cmdMqBroker = &Command{
UsageLine: "mq.broker [-port=17777] [-filer=<ip:port>]",
Short: "start a message queue broker", Short: "start a message queue broker",
Long: `start a message queue broker Long: `start a message queue broker
@ -50,19 +50,19 @@ var cmdMsgBroker = &Command{
`, `,
} }
func runMsgBroker(cmd *Command, args []string) bool {
func runMqBroker(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false) util.LoadConfiguration("security", false)
return messageBrokerStandaloneOptions.startQueueServer()
return mqBrokerStandaloneOptions.startQueueServer()
} }
func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
filerAddress := pb.ServerAddress(*msgBrokerOpt.filer)
filerAddress := pb.ServerAddress(*mqBrokerOpt.filer)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
cipher := false cipher := false
@ -77,10 +77,10 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
return nil return nil
}) })
if err != nil { if err != nil {
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second) time.Sleep(time.Second)
} else { } else {
glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
break break
} }
} }
@ -89,15 +89,15 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
Filers: []pb.ServerAddress{filerAddress}, Filers: []pb.ServerAddress{filerAddress},
DefaultReplication: "", DefaultReplication: "",
MaxMB: 0, MaxMB: 0,
Ip: *msgBrokerOpt.ip,
Port: *msgBrokerOpt.port,
Ip: *mqBrokerOpt.ip,
Port: *mqBrokerOpt.port,
Cipher: cipher, Cipher: cipher,
}, grpcDialOption) }, grpcDialOption)
// start grpc listener // start grpc listener
grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0)
grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0)
if err != nil { if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
} }
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)

16
weed/command/server.go

@ -30,7 +30,7 @@ var (
s3Options S3Options s3Options S3Options
iamOptions IamOptions iamOptions IamOptions
webdavOptions WebDavOption webdavOptions WebDavOption
msgBrokerOptions MessageBrokerOptions
mqBrokerOptions MessageQueueBrokerOptions
) )
func init() { func init() {
@ -74,7 +74,7 @@ var (
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service") isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service")
isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway") isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway")
isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker")
serverWhiteList []string serverWhiteList []string
@ -155,7 +155,7 @@ func init() {
webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port")
} }
@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingWebDav { if *isStartingWebDav {
*isStartingFiler = true *isStartingFiler = true
} }
if *isStartingMsgBroker {
if *isStartingMqBroker {
*isStartingFiler = true *isStartingFiler = true
} }
@ -208,7 +208,7 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack serverOptions.v.rack = serverRack
msgBrokerOptions.ip = serverIp
mqBrokerOptions.ip = serverIp
// serverOptions.v.pulseSeconds = pulseSeconds // serverOptions.v.pulseSeconds = pulseSeconds
// masterOptions.pulseSeconds = pulseSeconds // masterOptions.pulseSeconds = pulseSeconds
@ -224,7 +224,7 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress s3Options.filer = &filerAddress
iamOptions.filer = &filerAddress iamOptions.filer = &filerAddress
webdavOptions.filer = &filerAddress webdavOptions.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
mqBrokerOptions.filer = &filerAddress
go stats_collect.StartMetricsServer(*serverMetricsHttpPort) go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
@ -276,10 +276,10 @@ func runServer(cmd *Command, args []string) bool {
}() }()
} }
if *isStartingMsgBroker {
if *isStartingMqBroker {
go func() { go func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
msgBrokerOptions.startQueueServer()
mqBrokerOptions.startQueueServer()
}() }()
} }

Loading…
Cancel
Save