diff --git a/weed/command/command.go b/weed/command/command.go index abd1b63e9..512cd6f8f 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -28,12 +28,12 @@ var Commands = []*Command{ cmdFilerSynchronize, cmdFix, cmdFuse, + cmdIam, cmdMaster, cmdMasterFollower, cmdMount, - cmdS3, - cmdIam, cmdMqBroker, + cmdS3, cmdScaffold, cmdServer, cmdShell, diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go index da7f59596..c87cccd2c 100644 --- a/weed/command/mq_broker.go +++ b/weed/command/mq_broker.go @@ -23,6 +23,8 @@ var ( ) type MessageQueueBrokerOptions struct { + masters *string + filerGroup *string filer *string ip *string port *int @@ -32,7 +34,9 @@ type MessageQueueBrokerOptions struct { func init() { cmdMqBroker.Run = runMqBroker // break init cycle + mqBrokerStandaloneOptions.masters = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers") mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address") + mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") @@ -85,7 +89,9 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { } } - qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ + qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{ + Masters: pb.ServerAddresses(*mqBrokerOpt.masters).ToAddressMap(), + FilerGroup: *mqBrokerOpt.filerGroup, Filers: []pb.ServerAddress{filerAddress}, DefaultReplication: "", MaxMB: 0, diff --git a/weed/command/server.go b/weed/command/server.go index 7c14fd14f..2c363087c 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -225,6 +225,7 @@ func runServer(cmd *Command, args []string) bool { iamOptions.filer = &filerAddress webdavOptions.filer = &filerAddress mqBrokerOptions.filer = &filerAddress + mqBrokerOptions.filerGroup = filerOptions.filerGroup go stats_collect.StartMetricsServer(*serverMetricsHttpPort) diff --git a/weed/mq/broker/broker_grpc_server.go b/weed/mq/broker/broker_grpc_server.go index 9aa9b1908..2cb4187ae 100644 --- a/weed/mq/broker/broker_grpc_server.go +++ b/weed/mq/broker/broker_grpc_server.go @@ -9,11 +9,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) { +func (broker *MessageQueueBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) { panic("implement me") } -func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) { +func (broker *MessageQueueBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) { resp := &mq_pb.DeleteTopicResponse{} dir, entry := genTopicDirEntry(request.Namespace, request.Topic) if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { @@ -24,7 +24,7 @@ func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.Delet return resp, nil } -func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) { +func (broker *MessageQueueBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) { panic("implement me") } diff --git a/weed/mq/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go index 0c8d70e68..e276091a9 100644 --- a/weed/mq/broker/broker_grpc_server_discovery.go +++ b/weed/mq/broker/broker_grpc_server_discovery.go @@ -26,7 +26,7 @@ If one of the pub or sub connects very late, and the system topo changed quite a */ -func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) { +func (broker *MessageQueueBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) { t := &mq_pb.FindBrokerResponse{} var peers []string @@ -61,7 +61,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBr } -func (broker *MessageBroker) checkFilers() { +func (broker *MessageQueueBroker) checkFilers() { // contact a filer about masters var masters []pb.ServerAddress diff --git a/weed/mq/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go index 4ff9ad809..eb76dd5dc 100644 --- a/weed/mq/broker/broker_grpc_server_publish.go +++ b/weed/mq/broker/broker_grpc_server_publish.go @@ -13,7 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { +func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { // process initial request in, err := stream.Recv() diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 3fd01fb53..dbd854250 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -2,7 +2,9 @@ package broker import ( "context" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" "time" "google.golang.org/grpc" @@ -14,6 +16,8 @@ import ( ) type MessageQueueBrokerOption struct { + Masters map[string]pb.ServerAddress + FilerGroup string Filers []pb.ServerAddress DefaultReplication string MaxMB int @@ -26,23 +30,26 @@ type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer option *MessageQueueBrokerOption grpcDialOption grpc.DialOption + MasterClient *wdclient.MasterClient topicManager *TopicManager } -func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) { +func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { - messageBroker = &MessageQueueBroker{ + mqBroker = &MessageQueueBroker{ option: option, grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), "", option.Masters), } - messageBroker.topicManager = NewTopicManager(messageBroker) + mqBroker.topicManager = NewTopicManager(mqBroker) - messageBroker.checkFilers() + mqBroker.checkFilers() - go messageBroker.keepConnectedToOneFiler() + go mqBroker.keepConnectedToOneFiler() + go mqBroker.MasterClient.KeepConnectedToMaster() - return messageBroker, nil + return mqBroker, nil } func (broker *MessageQueueBroker) keepConnectedToOneFiler() { diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index 21d3587fb..fc5f801b1 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -40,7 +40,9 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W } var filerNodes []*master_pb.ListClusterNodesResponse_ClusterNode + var mqBrokerNodes []*master_pb.ListClusterNodesResponse_ClusterNode + // get the list of filers err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, @@ -57,6 +59,30 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W return } + // get the list of message queue brokers + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.BrokerType, + FilerGroup: *commandEnv.option.FilerGroup, + }) + if err != nil { + return err + } + + mqBrokerNodes = resp.ClusterNodes + return err + }) + if err != nil { + return + } + + if len(mqBrokerNodes) > 0 { + fmt.Fprintf(writer, "* message queue brokers %d\n", len(mqBrokerNodes)) + for _, node := range mqBrokerNodes { + fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version) + } + } + fmt.Fprintf(writer, "* filers %d\n", len(filerNodes)) for _, node := range filerNodes { fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)