|
@ -58,11 +58,23 @@ func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Pa |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats { |
|
|
func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats { |
|
|
stats := &mq_pb.BrokerStats{} |
|
|
|
|
|
|
|
|
stats := &mq_pb.BrokerStats{ |
|
|
|
|
|
Stats: make(map[string]*mq_pb.TopicPartitionStats), |
|
|
|
|
|
} |
|
|
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { |
|
|
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { |
|
|
for _, localPartition := range localTopic.Partitions { |
|
|
for _, localPartition := range localTopic.Partitions { |
|
|
stats.TopicPartitionCount++ |
|
|
|
|
|
stats.ConsumerCount += localPartition.ConsumerCount |
|
|
|
|
|
|
|
|
stats.Stats[topic] = &mq_pb.TopicPartitionStats{ |
|
|
|
|
|
Topic: &mq_pb.Topic{ |
|
|
|
|
|
Namespace: string(localTopic.Namespace), |
|
|
|
|
|
Name: localTopic.Name, |
|
|
|
|
|
}, |
|
|
|
|
|
Partition: &mq_pb.Partition{ |
|
|
|
|
|
RingSize: localPartition.RingSize, |
|
|
|
|
|
RangeStart: localPartition.RangeStart, |
|
|
|
|
|
RangeStop: localPartition.RangeStop, |
|
|
|
|
|
}, |
|
|
|
|
|
ConsumerCount: localPartition.ConsumerCount, |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|