You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

902 lines
28 KiB

package pluginworker
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
erasurecodingtask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
type erasureCodingWorkerConfig struct {
TaskConfig *erasurecodingtask.Config
MinIntervalSeconds int
}
// ErasureCodingHandler is the plugin job handler for erasure coding.
type ErasureCodingHandler struct {
grpcDialOption grpc.DialOption
}
func NewErasureCodingHandler(grpcDialOption grpc.DialOption) *ErasureCodingHandler {
return &ErasureCodingHandler{grpcDialOption: grpcDialOption}
}
func (h *ErasureCodingHandler) Capability() *plugin_pb.JobTypeCapability {
return &plugin_pb.JobTypeCapability{
JobType: "erasure_coding",
CanDetect: true,
CanExecute: true,
MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 1,
DisplayName: "Erasure Coding",
Description: "Converts full and quiet volumes into EC shards",
}
}
func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
return &plugin_pb.JobTypeDescriptor{
JobType: "erasure_coding",
DisplayName: "Erasure Coding",
Description: "Detect and execute erasure coding for suitable volumes",
Icon: "fas fa-shield-alt",
DescriptorVersion: 1,
AdminConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-admin",
Title: "Erasure Coding Admin Config",
Description: "Admin-side controls for erasure coding detection scope.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "scope",
Title: "Scope",
Description: "Optional filters applied before erasure coding detection.",
Fields: []*plugin_pb.ConfigField{
{
Name: "collection_filter",
Label: "Collection Filter",
Description: "Only detect erasure coding opportunities in this collection when set.",
Placeholder: "all collections",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"collection_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-worker",
Title: "Erasure Coding Worker Config",
Description: "Worker-side detection thresholds.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "thresholds",
Title: "Detection Thresholds",
Description: "Controls for when erasure coding jobs should be proposed.",
Fields: []*plugin_pb.ConfigField{
{
Name: "quiet_for_seconds",
Label: "Quiet Period (s)",
Description: "Volume must remain unmodified for at least this duration.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
{
Name: "fullness_ratio",
Label: "Fullness Ratio",
Description: "Minimum volume fullness ratio to trigger erasure coding.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 1}},
},
{
Name: "min_size_mb",
Label: "Minimum Volume Size (MB)",
Description: "Only volumes larger than this size are considered.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
{
Name: "min_interval_seconds",
Label: "Minimum Detection Interval (s)",
Description: "Skip detection if the last successful run is more recent than this interval.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
Enabled: true,
DetectionIntervalSeconds: 60 * 5,
DetectionTimeoutSeconds: 300,
MaxJobsPerDetection: 500,
GlobalExecutionConcurrency: 16,
PerWorkerExecutionConcurrency: 4,
RetryLimit: 1,
RetryBackoffSeconds: 30,
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60},
},
},
}
}
func (h *ErasureCodingHandler) Detect(
ctx context.Context,
request *plugin_pb.RunDetectionRequest,
sender DetectionSender,
) error {
if request == nil {
return fmt.Errorf("run detection request is nil")
}
if sender == nil {
return fmt.Errorf("detection sender is nil")
}
if request.JobType != "" && request.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.JobType)
}
workerConfig := deriveErasureCodingWorkerConfig(request.GetWorkerConfigValues())
if shouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
_ = sender.SendActivity(buildDetectorActivity(
"skipped_by_interval",
fmt.Sprintf("ERASURE CODING: Detection skipped due to min interval (%s)", minInterval),
map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
},
},
))
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "erasure_coding",
Proposals: []*plugin_pb.JobProposal{},
HasMore: false,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "erasure_coding",
Success: true,
TotalProposals: 0,
})
}
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
if collectionFilter != "" {
workerConfig.TaskConfig.CollectionFilter = collectionFilter
}
masters := make([]string, 0)
if request.ClusterContext != nil {
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
}
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
if err != nil {
return err
}
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
results, err := erasurecodingtask.Detection(metrics, clusterInfo, workerConfig.TaskConfig)
if err != nil {
return err
}
if traceErr := emitErasureCodingDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil {
glog.Warningf("Plugin worker failed to emit erasure_coding detection trace: %v", traceErr)
}
maxResults := int(request.MaxResults)
hasMore := false
if maxResults > 0 && len(results) > maxResults {
hasMore = true
results = results[:maxResults]
}
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildErasureCodingProposal(result)
if proposalErr != nil {
glog.Warningf("Plugin worker skip invalid erasure_coding proposal: %v", proposalErr)
continue
}
proposals = append(proposals, proposal)
}
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "erasure_coding",
Proposals: proposals,
HasMore: hasMore,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "erasure_coding",
Success: true,
TotalProposals: int32(len(proposals)),
})
}
func emitErasureCodingDetectionDecisionTrace(
sender DetectionSender,
metrics []*workertypes.VolumeHealthMetrics,
taskConfig *erasurecodingtask.Config,
results []*workertypes.TaskDetectionResult,
) error {
if sender == nil || taskConfig == nil {
return nil
}
quietThreshold := time.Duration(taskConfig.QuietForSeconds) * time.Second
minSizeBytes := uint64(taskConfig.MinSizeMB) * 1024 * 1024
allowedCollections := make(map[string]bool)
if strings.TrimSpace(taskConfig.CollectionFilter) != "" {
for _, collection := range strings.Split(taskConfig.CollectionFilter, ",") {
trimmed := strings.TrimSpace(collection)
if trimmed != "" {
allowedCollections[trimmed] = true
}
}
}
volumeGroups := make(map[uint32][]*workertypes.VolumeHealthMetrics)
for _, metric := range metrics {
if metric == nil {
continue
}
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
skippedAlreadyEC := 0
skippedTooSmall := 0
skippedCollectionFilter := 0
skippedQuietTime := 0
skippedFullness := 0
for _, groupMetrics := range volumeGroups {
if len(groupMetrics) == 0 {
continue
}
metric := groupMetrics[0]
for _, candidate := range groupMetrics {
if candidate != nil && candidate.Server < metric.Server {
metric = candidate
}
}
if metric == nil {
continue
}
if metric.IsECVolume {
skippedAlreadyEC++
continue
}
if metric.Size < minSizeBytes {
skippedTooSmall++
continue
}
if len(allowedCollections) > 0 && !allowedCollections[metric.Collection] {
skippedCollectionFilter++
continue
}
if metric.Age < quietThreshold {
skippedQuietTime++
continue
}
if metric.FullnessRatio < taskConfig.FullnessRatio {
skippedFullness++
continue
}
}
totalVolumes := len(metrics)
summaryMessage := ""
if len(results) == 0 {
summaryMessage = fmt.Sprintf(
"EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
totalVolumes,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
} else {
summaryMessage = fmt.Sprintf(
"EC detection: Created %d task(s) from %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
len(results),
totalVolumes,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
}
if err := sender.SendActivity(buildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
"total_volumes": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)},
},
"selected_tasks": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
},
"skipped_already_ec": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedAlreadyEC)},
},
"skipped_too_small": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedTooSmall)},
},
"skipped_filtered": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedCollectionFilter)},
},
"skipped_not_quiet": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedQuietTime)},
},
"skipped_not_full": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedFullness)},
},
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"fullness_threshold_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted := 0
for _, metric := range metrics {
if metric == nil || metric.IsECVolume {
continue
}
sizeMB := float64(metric.Size) / (1024 * 1024)
message := fmt.Sprintf(
"ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
metric.VolumeID,
sizeMB,
taskConfig.MinSizeMB,
metric.Age.Truncate(time.Minute),
quietThreshold.Truncate(time.Minute),
metric.FullnessRatio*100,
taskConfig.FullnessRatio*100,
)
if err := sender.SendActivity(buildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.VolumeID)},
},
"size_mb": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: sizeMB},
},
"required_min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"age_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.Age.Seconds())},
},
"required_quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: metric.FullnessRatio * 100},
},
"required_fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted++
if detailsEmitted >= 3 {
break
}
}
return nil
}
func (h *ErasureCodingHandler) Execute(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
sender ExecutionSender,
) error {
if request == nil || request.Job == nil {
return fmt.Errorf("execute request/job is nil")
}
if sender == nil {
return fmt.Errorf("execution sender is nil")
}
if request.Job.JobType != "" && request.Job.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.Job.JobType)
}
params, err := decodeErasureCodingTaskParams(request.Job)
if err != nil {
return err
}
applyErasureCodingExecutionDefaults(params, request.GetClusterContext())
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
return fmt.Errorf("erasure coding source node is required")
}
if len(params.Targets) == 0 {
return fmt.Errorf("erasure coding targets are required")
}
task := erasurecodingtask.NewErasureCodingTask(
request.Job.JobId,
params.Sources[0].Node,
params.VolumeId,
params.Collection,
h.grpcDialOption,
)
task.SetProgressCallback(func(progress float64, stage string) {
message := fmt.Sprintf("erasure coding progress %.0f%%", progress)
if strings.TrimSpace(stage) != "" {
message = stage
}
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: progress,
Stage: stage,
Message: message,
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity(stage, message),
},
})
})
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: "erasure coding job accepted",
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("assigned", "erasure coding job accepted"),
},
}); err != nil {
return err
}
if err := task.Execute(ctx, params); err != nil {
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_FAILED,
ProgressPercent: 100,
Stage: "failed",
Message: err.Error(),
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("failed", err.Error()),
},
})
return err
}
sourceNode := params.Sources[0].Node
resultSummary := fmt.Sprintf("erasure coding completed for volume %d across %d targets", params.VolumeId, len(params.Targets))
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
Success: true,
Result: &plugin_pb.JobResult{
Summary: resultSummary,
OutputValues: map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
},
},
Activities: []*plugin_pb.ActivityEvent{
buildExecutorActivity("completed", resultSummary),
},
})
}
func (h *ErasureCodingHandler) collectVolumeMetrics(
ctx context.Context,
masterAddresses []string,
collectionFilter string,
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
// Reuse the same master topology fetch/build flow used by the vacuum handler.
helper := &VacuumHandler{grpcDialOption: h.grpcDialOption}
return helper.collectVolumeMetrics(ctx, masterAddresses, collectionFilter)
}
func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *erasureCodingWorkerConfig {
taskConfig := erasurecodingtask.NewDefaultConfig()
quietForSeconds := int(readInt64Config(values, "quiet_for_seconds", int64(taskConfig.QuietForSeconds)))
if quietForSeconds < 0 {
quietForSeconds = 0
}
taskConfig.QuietForSeconds = quietForSeconds
fullnessRatio := readDoubleConfig(values, "fullness_ratio", taskConfig.FullnessRatio)
if fullnessRatio < 0 {
fullnessRatio = 0
}
if fullnessRatio > 1 {
fullnessRatio = 1
}
taskConfig.FullnessRatio = fullnessRatio
minSizeMB := int(readInt64Config(values, "min_size_mb", int64(taskConfig.MinSizeMB)))
if minSizeMB < 1 {
minSizeMB = 1
}
taskConfig.MinSizeMB = minSizeMB
minIntervalSeconds := int(readInt64Config(values, "min_interval_seconds", 60*60))
if minIntervalSeconds < 0 {
minIntervalSeconds = 0
}
return &erasureCodingWorkerConfig{
TaskConfig: taskConfig,
MinIntervalSeconds: minIntervalSeconds,
}
}
func buildErasureCodingProposal(
result *workertypes.TaskDetectionResult,
) (*plugin_pb.JobProposal, error) {
if result == nil {
return nil, fmt.Errorf("task detection result is nil")
}
if result.TypedParams == nil {
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
}
params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams)
applyErasureCodingExecutionDefaults(params, nil)
paramsPayload, err := proto.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal task params: %w", err)
}
proposalID := strings.TrimSpace(result.TaskID)
if proposalID == "" {
proposalID = fmt.Sprintf("erasure-coding-%d-%d", result.VolumeID, time.Now().UnixNano())
}
dedupeKey := fmt.Sprintf("erasure_coding:%d", result.VolumeID)
if result.Collection != "" {
dedupeKey += ":" + result.Collection
}
sourceNode := ""
if len(params.Sources) > 0 {
sourceNode = strings.TrimSpace(params.Sources[0].Node)
}
summary := fmt.Sprintf("Erasure code volume %d", result.VolumeID)
if sourceNode != "" {
summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode)
}
return &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: dedupeKey,
JobType: "erasure_coding",
Priority: mapTaskPriority(result.Priority),
Summary: summary,
Detail: strings.TrimSpace(result.Reason),
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
},
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
},
Labels: map[string]string{
"task_type": "erasure_coding",
"volume_id": fmt.Sprintf("%d", result.VolumeID),
"collection": result.Collection,
"source_node": sourceNode,
"target_count": fmt.Sprintf("%d", len(params.Targets)),
},
}, nil
}
func decodeErasureCodingTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
if job == nil {
return nil, fmt.Errorf("job spec is nil")
}
if payload := readBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
params := &worker_pb.TaskParams{}
if err := proto.Unmarshal(payload, params); err != nil {
return nil, fmt.Errorf("unmarshal task_params_pb: %w", err)
}
if params.TaskId == "" {
params.TaskId = job.JobId
}
return params, nil
}
volumeID := readInt64Config(job.Parameters, "volume_id", 0)
sourceNode := strings.TrimSpace(readStringConfig(job.Parameters, "source_server", ""))
if sourceNode == "" {
sourceNode = strings.TrimSpace(readStringConfig(job.Parameters, "server", ""))
}
targetServers := readStringListConfig(job.Parameters, "target_servers")
if len(targetServers) == 0 {
targetServers = readStringListConfig(job.Parameters, "targets")
}
collection := readStringConfig(job.Parameters, "collection", "")
dataShards := int32(readInt64Config(job.Parameters, "data_shards", int64(ecstorage.DataShardsCount)))
if dataShards <= 0 {
dataShards = ecstorage.DataShardsCount
}
parityShards := int32(readInt64Config(job.Parameters, "parity_shards", int64(ecstorage.ParityShardsCount)))
if parityShards <= 0 {
parityShards = ecstorage.ParityShardsCount
}
totalShards := int(dataShards + parityShards)
if volumeID <= 0 {
return nil, fmt.Errorf("missing volume_id in job parameters")
}
if sourceNode == "" {
return nil, fmt.Errorf("missing source_server in job parameters")
}
if len(targetServers) == 0 {
return nil, fmt.Errorf("missing target_servers in job parameters")
}
if len(targetServers) < totalShards {
return nil, fmt.Errorf("insufficient target_servers: got %d, need at least %d", len(targetServers), totalShards)
}
shardAssignments := assignECShardIDs(totalShards, len(targetServers))
targets := make([]*worker_pb.TaskTarget, 0, len(targetServers))
for i := 0; i < len(targetServers); i++ {
targetNode := strings.TrimSpace(targetServers[i])
if targetNode == "" {
continue
}
targets = append(targets, &worker_pb.TaskTarget{
Node: targetNode,
VolumeId: uint32(volumeID),
ShardIds: shardAssignments[i],
})
}
if len(targets) < totalShards {
return nil, fmt.Errorf("insufficient non-empty target_servers after normalization: got %d, need at least %d", len(targets), totalShards)
}
return &worker_pb.TaskParams{
TaskId: job.JobId,
VolumeId: uint32(volumeID),
Collection: collection,
Sources: []*worker_pb.TaskSource{
{
Node: sourceNode,
VolumeId: uint32(volumeID),
},
},
Targets: targets,
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
DataShards: dataShards,
ParityShards: parityShards,
},
},
}, nil
}
func applyErasureCodingExecutionDefaults(
params *worker_pb.TaskParams,
clusterContext *plugin_pb.ClusterContext,
) {
if params == nil {
return
}
ecParams := params.GetErasureCodingParams()
if ecParams == nil {
ecParams = &worker_pb.ErasureCodingTaskParams{
DataShards: ecstorage.DataShardsCount,
ParityShards: ecstorage.ParityShardsCount,
}
params.TaskParams = &worker_pb.TaskParams_ErasureCodingParams{ErasureCodingParams: ecParams}
}
if ecParams.DataShards <= 0 {
ecParams.DataShards = ecstorage.DataShardsCount
}
if ecParams.ParityShards <= 0 {
ecParams.ParityShards = ecstorage.ParityShardsCount
}
ecParams.WorkingDir = defaultErasureCodingWorkingDir()
ecParams.CleanupSource = true
if strings.TrimSpace(ecParams.MasterClient) == "" && clusterContext != nil && len(clusterContext.MasterGrpcAddresses) > 0 {
ecParams.MasterClient = clusterContext.MasterGrpcAddresses[0]
}
totalShards := int(ecParams.DataShards + ecParams.ParityShards)
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
needsShardAssignment := false
for _, target := range params.Targets {
if target == nil || len(target.ShardIds) == 0 {
needsShardAssignment = true
break
}
}
if needsShardAssignment && len(params.Targets) > 0 {
assignments := assignECShardIDs(totalShards, len(params.Targets))
for i := 0; i < len(params.Targets); i++ {
if params.Targets[i] == nil {
continue
}
if len(params.Targets[i].ShardIds) == 0 {
params.Targets[i].ShardIds = assignments[i]
}
}
}
}
func readStringListConfig(values map[string]*plugin_pb.ConfigValue, field string) []string {
if values == nil {
return nil
}
value := values[field]
if value == nil {
return nil
}
switch kind := value.Kind.(type) {
case *plugin_pb.ConfigValue_StringList:
return normalizeStringList(kind.StringList.GetValues())
case *plugin_pb.ConfigValue_ListValue:
out := make([]string, 0, len(kind.ListValue.GetValues()))
for _, item := range kind.ListValue.GetValues() {
itemText := readStringFromConfigValue(item)
if itemText != "" {
out = append(out, itemText)
}
}
return normalizeStringList(out)
case *plugin_pb.ConfigValue_StringValue:
return normalizeStringList(strings.Split(kind.StringValue, ","))
}
return nil
}
func readStringFromConfigValue(value *plugin_pb.ConfigValue) string {
if value == nil {
return ""
}
switch kind := value.Kind.(type) {
case *plugin_pb.ConfigValue_StringValue:
return strings.TrimSpace(kind.StringValue)
case *plugin_pb.ConfigValue_Int64Value:
return fmt.Sprintf("%d", kind.Int64Value)
case *plugin_pb.ConfigValue_DoubleValue:
return fmt.Sprintf("%g", kind.DoubleValue)
case *plugin_pb.ConfigValue_BoolValue:
if kind.BoolValue {
return "true"
}
return "false"
}
return ""
}
func normalizeStringList(values []string) []string {
normalized := make([]string, 0, len(values))
seen := make(map[string]struct{}, len(values))
for _, value := range values {
item := strings.TrimSpace(value)
if item == "" {
continue
}
if _, found := seen[item]; found {
continue
}
seen[item] = struct{}{}
normalized = append(normalized, item)
}
return normalized
}
func assignECShardIDs(totalShards int, targetCount int) [][]uint32 {
if targetCount <= 0 {
return nil
}
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
assignments := make([][]uint32, targetCount)
for i := 0; i < totalShards; i++ {
targetIndex := i % targetCount
assignments[targetIndex] = append(assignments[targetIndex], uint32(i))
}
return assignments
}
func defaultErasureCodingWorkingDir() string {
return filepath.Join(os.TempDir(), "seaweedfs-ec")
}