You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

77 lines
2.2 KiB

1 year ago
1 year ago
  1. package topic
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. "github.com/shirou/gopsutil/v3/cpu"
  6. "time"
  7. )
  8. // LocalTopicManager manages topics on local broker
  9. type LocalTopicManager struct {
  10. topics cmap.ConcurrentMap[string, *LocalTopic]
  11. }
  12. // NewLocalTopicManager creates a new LocalTopicManager
  13. func NewLocalTopicManager() *LocalTopicManager {
  14. return &LocalTopicManager{
  15. topics: cmap.New[*LocalTopic](),
  16. }
  17. }
  18. // AddTopic adds a topic to the local topic manager
  19. func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
  20. localTopic, ok := manager.topics.Get(topic.String())
  21. if !ok {
  22. localTopic = &LocalTopic{
  23. Topic: topic,
  24. Partitions: make([]*LocalPartition, 0),
  25. }
  26. }
  27. manager.topics.SetIfAbsent(topic.String(), localTopic)
  28. if localTopic.findPartition(localPartition.Partition) != nil {
  29. return
  30. }
  31. localTopic.Partitions = append(localTopic.Partitions, localPartition)
  32. }
  33. // GetTopic gets a topic from the local topic manager
  34. func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
  35. localTopic, ok := manager.topics.Get(topic.String())
  36. if !ok {
  37. return nil
  38. }
  39. return localTopic.findPartition(partition)
  40. }
  41. // RemoveTopic removes a topic from the local topic manager
  42. func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
  43. manager.topics.Remove(topic.String())
  44. }
  45. func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
  46. localTopic, ok := manager.topics.Get(topic.String())
  47. if !ok {
  48. return false
  49. }
  50. return localTopic.removePartition(partition)
  51. }
  52. func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats {
  53. stats := &mq_pb.BrokerStats{}
  54. manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
  55. for _, localPartition := range localTopic.Partitions {
  56. stats.TopicPartitionCount++
  57. stats.ConsumerCount += localPartition.ConsumerCount
  58. }
  59. })
  60. // collect current broker's cpu usage
  61. usages, err := cpu.Percent(duration, false)
  62. if err == nil && len(usages) > 0 {
  63. stats.CpuUsagePercent = int32(usages[0])
  64. }
  65. return stats
  66. }