diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 1dd1ef9d8..39d91bef3 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -27,13 +27,8 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p assignments = append(assignments, assignment) } - // pick the brokers - pickedBrokers := pickBrokers(brokers, partitionCount) + EnsureAssignmentsToActiveBrokers(brokers, 1, assignments) - // assign the partitions to brokers - for i, assignment := range assignments { - assignment.LeaderBroker = pickedBrokers[i] - } glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments) return } @@ -91,6 +86,8 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, // EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) { + glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments) + candidates := make([]string, 0, activeBrokers.Count()) for brokerStatsItem := range activeBrokers.IterBuffered() { candidates = append(candidates, brokerStatsItem.Key) @@ -122,9 +119,11 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers) i := 0 if assignment.LeaderBroker == "" { - assignment.LeaderBroker = pickedBrokers[i] - i++ - hasChanges = true + if i < len(pickedBrokers) { + assignment.LeaderBroker = pickedBrokers[i] + i++ + hasChanges = true + } } hasEmptyFollowers := false @@ -158,5 +157,6 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * } + glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges) return } diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index b585219dc..5f6342e99 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -209,6 +209,20 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { }, hasChanges: false, }, + { + name: "test low active brokers with one follower", + args: args{ + activeBrokers: lowActiveBrokers, + followerCount: 1, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, + }, + }, + }, + hasChanges: true, + }, { name: "test single active broker", args: args{