You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

54 lines
1.5 KiB

  1. package topic
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. )
  5. // LocalTopicManager manages topics on local broker
  6. type LocalTopicManager struct {
  7. topics cmap.ConcurrentMap[string, *LocalTopic]
  8. }
  9. // NewLocalTopicManager creates a new LocalTopicManager
  10. func NewLocalTopicManager() *LocalTopicManager {
  11. return &LocalTopicManager{
  12. topics: cmap.New[*LocalTopic](),
  13. }
  14. }
  15. // AddTopic adds a topic to the local topic manager
  16. func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
  17. localTopic, ok := manager.topics.Get(topic.String())
  18. if !ok {
  19. localTopic = &LocalTopic{
  20. Topic: topic,
  21. Partitions: make([]*LocalPartition, 0),
  22. }
  23. }
  24. if localTopic.findPartition(localPartition.Partition) != nil {
  25. return
  26. }
  27. localTopic.Partitions = append(localTopic.Partitions, localPartition)
  28. }
  29. // GetTopic gets a topic from the local topic manager
  30. func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
  31. localTopic, ok := manager.topics.Get(topic.String())
  32. if !ok {
  33. return nil
  34. }
  35. return localTopic.findPartition(partition)
  36. }
  37. // RemoveTopic removes a topic from the local topic manager
  38. func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
  39. manager.topics.Remove(topic.String())
  40. }
  41. func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
  42. localTopic, ok := manager.topics.Get(topic.String())
  43. if !ok {
  44. return false
  45. }
  46. return localTopic.removePartition(partition)
  47. }