Browse Source

test low active brokers

pull/5637/head
chrislu 10 months ago
parent
commit
50c5dd7313
  1. 5
      weed/mq/pub_balancer/allocate.go
  2. 20
      weed/mq/pub_balancer/allocate_test.go

5
weed/mq/pub_balancer/allocate.go

@ -89,6 +89,7 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string,
return pickedBrokers return pickedBrokers
} }
// EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) { func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
candidates := make([]string, 0, activeBrokers.Count()) candidates := make([]string, 0, activeBrokers.Count())
for brokerStatsItem := range activeBrokers.IterBuffered() { for brokerStatsItem := range activeBrokers.IterBuffered() {
@ -123,20 +124,22 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
if assignment.LeaderBroker == "" { if assignment.LeaderBroker == "" {
assignment.LeaderBroker = pickedBrokers[i] assignment.LeaderBroker = pickedBrokers[i]
i++ i++
hasChanges = true
} }
j := 0 j := 0
for ; j<len(assignment.FollowerBrokers); j++ { for ; j<len(assignment.FollowerBrokers); j++ {
if assignment.FollowerBrokers[j] == "" { if assignment.FollowerBrokers[j] == "" {
assignment.FollowerBrokers[j] = pickedBrokers[i] assignment.FollowerBrokers[j] = pickedBrokers[i]
i++ i++
hasChanges = true
} }
} }
if i < len(pickedBrokers) { if i < len(pickedBrokers) {
assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...) assignment.FollowerBrokers = append(assignment.FollowerBrokers, pickedBrokers[i:]...)
hasChanges = true
} }
} }
hasChanges = hasChanges || count > 0
} }
return return

20
weed/mq/pub_balancer/allocate_test.go

@ -80,6 +80,9 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{}) activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{})
activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{}) activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{})
activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{}) activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{})
lowActiveBrokers := cmap.New[*BrokerStats]()
lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{})
lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{})
tests := []struct { tests := []struct {
name string name string
args args args args
@ -187,6 +190,23 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
}, },
hasChanges: true, hasChanges: true,
}, },
{
name: "test low active brokers",
args: args{
activeBrokers: lowActiveBrokers,
followerCount: 3,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:2",
},
},
},
},
hasChanges: false,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {

Loading…
Cancel
Save