You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
5.4 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
11 months ago
1 year ago
1 year ago
1 year ago
1 year ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
5 months ago
11 months ago
10 months ago
11 months ago
5 months ago
11 months ago
10 months ago
11 months ago
5 months ago
11 months ago
10 months ago
11 months ago
5 months ago
11 months ago
10 months ago
5 months ago
10 months ago
10 months ago
10 months ago
5 months ago
11 months ago
  1. package pub_balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "github.com/stretchr/testify/assert"
  7. "testing"
  8. )
  9. func Test_allocateOneBroker(t *testing.T) {
  10. brokers := cmap.New[*BrokerStats]()
  11. brokers.SetIfAbsent("localhost:17777", &BrokerStats{
  12. TopicPartitionCount: 0,
  13. CpuUsagePercent: 0,
  14. })
  15. tests := []struct {
  16. name string
  17. args args
  18. wantAssignments []*mq_pb.BrokerPartitionAssignment
  19. }{
  20. {
  21. name: "test only one broker",
  22. args: args{
  23. brokers: brokers,
  24. partitionCount: 1,
  25. },
  26. wantAssignments: []*mq_pb.BrokerPartitionAssignment{
  27. {
  28. LeaderBroker: "localhost:17777",
  29. Partition: &mq_pb.Partition{
  30. RingSize: MaxPartitionCount,
  31. RangeStart: 0,
  32. RangeStop: MaxPartitionCount,
  33. },
  34. },
  35. },
  36. },
  37. }
  38. testThem(t, tests)
  39. }
  40. type args struct {
  41. brokers cmap.ConcurrentMap[string, *BrokerStats]
  42. partitionCount int32
  43. }
  44. func testThem(t *testing.T, tests []struct {
  45. name string
  46. args args
  47. wantAssignments []*mq_pb.BrokerPartitionAssignment
  48. }) {
  49. for _, tt := range tests {
  50. t.Run(tt.name, func(t *testing.T) {
  51. gotAssignments := AllocateTopicPartitions(tt.args.brokers, tt.args.partitionCount)
  52. assert.Equal(t, len(tt.wantAssignments), len(gotAssignments))
  53. for i, gotAssignment := range gotAssignments {
  54. assert.Equal(t, tt.wantAssignments[i].LeaderBroker, gotAssignment.LeaderBroker)
  55. assert.Equal(t, tt.wantAssignments[i].Partition.RangeStart, gotAssignment.Partition.RangeStart)
  56. assert.Equal(t, tt.wantAssignments[i].Partition.RangeStop, gotAssignment.Partition.RangeStop)
  57. assert.Equal(t, tt.wantAssignments[i].Partition.RingSize, gotAssignment.Partition.RingSize)
  58. }
  59. })
  60. }
  61. }
  62. func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
  63. type args struct {
  64. activeBrokers cmap.ConcurrentMap[string, *BrokerStats]
  65. followerCount int
  66. assignments []*mq_pb.BrokerPartitionAssignment
  67. }
  68. activeBrokers := cmap.New[*BrokerStats]()
  69. activeBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
  70. activeBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
  71. activeBrokers.SetIfAbsent("localhost:3", &BrokerStats{})
  72. activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{})
  73. activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{})
  74. activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{})
  75. lowActiveBrokers := cmap.New[*BrokerStats]()
  76. lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
  77. lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
  78. singleActiveBroker := cmap.New[*BrokerStats]()
  79. singleActiveBroker.SetIfAbsent("localhost:1", &BrokerStats{})
  80. tests := []struct {
  81. name string
  82. args args
  83. hasChanges bool
  84. }{
  85. {
  86. name: "test empty leader",
  87. args: args{
  88. activeBrokers: activeBrokers,
  89. followerCount: 1,
  90. assignments: []*mq_pb.BrokerPartitionAssignment{
  91. {
  92. LeaderBroker: "",
  93. Partition: &mq_pb.Partition{},
  94. FollowerBroker: "localhost:2",
  95. },
  96. },
  97. },
  98. hasChanges: true,
  99. },
  100. {
  101. name: "test empty follower",
  102. args: args{
  103. activeBrokers: activeBrokers,
  104. followerCount: 1,
  105. assignments: []*mq_pb.BrokerPartitionAssignment{
  106. {
  107. LeaderBroker: "localhost:1",
  108. Partition: &mq_pb.Partition{},
  109. FollowerBroker: "",
  110. },
  111. },
  112. },
  113. hasChanges: true,
  114. },
  115. {
  116. name: "test dead follower",
  117. args: args{
  118. activeBrokers: activeBrokers,
  119. followerCount: 1,
  120. assignments: []*mq_pb.BrokerPartitionAssignment{
  121. {
  122. LeaderBroker: "localhost:1",
  123. Partition: &mq_pb.Partition{},
  124. FollowerBroker: "localhost:200",
  125. },
  126. },
  127. },
  128. hasChanges: true,
  129. },
  130. {
  131. name: "test dead leader and follower",
  132. args: args{
  133. activeBrokers: activeBrokers,
  134. followerCount: 1,
  135. assignments: []*mq_pb.BrokerPartitionAssignment{
  136. {
  137. LeaderBroker: "localhost:100",
  138. Partition: &mq_pb.Partition{},
  139. FollowerBroker: "localhost:200",
  140. },
  141. },
  142. },
  143. hasChanges: true,
  144. },
  145. {
  146. name: "test low active brokers",
  147. args: args{
  148. activeBrokers: lowActiveBrokers,
  149. followerCount: 3,
  150. assignments: []*mq_pb.BrokerPartitionAssignment{
  151. {
  152. LeaderBroker: "localhost:1",
  153. Partition: &mq_pb.Partition{},
  154. FollowerBroker: "localhost:2",
  155. },
  156. },
  157. },
  158. hasChanges: false,
  159. },
  160. {
  161. name: "test low active brokers with one follower",
  162. args: args{
  163. activeBrokers: lowActiveBrokers,
  164. followerCount: 1,
  165. assignments: []*mq_pb.BrokerPartitionAssignment{
  166. {
  167. LeaderBroker: "localhost:1",
  168. Partition: &mq_pb.Partition{},
  169. },
  170. },
  171. },
  172. hasChanges: true,
  173. },
  174. {
  175. name: "test single active broker",
  176. args: args{
  177. activeBrokers: singleActiveBroker,
  178. followerCount: 3,
  179. assignments: []*mq_pb.BrokerPartitionAssignment{
  180. {
  181. LeaderBroker: "localhost:1",
  182. Partition: &mq_pb.Partition{},
  183. FollowerBroker: "localhost:2",
  184. },
  185. },
  186. },
  187. hasChanges: true,
  188. },
  189. }
  190. for _, tt := range tests {
  191. t.Run(tt.name, func(t *testing.T) {
  192. fmt.Printf("%v before %v\n", tt.name, tt.args.assignments)
  193. hasChanges := EnsureAssignmentsToActiveBrokers(tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
  194. assert.Equalf(t, tt.hasChanges, hasChanges, "EnsureAssignmentsToActiveBrokers(%v, %v, %v)", tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
  195. fmt.Printf("%v after %v\n", tt.name, tt.args.assignments)
  196. })
  197. }
  198. }