You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							596 lines
						
					
					
						
							22 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							596 lines
						
					
					
						
							22 KiB
						
					
					
				
								package erasure_coding
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"fmt"
							 | 
						|
									"strings"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/admin/topology"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/worker/types"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// Detection implements the detection logic for erasure coding tasks
							 | 
						|
								func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
							 | 
						|
									if !config.IsEnabled() {
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									ecConfig := config.(*Config)
							 | 
						|
									var results []*types.TaskDetectionResult
							 | 
						|
									now := time.Now()
							 | 
						|
									quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
							 | 
						|
									minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum
							 | 
						|
								
							 | 
						|
									debugCount := 0
							 | 
						|
									skippedAlreadyEC := 0
							 | 
						|
									skippedTooSmall := 0
							 | 
						|
									skippedCollectionFilter := 0
							 | 
						|
									skippedQuietTime := 0
							 | 
						|
									skippedFullness := 0
							 | 
						|
								
							 | 
						|
									for _, metric := range metrics {
							 | 
						|
										// Skip if already EC volume
							 | 
						|
										if metric.IsECVolume {
							 | 
						|
											skippedAlreadyEC++
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Check minimum size requirement
							 | 
						|
										if metric.Size < minSizeBytes {
							 | 
						|
											skippedTooSmall++
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Check collection filter if specified
							 | 
						|
										if ecConfig.CollectionFilter != "" {
							 | 
						|
											// Parse comma-separated collections
							 | 
						|
											allowedCollections := make(map[string]bool)
							 | 
						|
											for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
							 | 
						|
												allowedCollections[strings.TrimSpace(collection)] = true
							 | 
						|
											}
							 | 
						|
											// Skip if volume's collection is not in the allowed list
							 | 
						|
											if !allowedCollections[metric.Collection] {
							 | 
						|
												skippedCollectionFilter++
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Check quiet duration and fullness criteria
							 | 
						|
										if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
							 | 
						|
											glog.Infof("EC Detection: Volume %d meets all criteria, attempting to create task", metric.VolumeID)
							 | 
						|
								
							 | 
						|
											// Generate task ID for ActiveTopology integration
							 | 
						|
											taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
							 | 
						|
								
							 | 
						|
											result := &types.TaskDetectionResult{
							 | 
						|
												TaskID:     taskID, // Link to ActiveTopology pending task
							 | 
						|
												TaskType:   types.TaskTypeErasureCoding,
							 | 
						|
												VolumeID:   metric.VolumeID,
							 | 
						|
												Server:     metric.Server,
							 | 
						|
												Collection: metric.Collection,
							 | 
						|
												Priority:   types.TaskPriorityLow, // EC is not urgent
							 | 
						|
												Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
							 | 
						|
													metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
							 | 
						|
													float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB),
							 | 
						|
												ScheduleAt: now,
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Plan EC destinations if ActiveTopology is available
							 | 
						|
											if clusterInfo.ActiveTopology != nil {
							 | 
						|
												glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID)
							 | 
						|
												multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
							 | 
						|
												if err != nil {
							 | 
						|
													glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
							 | 
						|
													continue // Skip this volume if destination planning fails
							 | 
						|
												}
							 | 
						|
												glog.Infof("EC Detection: Successfully planned %d destinations for volume %d", len(multiPlan.Plans), metric.VolumeID)
							 | 
						|
								
							 | 
						|
												// Calculate expected shard size for EC operation
							 | 
						|
												// Each data shard will be approximately volumeSize / dataShards
							 | 
						|
												expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
							 | 
						|
								
							 | 
						|
												// Add pending EC shard task to ActiveTopology for capacity management
							 | 
						|
								
							 | 
						|
												// Extract shard destinations from multiPlan
							 | 
						|
												var shardDestinations []string
							 | 
						|
												var shardDiskIDs []uint32
							 | 
						|
												for _, plan := range multiPlan.Plans {
							 | 
						|
													shardDestinations = append(shardDestinations, plan.TargetNode)
							 | 
						|
													shardDiskIDs = append(shardDiskIDs, plan.TargetDisk)
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												// Find all volume replica locations (server + disk) from topology
							 | 
						|
												glog.Infof("EC Detection: Looking for replica locations for volume %d", metric.VolumeID)
							 | 
						|
												replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
							 | 
						|
												if len(replicaLocations) == 0 {
							 | 
						|
													glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
							 | 
						|
													continue
							 | 
						|
												}
							 | 
						|
												glog.Infof("EC Detection: Found %d replica locations for volume %d", len(replicaLocations), metric.VolumeID)
							 | 
						|
								
							 | 
						|
												// Find existing EC shards from previous failed attempts
							 | 
						|
												existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
							 | 
						|
								
							 | 
						|
												// Combine volume replicas and existing EC shards for cleanup
							 | 
						|
												var sources []topology.TaskSourceSpec
							 | 
						|
								
							 | 
						|
												// Add volume replicas (will free volume slots)
							 | 
						|
												for _, replica := range replicaLocations {
							 | 
						|
													sources = append(sources, topology.TaskSourceSpec{
							 | 
						|
														ServerID:    replica.ServerID,
							 | 
						|
														DiskID:      replica.DiskID,
							 | 
						|
														DataCenter:  replica.DataCenter,
							 | 
						|
														Rack:        replica.Rack,
							 | 
						|
														CleanupType: topology.CleanupVolumeReplica,
							 | 
						|
													})
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												// Add existing EC shards (will free shard slots)
							 | 
						|
												duplicateCheck := make(map[string]bool)
							 | 
						|
												for _, replica := range replicaLocations {
							 | 
						|
													key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID)
							 | 
						|
													duplicateCheck[key] = true
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												for _, shard := range existingECShards {
							 | 
						|
													key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
							 | 
						|
													if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
							 | 
						|
														sources = append(sources, topology.TaskSourceSpec{
							 | 
						|
															ServerID:    shard.ServerID,
							 | 
						|
															DiskID:      shard.DiskID,
							 | 
						|
															DataCenter:  shard.DataCenter,
							 | 
						|
															Rack:        shard.Rack,
							 | 
						|
															CleanupType: topology.CleanupECShards,
							 | 
						|
														})
							 | 
						|
														duplicateCheck[key] = true
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
							 | 
						|
													len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources))
							 | 
						|
								
							 | 
						|
												// Convert shard destinations to TaskDestinationSpec
							 | 
						|
												destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
							 | 
						|
												shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination
							 | 
						|
												shardSize := int64(expectedShardSize)
							 | 
						|
												for i, dest := range shardDestinations {
							 | 
						|
													destinations[i] = topology.TaskDestinationSpec{
							 | 
						|
														ServerID:      dest,
							 | 
						|
														DiskID:        shardDiskIDs[i],
							 | 
						|
														StorageImpact: &shardImpact,
							 | 
						|
														EstimatedSize: &shardSize,
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
							 | 
						|
													TaskID:       taskID,
							 | 
						|
													TaskType:     topology.TaskTypeErasureCoding,
							 | 
						|
													VolumeID:     metric.VolumeID,
							 | 
						|
													VolumeSize:   int64(metric.Size),
							 | 
						|
													Sources:      sources,
							 | 
						|
													Destinations: destinations,
							 | 
						|
												})
							 | 
						|
												if err != nil {
							 | 
						|
													glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err)
							 | 
						|
													continue // Skip this volume if topology task addition fails
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
							 | 
						|
													taskID, metric.VolumeID, len(sources), len(multiPlan.Plans))
							 | 
						|
								
							 | 
						|
												// Create unified sources and targets for EC task
							 | 
						|
												result.TypedParams = &worker_pb.TaskParams{
							 | 
						|
													TaskId:     taskID, // Link to ActiveTopology pending task
							 | 
						|
													VolumeId:   metric.VolumeID,
							 | 
						|
													Collection: metric.Collection,
							 | 
						|
													VolumeSize: metric.Size, // Store original volume size for tracking changes
							 | 
						|
								
							 | 
						|
													// Unified sources - all sources that will be processed/cleaned up
							 | 
						|
													Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID),
							 | 
						|
								
							 | 
						|
													// Unified targets - all EC shard destinations
							 | 
						|
													Targets: createECTargets(multiPlan),
							 | 
						|
								
							 | 
						|
													TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
							 | 
						|
														ErasureCodingParams: createECTaskParams(multiPlan),
							 | 
						|
													},
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs",
							 | 
						|
													metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
							 | 
						|
											} else {
							 | 
						|
												glog.Warningf("No ActiveTopology available for destination planning in EC detection")
							 | 
						|
												continue // Skip this volume if no topology available
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											glog.Infof("EC Detection: Successfully created EC task for volume %d, adding to results", metric.VolumeID)
							 | 
						|
											results = append(results, result)
							 | 
						|
										} else {
							 | 
						|
											// Count debug reasons
							 | 
						|
											if debugCount < 5 { // Limit to avoid spam
							 | 
						|
												if metric.Age < quietThreshold {
							 | 
						|
													skippedQuietTime++
							 | 
						|
												}
							 | 
						|
												if metric.FullnessRatio < ecConfig.FullnessRatio {
							 | 
						|
													skippedFullness++
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
											debugCount++
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Log debug summary if no tasks were created
							 | 
						|
									if len(results) == 0 && len(metrics) > 0 {
							 | 
						|
										totalVolumes := len(metrics)
							 | 
						|
										glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
							 | 
						|
											totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
							 | 
						|
								
							 | 
						|
										// Show details for first few volumes
							 | 
						|
										for i, metric := range metrics {
							 | 
						|
											if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
											sizeMB := float64(metric.Size) / (1024 * 1024)
							 | 
						|
											glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
							 | 
						|
												metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute),
							 | 
						|
												metric.FullnessRatio*100, ecConfig.FullnessRatio*100)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return results, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// planECDestinations plans the destinations for erasure coding operation
							 | 
						|
								// This function implements EC destination planning logic directly in the detection phase
							 | 
						|
								func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
							 | 
						|
									// Calculate expected shard size for EC operation
							 | 
						|
									expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
							 | 
						|
								
							 | 
						|
									// Get source node information from topology
							 | 
						|
									var sourceRack, sourceDC string
							 | 
						|
								
							 | 
						|
									// Extract rack and DC from topology info
							 | 
						|
									topologyInfo := activeTopology.GetTopologyInfo()
							 | 
						|
									if topologyInfo != nil {
							 | 
						|
										for _, dc := range topologyInfo.DataCenterInfos {
							 | 
						|
											for _, rack := range dc.RackInfos {
							 | 
						|
												for _, dataNodeInfo := range rack.DataNodeInfos {
							 | 
						|
													if dataNodeInfo.Id == metric.Server {
							 | 
						|
														sourceDC = dc.Id
							 | 
						|
														sourceRack = rack.Id
							 | 
						|
														break
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
												if sourceRack != "" {
							 | 
						|
													break
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
											if sourceDC != "" {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Get available disks for EC placement with effective capacity consideration (includes pending tasks)
							 | 
						|
									// For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
							 | 
						|
									// For EC, we need at least 1 available volume slot on a disk to consider it for placement.
							 | 
						|
									// Note: We don't exclude the source server since the original volume will be deleted after EC conversion
							 | 
						|
									availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1)
							 | 
						|
									if len(availableDisks) < erasure_coding.MinTotalDisks {
							 | 
						|
										return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Select best disks for EC placement with rack/DC diversity
							 | 
						|
									selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount)
							 | 
						|
									if len(selectedDisks) < erasure_coding.MinTotalDisks {
							 | 
						|
										return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var plans []*topology.DestinationPlan
							 | 
						|
									rackCount := make(map[string]int)
							 | 
						|
									dcCount := make(map[string]int)
							 | 
						|
								
							 | 
						|
									for _, disk := range selectedDisks {
							 | 
						|
										plan := &topology.DestinationPlan{
							 | 
						|
											TargetNode:     disk.NodeID,
							 | 
						|
											TargetDisk:     disk.DiskID,
							 | 
						|
											TargetRack:     disk.Rack,
							 | 
						|
											TargetDC:       disk.DataCenter,
							 | 
						|
											ExpectedSize:   expectedShardSize, // Set calculated EC shard size
							 | 
						|
											PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
							 | 
						|
										}
							 | 
						|
										plans = append(plans, plan)
							 | 
						|
								
							 | 
						|
										// Count rack and DC diversity
							 | 
						|
										rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
							 | 
						|
										rackCount[rackKey]++
							 | 
						|
										dcCount[disk.DataCenter]++
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Log capacity utilization information using ActiveTopology's encapsulated logic
							 | 
						|
									totalEffectiveCapacity := int64(0)
							 | 
						|
									for _, plan := range plans {
							 | 
						|
										effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk)
							 | 
						|
										totalEffectiveCapacity += effectiveCapacity
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots",
							 | 
						|
										metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
							 | 
						|
								
							 | 
						|
									// Log storage impact for EC task (source only - EC has multiple targets handled individually)
							 | 
						|
									sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
							 | 
						|
									glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
							 | 
						|
										sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
							 | 
						|
									glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")
							 | 
						|
								
							 | 
						|
									return &topology.MultiDestinationPlan{
							 | 
						|
										Plans:          plans,
							 | 
						|
										TotalShards:    len(plans),
							 | 
						|
										SuccessfulRack: len(rackCount),
							 | 
						|
										SuccessfulDCs:  len(dcCount),
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createECTargets creates unified TaskTarget structures from the multi-destination plan
							 | 
						|
								// with proper shard ID assignment during planning phase
							 | 
						|
								func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.TaskTarget {
							 | 
						|
									var targets []*worker_pb.TaskTarget
							 | 
						|
									numTargets := len(multiPlan.Plans)
							 | 
						|
								
							 | 
						|
									// Create shard assignment arrays for each target (round-robin distribution)
							 | 
						|
									targetShards := make([][]uint32, numTargets)
							 | 
						|
									for i := range targetShards {
							 | 
						|
										targetShards[i] = make([]uint32, 0)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Distribute shards in round-robin fashion to spread both data and parity shards
							 | 
						|
									// This ensures each target gets a mix of data shards (0-9) and parity shards (10-13)
							 | 
						|
									for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ {
							 | 
						|
										targetIndex := int(shardId) % numTargets
							 | 
						|
										targetShards[targetIndex] = append(targetShards[targetIndex], shardId)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create targets with assigned shard IDs
							 | 
						|
									for i, plan := range multiPlan.Plans {
							 | 
						|
										target := &worker_pb.TaskTarget{
							 | 
						|
											Node:          plan.TargetNode,
							 | 
						|
											DiskId:        plan.TargetDisk,
							 | 
						|
											Rack:          plan.TargetRack,
							 | 
						|
											DataCenter:    plan.TargetDC,
							 | 
						|
											ShardIds:      targetShards[i], // Round-robin assigned shards
							 | 
						|
											EstimatedSize: plan.ExpectedSize,
							 | 
						|
										}
							 | 
						|
										targets = append(targets, target)
							 | 
						|
								
							 | 
						|
										// Log shard assignment with data/parity classification
							 | 
						|
										dataShards := make([]uint32, 0)
							 | 
						|
										parityShards := make([]uint32, 0)
							 | 
						|
										for _, shardId := range targetShards[i] {
							 | 
						|
											if shardId < uint32(erasure_coding.DataShardsCount) {
							 | 
						|
												dataShards = append(dataShards, shardId)
							 | 
						|
											} else {
							 | 
						|
												parityShards = append(parityShards, shardId)
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										glog.V(2).Infof("EC planning: target %s assigned shards %v (data: %v, parity: %v)",
							 | 
						|
											plan.TargetNode, targetShards[i], dataShards, parityShards)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(1).Infof("EC planning: distributed %d shards across %d targets using round-robin (data shards 0-%d, parity shards %d-%d)",
							 | 
						|
										erasure_coding.TotalShardsCount, numTargets,
							 | 
						|
										erasure_coding.DataShardsCount-1, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount-1)
							 | 
						|
									return targets
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource
							 | 
						|
								func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource {
							 | 
						|
									var protobufSources []*worker_pb.TaskSource
							 | 
						|
								
							 | 
						|
									for _, source := range sources {
							 | 
						|
										pbSource := &worker_pb.TaskSource{
							 | 
						|
											Node:       source.ServerID,
							 | 
						|
											DiskId:     source.DiskID,
							 | 
						|
											DataCenter: source.DataCenter,
							 | 
						|
											Rack:       source.Rack,
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Convert storage impact to estimated size
							 | 
						|
										if source.EstimatedSize != nil {
							 | 
						|
											pbSource.EstimatedSize = uint64(*source.EstimatedSize)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Set appropriate volume ID or shard IDs based on cleanup type
							 | 
						|
										switch source.CleanupType {
							 | 
						|
										case topology.CleanupVolumeReplica:
							 | 
						|
											// This is a volume replica, use the actual volume ID
							 | 
						|
											pbSource.VolumeId = volumeID
							 | 
						|
										case topology.CleanupECShards:
							 | 
						|
											// This is EC shards, also use the volume ID for consistency
							 | 
						|
											pbSource.VolumeId = volumeID
							 | 
						|
											// Note: ShardIds would need to be passed separately if we need specific shard info
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										protobufSources = append(protobufSources, pbSource)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return protobufSources
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createECTaskParams creates clean EC task parameters (destinations now in unified targets)
							 | 
						|
								func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
							 | 
						|
									return &worker_pb.ErasureCodingTaskParams{
							 | 
						|
										DataShards:   erasure_coding.DataShardsCount,   // Standard data shards
							 | 
						|
										ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// selectBestECDestinations selects multiple disks for EC shard placement with diversity
							 | 
						|
								func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
							 | 
						|
									if len(disks) == 0 {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Group disks by rack and DC for diversity
							 | 
						|
									rackGroups := make(map[string][]*topology.DiskInfo)
							 | 
						|
									for _, disk := range disks {
							 | 
						|
										rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
							 | 
						|
										rackGroups[rackKey] = append(rackGroups[rackKey], disk)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var selected []*topology.DiskInfo
							 | 
						|
									usedRacks := make(map[string]bool)
							 | 
						|
								
							 | 
						|
									// First pass: select one disk from each rack for maximum diversity
							 | 
						|
									for rackKey, rackDisks := range rackGroups {
							 | 
						|
										if len(selected) >= shardsNeeded {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Select best disk from this rack
							 | 
						|
										bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
							 | 
						|
										if bestDisk != nil {
							 | 
						|
											selected = append(selected, bestDisk)
							 | 
						|
											usedRacks[rackKey] = true
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Second pass: if we need more disks, select from racks we've already used
							 | 
						|
									if len(selected) < shardsNeeded {
							 | 
						|
										for _, disk := range disks {
							 | 
						|
											if len(selected) >= shardsNeeded {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Skip if already selected
							 | 
						|
											alreadySelected := false
							 | 
						|
											for _, sel := range selected {
							 | 
						|
												if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
							 | 
						|
													alreadySelected = true
							 | 
						|
													break
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											if !alreadySelected && isDiskSuitableForEC(disk) {
							 | 
						|
												selected = append(selected, disk)
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return selected
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// selectBestFromRack selects the best disk from a rack for EC placement
							 | 
						|
								func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
							 | 
						|
									if len(disks) == 0 {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var bestDisk *topology.DiskInfo
							 | 
						|
									bestScore := -1.0
							 | 
						|
								
							 | 
						|
									for _, disk := range disks {
							 | 
						|
										if !isDiskSuitableForEC(disk) {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										score := calculateECScore(disk, sourceRack, sourceDC)
							 | 
						|
										if score > bestScore {
							 | 
						|
											bestScore = score
							 | 
						|
											bestDisk = disk
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return bestDisk
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// calculateECScore calculates placement score for EC operations
							 | 
						|
								func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
							 | 
						|
									if disk.DiskInfo == nil {
							 | 
						|
										return 0.0
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									score := 0.0
							 | 
						|
								
							 | 
						|
									// Prefer disks with available capacity (primary factor)
							 | 
						|
									if disk.DiskInfo.MaxVolumeCount > 0 {
							 | 
						|
										utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
							 | 
						|
										score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Consider current load (secondary factor)
							 | 
						|
									score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
							 | 
						|
								
							 | 
						|
									// Note: We don't penalize placing shards on the same rack/DC as source
							 | 
						|
									// since the original volume will be deleted after EC conversion.
							 | 
						|
									// This allows for better network efficiency and storage utilization.
							 | 
						|
								
							 | 
						|
									return score
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isDiskSuitableForEC checks if a disk is suitable for EC placement
							 | 
						|
								func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
							 | 
						|
									if disk.DiskInfo == nil {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check if disk is not overloaded with tasks
							 | 
						|
									if disk.LoadCount > topology.MaxTaskLoadForECPlacement {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return true
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
							 | 
						|
								// Uses O(1) indexed lookup for optimal performance on large clusters.
							 | 
						|
								func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
							 | 
						|
									if activeTopology == nil {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
									return activeTopology.GetVolumeLocations(volumeID, collection)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts)
							 | 
						|
								// Uses O(1) indexed lookup for optimal performance on large clusters.
							 | 
						|
								func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
							 | 
						|
									if activeTopology == nil {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
									return activeTopology.GetECShardLocations(volumeID, collection)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// findVolumeReplicas finds all servers that have replicas of the specified volume
							 | 
						|
								func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
							 | 
						|
									if activeTopology == nil {
							 | 
						|
										return []string{}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									topologyInfo := activeTopology.GetTopologyInfo()
							 | 
						|
									if topologyInfo == nil {
							 | 
						|
										return []string{}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var replicaServers []string
							 | 
						|
								
							 | 
						|
									// Iterate through all nodes to find volume replicas
							 | 
						|
									for _, dc := range topologyInfo.DataCenterInfos {
							 | 
						|
										for _, rack := range dc.RackInfos {
							 | 
						|
											for _, nodeInfo := range rack.DataNodeInfos {
							 | 
						|
												for _, diskInfo := range nodeInfo.DiskInfos {
							 | 
						|
													for _, volumeInfo := range diskInfo.VolumeInfos {
							 | 
						|
														if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
							 | 
						|
															replicaServers = append(replicaServers, nodeInfo.Id)
							 | 
						|
															break // Found volume on this node, move to next node
							 | 
						|
														}
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return replicaServers
							 | 
						|
								}
							 |