You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.7 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
11 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package pub_balancer
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "math/rand"
  7. "time"
  8. )
  9. func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) {
  10. // divide the ring into partitions
  11. now := time.Now().UnixNano()
  12. rangeSize := MaxPartitionCount / partitionCount
  13. for i := int32(0); i < partitionCount; i++ {
  14. assignment := &mq_pb.BrokerPartitionAssignment{
  15. Partition: &mq_pb.Partition{
  16. RingSize: MaxPartitionCount,
  17. RangeStart: int32(i * rangeSize),
  18. RangeStop: int32((i + 1) * rangeSize),
  19. UnixTimeNs: now,
  20. },
  21. }
  22. if i == partitionCount-1 {
  23. assignment.Partition.RangeStop = MaxPartitionCount
  24. }
  25. assignments = append(assignments, assignment)
  26. }
  27. // pick the brokers
  28. pickedBrokers := pickBrokers(brokers, partitionCount)
  29. // assign the partitions to brokers
  30. for i, assignment := range assignments {
  31. assignment.LeaderBroker = pickedBrokers[i]
  32. }
  33. glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
  34. return
  35. }
  36. // randomly pick n brokers, which may contain duplicates
  37. // TODO pick brokers based on the broker stats
  38. func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string {
  39. candidates := make([]string, 0, brokers.Count())
  40. for brokerStatsItem := range brokers.IterBuffered() {
  41. candidates = append(candidates, brokerStatsItem.Key)
  42. }
  43. pickedBrokers := make([]string, 0, count)
  44. for i := int32(0); i < count; i++ {
  45. p := rand.Intn(len(candidates))
  46. pickedBrokers = append(pickedBrokers, candidates[p])
  47. }
  48. return pickedBrokers
  49. }
  50. // reservoir sampling select N brokers from the active brokers, with exclusion of the excluded brokers
  51. func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBrokers []string) []string {
  52. // convert the excluded brokers to a map
  53. excludedBrokerMap := make(map[string]bool)
  54. for _, broker := range excludedBrokers {
  55. excludedBrokerMap[broker] = true
  56. }
  57. if excludedLeadBroker != "" {
  58. excludedBrokerMap[excludedLeadBroker] = true
  59. }
  60. pickedBrokers := make([]string, 0, count)
  61. for i, broker := range brokers {
  62. if _, found := excludedBrokerMap[broker]; found {
  63. continue
  64. }
  65. if len(pickedBrokers) < count {
  66. pickedBrokers = append(pickedBrokers, broker)
  67. } else {
  68. j := rand.Intn(i + 1)
  69. if j < count {
  70. pickedBrokers[j] = broker
  71. }
  72. }
  73. }
  74. // shuffle the picked brokers
  75. count = len(pickedBrokers)
  76. for i := 0; i < count; i++ {
  77. j := rand.Intn(count)
  78. pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i]
  79. }
  80. return pickedBrokers
  81. }
  82. // EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
  83. func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
  84. candidates := make([]string, 0, activeBrokers.Count())
  85. for brokerStatsItem := range activeBrokers.IterBuffered() {
  86. candidates = append(candidates, brokerStatsItem.Key)
  87. }
  88. for _, assignment := range assignments {
  89. // count how many brokers are needed
  90. count := 0
  91. if assignment.LeaderBroker == "" {
  92. count++
  93. } else if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
  94. assignment.LeaderBroker = ""
  95. count++
  96. }
  97. for i:=0; i<followerCount; i++ {
  98. if i >= len(assignment.FollowerBrokers) {
  99. count++
  100. continue
  101. }
  102. if assignment.FollowerBrokers[i] == "" {
  103. count++
  104. } else if _, found := activeBrokers.Get(assignment.FollowerBrokers[i]); !found {
  105. assignment.FollowerBrokers[i] = ""
  106. count++
  107. }
  108. }
  109. if count > 0 {
  110. pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers)
  111. i := 0
  112. if assignment.LeaderBroker == "" {
  113. assignment.LeaderBroker = pickedBrokers[i]
  114. i++
  115. hasChanges = true
  116. }
  117. hasEmptyFollowers := false
  118. j := 0
  119. for ; j<len(assignment.FollowerBrokers); j++ {
  120. if assignment.FollowerBrokers[j] == "" {
  121. hasChanges = true
  122. if i < len(pickedBrokers) {
  123. assignment.FollowerBrokers[j] = pickedBrokers[i]
  124. i++
  125. } else {
  126. hasEmptyFollowers = true
  127. }
  128. }
  129. }
  130. if hasEmptyFollowers {
  131. var followerBrokers []string
  132. for _, follower := range assignment.FollowerBrokers {
  133. if follower != "" {
  134. followerBrokers = append(followerBrokers, follower)
  135. }
  136. }
  137. assignment.FollowerBrokers = followerBrokers
  138. }
  139. if i < len(pickedBrokers) {
  140. assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...)
  141. hasChanges = true
  142. }
  143. }
  144. }
  145. return
  146. }