|
|
@ -20,11 +20,15 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin |
|
|
|
|
|
|
|
// process init message
|
|
|
|
initMessage := req.GetInit() |
|
|
|
brokerStats := balancer.NewBrokerStats() |
|
|
|
var brokerStats *balancer.BrokerStats |
|
|
|
if initMessage != nil { |
|
|
|
broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats) |
|
|
|
var found bool |
|
|
|
brokerStats, found = broker.Balancer.Brokers.Get(initMessage.Broker) |
|
|
|
if !found { |
|
|
|
brokerStats = balancer.NewBrokerStats() |
|
|
|
broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats) |
|
|
|
} |
|
|
|
} else { |
|
|
|
// TODO fix this
|
|
|
|
return status.Errorf(codes.InvalidArgument, "balancer init message is empty") |
|
|
|
} |
|
|
|
defer func() { |
|
|
|