diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index bd1ccb6dd..12d83d01a 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -89,6 +89,7 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, return pickedBrokers } +// EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) { candidates := make([]string, 0, activeBrokers.Count()) for brokerStatsItem := range activeBrokers.IterBuffered() { @@ -123,20 +124,22 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * if assignment.LeaderBroker == "" { assignment.LeaderBroker = pickedBrokers[i] i++ + hasChanges = true } j := 0 for ; j 0 } return diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index 2f298d3e5..8b0bc3603 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -80,6 +80,9 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{}) activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{}) activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{}) + lowActiveBrokers := cmap.New[*BrokerStats]() + lowActiveBrokers.SetIfAbsent("localhost:1", &BrokerStats{}) + lowActiveBrokers.SetIfAbsent("localhost:2", &BrokerStats{}) tests := []struct { name string args args @@ -187,6 +190,23 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { }, hasChanges: true, }, + { + name: "test low active brokers", + args: args{ + activeBrokers: lowActiveBrokers, + followerCount: 3, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "localhost:2", + }, + }, + }, + }, + hasChanges: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {