You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
2.3 KiB

  1. package coordinator
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  4. "sync"
  5. )
  6. func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) {
  7. cg.MinimumActiveInstances = min
  8. cg.MaximumActiveInstances = max
  9. }
  10. func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance {
  11. cgi := &ConsumerGroupInstance{
  12. ClientId: clientId,
  13. }
  14. cg.ConsumerGroupInstances.Set(clientId, cgi)
  15. return cgi
  16. }
  17. func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) {
  18. cg.ConsumerGroupInstances.Remove(clientId)
  19. }
  20. func (cg *ConsumerGroup) CoordinateIfNeeded() {
  21. emptyInstanceCount, activeInstanceCount := int32(0), int32(0)
  22. for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
  23. if cgi.Val.Partition == nil {
  24. // this consumer group instance is not assigned a partition
  25. // need to assign one
  26. emptyInstanceCount++
  27. } else {
  28. activeInstanceCount++
  29. }
  30. }
  31. var delta int32
  32. if emptyInstanceCount > 0 {
  33. if cg.MinimumActiveInstances <= 0 {
  34. // need to assign more partitions
  35. delta = emptyInstanceCount
  36. } else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances {
  37. // need to assign more partitions
  38. delta = cg.MinimumActiveInstances - activeInstanceCount
  39. }
  40. }
  41. if cg.MaximumActiveInstances > 0 {
  42. if activeInstanceCount > cg.MaximumActiveInstances {
  43. // need to remove some partitions
  44. delta = cg.MaximumActiveInstances - activeInstanceCount
  45. }
  46. }
  47. if delta == 0 {
  48. return
  49. }
  50. cg.doCoordinate(activeInstanceCount + delta)
  51. }
  52. func (cg *ConsumerGroup) doCoordinate(target int32) {
  53. // stop existing instances from processing
  54. var wg sync.WaitGroup
  55. for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
  56. if cgi.Val.Partition != nil {
  57. wg.Add(1)
  58. go func(cgi *ConsumerGroupInstance) {
  59. defer wg.Done()
  60. // stop processing
  61. // flush internal state
  62. // wait for all messages to be processed
  63. // close the connection
  64. }(cgi.Val)
  65. }
  66. }
  67. wg.Wait()
  68. partitions := topic.SplitPartitions(target)
  69. // assign partitions to new instances
  70. i := 0
  71. for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
  72. cgi.Val.Partition = partitions[i]
  73. i++
  74. wg.Add(1)
  75. go func(cgi *ConsumerGroupInstance) {
  76. defer wg.Done()
  77. // start processing
  78. // start consuming from the last offset
  79. }(cgi.Val)
  80. }
  81. wg.Wait()
  82. }