You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
2.6 KiB

Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
  1. package pub_balancer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  4. "reflect"
  5. "testing"
  6. )
  7. func Test_findMissingPartitions(t *testing.T) {
  8. type args struct {
  9. partitions []topic.Partition
  10. }
  11. tests := []struct {
  12. name string
  13. args args
  14. wantMissingPartitions []topic.Partition
  15. }{
  16. {
  17. name: "one partition",
  18. args: args{
  19. partitions: []topic.Partition{
  20. {RingSize: 1024, RangeStart: 0, RangeStop: 1024},
  21. },
  22. },
  23. wantMissingPartitions: nil,
  24. },
  25. {
  26. name: "two partitions",
  27. args: args{
  28. partitions: []topic.Partition{
  29. {RingSize: 1024, RangeStart: 0, RangeStop: 512},
  30. {RingSize: 1024, RangeStart: 512, RangeStop: 1024},
  31. },
  32. },
  33. wantMissingPartitions: nil,
  34. },
  35. {
  36. name: "four partitions, missing last two",
  37. args: args{
  38. partitions: []topic.Partition{
  39. {RingSize: 1024, RangeStart: 0, RangeStop: 256},
  40. {RingSize: 1024, RangeStart: 256, RangeStop: 512},
  41. },
  42. },
  43. wantMissingPartitions: []topic.Partition{
  44. {RingSize: 1024, RangeStart: 512, RangeStop: 768},
  45. {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
  46. },
  47. },
  48. {
  49. name: "four partitions, missing first two",
  50. args: args{
  51. partitions: []topic.Partition{
  52. {RingSize: 1024, RangeStart: 512, RangeStop: 768},
  53. {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
  54. },
  55. },
  56. wantMissingPartitions: []topic.Partition{
  57. {RingSize: 1024, RangeStart: 0, RangeStop: 256},
  58. {RingSize: 1024, RangeStart: 256, RangeStop: 512},
  59. },
  60. },
  61. {
  62. name: "four partitions, missing middle two",
  63. args: args{
  64. partitions: []topic.Partition{
  65. {RingSize: 1024, RangeStart: 0, RangeStop: 256},
  66. {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
  67. },
  68. },
  69. wantMissingPartitions: []topic.Partition{
  70. {RingSize: 1024, RangeStart: 256, RangeStop: 512},
  71. {RingSize: 1024, RangeStart: 512, RangeStop: 768},
  72. },
  73. },
  74. {
  75. name: "four partitions, missing three",
  76. args: args{
  77. partitions: []topic.Partition{
  78. {RingSize: 1024, RangeStart: 512, RangeStop: 768},
  79. },
  80. },
  81. wantMissingPartitions: []topic.Partition{
  82. {RingSize: 1024, RangeStart: 0, RangeStop: 256},
  83. {RingSize: 1024, RangeStart: 256, RangeStop: 512},
  84. {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
  85. },
  86. },
  87. }
  88. for _, tt := range tests {
  89. t.Run(tt.name, func(t *testing.T) {
  90. if gotMissingPartitions := findMissingPartitions(tt.args.partitions, 1024); !reflect.DeepEqual(gotMissingPartitions, tt.wantMissingPartitions) {
  91. t.Errorf("findMissingPartitions() = %v, want %v", gotMissingPartitions, tt.wantMissingPartitions)
  92. }
  93. })
  94. }
  95. }