You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
422 lines
13 KiB
422 lines
13 KiB
// Package placement provides consolidated EC shard placement logic used by
|
|
// both shell commands and worker tasks.
|
|
//
|
|
// This package encapsulates the algorithms for:
|
|
// - Selecting destination nodes/disks for EC shards
|
|
// - Ensuring proper spread across racks, servers, and disks
|
|
// - Balancing shards across the cluster
|
|
package placement
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
)
|
|
|
|
// DiskCandidate represents a disk that can receive EC shards
|
|
type DiskCandidate struct {
|
|
NodeID string
|
|
DiskID uint32
|
|
DataCenter string
|
|
Rack string
|
|
|
|
// Capacity information
|
|
VolumeCount int64
|
|
MaxVolumeCount int64
|
|
ShardCount int // Current number of EC shards on this disk
|
|
FreeSlots int // Available slots for new shards
|
|
|
|
// Load information
|
|
LoadCount int // Number of active tasks on this disk
|
|
}
|
|
|
|
// NodeCandidate represents a server node that can receive EC shards
|
|
type NodeCandidate struct {
|
|
NodeID string
|
|
DataCenter string
|
|
Rack string
|
|
FreeSlots int
|
|
ShardCount int // Total shards across all disks
|
|
Disks []*DiskCandidate // All disks on this node
|
|
}
|
|
|
|
// PlacementConfig configures EC shard placement behavior
|
|
type PlacementConfig struct {
|
|
// ShardsNeeded is the total number of shards to place
|
|
ShardsNeeded int
|
|
|
|
// MaxShardsPerServer limits how many shards can be placed on a single server
|
|
// 0 means no limit (but prefer spreading when possible)
|
|
MaxShardsPerServer int
|
|
|
|
// MaxShardsPerRack limits how many shards can be placed in a single rack
|
|
// 0 means no limit
|
|
MaxShardsPerRack int
|
|
|
|
// MaxTaskLoad is the maximum task load count for a disk to be considered
|
|
MaxTaskLoad int
|
|
|
|
// PreferDifferentServers when true, spreads shards across different servers
|
|
// before using multiple disks on the same server
|
|
PreferDifferentServers bool
|
|
|
|
// PreferDifferentRacks when true, spreads shards across different racks
|
|
// before using multiple servers in the same rack
|
|
PreferDifferentRacks bool
|
|
}
|
|
|
|
// DefaultConfig returns the default placement configuration
|
|
func DefaultConfig() PlacementConfig {
|
|
return PlacementConfig{
|
|
ShardsNeeded: 14,
|
|
MaxShardsPerServer: 0,
|
|
MaxShardsPerRack: 0,
|
|
MaxTaskLoad: 5,
|
|
PreferDifferentServers: true,
|
|
PreferDifferentRacks: true,
|
|
}
|
|
}
|
|
|
|
// PlacementResult contains the selected destinations for EC shards
|
|
type PlacementResult struct {
|
|
SelectedDisks []*DiskCandidate
|
|
|
|
// Statistics
|
|
ServersUsed int
|
|
RacksUsed int
|
|
DCsUsed int
|
|
|
|
// Distribution maps
|
|
ShardsPerServer map[string]int
|
|
ShardsPerRack map[string]int
|
|
ShardsPerDC map[string]int
|
|
}
|
|
|
|
// SelectDestinations selects the best disks for EC shard placement.
|
|
// This is the main entry point for EC placement logic.
|
|
//
|
|
// The algorithm works in multiple passes:
|
|
// 1. First pass: Select one disk from each rack (maximize rack diversity)
|
|
// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity)
|
|
// 3. Third pass: Select additional disks from servers already used (maximize disk diversity)
|
|
func SelectDestinations(disks []*DiskCandidate, config PlacementConfig) (*PlacementResult, error) {
|
|
if len(disks) == 0 {
|
|
return nil, fmt.Errorf("no disk candidates provided")
|
|
}
|
|
if config.ShardsNeeded <= 0 {
|
|
return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded)
|
|
}
|
|
|
|
// Filter suitable disks
|
|
suitable := filterSuitableDisks(disks, config)
|
|
if len(suitable) == 0 {
|
|
return nil, fmt.Errorf("no suitable disks found after filtering")
|
|
}
|
|
|
|
// Build indexes for efficient lookup
|
|
rackToDisks := groupDisksByRack(suitable)
|
|
serverToDisks := groupDisksByServer(suitable)
|
|
_ = serverToDisks // Used for reference
|
|
|
|
result := &PlacementResult{
|
|
SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded),
|
|
ShardsPerServer: make(map[string]int),
|
|
ShardsPerRack: make(map[string]int),
|
|
ShardsPerDC: make(map[string]int),
|
|
}
|
|
|
|
usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool
|
|
usedServers := make(map[string]bool) // nodeID -> bool
|
|
usedRacks := make(map[string]bool) // "dc:rack" -> bool
|
|
|
|
// Pass 1: Select one disk from each rack (maximize rack diversity)
|
|
if config.PreferDifferentRacks {
|
|
// Sort racks by number of available servers (ascending) to prioritize underutilized racks
|
|
sortedRacks := sortRacksByServerCount(rackToDisks)
|
|
for _, rackKey := range sortedRacks {
|
|
if len(result.SelectedDisks) >= config.ShardsNeeded {
|
|
break
|
|
}
|
|
rackDisks := rackToDisks[rackKey]
|
|
// Select best disk from this rack, preferring a new server
|
|
disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config)
|
|
if disk != nil {
|
|
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Pass 2: Select disks from unused servers in already-used racks
|
|
if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded {
|
|
for _, rackKey := range getSortedRackKeys(rackToDisks) {
|
|
if len(result.SelectedDisks) >= config.ShardsNeeded {
|
|
break
|
|
}
|
|
rackDisks := rackToDisks[rackKey]
|
|
for _, disk := range sortDisksByScore(rackDisks) {
|
|
if len(result.SelectedDisks) >= config.ShardsNeeded {
|
|
break
|
|
}
|
|
diskKey := getDiskKey(disk)
|
|
if usedDisks[diskKey] {
|
|
continue
|
|
}
|
|
// Skip if server already used (we want different servers in this pass)
|
|
if usedServers[disk.NodeID] {
|
|
continue
|
|
}
|
|
// Check server limit
|
|
if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer {
|
|
continue
|
|
}
|
|
// Check rack limit
|
|
if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
|
|
continue
|
|
}
|
|
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Pass 3: Fill remaining slots from already-used servers (different disks)
|
|
// Use round-robin across servers to balance shards evenly
|
|
if len(result.SelectedDisks) < config.ShardsNeeded {
|
|
// Group remaining disks by server
|
|
serverToRemainingDisks := make(map[string][]*DiskCandidate)
|
|
for _, disk := range suitable {
|
|
if !usedDisks[getDiskKey(disk)] {
|
|
serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk)
|
|
}
|
|
}
|
|
|
|
// Sort each server's disks by score
|
|
for serverID := range serverToRemainingDisks {
|
|
serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID])
|
|
}
|
|
|
|
// Round-robin: repeatedly select from the server with the fewest shards
|
|
for len(result.SelectedDisks) < config.ShardsNeeded {
|
|
// Find server with fewest shards that still has available disks
|
|
var bestServer string
|
|
minShards := -1
|
|
for serverID, disks := range serverToRemainingDisks {
|
|
if len(disks) == 0 {
|
|
continue
|
|
}
|
|
// Check server limit
|
|
if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer {
|
|
continue
|
|
}
|
|
shardCount := result.ShardsPerServer[serverID]
|
|
if minShards == -1 || shardCount < minShards {
|
|
minShards = shardCount
|
|
bestServer = serverID
|
|
} else if shardCount == minShards && serverID < bestServer {
|
|
// Tie-break by server name for determinism
|
|
bestServer = serverID
|
|
}
|
|
}
|
|
|
|
if bestServer == "" {
|
|
// No more servers with available disks
|
|
break
|
|
}
|
|
|
|
// Pop the best disk from this server
|
|
disks := serverToRemainingDisks[bestServer]
|
|
disk := disks[0]
|
|
serverToRemainingDisks[bestServer] = disks[1:]
|
|
|
|
// Check rack limit
|
|
if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
|
|
continue
|
|
}
|
|
|
|
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
|
|
}
|
|
}
|
|
|
|
// Calculate final statistics
|
|
result.ServersUsed = len(usedServers)
|
|
result.RacksUsed = len(usedRacks)
|
|
dcSet := make(map[string]bool)
|
|
for _, disk := range result.SelectedDisks {
|
|
dcSet[disk.DataCenter] = true
|
|
}
|
|
result.DCsUsed = len(dcSet)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// filterSuitableDisks filters disks that are suitable for EC placement
|
|
func filterSuitableDisks(disks []*DiskCandidate, config PlacementConfig) []*DiskCandidate {
|
|
var suitable []*DiskCandidate
|
|
for _, disk := range disks {
|
|
if disk.FreeSlots <= 0 {
|
|
continue
|
|
}
|
|
if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad {
|
|
continue
|
|
}
|
|
suitable = append(suitable, disk)
|
|
}
|
|
return suitable
|
|
}
|
|
|
|
// groupDisksByRack groups disks by their rack (dc:rack key)
|
|
func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate {
|
|
result := make(map[string][]*DiskCandidate)
|
|
for _, disk := range disks {
|
|
key := getRackKey(disk)
|
|
result[key] = append(result[key], disk)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// groupDisksByServer groups disks by their server
|
|
func groupDisksByServer(disks []*DiskCandidate) map[string][]*DiskCandidate {
|
|
result := make(map[string][]*DiskCandidate)
|
|
for _, disk := range disks {
|
|
result[disk.NodeID] = append(result[disk.NodeID], disk)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// getRackKey returns the unique key for a rack (dc:rack)
|
|
func getRackKey(disk *DiskCandidate) string {
|
|
return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
|
}
|
|
|
|
// getDiskKey returns the unique key for a disk (nodeID:diskID)
|
|
func getDiskKey(disk *DiskCandidate) string {
|
|
return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
|
|
}
|
|
|
|
// sortRacksByServerCount returns rack keys sorted by number of servers (ascending)
|
|
func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string {
|
|
// Count unique servers per rack
|
|
rackServerCount := make(map[string]int)
|
|
for rackKey, disks := range rackToDisks {
|
|
servers := make(map[string]bool)
|
|
for _, disk := range disks {
|
|
servers[disk.NodeID] = true
|
|
}
|
|
rackServerCount[rackKey] = len(servers)
|
|
}
|
|
|
|
keys := getSortedRackKeys(rackToDisks)
|
|
sort.Slice(keys, func(i, j int) bool {
|
|
// Sort by server count (descending) to pick from racks with more options first
|
|
return rackServerCount[keys[i]] > rackServerCount[keys[j]]
|
|
})
|
|
return keys
|
|
}
|
|
|
|
// getSortedRackKeys returns rack keys in a deterministic order
|
|
func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string {
|
|
keys := make([]string, 0, len(rackToDisks))
|
|
for k := range rackToDisks {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
return keys
|
|
}
|
|
|
|
// selectBestDiskFromRack selects the best disk from a rack for EC placement
|
|
// It prefers servers that haven't been used yet
|
|
func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementConfig) *DiskCandidate {
|
|
var bestDisk *DiskCandidate
|
|
bestScore := -1.0
|
|
bestIsFromUnusedServer := false
|
|
|
|
for _, disk := range disks {
|
|
if usedDisks[getDiskKey(disk)] {
|
|
continue
|
|
}
|
|
isFromUnusedServer := !usedServers[disk.NodeID]
|
|
score := calculateDiskScore(disk)
|
|
|
|
// Prefer unused servers
|
|
if isFromUnusedServer && !bestIsFromUnusedServer {
|
|
bestDisk = disk
|
|
bestScore = score
|
|
bestIsFromUnusedServer = true
|
|
} else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore {
|
|
bestDisk = disk
|
|
bestScore = score
|
|
}
|
|
}
|
|
|
|
return bestDisk
|
|
}
|
|
|
|
// sortDisksByScore returns disks sorted by score (best first)
|
|
func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate {
|
|
sorted := make([]*DiskCandidate, len(disks))
|
|
copy(sorted, disks)
|
|
sort.Slice(sorted, func(i, j int) bool {
|
|
return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j])
|
|
})
|
|
return sorted
|
|
}
|
|
|
|
// calculateDiskScore calculates a score for a disk candidate
|
|
// Higher score is better
|
|
func calculateDiskScore(disk *DiskCandidate) float64 {
|
|
score := 0.0
|
|
|
|
// Primary factor: available capacity (lower utilization is better)
|
|
if disk.MaxVolumeCount > 0 {
|
|
utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount)
|
|
score += (1.0 - utilization) * 60.0 // Up to 60 points
|
|
} else {
|
|
score += 30.0 // Default if no max count
|
|
}
|
|
|
|
// Secondary factor: fewer shards already on this disk is better
|
|
score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points
|
|
|
|
// Tertiary factor: lower load is better
|
|
score += float64(10 - disk.LoadCount) // Up to 10 points
|
|
|
|
return score
|
|
}
|
|
|
|
// addDiskToResult adds a disk to the result and updates tracking maps
|
|
func addDiskToResult(result *PlacementResult, disk *DiskCandidate,
|
|
usedDisks, usedServers, usedRacks map[string]bool) {
|
|
diskKey := getDiskKey(disk)
|
|
rackKey := getRackKey(disk)
|
|
|
|
result.SelectedDisks = append(result.SelectedDisks, disk)
|
|
usedDisks[diskKey] = true
|
|
usedServers[disk.NodeID] = true
|
|
usedRacks[rackKey] = true
|
|
result.ShardsPerServer[disk.NodeID]++
|
|
result.ShardsPerRack[rackKey]++
|
|
result.ShardsPerDC[disk.DataCenter]++
|
|
}
|
|
|
|
// VerifySpread checks if the placement result meets diversity requirements
|
|
func VerifySpread(result *PlacementResult, minServers, minRacks int) error {
|
|
if result.ServersUsed < minServers {
|
|
return fmt.Errorf("only %d servers used, need at least %d", result.ServersUsed, minServers)
|
|
}
|
|
if result.RacksUsed < minRacks {
|
|
return fmt.Errorf("only %d racks used, need at least %d", result.RacksUsed, minRacks)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CalculateIdealDistribution returns the ideal number of shards per server
|
|
// when we have a certain number of shards and servers
|
|
func CalculateIdealDistribution(totalShards, numServers int) (min, max int) {
|
|
if numServers <= 0 {
|
|
return 0, totalShards
|
|
}
|
|
min = totalShards / numServers
|
|
max = min
|
|
if totalShards%numServers != 0 {
|
|
max = min + 1
|
|
}
|
|
return
|
|
}
|