From 62d89fa60bc39d69693cf66b50b262be0036221d Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 15:04:55 -0700 Subject: [PATCH] EC vacuum distribution updated for generation-aware mount/copy RPCs --- weed/worker/tasks/ec_vacuum/ec_vacuum_task.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 6266e2b46..fe4442393 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -156,7 +156,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) { CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(sourceNode), - Generation: t.targetGeneration, // copy new EC shards as G+1 + Generation: t.sourceGeneration, // collect existing shards from source generation G }) if copyErr != nil { return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr) @@ -167,6 +167,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) { VolumeId: t.volumeID, Collection: t.collection, ShardIds: needToCopyBits.ToUint32Slice(), + Generation: t.sourceGeneration, // mount collected shards from source generation G }) if mountErr != nil { return fmt.Errorf("failed to mount shards %v on %s: %w", needToCopyBits.ShardIds(), targetNode, mountErr) @@ -221,8 +222,9 @@ func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error // distributeNewEcShards distributes the new EC shards across the cluster func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error { t.LogInfo("Distributing new EC shards", map[string]interface{}{ - "volume_id": t.volumeID, - "source": sourceNode, + "volume_id": t.volumeID, + "source": sourceNode, + "target_generation": t.targetGeneration, }) // For simplicity, we'll distribute to the same nodes as before @@ -272,6 +274,7 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error VolumeId: t.volumeID, Collection: t.collection, ShardIds: needToDistributeBits.ToUint32Slice(), + Generation: t.targetGeneration, // mount new EC shards as G+1 }) if mountErr != nil { return fmt.Errorf("failed to mount new shards %v on %s: %w", needToDistributeBits.ShardIds(), targetNode, mountErr)