|
|
@ -34,11 +34,12 @@ type ErasureCodingTask struct { |
|
|
grpcDialOption grpc.DialOption |
|
|
grpcDialOption grpc.DialOption |
|
|
|
|
|
|
|
|
// EC parameters
|
|
|
// EC parameters
|
|
|
dataShards int32 |
|
|
|
|
|
parityShards int32 |
|
|
|
|
|
targets []*worker_pb.TaskTarget // Unified targets for EC shards
|
|
|
|
|
|
sources []*worker_pb.TaskSource // Unified sources for cleanup
|
|
|
|
|
|
shardAssignment map[string][]string // destination -> assigned shard types
|
|
|
|
|
|
|
|
|
dataShards int32 |
|
|
|
|
|
parityShards int32 |
|
|
|
|
|
targets []*worker_pb.TaskTarget // Unified targets for EC shards
|
|
|
|
|
|
sources []*worker_pb.TaskSource // Unified sources for cleanup
|
|
|
|
|
|
shardAssignment map[string][]string // destination -> assigned shard types
|
|
|
|
|
|
allowSourceDelete bool |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// NewErasureCodingTask creates a new unified EC task instance
|
|
|
// NewErasureCodingTask creates a new unified EC task instance
|
|
|
@ -179,6 +180,7 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP |
|
|
if err := t.mountEcShards(); err != nil { |
|
|
if err := t.mountEcShards(); err != nil { |
|
|
return fmt.Errorf("failed to mount EC shards: %v", err) |
|
|
return fmt.Errorf("failed to mount EC shards: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
t.allowSourceDelete = true |
|
|
|
|
|
|
|
|
// Step 6: Delete original volume
|
|
|
// Step 6: Delete original volume
|
|
|
t.ReportProgressWithStage(90.0, "Deleting original volume") |
|
|
t.ReportProgressWithStage(90.0, "Deleting original volume") |
|
|
@ -469,6 +471,10 @@ func (t *ErasureCodingTask) mountEcShards() error { |
|
|
|
|
|
|
|
|
// deleteOriginalVolume deletes the original volume and all its replicas from all servers
|
|
|
// deleteOriginalVolume deletes the original volume and all its replicas from all servers
|
|
|
func (t *ErasureCodingTask) deleteOriginalVolume() error { |
|
|
func (t *ErasureCodingTask) deleteOriginalVolume() error { |
|
|
|
|
|
if !t.allowSourceDelete { |
|
|
|
|
|
return fmt.Errorf("source deletion disabled: EC shards not fully distributed and mounted") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Get replicas from task parameters (set during detection)
|
|
|
// Get replicas from task parameters (set during detection)
|
|
|
replicas := t.getReplicas() |
|
|
replicas := t.getReplicas() |
|
|
|
|
|
|
|
|
|