You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
4.0 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package pub_balancer
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  7. "math/rand"
  8. "time"
  9. )
  10. func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) {
  11. // divide the ring into partitions
  12. now := time.Now().UnixNano()
  13. rangeSize := MaxPartitionCount / partitionCount
  14. for i := int32(0); i < partitionCount; i++ {
  15. assignment := &mq_pb.BrokerPartitionAssignment{
  16. Partition: &schema_pb.Partition{
  17. RingSize: MaxPartitionCount,
  18. RangeStart: int32(i * rangeSize),
  19. RangeStop: int32((i + 1) * rangeSize),
  20. UnixTimeNs: now,
  21. },
  22. }
  23. if i == partitionCount-1 {
  24. assignment.Partition.RangeStop = MaxPartitionCount
  25. }
  26. assignments = append(assignments, assignment)
  27. }
  28. EnsureAssignmentsToActiveBrokers(brokers, 1, assignments)
  29. glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
  30. return
  31. }
  32. // randomly pick n brokers, which may contain duplicates
  33. // TODO pick brokers based on the broker stats
  34. func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string {
  35. candidates := make([]string, 0, brokers.Count())
  36. for brokerStatsItem := range brokers.IterBuffered() {
  37. candidates = append(candidates, brokerStatsItem.Key)
  38. }
  39. pickedBrokers := make([]string, 0, count)
  40. for i := int32(0); i < count; i++ {
  41. p := rand.Intn(len(candidates))
  42. pickedBrokers = append(pickedBrokers, candidates[p])
  43. }
  44. return pickedBrokers
  45. }
  46. // reservoir sampling select N brokers from the active brokers, with exclusion of the excluded broker
  47. func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBroker string) []string {
  48. pickedBrokers := make([]string, 0, count)
  49. for i, broker := range brokers {
  50. if broker == excludedBroker {
  51. continue
  52. }
  53. if len(pickedBrokers) < count {
  54. pickedBrokers = append(pickedBrokers, broker)
  55. } else {
  56. j := rand.Intn(i + 1)
  57. if j < count {
  58. pickedBrokers[j] = broker
  59. }
  60. }
  61. }
  62. // shuffle the picked brokers
  63. count = len(pickedBrokers)
  64. for i := 0; i < count; i++ {
  65. j := rand.Intn(count)
  66. pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i]
  67. }
  68. return pickedBrokers
  69. }
  70. // EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
  71. func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
  72. glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments)
  73. candidates := make([]string, 0, activeBrokers.Count())
  74. for brokerStatsItem := range activeBrokers.IterBuffered() {
  75. candidates = append(candidates, brokerStatsItem.Key)
  76. }
  77. for _, assignment := range assignments {
  78. // count how many brokers are needed
  79. count := 0
  80. if assignment.LeaderBroker == "" {
  81. count++
  82. } else if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
  83. assignment.LeaderBroker = ""
  84. count++
  85. }
  86. if assignment.FollowerBroker == "" {
  87. count++
  88. } else if _, found := activeBrokers.Get(assignment.FollowerBroker); !found {
  89. assignment.FollowerBroker = ""
  90. count++
  91. }
  92. if count > 0 {
  93. pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBroker)
  94. i := 0
  95. if assignment.LeaderBroker == "" {
  96. if i < len(pickedBrokers) {
  97. assignment.LeaderBroker = pickedBrokers[i]
  98. i++
  99. hasChanges = true
  100. }
  101. }
  102. if assignment.FollowerBroker == "" {
  103. if i < len(pickedBrokers) {
  104. assignment.FollowerBroker = pickedBrokers[i]
  105. i++
  106. hasChanges = true
  107. }
  108. }
  109. }
  110. }
  111. glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges)
  112. return
  113. }