diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index cd74bed33..2e0811521 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -429,85 +430,95 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era } // selectBestECDestinations selects multiple disks for EC shard placement with diversity +// Uses the consolidated placement package for proper rack/server/disk spreading func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { if len(disks) == 0 { return nil } - // Group disks by rack and DC for diversity - rackGroups := make(map[string][]*topology.DiskInfo) - for _, disk := range disks { - rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) - rackGroups[rackKey] = append(rackGroups[rackKey], disk) + // Convert topology.DiskInfo to placement.DiskCandidate + candidates := diskInfosToCandidates(disks) + if len(candidates) == 0 { + return nil } - var selected []*topology.DiskInfo - usedRacks := make(map[string]bool) - - // First pass: select one disk from each rack for maximum diversity - for rackKey, rackDisks := range rackGroups { - if len(selected) >= shardsNeeded { - break - } - - // Select best disk from this rack - bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC) - if bestDisk != nil { - selected = append(selected, bestDisk) - usedRacks[rackKey] = true - } + // Configure placement for EC shards + config := placement.PlacementConfig{ + ShardsNeeded: shardsNeeded, + MaxShardsPerServer: 0, // No hard limit, but prefer spreading + MaxShardsPerRack: 0, // No hard limit, but prefer spreading + MaxTaskLoad: topology.MaxTaskLoadForECPlacement, + PreferDifferentServers: true, + PreferDifferentRacks: true, } - // Second pass: if we need more disks, select from racks we've already used - if len(selected) < shardsNeeded { - for _, disk := range disks { - if len(selected) >= shardsNeeded { - break - } - - // Skip if already selected - alreadySelected := false - for _, sel := range selected { - if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID { - alreadySelected = true - break - } - } - - if !alreadySelected && isDiskSuitableForEC(disk) { - selected = append(selected, disk) - } - } - } - - return selected -} - -// selectBestFromRack selects the best disk from a rack for EC placement -func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo { - if len(disks) == 0 { + // Use the shared placement algorithm + result, err := placement.SelectDestinations(candidates, config) + if err != nil { + glog.V(2).Infof("EC placement failed: %v", err) return nil } - var bestDisk *topology.DiskInfo - bestScore := -1.0 + // Convert back to topology.DiskInfo + return candidatesToDiskInfos(result.SelectedDisks, disks) +} +// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice +func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate { + var candidates []*placement.DiskCandidate for _, disk := range disks { - if !isDiskSuitableForEC(disk) { + if disk.DiskInfo == nil { continue } - score := calculateECScore(disk, sourceRack, sourceDC) - if score > bestScore { - bestScore = score - bestDisk = disk + // Calculate free slots (using default max if not set) + freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount) + if freeSlots < 0 { + freeSlots = 0 + } + + // Calculate EC shard count from EcShardInfos slice + ecShardCount := 0 + if disk.DiskInfo.EcShardInfos != nil { + ecShardCount = len(disk.DiskInfo.EcShardInfos) } + + candidates = append(candidates, &placement.DiskCandidate{ + NodeID: disk.NodeID, + DiskID: disk.DiskID, + DataCenter: disk.DataCenter, + Rack: disk.Rack, + VolumeCount: disk.DiskInfo.VolumeCount, + MaxVolumeCount: disk.DiskInfo.MaxVolumeCount, + ShardCount: ecShardCount, + FreeSlots: freeSlots, + LoadCount: disk.LoadCount, + }) + } + return candidates +} + +// candidatesToDiskInfos converts placement results back to topology.DiskInfo +func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo { + // Create a map for quick lookup + diskMap := make(map[string]*topology.DiskInfo) + for _, disk := range originalDisks { + key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) + diskMap[key] = disk } - return bestDisk + var result []*topology.DiskInfo + for _, candidate := range candidates { + key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID) + if disk, ok := diskMap[key]; ok { + result = append(result, disk) + } + } + return result } // calculateECScore calculates placement score for EC operations +// Used for logging and plan metadata func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 { if disk.DiskInfo == nil { return 0.0 @@ -524,14 +535,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa // Consider current load (secondary factor) score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load - // Note: We don't penalize placing shards on the same rack/DC as source - // since the original volume will be deleted after EC conversion. - // This allows for better network efficiency and storage utilization. - return score } // isDiskSuitableForEC checks if a disk is suitable for EC placement +// Note: This is kept for backward compatibility but the placement package +// handles filtering internally func isDiskSuitableForEC(disk *topology.DiskInfo) bool { if disk.DiskInfo == nil { return false