Browse Source

rename

pull/5890/head
chrislu 8 months ago
parent
commit
554ae09f82
  1. 6
      weed/mq/broker/broker_grpc_assign.go
  2. 4
      weed/mq/broker/broker_grpc_balance.go
  3. 6
      weed/mq/broker/broker_grpc_configure.go
  4. 2
      weed/mq/broker/broker_grpc_lookup.go
  5. 6
      weed/mq/broker/broker_grpc_pub_balancer.go
  6. 4
      weed/mq/broker/broker_grpc_sub_coordinator.go
  7. 20
      weed/mq/broker/broker_server.go
  8. 2
      weed/mq/broker/broker_topic_conf_read_write.go
  9. 2
      weed/mq/pub_balancer/pub_balancer.go
  10. 2
      weed/mq/sub_coordinator/sub_coordinator.go

6
weed/mq/broker/broker_grpc_assign.go

@ -79,11 +79,11 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context,
return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
}
}
brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
brokerStats, found := b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
if !found {
brokerStats = pub_balancer.NewBrokerStats()
if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
if !b.PubBalancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
brokerStats, _ = b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
}
}
brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)

4
weed/mq/broker/broker_grpc_balance.go

@ -20,8 +20,8 @@ func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.B
ret := &mq_pb.BalanceTopicsResponse{}
actions := b.Balancer.BalancePublishers()
err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption)
actions := b.PubBalancer.BalancePublishers()
err = b.PubBalancer.ExecuteBalanceAction(actions, b.grpcDialOption)
return ret, err
}

6
weed/mq/broker/broker_grpc_configure.go

@ -60,10 +60,10 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
}
resp = &mq_pb.ConfigureTopicResponse{}
if b.Balancer.Brokers.IsEmpty() {
if b.PubBalancer.Brokers.IsEmpty() {
return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
}
resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount)
resp.RecordType = request.RecordType
// save the topic configuration on filer
@ -71,7 +71,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return nil, fmt.Errorf("configure topic: %v", err)
}
b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)

2
weed/mq/broker/broker_grpc_lookup.go

@ -50,7 +50,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
ret := &mq_pb.ListTopicsResponse{}
knownTopics := make(map[string]struct{})
for brokerStatsItem := range b.Balancer.Brokers.IterBuffered() {
for brokerStatsItem := range b.PubBalancer.Brokers.IterBuffered() {
_, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val

6
weed/mq/broker/broker_grpc_pub_balancer.go

@ -22,12 +22,12 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
initMessage := req.GetInit()
var brokerStats *pub_balancer.BrokerStats
if initMessage != nil {
brokerStats = b.Balancer.AddBroker(initMessage.Broker)
brokerStats = b.PubBalancer.AddBroker(initMessage.Broker)
} else {
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
}
defer func() {
b.Balancer.RemoveBroker(initMessage.Broker, brokerStats)
b.PubBalancer.RemoveBroker(initMessage.Broker, brokerStats)
}()
// process stats message
@ -40,7 +40,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
if receivedStats := req.GetStats(); receivedStats != nil {
b.Balancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
// glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
}
}

4
weed/mq/broker/broker_grpc_sub_coordinator.go

@ -23,13 +23,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// process init message
initMessage := req.GetInit()
if initMessage != nil {
cgi = b.Coordinator.AddSubscriber(initMessage)
cgi = b.SubCoordinator.AddSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
} else {
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
}
defer func() {
b.Coordinator.RemoveSubscriber(initMessage)
b.SubCoordinator.RemoveSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}()

20
weed/mq/broker/broker_server.go

@ -43,17 +43,17 @@ type MessageQueueBroker struct {
filers map[pb.ServerAddress]struct{}
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
Balancer *pub_balancer.PubBalancer
PubBalancer *pub_balancer.PubBalancer
lockAsBalancer *cluster.LiveLock
Coordinator *sub_coordinator.SubCoordinator
SubCoordinator *sub_coordinator.SubCoordinator
accessLock sync.Mutex
fca *sub_coordinator.FilerClientAccessor
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
pub_broker_balancer := pub_balancer.NewBalancer()
coordinator := sub_coordinator.NewCoordinator(pub_broker_balancer)
pubBalancer := pub_balancer.NewPubBalancer()
subCoordinator := sub_coordinator.NewSubCoordinator(pubBalancer)
mqBroker = &MessageQueueBroker{
option: option,
@ -61,20 +61,20 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
Balancer: pub_broker_balancer,
Coordinator: coordinator,
PubBalancer: pubBalancer,
SubCoordinator: subCoordinator,
}
fca := &sub_coordinator.FilerClientAccessor{
GetFiler: mqBroker.GetFiler,
GetGrpcDialOption: mqBroker.GetGrpcDialOption,
}
mqBroker.fca = fca
coordinator.FilerClientAccessor = fca
subCoordinator.FilerClientAccessor = fca
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker
pubBalancer.OnPartitionChange = mqBroker.SubCoordinator.OnPartitionChange
pubBalancer.OnAddBroker = mqBroker.SubCoordinator.OnSubAddBroker
pubBalancer.OnRemoveBroker = mqBroker.SubCoordinator.OnSubRemoveBroker
go mqBroker.MasterClient.KeepConnectedToMaster()

2
weed/mq/broker/broker_topic_conf_read_write.go

@ -52,7 +52,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
// also fix assignee broker if invalid
hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments)
if hasChanges {
glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {

2
weed/mq/pub_balancer/pub_balancer.go

@ -37,7 +37,7 @@ type PubBalancer struct {
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
}
func NewBalancer() *PubBalancer {
func NewPubBalancer() *PubBalancer {
return &PubBalancer{
Brokers: cmap.New[*BrokerStats](),
TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),

2
weed/mq/sub_coordinator/sub_coordinator.go

@ -22,7 +22,7 @@ type SubCoordinator struct {
FilerClientAccessor *FilerClientAccessor
}
func NewCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
func NewSubCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
return &SubCoordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,

Loading…
Cancel
Save