Browse Source

ensure latest stats are reported

pull/4866/head
chrislu 1 year ago
parent
commit
c7e05e4e71
  1. 4
      weed/mq/broker/broker_grpc_balancer.go
  2. 6
      weed/mq/broker/broker_grpc_create.go
  3. 2
      weed/mq/broker/broker_stats.go
  4. 19
      weed/mq/topic/local_manager.go

4
weed/mq/broker/broker_grpc_balancer.go

@ -26,7 +26,9 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin
brokerStats, found = broker.Balancer.Brokers.Get(initMessage.Broker) brokerStats, found = broker.Balancer.Brokers.Get(initMessage.Broker)
if !found { if !found {
brokerStats = balancer.NewBrokerStats() brokerStats = balancer.NewBrokerStats()
broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats)
if !broker.Balancer.Brokers.SetIfAbsent(initMessage.Broker, brokerStats) {
brokerStats, _ = broker.Balancer.Brokers.Get(initMessage.Broker)
}
} }
} else { } else {
return status.Errorf(codes.InvalidArgument, "balancer init message is empty") return status.Errorf(codes.InvalidArgument, "balancer init message is empty")

6
weed/mq/broker/broker_grpc_create.go

@ -43,7 +43,9 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p
brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker) brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker)
if !found { if !found {
brokerStats = balancer.NewBrokerStats() brokerStats = balancer.NewBrokerStats()
broker.Balancer.Brokers.Set(bpa.LeaderBroker, brokerStats)
if !broker.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
brokerStats, _ = broker.Balancer.Brokers.Get(bpa.LeaderBroker)
}
} }
brokerStats.RegisterAssignment(request.Topic, bpa.Partition) brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
return nil return nil
@ -52,6 +54,8 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p
} }
} }
// TODO revert if some error happens in the middle of the assignments
return ret, err return ret, err
} }

2
weed/mq/broker/broker_stats.go

@ -61,7 +61,7 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
if err != nil { if err != nil {
return fmt.Errorf("send stats message: %v", err) return fmt.Errorf("send stats message: %v", err)
} }
glog.V(4).Infof("sent stats: %+v", stats)
glog.V(3).Infof("sent stats: %+v", stats)
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond) time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
} }

19
weed/mq/topic/local_manager.go

@ -28,7 +28,9 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
Partitions: make([]*LocalPartition, 0), Partitions: make([]*LocalPartition, 0),
} }
} }
manager.topics.SetIfAbsent(topic.String(), localTopic)
if !manager.topics.SetIfAbsent(topic.String(), localTopic) {
localTopic, _ = manager.topics.Get(topic.String())
}
if localTopic.findPartition(localPartition.Partition) != nil { if localTopic.findPartition(localPartition.Partition) != nil {
return return
} }
@ -61,6 +63,15 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
stats := &mq_pb.BrokerStats{ stats := &mq_pb.BrokerStats{
Stats: make(map[string]*mq_pb.TopicPartitionStats), Stats: make(map[string]*mq_pb.TopicPartitionStats),
} }
// collect current broker's cpu usage
// this needs to be in front, so the following stats can be more accurate
usages, err := cpu.Percent(duration, false)
if err == nil && len(usages) > 0 {
stats.CpuUsagePercent = int32(usages[0])
}
// collect current broker's topics and partitions
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions { for _, localPartition := range localTopic.Partitions {
topicPartition := &TopicPartition{ topicPartition := &TopicPartition{
@ -85,12 +96,6 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
} }
}) })
// collect current broker's cpu usage
usages, err := cpu.Percent(duration, false)
if err == nil && len(usages) > 0 {
stats.CpuUsagePercent = int32(usages[0])
}
return stats return stats
} }
Loading…
Cancel
Save