package ec_vacuum import ( "fmt" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" wtypes "github.com/seaweedfs/seaweedfs/weed/worker/types" ) // Detection identifies EC volumes that need vacuum operations func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo, config base.TaskConfig) ([]*wtypes.TaskDetectionResult, error) { ecVacuumConfig, ok := config.(*Config) if !ok { return nil, fmt.Errorf("invalid config type for EC vacuum detection") } if !ecVacuumConfig.Enabled { return nil, nil } glog.V(2).Infof("EC vacuum detection: checking %d volume metrics", len(metrics)) var results []*wtypes.TaskDetectionResult now := time.Now() // Get topology info for EC shard analysis if info.ActiveTopology == nil { glog.V(1).Infof("EC vacuum detection: no topology info available") return results, nil } // Collect EC volume information from metrics ecVolumeInfo := collectEcVolumeInfo(metrics, info) glog.V(2).Infof("EC vacuum detection: found %d EC volumes in metrics", len(ecVolumeInfo)) for volumeID, ecInfo := range ecVolumeInfo { // Calculate deletion ratio first for logging deletionRatio := calculateDeletionRatio(ecInfo) // Apply filters and track why volumes don't qualify if !shouldVacuumEcVolume(ecInfo, ecVacuumConfig, now) { continue } if deletionRatio < ecVacuumConfig.DeletionThreshold { glog.V(3).Infof("EC volume %d deletion ratio %.3f below threshold %.3f", volumeID, deletionRatio, ecVacuumConfig.DeletionThreshold) continue } // Generate task ID for ActiveTopology integration taskID := fmt.Sprintf("ec_vacuum_vol_%d_%d", volumeID, now.Unix()) // Register storage impact with ActiveTopology if available if info.ActiveTopology != nil { regErr := registerEcVacuumWithTopology(info.ActiveTopology, taskID, volumeID, ecInfo) if regErr != nil { glog.Warningf("Failed to register EC vacuum task with topology for volume %d: %v", volumeID, regErr) continue // Skip this volume if topology registration fails } glog.V(2).Infof("Successfully registered EC vacuum task %s with ActiveTopology for volume %d", taskID, volumeID) } // Create task sources from shard information with generation info var sources []*worker_pb.TaskSource for serverAddr, shardBits := range ecInfo.ShardNodes { shardIds := make([]uint32, 0, shardBits.ShardIdCount()) for i := 0; i < erasure_coding.TotalShardsCount; i++ { if shardBits.HasShardId(erasure_coding.ShardId(i)) { shardIds = append(shardIds, uint32(i)) } } if len(shardIds) > 0 { sources = append(sources, &worker_pb.TaskSource{ Node: string(serverAddr), VolumeId: volumeID, ShardIds: shardIds, EstimatedSize: ecInfo.Size / uint64(len(ecInfo.ShardNodes)), // Rough estimate per server Generation: ecInfo.CurrentGeneration, // Use the current generation from EcVolumeInfo }) } } // Create TypedParams for EC vacuum task typedParams := &worker_pb.TaskParams{ TaskId: taskID, // Link to ActiveTopology pending task VolumeId: volumeID, Collection: ecInfo.Collection, VolumeSize: ecInfo.Size, Sources: sources, TaskParams: &worker_pb.TaskParams_VacuumParams{ VacuumParams: &worker_pb.VacuumTaskParams{ GarbageThreshold: deletionRatio, ForceVacuum: false, BatchSize: 1000, // Default batch size WorkingDir: "/data/ec_vacuum", // Default base directory - worker may use BaseWorkingDir/ec_vacuum instead VerifyChecksum: true, // Enable checksum verification for safety }, }, } // Cleanup planning is now simplified - done during execution via master query result := &wtypes.TaskDetectionResult{ TaskID: taskID, TaskType: wtypes.TaskType("ec_vacuum"), VolumeID: volumeID, Server: ecInfo.PrimaryNode, Collection: ecInfo.Collection, Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent Reason: fmt.Sprintf("EC volume needs vacuum: deletion_ratio=%.1f%% (>%.1f%%), age=%.1fh (>%.1fh), size=%.1fMB (>%dMB)", deletionRatio*100, ecVacuumConfig.DeletionThreshold*100, ecInfo.Age.Hours(), (time.Duration(ecVacuumConfig.MinVolumeAgeSeconds) * time.Second).Hours(), float64(ecInfo.Size)/(1024*1024), ecVacuumConfig.MinSizeMB), TypedParams: typedParams, ScheduleAt: now, } // Add to topology's pending tasks for capacity management (simplified for now) if info.ActiveTopology != nil { glog.V(3).Infof("EC vacuum detection: would add pending task %s to topology for volume %d", taskID, volumeID) // Note: Simplified for now - in production would properly integrate with ActiveTopology } results = append(results, result) glog.V(1).Infof("EC vacuum detection: queued volume %d for vacuum (deletion_ratio=%.1f%%, size=%.1fMB)", volumeID, deletionRatio*100, float64(ecInfo.Size)/(1024*1024)) } glog.V(1).Infof("EC vacuum detection: found %d EC volumes needing vacuum", len(results)) // Show detailed criteria for volumes that didn't qualify (similar to erasure coding detection) if len(results) == 0 && len(ecVolumeInfo) > 0 { glog.V(1).Infof("EC vacuum detection: No tasks created for %d volumes", len(ecVolumeInfo)) // Show details for first few EC volumes count := 0 for volumeID, ecInfo := range ecVolumeInfo { if count >= 3 { // Limit to first 3 volumes to avoid spam break } deletionRatio := calculateDeletionRatio(ecInfo) sizeMB := float64(ecInfo.Size) / (1024 * 1024) deletedMB := deletionRatio * sizeMB ageRequired := time.Duration(ecVacuumConfig.MinVolumeAgeSeconds) * time.Second // Check shard availability totalShards := 0 for _, shardBits := range ecInfo.ShardNodes { totalShards += shardBits.ShardIdCount() } glog.Infof("EC VACUUM: Volume %d: deleted=%.1fMB, ratio=%.1f%% (need ≥%.1f%%), age=%s (need ≥%s), size=%.1fMB (need ≥%dMB), shards=%d (need ≥%d)", volumeID, deletedMB, deletionRatio*100, ecVacuumConfig.DeletionThreshold*100, ecInfo.Age.Truncate(time.Minute), ageRequired.Truncate(time.Minute), sizeMB, ecVacuumConfig.MinSizeMB, totalShards, erasure_coding.DataShardsCount) count++ } } return results, nil } // EcVolumeInfo contains information about an EC volume type EcVolumeInfo struct { VolumeID uint32 Collection string Size uint64 CreatedAt time.Time Age time.Duration PrimaryNode string ShardNodes map[pb.ServerAddress]erasure_coding.ShardBits DeletionInfo DeletionInfo CurrentGeneration uint32 // Current generation of EC shards AvailableGenerations []uint32 // All discovered generations for this volume } // DeletionInfo contains deletion statistics for an EC volume type DeletionInfo struct { TotalEntries int64 DeletedEntries int64 DeletionRatio float64 } // collectEcVolumeInfo extracts EC volume information from volume health metrics and topology func collectEcVolumeInfo(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo) map[uint32]*EcVolumeInfo { ecVolumes := make(map[uint32]*EcVolumeInfo) for _, metric := range metrics { // Only process EC volumes if !metric.IsECVolume { continue } // Calculate deletion ratio from health metrics deletionRatio := 0.0 if metric.Size > 0 { deletionRatio = float64(metric.DeletedBytes) / float64(metric.Size) } // Create EC volume info from metrics ecVolumes[metric.VolumeID] = &EcVolumeInfo{ VolumeID: metric.VolumeID, Collection: metric.Collection, Size: metric.Size, CreatedAt: time.Now().Add(-metric.Age), Age: metric.Age, PrimaryNode: metric.Server, ShardNodes: make(map[pb.ServerAddress]erasure_coding.ShardBits), // Will be populated if needed CurrentGeneration: 0, // Will be determined from topology AvailableGenerations: []uint32{}, // Will be populated from topology DeletionInfo: DeletionInfo{ TotalEntries: int64(metric.Size / 1024), // Rough estimate DeletedEntries: int64(metric.DeletedBytes / 1024), DeletionRatio: deletionRatio, }, } glog.V(2).Infof("EC vacuum detection: found EC volume %d, size=%dMB, deleted=%dMB, ratio=%.1f%%", metric.VolumeID, metric.Size/(1024*1024), metric.DeletedBytes/(1024*1024), deletionRatio*100) } // Populate shard information from cluster topology if info.ActiveTopology != nil { populateShardInfo(ecVolumes, info.ActiveTopology) } glog.V(1).Infof("EC vacuum detection: found %d EC volumes from %d metrics", len(ecVolumes), len(metrics)) return ecVolumes } // populateShardInfo populates the ShardNodes information from cluster topology func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topology.ActiveTopology) { if activeTopology == nil { return } // Get topology information topologyInfo := activeTopology.GetTopologyInfo() if topologyInfo == nil { return } // Iterate through topology to find EC shard information for _, dc := range topologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, node := range rack.DataNodeInfos { for _, diskInfo := range node.DiskInfos { // Check each EC shard on this disk for _, ecShardInfo := range diskInfo.EcShardInfos { volumeID := ecShardInfo.Id // Only process volumes we're tracking if ecVolumeInfo, exists := ecVolumes[volumeID]; exists { // Initialize ShardNodes map if needed if ecVolumeInfo.ShardNodes == nil { ecVolumeInfo.ShardNodes = make(map[pb.ServerAddress]erasure_coding.ShardBits) } // Track generation information generation := ecShardInfo.Generation // Update current generation (use the highest found) if generation > ecVolumeInfo.CurrentGeneration { ecVolumeInfo.CurrentGeneration = generation } // Add to available generations if not already present found := false for _, existingGen := range ecVolumeInfo.AvailableGenerations { if existingGen == generation { found = true break } } if !found { ecVolumeInfo.AvailableGenerations = append(ecVolumeInfo.AvailableGenerations, generation) } // Add shards from this node serverAddr := pb.ServerAddress(node.Id) if _, exists := ecVolumeInfo.ShardNodes[serverAddr]; !exists { ecVolumeInfo.ShardNodes[serverAddr] = erasure_coding.ShardBits(0) } // Add shards based on actual EcIndexBits, not ShardSizes length ecIndexBits := ecShardInfo.EcIndexBits actualShards := make([]int, 0) for i := 0; i < erasure_coding.TotalShardsCount; i++ { if (ecIndexBits & (1 << uint(i))) != 0 { ecVolumeInfo.ShardNodes[serverAddr] = ecVolumeInfo.ShardNodes[serverAddr].AddShardId(erasure_coding.ShardId(i)) actualShards = append(actualShards, i) } } glog.V(2).Infof("EC volume %d generation %d: found shards %v on server %s (EcIndexBits=0x%x)", volumeID, generation, actualShards, node.Id, ecIndexBits) } } } } } } // Log shard distribution summary for volumeID, ecInfo := range ecVolumes { shardDistribution := make(map[string][]int) for serverAddr, shardBits := range ecInfo.ShardNodes { shards := make([]int, 0) for i := 0; i < erasure_coding.TotalShardsCount; i++ { if shardBits.HasShardId(erasure_coding.ShardId(i)) { shards = append(shards, i) } } if len(shards) > 0 { shardDistribution[string(serverAddr)] = shards } } glog.V(1).Infof("EC volume %d: current_generation=%d, available_generations=%v, shard_distribution=%+v", volumeID, ecInfo.CurrentGeneration, ecInfo.AvailableGenerations, shardDistribution) } } // shouldVacuumEcVolume determines if an EC volume should be considered for vacuum func shouldVacuumEcVolume(ecInfo *EcVolumeInfo, config *Config, now time.Time) bool { // Check minimum age minAge := time.Duration(config.MinVolumeAgeSeconds) * time.Second if ecInfo.Age < minAge { glog.V(3).Infof("EC volume %d too young: age=%.1fh < %.1fh", ecInfo.VolumeID, ecInfo.Age.Hours(), minAge.Hours()) return false } // Check minimum size sizeMB := float64(ecInfo.Size) / (1024 * 1024) if sizeMB < float64(config.MinSizeMB) { glog.V(3).Infof("EC volume %d too small: size=%.1fMB < %dMB", ecInfo.VolumeID, sizeMB, config.MinSizeMB) return false } // Check collection filter if config.CollectionFilter != "" && !strings.Contains(ecInfo.Collection, config.CollectionFilter) { glog.V(3).Infof("EC volume %d collection %s doesn't match filter %s", ecInfo.VolumeID, ecInfo.Collection, config.CollectionFilter) return false } // Check if we have all required data shards (0-9) for vacuum operation availableDataShards := make(map[int]bool) for _, shardBits := range ecInfo.ShardNodes { for i := 0; i < erasure_coding.DataShardsCount; i++ { if shardBits.HasShardId(erasure_coding.ShardId(i)) { availableDataShards[i] = true } } } missingDataShards := make([]int, 0) for i := 0; i < erasure_coding.DataShardsCount; i++ { if !availableDataShards[i] { missingDataShards = append(missingDataShards, i) } } if len(missingDataShards) > 0 { glog.V(1).Infof("EC volume %d incomplete for vacuum: missing data shards %v (need shards 0-%d)", ecInfo.VolumeID, missingDataShards, erasure_coding.DataShardsCount-1) return false } return true } // calculateDeletionRatio calculates the deletion ratio for an EC volume func calculateDeletionRatio(ecInfo *EcVolumeInfo) float64 { if ecInfo.DeletionInfo.TotalEntries == 0 { // If no deletion info available, estimate based on shard distribution // Volumes with uneven shard distribution might indicate deletion return estimateDeletionFromShardDistribution(ecInfo) } return ecInfo.DeletionInfo.DeletionRatio } // estimateDeletionInfo provides a simplified estimation of deletion info func estimateDeletionInfo(volumeSize uint64) DeletionInfo { // Simplified estimation - in reality would parse ecj files // For demonstration, assume some deletion exists if the volume is old enough estimatedTotal := int64(volumeSize / 1024) // Rough estimate of entries estimatedDeleted := estimatedTotal / 10 // Assume 10% deletions as baseline deletionRatio := 0.0 if estimatedTotal > 0 { deletionRatio = float64(estimatedDeleted) / float64(estimatedTotal) } return DeletionInfo{ TotalEntries: estimatedTotal, DeletedEntries: estimatedDeleted, DeletionRatio: deletionRatio, } } // estimateDeletionFromShardDistribution estimates deletion ratio from shard distribution patterns func estimateDeletionFromShardDistribution(ecInfo *EcVolumeInfo) float64 { // Simplified heuristic: if shards are not evenly distributed, // it might indicate the volume has been through some operations // In a real implementation, would analyze ecj files directly nodeCount := len(ecInfo.ShardNodes) if nodeCount == 0 { return 0.0 } // If all shards are on one node, it might indicate consolidation due to deletions for _, shardBits := range ecInfo.ShardNodes { if shardBits.ShardIdCount() >= erasure_coding.TotalShardsCount { return 0.4 // Higher deletion ratio for consolidated volumes } } // Default conservative estimate return 0.1 } // registerEcVacuumWithTopology registers the EC vacuum task with ActiveTopology for capacity tracking func registerEcVacuumWithTopology(activeTopology *topology.ActiveTopology, taskID string, volumeID uint32, ecInfo *EcVolumeInfo) error { // Convert shard information to TaskSourceSpec for topology tracking var sources []topology.TaskSourceSpec // Add all existing EC shard locations as sources (these will be cleaned up) for serverAddr := range ecInfo.ShardNodes { // Use the existing EC shard cleanup impact calculation cleanupImpact := topology.CalculateECShardCleanupImpact(int64(ecInfo.Size)) sources = append(sources, topology.TaskSourceSpec{ ServerID: string(serverAddr), DiskID: 0, // Default disk (topology system will resolve) CleanupType: topology.CleanupECShards, StorageImpact: &cleanupImpact, }) } // EC vacuum creates new generation on same nodes (destinations same as sources but for new generation) // Create destinations for the new generation (positive storage impact) var destinations []topology.TaskDestinationSpec newGenerationImpact := topology.CalculateECShardStorageImpact(int32(erasure_coding.TotalShardsCount), int64(ecInfo.Size)) for serverAddr := range ecInfo.ShardNodes { destinations = append(destinations, topology.TaskDestinationSpec{ ServerID: string(serverAddr), DiskID: 0, // Default disk (topology system will resolve) StorageImpact: &newGenerationImpact, }) } // Register the task with topology for capacity tracking err := activeTopology.AddPendingTask(topology.TaskSpec{ TaskID: taskID, TaskType: topology.TaskType("ec_vacuum"), VolumeID: volumeID, VolumeSize: int64(ecInfo.Size), Sources: sources, Destinations: destinations, }) if err != nil { return fmt.Errorf("failed to add pending EC vacuum task to topology: %w", err) } glog.V(2).Infof("Registered EC vacuum task %s with topology: %d sources, %d destinations", taskID, len(sources), len(destinations)) return nil }