diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 3ca10d258..5686f8dd9 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -55,15 +55,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis var p topic.Partition if initMessage != nil { t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) - if localTopicPartition == nil { - localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p) - // if not created, return error - if err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) - glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) - return stream.Send(response) - } + localTopicPartition, err = b.loadLocalTopicPartition(t, p) + if err != nil { + response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) + glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) + return stream.Send(response) } ackInterval = int(initMessage.AckInterval) stream.Send(response) @@ -148,6 +144,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return nil } +func (b *MessageQueueBroker) loadLocalTopicPartition(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { + localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) + if localTopicPartition == nil { + localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p) + } + return localTopicPartition, err +} + func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { self := b.option.BrokerAddress() diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go index 418921ba3..4edceb8a0 100644 --- a/weed/mq/broker/broker_grpc_pub_balancer.go +++ b/weed/mq/broker/broker_grpc_pub_balancer.go @@ -21,12 +21,12 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin initMessage := req.GetInit() var brokerStats *pub_balancer.BrokerStats if initMessage != nil { - brokerStats = b.Balancer.OnBrokerConnected(initMessage.Broker) + brokerStats = b.Balancer.AddBroker(initMessage.Broker) } else { return status.Errorf(codes.InvalidArgument, "balancer init message is empty") } defer func() { - b.Balancer.OnBrokerDisconnected(initMessage.Broker, brokerStats) + b.Balancer.RemoveBroker(initMessage.Broker, brokerStats) }() // process stats message diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 9ef277f4d..31a6e10ef 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -67,6 +67,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial } mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange + pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnAddBroker + pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnRemoveBroker go mqBroker.MasterClient.KeepConnectedToMaster() diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index 0bcbdd51b..d44c68720 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -33,6 +33,8 @@ type Balancer struct { // Collected from all brokers when they connect to the broker leader TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) + OnAddBroker func(broker string, brokerStats *BrokerStats) + OnRemoveBroker func(broker string, brokerStats *BrokerStats) } func NewBalancer() *Balancer { @@ -42,7 +44,7 @@ func NewBalancer() *Balancer { } } -func (balancer *Balancer) OnBrokerConnected(broker string) (brokerStats *BrokerStats) { +func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) { var found bool brokerStats, found = balancer.Brokers.Get(broker) if !found { @@ -51,10 +53,11 @@ func (balancer *Balancer) OnBrokerConnected(broker string) (brokerStats *BrokerS brokerStats, _ = balancer.Brokers.Get(broker) } } + balancer.OnAddBroker(broker, brokerStats) return brokerStats } -func (balancer *Balancer) OnBrokerDisconnected(broker string, stats *BrokerStats) { +func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) { balancer.Brokers.Remove(broker) // update TopicToBrokers @@ -65,6 +68,7 @@ func (balancer *Balancer) OnBrokerDisconnected(broker string, stats *BrokerStats } partitionSlotToBrokerList.RemoveBroker(broker) } + balancer.OnRemoveBroker(broker, stats) } func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) { diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 269c12e66..1ad539cfb 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -99,3 +99,11 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb cg.OnPartitionListChange(assignments) } } + +func (c *Coordinator) OnAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) { + +} + +func (c *Coordinator) OnRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) { + +}