diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 640ccca10..e381fa84c 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -43,7 +43,7 @@ type MessageQueueBroker struct { filers map[pb.ServerAddress]struct{} currentFiler pb.ServerAddress localTopicManager *topic.LocalTopicManager - Balancer *pub_balancer.Balancer + Balancer *pub_balancer.PubBalancer lockAsBalancer *cluster.LiveLock Coordinator *sub_coordinator.Coordinator accessLock sync.Mutex diff --git a/weed/mq/pub_balancer/balance.go b/weed/mq/pub_balancer/balance.go index 87fc5739b..b4f1e20cd 100644 --- a/weed/mq/pub_balancer/balance.go +++ b/weed/mq/pub_balancer/balance.go @@ -54,12 +54,12 @@ type BalanceActionCreate struct { // BalancePublishers check the stats of all brokers, // and balance the publishers to the brokers. -func (balancer *Balancer) BalancePublishers() []BalanceAction { +func (balancer *PubBalancer) BalancePublishers() []BalanceAction { action := BalanceTopicPartitionOnBrokers(balancer.Brokers) return []BalanceAction{action} } -func (balancer *Balancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) { +func (balancer *PubBalancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) { for _, action := range actions { switch action.(type) { case *BalanceActionMove: diff --git a/weed/mq/pub_balancer/balance_action.go b/weed/mq/pub_balancer/balance_action.go index c29ec3469..a2d888b2a 100644 --- a/weed/mq/pub_balancer/balance_action.go +++ b/weed/mq/pub_balancer/balance_action.go @@ -8,10 +8,10 @@ import ( "google.golang.org/grpc" ) -// Balancer <= PublisherToPubBalancer() <= Broker <=> Publish() -// ExecuteBalanceActionMove from Balancer => AssignTopicPartitions() => Broker => Publish() +// PubBalancer <= PublisherToPubBalancer() <= Broker <=> Publish() +// ExecuteBalanceActionMove from PubBalancer => AssignTopicPartitions() => Broker => Publish() -func (balancer *Balancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error { +func (balancer *PubBalancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error { if _, found := balancer.Brokers.Get(move.SourceBroker); !found { return fmt.Errorf("source broker %s not found", move.SourceBroker) } diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index 5b5562d2c..1c8e31558 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -11,7 +11,7 @@ const ( LockBrokerBalancer = "broker_balancer" ) -// Balancer collects stats from all brokers. +// PubBalancer collects stats from all brokers. // // When publishers wants to create topics, it picks brokers to assign the topic partitions. // When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions. @@ -28,7 +28,7 @@ const ( // // When a consumer instance is down, the broker will notice this and inform the balancer. // The balancer will then tell the broker to send the partition to another standby consumer instance. -type Balancer struct { +type PubBalancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address // Collected from all brokers when they connect to the broker leader TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name @@ -37,14 +37,14 @@ type Balancer struct { OnRemoveBroker func(broker string, brokerStats *BrokerStats) } -func NewBalancer() *Balancer { - return &Balancer{ +func NewBalancer() *PubBalancer { + return &PubBalancer{ Brokers: cmap.New[*BrokerStats](), TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](), } } -func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) { +func (balancer *PubBalancer) AddBroker(broker string) (brokerStats *BrokerStats) { var found bool brokerStats, found = balancer.Brokers.Get(broker) if !found { @@ -58,7 +58,7 @@ func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) { return brokerStats } -func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) { +func (balancer *PubBalancer) RemoveBroker(broker string, stats *BrokerStats) { balancer.Brokers.Remove(broker) // update TopicToBrokers @@ -78,7 +78,7 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) { balancer.OnRemoveBroker(broker, stats) } -func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) { +func (balancer *PubBalancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) { brokerStats.UpdateStats(receivedStats) // update TopicToBrokers @@ -97,9 +97,9 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke } // OnPubAddBroker is called when a broker is added for a publisher coordinator -func (balancer *Balancer) onPubAddBroker(broker string, brokerStats *BrokerStats) { +func (balancer *PubBalancer) onPubAddBroker(broker string, brokerStats *BrokerStats) { } // OnPubRemoveBroker is called when a broker is removed for a publisher coordinator -func (balancer *Balancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) { +func (balancer *PubBalancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) { } diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 052932c04..423b38ecb 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -9,7 +9,7 @@ var ( ErrNoBroker = errors.New("no broker") ) -func (balancer *Balancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) { +func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) { // find existing topic partition assignments for brokerStatsItem := range balancer.Brokers.IterBuffered() { broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go index 0f307c9eb..d16715406 100644 --- a/weed/mq/pub_balancer/repair.go +++ b/weed/mq/pub_balancer/repair.go @@ -8,7 +8,7 @@ import ( "sort" ) -func (balancer *Balancer) RepairTopics() []BalanceAction { +func (balancer *PubBalancer) RepairTopics() []BalanceAction { action := BalanceTopicPartitionOnBrokers(balancer.Brokers) return []BalanceAction{action} } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 72a8e9aad..1e5c7c79c 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -22,11 +22,11 @@ type ConsumerGroup struct { ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] mapping *PartitionConsumerMapping reBalanceTimer *time.Timer - pubBalancer *pub_balancer.Balancer + pubBalancer *pub_balancer.PubBalancer filerClientAccessor *FilerClientAccessor } -func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup { +func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.PubBalancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup { return &ConsumerGroup{ topic: topic.FromPbTopic(t), ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 4bb726f26..6ab7166d4 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -18,11 +18,11 @@ type TopicConsumerGroups struct { type Coordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] - balancer *pub_balancer.Balancer + balancer *pub_balancer.PubBalancer FilerClientAccessor *FilerClientAccessor } -func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator { +func NewCoordinator(balancer *pub_balancer.PubBalancer) *Coordinator { return &Coordinator{ TopicSubscribers: cmap.New[*TopicConsumerGroups](), balancer: balancer,