You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

63 lines
1.4 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
10 months ago
  1. package topic
  2. import "sync"
  3. type LocalPartitionSubscribers struct {
  4. Subscribers map[string]*LocalSubscriber
  5. SubscribersLock sync.RWMutex
  6. }
  7. type LocalSubscriber struct {
  8. stopCh chan struct{}
  9. }
  10. func NewLocalSubscriber() *LocalSubscriber {
  11. return &LocalSubscriber{
  12. stopCh: make(chan struct{}, 1),
  13. }
  14. }
  15. func (p *LocalSubscriber) SignalShutdown() {
  16. close(p.stopCh)
  17. }
  18. func NewLocalPartitionSubscribers() *LocalPartitionSubscribers {
  19. return &LocalPartitionSubscribers{
  20. Subscribers: make(map[string]*LocalSubscriber),
  21. }
  22. }
  23. func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber) {
  24. p.SubscribersLock.Lock()
  25. defer p.SubscribersLock.Unlock()
  26. p.Subscribers[clientName] = Subscriber
  27. }
  28. func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string) {
  29. p.SubscribersLock.Lock()
  30. defer p.SubscribersLock.Unlock()
  31. delete(p.Subscribers, clientName)
  32. }
  33. func (p *LocalPartitionSubscribers) SignalShutdown() {
  34. p.SubscribersLock.RLock()
  35. defer p.SubscribersLock.RUnlock()
  36. for _, Subscriber := range p.Subscribers {
  37. Subscriber.SignalShutdown()
  38. }
  39. }
  40. func (p *LocalPartitionSubscribers) IsEmpty() bool {
  41. p.SubscribersLock.RLock()
  42. defer p.SubscribersLock.RUnlock()
  43. return len(p.Subscribers) == 0
  44. }
  45. func (p *LocalPartitionSubscribers) Size() int {
  46. p.SubscribersLock.RLock()
  47. defer p.SubscribersLock.RUnlock()
  48. return len(p.Subscribers)
  49. }