Browse Source

rename Balancer to PubBalancer

pull/5890/head
chrislu 7 months ago
parent
commit
d5abffa42c
  1. 2
      weed/mq/broker/broker_server.go
  2. 4
      weed/mq/pub_balancer/balance.go
  3. 6
      weed/mq/pub_balancer/balance_action.go
  4. 18
      weed/mq/pub_balancer/balancer.go
  5. 2
      weed/mq/pub_balancer/lookup.go
  6. 2
      weed/mq/pub_balancer/repair.go
  7. 4
      weed/mq/sub_coordinator/consumer_group.go
  8. 4
      weed/mq/sub_coordinator/coordinator.go

2
weed/mq/broker/broker_server.go

@ -43,7 +43,7 @@ type MessageQueueBroker struct {
filers map[pb.ServerAddress]struct{}
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
Balancer *pub_balancer.Balancer
Balancer *pub_balancer.PubBalancer
lockAsBalancer *cluster.LiveLock
Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex

4
weed/mq/pub_balancer/balance.go

@ -54,12 +54,12 @@ type BalanceActionCreate struct {
// BalancePublishers check the stats of all brokers,
// and balance the publishers to the brokers.
func (balancer *Balancer) BalancePublishers() []BalanceAction {
func (balancer *PubBalancer) BalancePublishers() []BalanceAction {
action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
return []BalanceAction{action}
}
func (balancer *Balancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
func (balancer *PubBalancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
for _, action := range actions {
switch action.(type) {
case *BalanceActionMove:

6
weed/mq/pub_balancer/balance_action.go

@ -8,10 +8,10 @@ import (
"google.golang.org/grpc"
)
// Balancer <= PublisherToPubBalancer() <= Broker <=> Publish()
// ExecuteBalanceActionMove from Balancer => AssignTopicPartitions() => Broker => Publish()
// PubBalancer <= PublisherToPubBalancer() <= Broker <=> Publish()
// ExecuteBalanceActionMove from PubBalancer => AssignTopicPartitions() => Broker => Publish()
func (balancer *Balancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error {
func (balancer *PubBalancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error {
if _, found := balancer.Brokers.Get(move.SourceBroker); !found {
return fmt.Errorf("source broker %s not found", move.SourceBroker)
}

18
weed/mq/pub_balancer/balancer.go

@ -11,7 +11,7 @@ const (
LockBrokerBalancer = "broker_balancer"
)
// Balancer collects stats from all brokers.
// PubBalancer collects stats from all brokers.
//
// When publishers wants to create topics, it picks brokers to assign the topic partitions.
// When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions.
@ -28,7 +28,7 @@ const (
//
// When a consumer instance is down, the broker will notice this and inform the balancer.
// The balancer will then tell the broker to send the partition to another standby consumer instance.
type Balancer struct {
type PubBalancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
@ -37,14 +37,14 @@ type Balancer struct {
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
}
func NewBalancer() *Balancer {
return &Balancer{
func NewBalancer() *PubBalancer {
return &PubBalancer{
Brokers: cmap.New[*BrokerStats](),
TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
}
}
func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
func (balancer *PubBalancer) AddBroker(broker string) (brokerStats *BrokerStats) {
var found bool
brokerStats, found = balancer.Brokers.Get(broker)
if !found {
@ -58,7 +58,7 @@ func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
return brokerStats
}
func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
func (balancer *PubBalancer) RemoveBroker(broker string, stats *BrokerStats) {
balancer.Brokers.Remove(broker)
// update TopicToBrokers
@ -78,7 +78,7 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
balancer.OnRemoveBroker(broker, stats)
}
func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
func (balancer *PubBalancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
brokerStats.UpdateStats(receivedStats)
// update TopicToBrokers
@ -97,9 +97,9 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke
}
// OnPubAddBroker is called when a broker is added for a publisher coordinator
func (balancer *Balancer) onPubAddBroker(broker string, brokerStats *BrokerStats) {
func (balancer *PubBalancer) onPubAddBroker(broker string, brokerStats *BrokerStats) {
}
// OnPubRemoveBroker is called when a broker is removed for a publisher coordinator
func (balancer *Balancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) {
func (balancer *PubBalancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) {
}

2
weed/mq/pub_balancer/lookup.go

@ -9,7 +9,7 @@ var (
ErrNoBroker = errors.New("no broker")
)
func (balancer *Balancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
// find existing topic partition assignments
for brokerStatsItem := range balancer.Brokers.IterBuffered() {
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val

2
weed/mq/pub_balancer/repair.go

@ -8,7 +8,7 @@ import (
"sort"
)
func (balancer *Balancer) RepairTopics() []BalanceAction {
func (balancer *PubBalancer) RepairTopics() []BalanceAction {
action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
return []BalanceAction{action}
}

4
weed/mq/sub_coordinator/consumer_group.go

@ -22,11 +22,11 @@ type ConsumerGroup struct {
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
mapping *PartitionConsumerMapping
reBalanceTimer *time.Timer
pubBalancer *pub_balancer.Balancer
pubBalancer *pub_balancer.PubBalancer
filerClientAccessor *FilerClientAccessor
}
func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup {
func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.PubBalancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup {
return &ConsumerGroup{
topic: topic.FromPbTopic(t),
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),

4
weed/mq/sub_coordinator/coordinator.go

@ -18,11 +18,11 @@ type TopicConsumerGroups struct {
type Coordinator struct {
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
balancer *pub_balancer.Balancer
balancer *pub_balancer.PubBalancer
FilerClientAccessor *FilerClientAccessor
}
func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
func NewCoordinator(balancer *pub_balancer.PubBalancer) *Coordinator {
return &Coordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,

Loading…
Cancel
Save