|
|
|
@ -786,40 +786,6 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
|
|
|
|
// Uses the consolidated placement package for proper rack/server/disk spreading
|
|
|
|
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { |
|
|
|
if len(disks) == 0 { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Convert topology.DiskInfo to placement.DiskCandidate
|
|
|
|
candidates := diskInfosToCandidates(disks) |
|
|
|
if len(candidates) == 0 { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Configure placement for EC shards
|
|
|
|
config := placement.PlacementRequest{ |
|
|
|
ShardsNeeded: shardsNeeded, |
|
|
|
MaxShardsPerServer: 0, // No hard limit, but prefer spreading
|
|
|
|
MaxShardsPerRack: 0, // No hard limit, but prefer spreading
|
|
|
|
MaxTaskLoad: topology.MaxTaskLoadForECPlacement, |
|
|
|
PreferDifferentServers: true, |
|
|
|
PreferDifferentRacks: true, |
|
|
|
} |
|
|
|
|
|
|
|
// Use the shared placement algorithm
|
|
|
|
result, err := placement.SelectDestinations(candidates, config) |
|
|
|
if err != nil { |
|
|
|
glog.V(2).Infof("EC placement failed: %v", err) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Convert back to topology.DiskInfo
|
|
|
|
return candidatesToDiskInfos(result.SelectedDisks, disks) |
|
|
|
} |
|
|
|
|
|
|
|
// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice
|
|
|
|
func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate { |
|
|
|
var candidates []*placement.DiskCandidate |
|
|
|
@ -860,25 +826,6 @@ func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidat |
|
|
|
return candidates |
|
|
|
} |
|
|
|
|
|
|
|
// candidatesToDiskInfos converts placement results back to topology.DiskInfo
|
|
|
|
func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo { |
|
|
|
// Create a map for quick lookup
|
|
|
|
diskMap := make(map[string]*topology.DiskInfo) |
|
|
|
for _, disk := range originalDisks { |
|
|
|
key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) |
|
|
|
diskMap[key] = disk |
|
|
|
} |
|
|
|
|
|
|
|
var result []*topology.DiskInfo |
|
|
|
for _, candidate := range candidates { |
|
|
|
key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID) |
|
|
|
if disk, ok := diskMap[key]; ok { |
|
|
|
result = append(result, disk) |
|
|
|
} |
|
|
|
} |
|
|
|
return result |
|
|
|
} |
|
|
|
|
|
|
|
// calculateECScoreCandidate calculates placement score for EC operations.
|
|
|
|
// Used for logging and plan metadata.
|
|
|
|
func calculateECScoreCandidate(disk *placement.DiskCandidate, sourceRack, sourceDC string) float64 { |
|
|
|
@ -900,22 +847,6 @@ func calculateECScoreCandidate(disk *placement.DiskCandidate, sourceRack, source |
|
|
|
return score |
|
|
|
} |
|
|
|
|
|
|
|
// isDiskSuitableForEC checks if a disk is suitable for EC placement
|
|
|
|
// Note: This is kept for backward compatibility but the placement package
|
|
|
|
// handles filtering internally
|
|
|
|
func isDiskSuitableForEC(disk *topology.DiskInfo) bool { |
|
|
|
if disk.DiskInfo == nil { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// Check if disk is not overloaded with tasks
|
|
|
|
if disk.LoadCount > topology.MaxTaskLoadForECPlacement { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
|
|
|
|
// Uses O(1) indexed lookup for optimal performance on large clusters.
|
|
|
|
func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { |
|
|
|
@ -933,35 +864,3 @@ func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint |
|
|
|
} |
|
|
|
return activeTopology.GetECShardLocations(volumeID, collection) |
|
|
|
} |
|
|
|
|
|
|
|
// findVolumeReplicas finds all servers that have replicas of the specified volume
|
|
|
|
func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string { |
|
|
|
if activeTopology == nil { |
|
|
|
return []string{} |
|
|
|
} |
|
|
|
|
|
|
|
topologyInfo := activeTopology.GetTopologyInfo() |
|
|
|
if topologyInfo == nil { |
|
|
|
return []string{} |
|
|
|
} |
|
|
|
|
|
|
|
var replicaServers []string |
|
|
|
|
|
|
|
// Iterate through all nodes to find volume replicas
|
|
|
|
for _, dc := range topologyInfo.DataCenterInfos { |
|
|
|
for _, rack := range dc.RackInfos { |
|
|
|
for _, nodeInfo := range rack.DataNodeInfos { |
|
|
|
for _, diskInfo := range nodeInfo.DiskInfos { |
|
|
|
for _, volumeInfo := range diskInfo.VolumeInfos { |
|
|
|
if volumeInfo.Id == volumeID && volumeInfo.Collection == collection { |
|
|
|
replicaServers = append(replicaServers, nodeInfo.Id) |
|
|
|
break // Found volume on this node, move to next node
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return replicaServers |
|
|
|
} |