Browse Source

EC vacuum task updated for generation-aware operation

add-ec-vacuum
chrislu 4 months ago
parent
commit
de9399761b
  1. 38
      weed/worker/tasks/ec_vacuum/ec_vacuum_task.go
  2. 1
      weed/worker/tasks/ec_vacuum/register.go

38
weed/worker/tasks/ec_vacuum/ec_vacuum_task.go

@ -20,29 +20,35 @@ import (
// EcVacuumTask represents an EC vacuum task that collects, decodes, and re-encodes EC volumes
type EcVacuumTask struct {
*base.BaseTask
volumeID uint32
collection string
sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
tempDir string
grpcDialOption grpc.DialOption
volumeID uint32
collection string
sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
sourceGeneration uint32 // generation to vacuum from (G)
targetGeneration uint32 // generation to create (G+1)
tempDir string
grpcDialOption grpc.DialOption
}
// NewEcVacuumTask creates a new EC vacuum task instance
func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits) *EcVacuumTask {
func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits, sourceGeneration uint32) *EcVacuumTask {
return &EcVacuumTask{
BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")),
volumeID: volumeID,
collection: collection,
sourceNodes: sourceNodes,
BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")),
volumeID: volumeID,
collection: collection,
sourceNodes: sourceNodes,
sourceGeneration: sourceGeneration, // generation to vacuum from (G)
targetGeneration: sourceGeneration + 1, // generation to create (G+1)
}
}
// Execute performs the EC vacuum operation
func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
t.LogInfo("Starting EC vacuum task", map[string]interface{}{
"volume_id": t.volumeID,
"collection": t.collection,
"shard_nodes": len(t.sourceNodes),
"volume_id": t.volumeID,
"collection": t.collection,
"source_generation": t.sourceGeneration,
"target_generation": t.targetGeneration,
"shard_nodes": len(t.sourceNodes),
})
// Step 1: Create temporary working directory
@ -150,7 +156,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
Generation: 1, // TODO: implement proper generation tracking in vacuum task
Generation: t.targetGeneration, // copy new EC shards as G+1
})
if copyErr != nil {
return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr)
@ -206,7 +212,7 @@ func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error
_, err := client.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: t.volumeID,
Collection: t.collection,
Generation: 1, // TODO: implement proper generation tracking in vacuum task
Generation: t.targetGeneration, // generate new EC shards as G+1
})
return err
})
@ -255,7 +261,7 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
Generation: 1, // TODO: implement proper generation tracking in vacuum task
Generation: t.targetGeneration, // copy new EC shards as G+1
})
if copyErr != nil {
return fmt.Errorf("failed to copy new shards %v from %s to %s: %w", needToDistributeBits.ShardIds(), sourceNode, targetNode, copyErr)

1
weed/worker/tasks/ec_vacuum/register.go

@ -60,6 +60,7 @@ func RegisterEcVacuumTask() {
params.VolumeId,
params.Collection,
sourceNodes,
0, // default to generation 0 (current generation to vacuum)
), nil
},
DetectionFunc: Detection,

Loading…
Cancel
Save