Browse Source

Use placement package in EC detection for consistent disk-level placement

- Replace custom EC disk selection logic with shared placement package
- Convert topology DiskInfo to placement.DiskCandidate format
- Use SelectDestinations() for multi-rack/server/disk spreading
- Convert placement results back to topology DiskInfo for task creation
- Ensures EC detection uses same placement logic as shell commands
pull/7597/head
Chris Lu 3 days ago
parent
commit
01a440bc36
  1. 129
      weed/worker/tasks/erasure_coding/detection.go

129
weed/worker/tasks/erasure_coding/detection.go

@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@ -429,85 +430,95 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era
}
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
// Uses the consolidated placement package for proper rack/server/disk spreading
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
if len(disks) == 0 {
return nil
}
// Group disks by rack and DC for diversity
rackGroups := make(map[string][]*topology.DiskInfo)
for _, disk := range disks {
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
rackGroups[rackKey] = append(rackGroups[rackKey], disk)
// Convert topology.DiskInfo to placement.DiskCandidate
candidates := diskInfosToCandidates(disks)
if len(candidates) == 0 {
return nil
}
var selected []*topology.DiskInfo
usedRacks := make(map[string]bool)
// First pass: select one disk from each rack for maximum diversity
for rackKey, rackDisks := range rackGroups {
if len(selected) >= shardsNeeded {
break
}
// Select best disk from this rack
bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
if bestDisk != nil {
selected = append(selected, bestDisk)
usedRacks[rackKey] = true
}
// Configure placement for EC shards
config := placement.PlacementConfig{
ShardsNeeded: shardsNeeded,
MaxShardsPerServer: 0, // No hard limit, but prefer spreading
MaxShardsPerRack: 0, // No hard limit, but prefer spreading
MaxTaskLoad: topology.MaxTaskLoadForECPlacement,
PreferDifferentServers: true,
PreferDifferentRacks: true,
}
// Second pass: if we need more disks, select from racks we've already used
if len(selected) < shardsNeeded {
for _, disk := range disks {
if len(selected) >= shardsNeeded {
break
}
// Skip if already selected
alreadySelected := false
for _, sel := range selected {
if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
alreadySelected = true
break
}
}
if !alreadySelected && isDiskSuitableForEC(disk) {
selected = append(selected, disk)
}
}
}
return selected
}
// selectBestFromRack selects the best disk from a rack for EC placement
func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
if len(disks) == 0 {
// Use the shared placement algorithm
result, err := placement.SelectDestinations(candidates, config)
if err != nil {
glog.V(2).Infof("EC placement failed: %v", err)
return nil
}
var bestDisk *topology.DiskInfo
bestScore := -1.0
// Convert back to topology.DiskInfo
return candidatesToDiskInfos(result.SelectedDisks, disks)
}
// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice
func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate {
var candidates []*placement.DiskCandidate
for _, disk := range disks {
if !isDiskSuitableForEC(disk) {
if disk.DiskInfo == nil {
continue
}
score := calculateECScore(disk, sourceRack, sourceDC)
if score > bestScore {
bestScore = score
bestDisk = disk
// Calculate free slots (using default max if not set)
freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount)
if freeSlots < 0 {
freeSlots = 0
}
// Calculate EC shard count from EcShardInfos slice
ecShardCount := 0
if disk.DiskInfo.EcShardInfos != nil {
ecShardCount = len(disk.DiskInfo.EcShardInfos)
}
candidates = append(candidates, &placement.DiskCandidate{
NodeID: disk.NodeID,
DiskID: disk.DiskID,
DataCenter: disk.DataCenter,
Rack: disk.Rack,
VolumeCount: disk.DiskInfo.VolumeCount,
MaxVolumeCount: disk.DiskInfo.MaxVolumeCount,
ShardCount: ecShardCount,
FreeSlots: freeSlots,
LoadCount: disk.LoadCount,
})
}
return candidates
}
// candidatesToDiskInfos converts placement results back to topology.DiskInfo
func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo {
// Create a map for quick lookup
diskMap := make(map[string]*topology.DiskInfo)
for _, disk := range originalDisks {
key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
diskMap[key] = disk
}
return bestDisk
var result []*topology.DiskInfo
for _, candidate := range candidates {
key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID)
if disk, ok := diskMap[key]; ok {
result = append(result, disk)
}
}
return result
}
// calculateECScore calculates placement score for EC operations
// Used for logging and plan metadata
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
if disk.DiskInfo == nil {
return 0.0
@ -524,14 +535,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
// Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
// Note: We don't penalize placing shards on the same rack/DC as source
// since the original volume will be deleted after EC conversion.
// This allows for better network efficiency and storage utilization.
return score
}
// isDiskSuitableForEC checks if a disk is suitable for EC placement
// Note: This is kept for backward compatibility but the placement package
// handles filtering internally
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
if disk.DiskInfo == nil {
return false

Loading…
Cancel
Save