You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
3.9 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
11 months ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package pub_balancer
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "math/rand"
  7. "time"
  8. )
  9. func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) {
  10. // divide the ring into partitions
  11. now := time.Now().UnixNano()
  12. rangeSize := MaxPartitionCount / partitionCount
  13. for i := int32(0); i < partitionCount; i++ {
  14. assignment := &mq_pb.BrokerPartitionAssignment{
  15. Partition: &mq_pb.Partition{
  16. RingSize: MaxPartitionCount,
  17. RangeStart: int32(i * rangeSize),
  18. RangeStop: int32((i + 1) * rangeSize),
  19. UnixTimeNs: now,
  20. },
  21. }
  22. if i == partitionCount-1 {
  23. assignment.Partition.RangeStop = MaxPartitionCount
  24. }
  25. assignments = append(assignments, assignment)
  26. }
  27. EnsureAssignmentsToActiveBrokers(brokers, 1, assignments)
  28. glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
  29. return
  30. }
  31. // randomly pick n brokers, which may contain duplicates
  32. // TODO pick brokers based on the broker stats
  33. func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string {
  34. candidates := make([]string, 0, brokers.Count())
  35. for brokerStatsItem := range brokers.IterBuffered() {
  36. candidates = append(candidates, brokerStatsItem.Key)
  37. }
  38. pickedBrokers := make([]string, 0, count)
  39. for i := int32(0); i < count; i++ {
  40. p := rand.Intn(len(candidates))
  41. pickedBrokers = append(pickedBrokers, candidates[p])
  42. }
  43. return pickedBrokers
  44. }
  45. // reservoir sampling select N brokers from the active brokers, with exclusion of the excluded broker
  46. func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBroker string) []string {
  47. pickedBrokers := make([]string, 0, count)
  48. for i, broker := range brokers {
  49. if broker == excludedBroker {
  50. continue
  51. }
  52. if len(pickedBrokers) < count {
  53. pickedBrokers = append(pickedBrokers, broker)
  54. } else {
  55. j := rand.Intn(i + 1)
  56. if j < count {
  57. pickedBrokers[j] = broker
  58. }
  59. }
  60. }
  61. // shuffle the picked brokers
  62. count = len(pickedBrokers)
  63. for i := 0; i < count; i++ {
  64. j := rand.Intn(count)
  65. pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i]
  66. }
  67. return pickedBrokers
  68. }
  69. // EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
  70. func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
  71. glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments)
  72. candidates := make([]string, 0, activeBrokers.Count())
  73. for brokerStatsItem := range activeBrokers.IterBuffered() {
  74. candidates = append(candidates, brokerStatsItem.Key)
  75. }
  76. for _, assignment := range assignments {
  77. // count how many brokers are needed
  78. count := 0
  79. if assignment.LeaderBroker == "" {
  80. count++
  81. } else if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
  82. assignment.LeaderBroker = ""
  83. count++
  84. }
  85. if assignment.FollowerBroker == "" {
  86. count++
  87. } else if _, found := activeBrokers.Get(assignment.FollowerBroker); !found {
  88. assignment.FollowerBroker = ""
  89. count++
  90. }
  91. if count > 0 {
  92. pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBroker)
  93. i := 0
  94. if assignment.LeaderBroker == "" {
  95. if i < len(pickedBrokers) {
  96. assignment.LeaderBroker = pickedBrokers[i]
  97. i++
  98. hasChanges = true
  99. }
  100. }
  101. if assignment.FollowerBroker == "" {
  102. if i < len(pickedBrokers) {
  103. assignment.FollowerBroker = pickedBrokers[i]
  104. i++
  105. hasChanges = true
  106. }
  107. }
  108. }
  109. }
  110. glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges)
  111. return
  112. }