|  |  | @ -19,10 +19,10 @@ import ( | 
			
		
	
		
			
				
					|  |  |  | ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | var ( | 
			
		
	
		
			
				
					|  |  |  | 	messageBrokerStandaloneOptions MessageBrokerOptions | 
			
		
	
		
			
				
					|  |  |  | 	mqBrokerStandaloneOptions MessageQueueBrokerOptions | 
			
		
	
		
			
				
					|  |  |  | ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | type MessageBrokerOptions struct { | 
			
		
	
		
			
				
					|  |  |  | type MessageQueueBrokerOptions struct { | 
			
		
	
		
			
				
					|  |  |  | 	filer      *string | 
			
		
	
		
			
				
					|  |  |  | 	ip         *string | 
			
		
	
		
			
				
					|  |  |  | 	port       *int | 
			
		
	
	
		
			
				
					|  |  | @ -31,16 +31,16 @@ type MessageBrokerOptions struct { | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func init() { | 
			
		
	
		
			
				
					|  |  |  | 	cmdMsgBroker.Run = runMsgBroker // break init cycle
 | 
			
		
	
		
			
				
					|  |  |  | 	messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") | 
			
		
	
		
			
				
					|  |  |  | 	messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") | 
			
		
	
		
			
				
					|  |  |  | 	messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port") | 
			
		
	
		
			
				
					|  |  |  | 	messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file") | 
			
		
	
		
			
				
					|  |  |  | 	messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file") | 
			
		
	
		
			
				
					|  |  |  | 	cmdMqBroker.Run = runMqBroker // break init cycle
 | 
			
		
	
		
			
				
					|  |  |  | 	mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address") | 
			
		
	
		
			
				
					|  |  |  | 	mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") | 
			
		
	
		
			
				
					|  |  |  | 	mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") | 
			
		
	
		
			
				
					|  |  |  | 	mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") | 
			
		
	
		
			
				
					|  |  |  | 	mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file") | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | var cmdMsgBroker = &Command{ | 
			
		
	
		
			
				
					|  |  |  | 	UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]", | 
			
		
	
		
			
				
					|  |  |  | var cmdMqBroker = &Command{ | 
			
		
	
		
			
				
					|  |  |  | 	UsageLine: "mq.broker [-port=17777] [-filer=<ip:port>]", | 
			
		
	
		
			
				
					|  |  |  | 	Short:     "start a message queue broker", | 
			
		
	
		
			
				
					|  |  |  | 	Long: `start a message queue broker | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  | @ -50,19 +50,19 @@ var cmdMsgBroker = &Command{ | 
			
		
	
		
			
				
					|  |  |  | `, | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func runMsgBroker(cmd *Command, args []string) bool { | 
			
		
	
		
			
				
					|  |  |  | func runMqBroker(cmd *Command, args []string) bool { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	util.LoadConfiguration("security", false) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return messageBrokerStandaloneOptions.startQueueServer() | 
			
		
	
		
			
				
					|  |  |  | 	return mqBrokerStandaloneOptions.startQueueServer() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { | 
			
		
	
		
			
				
					|  |  |  | func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) | 
			
		
	
		
			
				
					|  |  |  | 	grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) | 
			
		
	
		
			
				
					|  |  |  | 	filerAddress := pb.ServerAddress(*mqBrokerOpt.filer) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") | 
			
		
	
		
			
				
					|  |  |  | 	cipher := false | 
			
		
	
	
		
			
				
					|  |  | @ -77,10 +77,10 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { | 
			
		
	
		
			
				
					|  |  |  | 			return nil | 
			
		
	
		
			
				
					|  |  |  | 		}) | 
			
		
	
		
			
				
					|  |  |  | 		if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 			glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) | 
			
		
	
		
			
				
					|  |  |  | 			glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) | 
			
		
	
		
			
				
					|  |  |  | 			time.Sleep(time.Second) | 
			
		
	
		
			
				
					|  |  |  | 		} else { | 
			
		
	
		
			
				
					|  |  |  | 			glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) | 
			
		
	
		
			
				
					|  |  |  | 			glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) | 
			
		
	
		
			
				
					|  |  |  | 			break | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
	
		
			
				
					|  |  | @ -89,15 +89,15 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { | 
			
		
	
		
			
				
					|  |  |  | 		Filers:             []pb.ServerAddress{filerAddress}, | 
			
		
	
		
			
				
					|  |  |  | 		DefaultReplication: "", | 
			
		
	
		
			
				
					|  |  |  | 		MaxMB:              0, | 
			
		
	
		
			
				
					|  |  |  | 		Ip:                 *msgBrokerOpt.ip, | 
			
		
	
		
			
				
					|  |  |  | 		Port:               *msgBrokerOpt.port, | 
			
		
	
		
			
				
					|  |  |  | 		Ip:                 *mqBrokerOpt.ip, | 
			
		
	
		
			
				
					|  |  |  | 		Port:               *mqBrokerOpt.port, | 
			
		
	
		
			
				
					|  |  |  | 		Cipher:             cipher, | 
			
		
	
		
			
				
					|  |  |  | 	}, grpcDialOption) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// start grpc listener
 | 
			
		
	
		
			
				
					|  |  |  | 	grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0) | 
			
		
	
		
			
				
					|  |  |  | 	grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) | 
			
		
	
		
			
				
					|  |  |  | 		glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) | 
			
		
	
		
			
				
					|  |  |  | 	messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) |