From 2a458972373856cc1b31b0b7433b2aaa000bb700 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 17 Apr 2020 02:29:00 -0700 Subject: [PATCH] broker: read cipher value from filer --- weed/command/msg_broker.go | 13 ++++++++----- weed/messaging/msg_broker_server.go | 7 +++---- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index f77582f03..36d164800 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -22,8 +22,8 @@ var ( ) type QueueOptions struct { - filer *string - port *int + filer *string + port *int } func init() { @@ -59,14 +59,16 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { return false } - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") + cipher := false for { err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) } + cipher = resp.Cipher return nil }) if err != nil { @@ -83,7 +85,8 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { DefaultReplication: "", MaxMB: 0, Port: *msgBrokerOpt.port, - }) + Cipher: cipher, + }, grpcDialOption) // start grpc listener grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) diff --git a/weed/messaging/msg_broker_server.go b/weed/messaging/msg_broker_server.go index 9174ca4cf..bc842eeea 100644 --- a/weed/messaging/msg_broker_server.go +++ b/weed/messaging/msg_broker_server.go @@ -10,8 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" ) type MessageBrokerOption struct { @@ -19,6 +17,7 @@ type MessageBrokerOption struct { DefaultReplication string MaxMB int Port int + Cipher bool } type MessageBroker struct { @@ -26,11 +25,11 @@ type MessageBroker struct { grpcDialOption grpc.DialOption } -func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) { +func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { messageBroker = &MessageBroker{ option: option, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"), + grpcDialOption: grpcDialOption, } go messageBroker.loopForEver()