diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 2451dc0e5..6266e2b46 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -20,29 +20,35 @@ import ( // EcVacuumTask represents an EC vacuum task that collects, decodes, and re-encodes EC volumes type EcVacuumTask struct { *base.BaseTask - volumeID uint32 - collection string - sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits - tempDir string - grpcDialOption grpc.DialOption + volumeID uint32 + collection string + sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits + sourceGeneration uint32 // generation to vacuum from (G) + targetGeneration uint32 // generation to create (G+1) + tempDir string + grpcDialOption grpc.DialOption } // NewEcVacuumTask creates a new EC vacuum task instance -func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits) *EcVacuumTask { +func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits, sourceGeneration uint32) *EcVacuumTask { return &EcVacuumTask{ - BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")), - volumeID: volumeID, - collection: collection, - sourceNodes: sourceNodes, + BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")), + volumeID: volumeID, + collection: collection, + sourceNodes: sourceNodes, + sourceGeneration: sourceGeneration, // generation to vacuum from (G) + targetGeneration: sourceGeneration + 1, // generation to create (G+1) } } // Execute performs the EC vacuum operation func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { t.LogInfo("Starting EC vacuum task", map[string]interface{}{ - "volume_id": t.volumeID, - "collection": t.collection, - "shard_nodes": len(t.sourceNodes), + "volume_id": t.volumeID, + "collection": t.collection, + "source_generation": t.sourceGeneration, + "target_generation": t.targetGeneration, + "shard_nodes": len(t.sourceNodes), }) // Step 1: Create temporary working directory @@ -150,7 +156,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) { CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(sourceNode), - Generation: 1, // TODO: implement proper generation tracking in vacuum task + Generation: t.targetGeneration, // copy new EC shards as G+1 }) if copyErr != nil { return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr) @@ -206,7 +212,7 @@ func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error _, err := client.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: t.volumeID, Collection: t.collection, - Generation: 1, // TODO: implement proper generation tracking in vacuum task + Generation: t.targetGeneration, // generate new EC shards as G+1 }) return err }) @@ -255,7 +261,7 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(sourceNode), - Generation: 1, // TODO: implement proper generation tracking in vacuum task + Generation: t.targetGeneration, // copy new EC shards as G+1 }) if copyErr != nil { return fmt.Errorf("failed to copy new shards %v from %s to %s: %w", needToDistributeBits.ShardIds(), sourceNode, targetNode, copyErr) diff --git a/weed/worker/tasks/ec_vacuum/register.go b/weed/worker/tasks/ec_vacuum/register.go index 859f92737..ef6aaefe0 100644 --- a/weed/worker/tasks/ec_vacuum/register.go +++ b/weed/worker/tasks/ec_vacuum/register.go @@ -60,6 +60,7 @@ func RegisterEcVacuumTask() { params.VolumeId, params.Collection, sourceNodes, + 0, // default to generation 0 (current generation to vacuum) ), nil }, DetectionFunc: Detection,