|
@ -1,12 +1,13 @@ |
|
|
package pub_balancer |
|
|
package pub_balancer |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
|
|
|
"math/rand/v2" |
|
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
cmap "github.com/orcaman/concurrent-map/v2" |
|
|
cmap "github.com/orcaman/concurrent-map/v2" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
"math/rand" |
|
|
|
|
|
"time" |
|
|
|
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) { |
|
|
func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) { |
|
@ -43,7 +44,7 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) |
|
|
} |
|
|
} |
|
|
pickedBrokers := make([]string, 0, count) |
|
|
pickedBrokers := make([]string, 0, count) |
|
|
for i := int32(0); i < count; i++ { |
|
|
for i := int32(0); i < count; i++ { |
|
|
p := rand.Intn(len(candidates)) |
|
|
|
|
|
|
|
|
p := rand.IntN(len(candidates)) |
|
|
pickedBrokers = append(pickedBrokers, candidates[p]) |
|
|
pickedBrokers = append(pickedBrokers, candidates[p]) |
|
|
} |
|
|
} |
|
|
return pickedBrokers |
|
|
return pickedBrokers |
|
@ -59,7 +60,7 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, |
|
|
if len(pickedBrokers) < count { |
|
|
if len(pickedBrokers) < count { |
|
|
pickedBrokers = append(pickedBrokers, broker) |
|
|
pickedBrokers = append(pickedBrokers, broker) |
|
|
} else { |
|
|
} else { |
|
|
j := rand.Intn(i + 1) |
|
|
|
|
|
|
|
|
j := rand.IntN(i + 1) |
|
|
if j < count { |
|
|
if j < count { |
|
|
pickedBrokers[j] = broker |
|
|
pickedBrokers[j] = broker |
|
|
} |
|
|
} |
|
@ -69,7 +70,7 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, |
|
|
// shuffle the picked brokers
|
|
|
// shuffle the picked brokers
|
|
|
count = len(pickedBrokers) |
|
|
count = len(pickedBrokers) |
|
|
for i := 0; i < count; i++ { |
|
|
for i := 0; i < count; i++ { |
|
|
j := rand.Intn(count) |
|
|
|
|
|
|
|
|
j := rand.IntN(count) |
|
|
pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i] |
|
|
pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|