You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
2.5 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
5 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
5 months ago
5 months ago
5 months ago
5 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package topic
  2. import "sync"
  3. type LocalTopic struct {
  4. Topic
  5. Partitions []*LocalPartition
  6. partitionLock sync.RWMutex
  7. }
  8. func NewLocalTopic(topic Topic) *LocalTopic {
  9. return &LocalTopic{
  10. Topic: topic,
  11. Partitions: make([]*LocalPartition, 0),
  12. }
  13. }
  14. func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
  15. localTopic.partitionLock.RLock()
  16. defer localTopic.partitionLock.RUnlock()
  17. for _, localPartition := range localTopic.Partitions {
  18. if localPartition.Partition.Equals(partition) {
  19. return localPartition
  20. }
  21. }
  22. return nil
  23. }
  24. func (localTopic *LocalTopic) removePartition(partition Partition) bool {
  25. localTopic.partitionLock.Lock()
  26. defer localTopic.partitionLock.Unlock()
  27. foundPartitionIndex := -1
  28. for i, localPartition := range localTopic.Partitions {
  29. if localPartition.Partition.Equals(partition) {
  30. foundPartitionIndex = i
  31. localPartition.Shutdown()
  32. break
  33. }
  34. }
  35. if foundPartitionIndex == -1 {
  36. return false
  37. }
  38. localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
  39. return true
  40. }
  41. func (localTopic *LocalTopic) addPartition(localPartition *LocalPartition) {
  42. localTopic.partitionLock.Lock()
  43. defer localTopic.partitionLock.Unlock()
  44. for _, partition := range localTopic.Partitions {
  45. if localPartition.Partition.Equals(partition.Partition) {
  46. return
  47. }
  48. }
  49. localTopic.Partitions = append(localTopic.Partitions, localPartition)
  50. }
  51. func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool {
  52. var wg sync.WaitGroup
  53. for _, localPartition := range localTopic.Partitions {
  54. if localPartition.UnixTimeNs != unixTsNs {
  55. continue
  56. }
  57. wg.Add(1)
  58. go func(localPartition *LocalPartition) {
  59. defer wg.Done()
  60. localPartition.closePublishers()
  61. }(localPartition)
  62. }
  63. wg.Wait()
  64. return true
  65. }
  66. func (localTopic *LocalTopic) closePartitionSubscribers(unixTsNs int64) bool {
  67. var wg sync.WaitGroup
  68. for _, localPartition := range localTopic.Partitions {
  69. if localPartition.UnixTimeNs != unixTsNs {
  70. continue
  71. }
  72. wg.Add(1)
  73. go func(localPartition *LocalPartition) {
  74. defer wg.Done()
  75. localPartition.closeSubscribers()
  76. }(localPartition)
  77. }
  78. wg.Wait()
  79. return true
  80. }
  81. func (localTopic *LocalTopic) WaitUntilNoPublishers() {
  82. for {
  83. var wg sync.WaitGroup
  84. for _, localPartition := range localTopic.Partitions {
  85. wg.Add(1)
  86. go func(localPartition *LocalPartition) {
  87. defer wg.Done()
  88. localPartition.WaitUntilNoPublishers()
  89. }(localPartition)
  90. }
  91. wg.Wait()
  92. if len(localTopic.Partitions) == 0 {
  93. return
  94. }
  95. }
  96. }