You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
6.3 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
10 months ago
1 year ago
1 year ago
11 months ago
1 year ago
10 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
10 months ago
9 months ago
9 months ago
9 months ago
9 months ago
9 months ago
9 months ago
10 months ago
  1. package pub_balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "github.com/stretchr/testify/assert"
  7. "testing"
  8. )
  9. func Test_allocateOneBroker(t *testing.T) {
  10. brokers := cmap.New[*BrokerStats]()
  11. brokers.SetIfAbsent("localhost:17777", &BrokerStats{
  12. TopicPartitionCount: 0,
  13. CpuUsagePercent: 0,
  14. })
  15. tests := []struct {
  16. name string
  17. args args
  18. wantAssignments []*mq_pb.BrokerPartitionAssignment
  19. }{
  20. {
  21. name: "test only one broker",
  22. args: args{
  23. brokers: brokers,
  24. partitionCount: 1,
  25. },
  26. wantAssignments: []*mq_pb.BrokerPartitionAssignment{
  27. {
  28. LeaderBroker: "localhost:17777",
  29. Partition: &mq_pb.Partition{
  30. RingSize: MaxPartitionCount,
  31. RangeStart: 0,
  32. RangeStop: MaxPartitionCount,
  33. },
  34. },
  35. },
  36. },
  37. }
  38. testThem(t, tests)
  39. }
  40. type args struct {
  41. brokers cmap.ConcurrentMap[string, *BrokerStats]
  42. partitionCount int32
  43. }
  44. func testThem(t *testing.T, tests []struct {
  45. name string
  46. args args
  47. wantAssignments []*mq_pb.BrokerPartitionAssignment
  48. }) {
  49. for _, tt := range tests {
  50. t.Run(tt.name, func(t *testing.T) {
  51. gotAssignments := AllocateTopicPartitions(tt.args.brokers, tt.args.partitionCount)
  52. assert.Equal(t, len(tt.wantAssignments), len(gotAssignments))
  53. for i, gotAssignment := range gotAssignments {
  54. assert.Equal(t, tt.wantAssignments[i].LeaderBroker, gotAssignment.LeaderBroker)
  55. assert.Equal(t, tt.wantAssignments[i].Partition.RangeStart, gotAssignment.Partition.RangeStart)
  56. assert.Equal(t, tt.wantAssignments[i].Partition.RangeStop, gotAssignment.Partition.RangeStop)
  57. assert.Equal(t, tt.wantAssignments[i].Partition.RingSize, gotAssignment.Partition.RingSize)
  58. }
  59. })
  60. }
  61. }
  62. func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
  63. type args struct {
  64. activeBrokers cmap.ConcurrentMap[string, *BrokerStats]
  65. followerCount int
  66. assignments []*mq_pb.BrokerPartitionAssignment
  67. }
  68. activeBrokers := cmap.New[*BrokerStats]()
  69. activeBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
  70. activeBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
  71. activeBrokers.SetIfAbsent("localhost:3", &BrokerStats{})
  72. activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{})
  73. activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{})
  74. activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{})
  75. lowActiveBrokers := cmap.New[*BrokerStats]()
  76. lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
  77. lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
  78. singleActiveBroker := cmap.New[*BrokerStats]()
  79. singleActiveBroker.SetIfAbsent("localhost:1", &BrokerStats{})
  80. tests := []struct {
  81. name string
  82. args args
  83. hasChanges bool
  84. }{
  85. {
  86. name: "test empty leader",
  87. args: args{
  88. activeBrokers: activeBrokers,
  89. followerCount: 1,
  90. assignments: []*mq_pb.BrokerPartitionAssignment{
  91. {
  92. LeaderBroker: "",
  93. Partition: &mq_pb.Partition{},
  94. FollowerBrokers: []string{
  95. "localhost:2",
  96. },
  97. },
  98. },
  99. },
  100. hasChanges: true,
  101. },
  102. {
  103. name: "test empty follower",
  104. args: args{
  105. activeBrokers: activeBrokers,
  106. followerCount: 1,
  107. assignments: []*mq_pb.BrokerPartitionAssignment{
  108. {
  109. LeaderBroker: "localhost:1",
  110. Partition: &mq_pb.Partition{},
  111. FollowerBrokers: []string{
  112. "",
  113. },
  114. },
  115. },
  116. },
  117. hasChanges: true,
  118. },
  119. {
  120. name: "test dead follower",
  121. args: args{
  122. activeBrokers: activeBrokers,
  123. followerCount: 1,
  124. assignments: []*mq_pb.BrokerPartitionAssignment{
  125. {
  126. LeaderBroker: "localhost:1",
  127. Partition: &mq_pb.Partition{},
  128. FollowerBrokers: []string{
  129. "localhost:200",
  130. },
  131. },
  132. },
  133. },
  134. hasChanges: true,
  135. },
  136. {
  137. name: "test dead leader and follower",
  138. args: args{
  139. activeBrokers: activeBrokers,
  140. followerCount: 1,
  141. assignments: []*mq_pb.BrokerPartitionAssignment{
  142. {
  143. LeaderBroker: "localhost:100",
  144. Partition: &mq_pb.Partition{},
  145. FollowerBrokers: []string{
  146. "localhost:200",
  147. },
  148. },
  149. },
  150. },
  151. hasChanges: true,
  152. },
  153. {
  154. name: "test missing two followers",
  155. args: args{
  156. activeBrokers: activeBrokers,
  157. followerCount: 3,
  158. assignments: []*mq_pb.BrokerPartitionAssignment{
  159. {
  160. LeaderBroker: "localhost:1",
  161. Partition: &mq_pb.Partition{},
  162. FollowerBrokers: []string{
  163. "localhost:2",
  164. },
  165. },
  166. },
  167. },
  168. hasChanges: true,
  169. },
  170. {
  171. name: "test missing some followers",
  172. args: args{
  173. activeBrokers: activeBrokers,
  174. followerCount: 10,
  175. assignments: []*mq_pb.BrokerPartitionAssignment{
  176. {
  177. LeaderBroker: "localhost:1",
  178. Partition: &mq_pb.Partition{},
  179. FollowerBrokers: []string{
  180. "localhost:2",
  181. },
  182. },
  183. },
  184. },
  185. hasChanges: true,
  186. },
  187. {
  188. name: "test low active brokers",
  189. args: args{
  190. activeBrokers: lowActiveBrokers,
  191. followerCount: 3,
  192. assignments: []*mq_pb.BrokerPartitionAssignment{
  193. {
  194. LeaderBroker: "localhost:1",
  195. Partition: &mq_pb.Partition{},
  196. FollowerBrokers: []string{
  197. "localhost:2",
  198. },
  199. },
  200. },
  201. },
  202. hasChanges: false,
  203. },
  204. {
  205. name: "test low active brokers with one follower",
  206. args: args{
  207. activeBrokers: lowActiveBrokers,
  208. followerCount: 1,
  209. assignments: []*mq_pb.BrokerPartitionAssignment{
  210. {
  211. LeaderBroker: "localhost:1",
  212. Partition: &mq_pb.Partition{},
  213. },
  214. },
  215. },
  216. hasChanges: true,
  217. },
  218. {
  219. name: "test single active broker",
  220. args: args{
  221. activeBrokers: singleActiveBroker,
  222. followerCount: 3,
  223. assignments: []*mq_pb.BrokerPartitionAssignment{
  224. {
  225. LeaderBroker: "localhost:1",
  226. Partition: &mq_pb.Partition{},
  227. FollowerBrokers: []string{
  228. "localhost:2",
  229. },
  230. },
  231. },
  232. },
  233. hasChanges: true,
  234. },
  235. }
  236. for _, tt := range tests {
  237. t.Run(tt.name, func(t *testing.T) {
  238. fmt.Printf("%v before %v\n", tt.name, tt.args.assignments)
  239. hasChanges := EnsureAssignmentsToActiveBrokers(tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
  240. assert.Equalf(t, tt.hasChanges, hasChanges, "EnsureAssignmentsToActiveBrokers(%v, %v, %v)", tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
  241. fmt.Printf("%v after %v\n", tt.name, tt.args.assignments)
  242. })
  243. }
  244. }