You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
6.3 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
11 months ago
1 year ago
1 year ago
1 year ago
1 year ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
  1. package pub_balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "github.com/stretchr/testify/assert"
  7. "testing"
  8. )
  9. func Test_allocateOneBroker(t *testing.T) {
  10. brokers := cmap.New[*BrokerStats]()
  11. brokers.SetIfAbsent("localhost:17777", &BrokerStats{
  12. TopicPartitionCount: 0,
  13. ConsumerCount: 0,
  14. CpuUsagePercent: 0,
  15. })
  16. tests := []struct {
  17. name string
  18. args args
  19. wantAssignments []*mq_pb.BrokerPartitionAssignment
  20. }{
  21. {
  22. name: "test only one broker",
  23. args: args{
  24. brokers: brokers,
  25. partitionCount: 1,
  26. },
  27. wantAssignments: []*mq_pb.BrokerPartitionAssignment{
  28. {
  29. LeaderBroker: "localhost:17777",
  30. Partition: &mq_pb.Partition{
  31. RingSize: MaxPartitionCount,
  32. RangeStart: 0,
  33. RangeStop: MaxPartitionCount,
  34. },
  35. },
  36. },
  37. },
  38. }
  39. testThem(t, tests)
  40. }
  41. type args struct {
  42. brokers cmap.ConcurrentMap[string, *BrokerStats]
  43. partitionCount int32
  44. }
  45. func testThem(t *testing.T, tests []struct {
  46. name string
  47. args args
  48. wantAssignments []*mq_pb.BrokerPartitionAssignment
  49. }) {
  50. for _, tt := range tests {
  51. t.Run(tt.name, func(t *testing.T) {
  52. gotAssignments := AllocateTopicPartitions(tt.args.brokers, tt.args.partitionCount)
  53. assert.Equal(t, len(tt.wantAssignments), len(gotAssignments))
  54. for i, gotAssignment := range gotAssignments {
  55. assert.Equal(t, tt.wantAssignments[i].LeaderBroker, gotAssignment.LeaderBroker)
  56. assert.Equal(t, tt.wantAssignments[i].Partition.RangeStart, gotAssignment.Partition.RangeStart)
  57. assert.Equal(t, tt.wantAssignments[i].Partition.RangeStop, gotAssignment.Partition.RangeStop)
  58. assert.Equal(t, tt.wantAssignments[i].Partition.RingSize, gotAssignment.Partition.RingSize)
  59. }
  60. })
  61. }
  62. }
  63. func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
  64. type args struct {
  65. activeBrokers cmap.ConcurrentMap[string, *BrokerStats]
  66. followerCount int
  67. assignments []*mq_pb.BrokerPartitionAssignment
  68. }
  69. activeBrokers := cmap.New[*BrokerStats]()
  70. activeBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
  71. activeBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
  72. activeBrokers.SetIfAbsent("localhost:3", &BrokerStats{})
  73. activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{})
  74. activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{})
  75. activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{})
  76. lowActiveBrokers := cmap.New[*BrokerStats]()
  77. lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
  78. lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
  79. singleActiveBroker := cmap.New[*BrokerStats]()
  80. singleActiveBroker.SetIfAbsent("localhost:1", &BrokerStats{})
  81. tests := []struct {
  82. name string
  83. args args
  84. hasChanges bool
  85. }{
  86. {
  87. name: "test empty leader",
  88. args: args{
  89. activeBrokers: activeBrokers,
  90. followerCount: 1,
  91. assignments: []*mq_pb.BrokerPartitionAssignment{
  92. {
  93. LeaderBroker: "",
  94. Partition: &mq_pb.Partition{},
  95. FollowerBrokers: []string{
  96. "localhost:2",
  97. },
  98. },
  99. },
  100. },
  101. hasChanges: true,
  102. },
  103. {
  104. name: "test empty follower",
  105. args: args{
  106. activeBrokers: activeBrokers,
  107. followerCount: 1,
  108. assignments: []*mq_pb.BrokerPartitionAssignment{
  109. {
  110. LeaderBroker: "localhost:1",
  111. Partition: &mq_pb.Partition{},
  112. FollowerBrokers: []string{
  113. "",
  114. },
  115. },
  116. },
  117. },
  118. hasChanges: true,
  119. },
  120. {
  121. name: "test dead follower",
  122. args: args{
  123. activeBrokers: activeBrokers,
  124. followerCount: 1,
  125. assignments: []*mq_pb.BrokerPartitionAssignment{
  126. {
  127. LeaderBroker: "localhost:1",
  128. Partition: &mq_pb.Partition{},
  129. FollowerBrokers: []string{
  130. "localhost:200",
  131. },
  132. },
  133. },
  134. },
  135. hasChanges: true,
  136. },
  137. {
  138. name: "test dead leader and follower",
  139. args: args{
  140. activeBrokers: activeBrokers,
  141. followerCount: 1,
  142. assignments: []*mq_pb.BrokerPartitionAssignment{
  143. {
  144. LeaderBroker: "localhost:100",
  145. Partition: &mq_pb.Partition{},
  146. FollowerBrokers: []string{
  147. "localhost:200",
  148. },
  149. },
  150. },
  151. },
  152. hasChanges: true,
  153. },
  154. {
  155. name: "test missing two followers",
  156. args: args{
  157. activeBrokers: activeBrokers,
  158. followerCount: 3,
  159. assignments: []*mq_pb.BrokerPartitionAssignment{
  160. {
  161. LeaderBroker: "localhost:1",
  162. Partition: &mq_pb.Partition{},
  163. FollowerBrokers: []string{
  164. "localhost:2",
  165. },
  166. },
  167. },
  168. },
  169. hasChanges: true,
  170. },
  171. {
  172. name: "test missing some followers",
  173. args: args{
  174. activeBrokers: activeBrokers,
  175. followerCount: 10,
  176. assignments: []*mq_pb.BrokerPartitionAssignment{
  177. {
  178. LeaderBroker: "localhost:1",
  179. Partition: &mq_pb.Partition{},
  180. FollowerBrokers: []string{
  181. "localhost:2",
  182. },
  183. },
  184. },
  185. },
  186. hasChanges: true,
  187. },
  188. {
  189. name: "test low active brokers",
  190. args: args{
  191. activeBrokers: lowActiveBrokers,
  192. followerCount: 3,
  193. assignments: []*mq_pb.BrokerPartitionAssignment{
  194. {
  195. LeaderBroker: "localhost:1",
  196. Partition: &mq_pb.Partition{},
  197. FollowerBrokers: []string{
  198. "localhost:2",
  199. },
  200. },
  201. },
  202. },
  203. hasChanges: false,
  204. },
  205. {
  206. name: "test low active brokers with one follower",
  207. args: args{
  208. activeBrokers: lowActiveBrokers,
  209. followerCount: 1,
  210. assignments: []*mq_pb.BrokerPartitionAssignment{
  211. {
  212. LeaderBroker: "localhost:1",
  213. Partition: &mq_pb.Partition{},
  214. },
  215. },
  216. },
  217. hasChanges: true,
  218. },
  219. {
  220. name: "test single active broker",
  221. args: args{
  222. activeBrokers: singleActiveBroker,
  223. followerCount: 3,
  224. assignments: []*mq_pb.BrokerPartitionAssignment{
  225. {
  226. LeaderBroker: "localhost:1",
  227. Partition: &mq_pb.Partition{},
  228. FollowerBrokers: []string{
  229. "localhost:2",
  230. },
  231. },
  232. },
  233. },
  234. hasChanges: true,
  235. },
  236. }
  237. for _, tt := range tests {
  238. t.Run(tt.name, func(t *testing.T) {
  239. fmt.Printf("%v before %v\n", tt.name, tt.args.assignments)
  240. hasChanges := EnsureAssignmentsToActiveBrokers(tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
  241. assert.Equalf(t, tt.hasChanges, hasChanges, "EnsureAssignmentsToActiveBrokers(%v, %v, %v)", tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments)
  242. fmt.Printf("%v after %v\n", tt.name, tt.args.assignments)
  243. })
  244. }
  245. }