Browse Source

refactor

pull/5637/head
chrislu 12 months ago
parent
commit
34839237ab
  1. 4
      weed/mq/broker/broker_server.go
  2. 10
      weed/mq/pub_balancer/balancer.go
  3. 6
      weed/mq/sub_coordinator/coordinator.go

4
weed/mq/broker/broker_server.go

@ -67,8 +67,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
}
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnAddBroker
pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnRemoveBroker
pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker
go mqBroker.MasterClient.KeepConnectedToMaster()

10
weed/mq/pub_balancer/balancer.go

@ -53,6 +53,7 @@ func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
brokerStats, _ = balancer.Brokers.Get(broker)
}
}
balancer.onPubAddBroker(broker, brokerStats)
balancer.OnAddBroker(broker, brokerStats)
return brokerStats
}
@ -68,6 +69,7 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
}
partitionSlotToBrokerList.RemoveBroker(broker)
}
balancer.onPubRemoveBroker(broker, stats)
balancer.OnRemoveBroker(broker, stats)
}
@ -88,3 +90,11 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke
partitionSlotToBrokerList.AddBroker(partition, broker)
}
}
// OnPubAddBroker is called when a broker is added for a publisher coordinator
func (balancer *Balancer) onPubAddBroker(broker string, brokerStats *BrokerStats) {
}
// OnPubRemoveBroker is called when a broker is removed for a publisher coordinator
func (balancer *Balancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) {
}

6
weed/mq/sub_coordinator/coordinator.go

@ -100,10 +100,12 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb
}
}
func (c *Coordinator) OnAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
// OnSubAddBroker is called when a broker is added to the balancer
func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}
func (c *Coordinator) OnRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
// OnSubRemoveBroker is called when a broker is removed from the balancer
func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}
Loading…
Cancel
Save