diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 2342689a8..3ed7654c9 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -40,6 +40,10 @@ type EcVacuumTask struct { // Runtime-determined during execution sourceGeneration uint32 // generation to vacuum from (determined at runtime) targetGeneration uint32 // generation to create (determined at runtime) + + // Core business logic + logic *EcVacuumLogic + plan *VacuumPlan // Generated plan for this vacuum operation } // NewEcVacuumTask creates a new EC vacuum task instance @@ -50,6 +54,7 @@ func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes collection: collection, sourceNodes: sourceNodes, cleanupGracePeriod: 1 * time.Minute, // 1 minute grace period for faster cleanup + logic: NewEcVacuumLogic(), // Initialize business logic // sourceGeneration and targetGeneration will be determined during execution } } @@ -66,21 +71,39 @@ func (t *EcVacuumTask) GetTopologyTaskID() string { // Execute performs the EC vacuum operation func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { - // Initialize generations from TaskSource (determined during detection phase) - if len(params.Sources) > 0 && params.Sources[0].Generation > 0 { - t.sourceGeneration = params.Sources[0].Generation - t.targetGeneration = t.sourceGeneration + 1 - } else { - // Fallback to safe defaults for backward compatibility - t.sourceGeneration = 0 - t.targetGeneration = 1 + // Step 0: Create comprehensive vacuum plan using the logic layer + plan, err := t.logic.CreateVacuumPlan(t.volumeID, t.collection, params) + if err != nil { + return fmt.Errorf("failed to create vacuum plan: %w", err) + } + t.plan = plan + + // Extract generations from the plan + t.sourceGeneration = plan.CurrentGeneration + t.targetGeneration = plan.TargetGeneration + + t.LogInfo("Vacuum plan created successfully", map[string]interface{}{ + "volume_id": plan.VolumeID, + "collection": plan.Collection, + "source_generation": plan.CurrentGeneration, + "target_generation": plan.TargetGeneration, + "cleanup_generations": plan.GenerationsToCleanup, + "nodes_involved": len(plan.SourceDistribution.Nodes), + "safety_checks": len(plan.SafetyChecks), + }) + + // Validate the plan is safe to execute + if err := t.logic.ValidateShardDistribution(plan.SourceDistribution); err != nil { + return fmt.Errorf("vacuum plan validation failed: %w", err) } - t.LogInfo("Generations determined from TaskSource", map[string]interface{}{ - "source_generation": t.sourceGeneration, - "target_generation": t.targetGeneration, + t.LogInfo("Plan validation successful", map[string]interface{}{ + "safety_checks": plan.SafetyChecks, }) + // Ensure sourceNodes is consistent with the plan + t.sourceNodes = plan.SourceDistribution.Nodes + // Log task information logFields := map[string]interface{}{ "volume_id": t.volumeID, @@ -950,20 +973,20 @@ func (t *EcVacuumTask) cleanupOldEcShards() error { return fmt.Errorf("safety checks failed: %w", err) } - // Step 3: Clean up old generations (simplified - clean up source generation only) - // Generation discovery is now handled during detection phase in EcVolumeInfo - generationsToCleanup := []uint32{t.sourceGeneration} + // Step 3: Use cleanup generations from the vacuum plan + generationsToCleanup := t.plan.GenerationsToCleanup - t.LogInfo("Identified generations for cleanup", map[string]interface{}{ + t.LogInfo("Using cleanup generations from vacuum plan", map[string]interface{}{ "volume_id": t.volumeID, "target_generation": t.targetGeneration, "source_generation": t.sourceGeneration, "generations_to_cleanup": generationsToCleanup, + "plan_validated": true, }) // Step 4: Unmount and delete old generation shards from each node var cleanupErrors []string - for node := range t.sourceNodes { + for node := range t.plan.SourceDistribution.Nodes { for _, generation := range generationsToCleanup { if err := t.cleanupGenerationFromNode(node, generation); err != nil { cleanupErrors = append(cleanupErrors, fmt.Sprintf("node %s generation %d: %v", node, generation, err))