From c57ccd554a91ec84d647cfb90ce9bc83b59cda6b Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 11 Aug 2025 02:11:39 -0700 Subject: [PATCH] worker_pb.TaskParams_VacuumParams --- weed/worker/tasks/ec_vacuum/detection.go | 124 +++++++++++++++++++++-- 1 file changed, 114 insertions(+), 10 deletions(-) diff --git a/weed/worker/tasks/ec_vacuum/detection.go b/weed/worker/tasks/ec_vacuum/detection.go index 7c7938392..60427bab2 100644 --- a/weed/worker/tasks/ec_vacuum/detection.go +++ b/weed/worker/tasks/ec_vacuum/detection.go @@ -5,8 +5,10 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" wtypes "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -35,7 +37,7 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo, } // Collect EC volume information from metrics - ecVolumeInfo := collectEcVolumeInfo(metrics) + ecVolumeInfo := collectEcVolumeInfo(metrics, info) glog.V(2).Infof("EC vacuum detection: found %d EC volumes in metrics", len(ecVolumeInfo)) for volumeID, ecInfo := range ecVolumeInfo { @@ -56,18 +58,56 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo, // Generate task ID for ActiveTopology integration taskID := fmt.Sprintf("ec_vacuum_vol_%d_%d", volumeID, now.Unix()) - result := &wtypes.TaskDetectionResult{ - TaskID: taskID, - TaskType: wtypes.TaskType("ec_vacuum"), - VolumeID: volumeID, - Server: ecInfo.PrimaryNode, + // Create task sources from shard information + var sources []*worker_pb.TaskSource + for serverAddr, shardBits := range ecInfo.ShardNodes { + shardIds := make([]uint32, 0, shardBits.ShardIdCount()) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if shardBits.HasShardId(erasure_coding.ShardId(i)) { + shardIds = append(shardIds, uint32(i)) + } + } + if len(shardIds) > 0 { + sources = append(sources, &worker_pb.TaskSource{ + Node: string(serverAddr), + VolumeId: volumeID, + ShardIds: shardIds, + EstimatedSize: ecInfo.Size / uint64(len(ecInfo.ShardNodes)), // Rough estimate per server + }) + } + } + + // Create TypedParams for EC vacuum task + typedParams := &worker_pb.TaskParams{ + TaskId: taskID, + VolumeId: volumeID, Collection: ecInfo.Collection, - Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent + VolumeSize: ecInfo.Size, + Sources: sources, + TaskParams: &worker_pb.TaskParams_VacuumParams{ + VacuumParams: &worker_pb.VacuumTaskParams{ + GarbageThreshold: deletionRatio, + ForceVacuum: false, + BatchSize: 1000, // Default batch size + WorkingDir: "/data/ec_vacuum", // Default base directory - worker may use BaseWorkingDir/ec_vacuum instead + VerifyChecksum: true, // Enable checksum verification for safety + }, + }, + } + + result := &wtypes.TaskDetectionResult{ + TaskID: taskID, + TaskType: wtypes.TaskType("ec_vacuum"), + VolumeID: volumeID, + Server: ecInfo.PrimaryNode, + Collection: ecInfo.Collection, + Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent Reason: fmt.Sprintf("EC volume needs vacuum: deletion_ratio=%.1f%% (>%.1f%%), age=%.1fh (>%.1fh), size=%.1fMB (>%dMB)", deletionRatio*100, ecVacuumConfig.DeletionThreshold*100, ecInfo.Age.Hours(), (time.Duration(ecVacuumConfig.MinVolumeAgeSeconds) * time.Second).Hours(), float64(ecInfo.Size)/(1024*1024), ecVacuumConfig.MinSizeMB), - ScheduleAt: now, + TypedParams: typedParams, + ScheduleAt: now, } // Add to topology's pending tasks for capacity management (simplified for now) @@ -136,8 +176,8 @@ type DeletionInfo struct { DeletionRatio float64 } -// collectEcVolumeInfo extracts EC volume information from volume health metrics -func collectEcVolumeInfo(metrics []*wtypes.VolumeHealthMetrics) map[uint32]*EcVolumeInfo { +// collectEcVolumeInfo extracts EC volume information from volume health metrics and topology +func collectEcVolumeInfo(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo) map[uint32]*EcVolumeInfo { ecVolumes := make(map[uint32]*EcVolumeInfo) for _, metric := range metrics { @@ -172,10 +212,74 @@ func collectEcVolumeInfo(metrics []*wtypes.VolumeHealthMetrics) map[uint32]*EcVo metric.VolumeID, metric.Size/(1024*1024), metric.DeletedBytes/(1024*1024), deletionRatio*100) } + // Populate shard information from cluster topology + if info.ActiveTopology != nil { + populateShardInfo(ecVolumes, info.ActiveTopology) + } + glog.V(1).Infof("EC vacuum detection: found %d EC volumes from %d metrics", len(ecVolumes), len(metrics)) return ecVolumes } +// populateShardInfo populates the ShardNodes information from cluster topology +func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topology.ActiveTopology) { + if activeTopology == nil { + return + } + + // Get topology information + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo == nil { + return + } + + // Iterate through topology to find EC shard information + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + // Check each EC shard on this disk + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeID := ecShardInfo.Id + + // Only process volumes we're tracking + if ecVolumeInfo, exists := ecVolumes[volumeID]; exists { + // Initialize ShardNodes map if needed + if ecVolumeInfo.ShardNodes == nil { + ecVolumeInfo.ShardNodes = make(map[pb.ServerAddress]erasure_coding.ShardBits) + } + + // Add shards from this node + serverAddr := pb.ServerAddress(node.Id) + if _, exists := ecVolumeInfo.ShardNodes[serverAddr]; !exists { + ecVolumeInfo.ShardNodes[serverAddr] = erasure_coding.ShardBits(0) + } + + // Add all shards on this disk for this volume + for i := 0; i < len(ecShardInfo.ShardSizes); i++ { + ecVolumeInfo.ShardNodes[serverAddr] = ecVolumeInfo.ShardNodes[serverAddr].AddShardId(erasure_coding.ShardId(i)) + } + + glog.V(3).Infof("EC volume %d: found %d shards on server %s", + volumeID, len(ecShardInfo.ShardSizes), node.Id) + } + } + } + } + } + } + + // Log shard distribution summary + for volumeID, ecInfo := range ecVolumes { + totalShards := 0 + for _, shardBits := range ecInfo.ShardNodes { + totalShards += shardBits.ShardIdCount() + } + glog.V(2).Infof("EC volume %d: total shards=%d across %d servers", + volumeID, totalShards, len(ecInfo.ShardNodes)) + } +} + // shouldVacuumEcVolume determines if an EC volume should be considered for vacuum func shouldVacuumEcVolume(ecInfo *EcVolumeInfo, config *Config, now time.Time) bool { // Check minimum age