Browse Source

edge cases for broker assignment

pull/5637/head
chrislu 10 months ago
parent
commit
49869eec83
  1. 12
      weed/mq/pub_balancer/allocate.go
  2. 14
      weed/mq/pub_balancer/allocate_test.go

12
weed/mq/pub_balancer/allocate.go

@ -27,13 +27,8 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p
assignments = append(assignments, assignment) assignments = append(assignments, assignment)
} }
// pick the brokers
pickedBrokers := pickBrokers(brokers, partitionCount)
EnsureAssignmentsToActiveBrokers(brokers, 1, assignments)
// assign the partitions to brokers
for i, assignment := range assignments {
assignment.LeaderBroker = pickedBrokers[i]
}
glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments) glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
return return
} }
@ -91,6 +86,8 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string,
// EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers // EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) { func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments)
candidates := make([]string, 0, activeBrokers.Count()) candidates := make([]string, 0, activeBrokers.Count())
for brokerStatsItem := range activeBrokers.IterBuffered() { for brokerStatsItem := range activeBrokers.IterBuffered() {
candidates = append(candidates, brokerStatsItem.Key) candidates = append(candidates, brokerStatsItem.Key)
@ -122,10 +119,12 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers) pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers)
i := 0 i := 0
if assignment.LeaderBroker == "" { if assignment.LeaderBroker == "" {
if i < len(pickedBrokers) {
assignment.LeaderBroker = pickedBrokers[i] assignment.LeaderBroker = pickedBrokers[i]
i++ i++
hasChanges = true hasChanges = true
} }
}
hasEmptyFollowers := false hasEmptyFollowers := false
j := 0 j := 0
@ -158,5 +157,6 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
} }
glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges)
return return
} }

14
weed/mq/pub_balancer/allocate_test.go

@ -209,6 +209,20 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
}, },
hasChanges: false, hasChanges: false,
}, },
{
name: "test low active brokers with one follower",
args: args{
activeBrokers: lowActiveBrokers,
followerCount: 1,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
Partition: &mq_pb.Partition{},
},
},
},
hasChanges: true,
},
{ {
name: "test single active broker", name: "test single active broker",
args: args{ args: args{

Loading…
Cancel
Save