package erasure_coding import ( "fmt" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) // Detection implements the detection logic for erasure coding tasks func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { if !config.IsEnabled() { return nil, nil } ecConfig := config.(*Config) var results []*types.TaskDetectionResult now := time.Now() quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum debugCount := 0 skippedAlreadyEC := 0 skippedTooSmall := 0 skippedCollectionFilter := 0 skippedQuietTime := 0 skippedFullness := 0 for _, metric := range metrics { // Skip if already EC volume if metric.IsECVolume { skippedAlreadyEC++ continue } // Check minimum size requirement if metric.Size < minSizeBytes { skippedTooSmall++ continue } // Check collection filter if specified if ecConfig.CollectionFilter != "" { // Parse comma-separated collections allowedCollections := make(map[string]bool) for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { allowedCollections[strings.TrimSpace(collection)] = true } // Skip if volume's collection is not in the allowed list if !allowedCollections[metric.Collection] { skippedCollectionFilter++ continue } } // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { // Generate task ID for ActiveTopology integration taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix()) result := &types.TaskDetectionResult{ TaskID: taskID, // Link to ActiveTopology pending task TaskType: types.TaskTypeErasureCoding, VolumeID: metric.VolumeID, Server: metric.Server, Collection: metric.Collection, Priority: types.TaskPriorityLow, // EC is not urgent Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)", metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB), ScheduleAt: now, } // Plan EC destinations if ActiveTopology is available if clusterInfo.ActiveTopology != nil { multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig) if err != nil { glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) continue // Skip this volume if destination planning fails } // Calculate expected shard size for EC operation // Each data shard will be approximately volumeSize / dataShards expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount) // Add pending EC shard task to ActiveTopology for capacity management // Extract shard destinations from multiPlan var shardDestinations []string var shardDiskIDs []uint32 for _, plan := range multiPlan.Plans { shardDestinations = append(shardDestinations, plan.TargetNode) shardDiskIDs = append(shardDiskIDs, plan.TargetDisk) } // Find all volume replica locations (server + disk) from topology replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) if len(replicaLocations) == 0 { glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID) continue } // Find existing EC shards from previous failed attempts existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) // Combine volume replicas and existing EC shards for cleanup var allSourceLocations []topology.TaskSourceLocation // Add volume replicas (will free volume slots) for _, replica := range replicaLocations { allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ ServerID: replica.ServerID, DiskID: replica.DiskID, CleanupType: topology.CleanupVolumeReplica, }) } // Add existing EC shards (will free shard slots) duplicateCheck := make(map[string]bool) for _, replica := range replicaLocations { key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID) duplicateCheck[key] = true } for _, shard := range existingECShards { key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID) if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ ServerID: shard.ServerID, DiskID: shard.DiskID, CleanupType: topology.CleanupECShards, }) duplicateCheck[key] = true } } glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)", len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations)) // Convert TaskSourceLocation to TaskSourceSpec sources := make([]topology.TaskSourceSpec, len(allSourceLocations)) for i, srcLoc := range allSourceLocations { sources[i] = topology.TaskSourceSpec{ ServerID: srcLoc.ServerID, DiskID: srcLoc.DiskID, CleanupType: srcLoc.CleanupType, } } // Convert shard destinations to TaskDestinationSpec destinations := make([]topology.TaskDestinationSpec, len(shardDestinations)) shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination shardSize := int64(expectedShardSize) for i, dest := range shardDestinations { destinations[i] = topology.TaskDestinationSpec{ ServerID: dest, DiskID: shardDiskIDs[i], StorageImpact: &shardImpact, EstimatedSize: &shardSize, } } err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ TaskID: taskID, TaskType: topology.TaskTypeErasureCoding, VolumeID: metric.VolumeID, VolumeSize: int64(metric.Size), Sources: sources, Destinations: destinations, }) if err != nil { glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err) continue // Skip this volume if topology task addition fails } glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans)) // Find all volume replicas from topology (for legacy worker compatibility) var replicas []string serverSet := make(map[string]struct{}) for _, loc := range replicaLocations { if _, found := serverSet[loc.ServerID]; !found { replicas = append(replicas, loc.ServerID) serverSet[loc.ServerID] = struct{}{} } } glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) // Create typed parameters with EC destination information and replicas result.TypedParams = &worker_pb.TaskParams{ TaskId: taskID, // Link to ActiveTopology pending task VolumeId: metric.VolumeID, Server: metric.Server, Collection: metric.Collection, VolumeSize: metric.Size, // Store original volume size for tracking changes Replicas: replicas, // Include all volume replicas for deletion TaskParams: &worker_pb.TaskParams_ErasureCodingParams{ ErasureCodingParams: createECTaskParams(multiPlan), }, } glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs", metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs) } else { glog.Warningf("No ActiveTopology available for destination planning in EC detection") continue // Skip this volume if no topology available } results = append(results, result) } else { // Count debug reasons if debugCount < 5 { // Limit to avoid spam if metric.Age < quietThreshold { skippedQuietTime++ } if metric.FullnessRatio < ecConfig.FullnessRatio { skippedFullness++ } } debugCount++ } } // Log debug summary if no tasks were created if len(results) == 0 && len(metrics) > 0 { totalVolumes := len(metrics) glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness) // Show details for first few volumes for i, metric := range metrics { if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes continue } sizeMB := float64(metric.Size) / (1024 * 1024) glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)", metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute), metric.FullnessRatio*100, ecConfig.FullnessRatio*100) } } return results, nil } // planECDestinations plans the destinations for erasure coding operation // This function implements EC destination planning logic directly in the detection phase func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { // Calculate expected shard size for EC operation expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount) // Get source node information from topology var sourceRack, sourceDC string // Extract rack and DC from topology info topologyInfo := activeTopology.GetTopologyInfo() if topologyInfo != nil { for _, dc := range topologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, dataNodeInfo := range rack.DataNodeInfos { if dataNodeInfo.Id == metric.Server { sourceDC = dc.Id sourceRack = rack.Id break } } if sourceRack != "" { break } } if sourceDC != "" { break } } } // Get available disks for EC placement with effective capacity consideration (includes pending tasks) // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1 // For EC, we need at least 1 available volume slot on a disk to consider it for placement. availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1) if len(availableDisks) < erasure_coding.MinTotalDisks { return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks)) } // Select best disks for EC placement with rack/DC diversity selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount) if len(selectedDisks) < erasure_coding.MinTotalDisks { return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks) } var plans []*topology.DestinationPlan rackCount := make(map[string]int) dcCount := make(map[string]int) for _, disk := range selectedDisks { plan := &topology.DestinationPlan{ TargetNode: disk.NodeID, TargetDisk: disk.DiskID, TargetRack: disk.Rack, TargetDC: disk.DataCenter, ExpectedSize: expectedShardSize, // Set calculated EC shard size PlacementScore: calculateECScore(disk, sourceRack, sourceDC), Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), } plans = append(plans, plan) // Count rack and DC diversity rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) rackCount[rackKey]++ dcCount[disk.DataCenter]++ } // Log capacity utilization information using ActiveTopology's encapsulated logic totalEffectiveCapacity := int64(0) for _, plan := range plans { effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk) totalEffectiveCapacity += effectiveCapacity } glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots", metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity) // Log storage impact for EC task (source only - EC has multiple targets handled individually) sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size)) glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d", sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size) glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact") return &topology.MultiDestinationPlan{ Plans: plans, TotalShards: len(plans), SuccessfulRack: len(rackCount), SuccessfulDCs: len(dcCount), }, nil } // createECTaskParams creates EC task parameters from the multi-destination plan func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams { var destinations []*worker_pb.ECDestination for _, plan := range multiPlan.Plans { destination := &worker_pb.ECDestination{ Node: plan.TargetNode, DiskId: plan.TargetDisk, Rack: plan.TargetRack, DataCenter: plan.TargetDC, PlacementScore: plan.PlacementScore, } destinations = append(destinations, destination) } // Collect placement conflicts from all destinations var placementConflicts []string for _, plan := range multiPlan.Plans { placementConflicts = append(placementConflicts, plan.Conflicts...) } return &worker_pb.ErasureCodingTaskParams{ Destinations: destinations, DataShards: erasure_coding.DataShardsCount, // Standard data shards ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards PlacementConflicts: placementConflicts, } } // selectBestECDestinations selects multiple disks for EC shard placement with diversity func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { if len(disks) == 0 { return nil } // Group disks by rack and DC for diversity rackGroups := make(map[string][]*topology.DiskInfo) for _, disk := range disks { rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) rackGroups[rackKey] = append(rackGroups[rackKey], disk) } var selected []*topology.DiskInfo usedRacks := make(map[string]bool) // First pass: select one disk from each rack for maximum diversity for rackKey, rackDisks := range rackGroups { if len(selected) >= shardsNeeded { break } // Select best disk from this rack bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC) if bestDisk != nil { selected = append(selected, bestDisk) usedRacks[rackKey] = true } } // Second pass: if we need more disks, select from racks we've already used if len(selected) < shardsNeeded { for _, disk := range disks { if len(selected) >= shardsNeeded { break } // Skip if already selected alreadySelected := false for _, sel := range selected { if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID { alreadySelected = true break } } if !alreadySelected && isDiskSuitableForEC(disk) { selected = append(selected, disk) } } } return selected } // selectBestFromRack selects the best disk from a rack for EC placement func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo { if len(disks) == 0 { return nil } var bestDisk *topology.DiskInfo bestScore := -1.0 for _, disk := range disks { if !isDiskSuitableForEC(disk) { continue } score := calculateECScore(disk, sourceRack, sourceDC) if score > bestScore { bestScore = score bestDisk = disk } } return bestDisk } // calculateECScore calculates placement score for EC operations func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 { if disk.DiskInfo == nil { return 0.0 } score := 0.0 // Prefer disks with available capacity if disk.DiskInfo.MaxVolumeCount > 0 { utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity } // Prefer different racks for better distribution if disk.Rack != sourceRack { score += 30.0 } // Prefer different data centers for better distribution if disk.DataCenter != sourceDC { score += 20.0 } // Consider current load score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load return score } // isDiskSuitableForEC checks if a disk is suitable for EC placement func isDiskSuitableForEC(disk *topology.DiskInfo) bool { if disk.DiskInfo == nil { return false } // Check if disk is not overloaded with tasks if disk.LoadCount > topology.MaxTaskLoadForECPlacement { return false } return true } // checkECPlacementConflicts checks for placement rule conflicts in EC operations func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string { var conflicts []string // For EC, being on the same rack as source is often acceptable // but we note it as potential conflict for monitoring if disk.Rack == sourceRack && disk.DataCenter == sourceDC { conflicts = append(conflicts, "same_rack_as_source") } return conflicts } // findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume // Uses O(1) indexed lookup for optimal performance on large clusters. func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { if activeTopology == nil { return nil } return activeTopology.GetVolumeLocations(volumeID, collection) } // findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts) // Uses O(1) indexed lookup for optimal performance on large clusters. func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { if activeTopology == nil { return nil } return activeTopology.GetECShardLocations(volumeID, collection) } // findVolumeReplicas finds all servers that have replicas of the specified volume func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string { if activeTopology == nil { return []string{} } topologyInfo := activeTopology.GetTopologyInfo() if topologyInfo == nil { return []string{} } var replicaServers []string // Iterate through all nodes to find volume replicas for _, dc := range topologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, nodeInfo := range rack.DataNodeInfos { for _, diskInfo := range nodeInfo.DiskInfos { for _, volumeInfo := range diskInfo.VolumeInfos { if volumeInfo.Id == volumeID && volumeInfo.Collection == collection { replicaServers = append(replicaServers, nodeInfo.Id) break // Found volume on this node, move to next node } } } } } } return replicaServers }