|
|
|
@ -12,7 +12,6 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" |
|
|
|
"google.golang.org/protobuf/encoding/protojson" |
|
|
|
"google.golang.org/protobuf/proto" |
|
|
|
) |
|
|
|
@ -23,11 +22,9 @@ const ( |
|
|
|
|
|
|
|
// Configuration file names (protobuf binary)
|
|
|
|
MaintenanceConfigFile = "maintenance.pb" |
|
|
|
ECTaskConfigFile = "task_erasure_coding.pb" |
|
|
|
|
|
|
|
// JSON reference files
|
|
|
|
MaintenanceConfigJSONFile = "maintenance.json" |
|
|
|
ECTaskConfigJSONFile = "task_erasure_coding.json" |
|
|
|
|
|
|
|
// Task persistence subdirectories and settings
|
|
|
|
TasksSubdir = "tasks" |
|
|
|
@ -39,12 +36,6 @@ const ( |
|
|
|
ConfigFilePermissions = 0644 |
|
|
|
) |
|
|
|
|
|
|
|
// Task configuration types
|
|
|
|
type ( |
|
|
|
ErasureCodingTaskConfig = worker_pb.ErasureCodingTaskConfig |
|
|
|
EcVacuumTaskConfig = worker_pb.EcVacuumTaskConfig |
|
|
|
) |
|
|
|
|
|
|
|
// isValidTaskID validates that a task ID is safe for use in file paths
|
|
|
|
// This prevents path traversal attacks by ensuring the task ID doesn't contain
|
|
|
|
// path separators or parent directory references
|
|
|
|
@ -139,8 +130,6 @@ func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) |
|
|
|
if configData, err := os.ReadFile(configPath); err == nil { |
|
|
|
var config MaintenanceConfig |
|
|
|
if err := proto.Unmarshal(configData, &config); err == nil { |
|
|
|
// Always populate policy from separate task configuration files
|
|
|
|
config.Policy = buildPolicyFromTaskConfigs() |
|
|
|
return &config, nil |
|
|
|
} |
|
|
|
} |
|
|
|
@ -252,95 +241,6 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file
|
|
|
|
func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error { |
|
|
|
return cp.saveTaskConfig(ECTaskConfigFile, config) |
|
|
|
} |
|
|
|
|
|
|
|
// SaveErasureCodingTaskPolicy saves complete EC task policy to protobuf file
|
|
|
|
func (cp *ConfigPersistence) SaveErasureCodingTaskPolicy(policy *worker_pb.TaskPolicy) error { |
|
|
|
return cp.saveTaskConfig(ECTaskConfigFile, policy) |
|
|
|
} |
|
|
|
|
|
|
|
// LoadErasureCodingTaskConfig loads EC task configuration from protobuf file
|
|
|
|
func (cp *ConfigPersistence) LoadErasureCodingTaskConfig() (*ErasureCodingTaskConfig, error) { |
|
|
|
// Load as TaskPolicy and extract EC config
|
|
|
|
if taskPolicy, err := cp.LoadErasureCodingTaskPolicy(); err == nil && taskPolicy != nil { |
|
|
|
if ecConfig := taskPolicy.GetErasureCodingConfig(); ecConfig != nil { |
|
|
|
return ecConfig, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Return default config if no valid config found
|
|
|
|
return &ErasureCodingTaskConfig{ |
|
|
|
FullnessRatio: 0.9, |
|
|
|
QuietForSeconds: 3600, |
|
|
|
MinVolumeSizeMb: 1024, |
|
|
|
CollectionFilter: "", |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
// LoadErasureCodingTaskPolicy loads complete EC task policy from protobuf file
|
|
|
|
func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) { |
|
|
|
if cp.dataDir == "" { |
|
|
|
// Return default policy if no data directory
|
|
|
|
return &worker_pb.TaskPolicy{ |
|
|
|
Enabled: true, |
|
|
|
MaxConcurrent: 1, |
|
|
|
RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
|
|
|
|
CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
|
|
|
|
TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ |
|
|
|
ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ |
|
|
|
FullnessRatio: 0.9, |
|
|
|
QuietForSeconds: 3600, |
|
|
|
MinVolumeSizeMb: 1024, |
|
|
|
CollectionFilter: "", |
|
|
|
}, |
|
|
|
}, |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
confDir := filepath.Join(cp.dataDir, ConfigSubdir) |
|
|
|
configPath := filepath.Join(confDir, ECTaskConfigFile) |
|
|
|
|
|
|
|
// Check if file exists
|
|
|
|
if _, err := os.Stat(configPath); os.IsNotExist(err) { |
|
|
|
// Return default policy if file doesn't exist
|
|
|
|
return &worker_pb.TaskPolicy{ |
|
|
|
Enabled: true, |
|
|
|
MaxConcurrent: 1, |
|
|
|
RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
|
|
|
|
CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
|
|
|
|
TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ |
|
|
|
ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ |
|
|
|
FullnessRatio: 0.9, |
|
|
|
QuietForSeconds: 3600, |
|
|
|
MinVolumeSizeMb: 1024, |
|
|
|
CollectionFilter: "", |
|
|
|
}, |
|
|
|
}, |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
// Read file
|
|
|
|
configData, err := os.ReadFile(configPath) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to read EC task config file: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Try to unmarshal as TaskPolicy
|
|
|
|
var policy worker_pb.TaskPolicy |
|
|
|
if err := proto.Unmarshal(configData, &policy); err == nil { |
|
|
|
// Validate that it's actually a TaskPolicy with EC config
|
|
|
|
if policy.GetErasureCodingConfig() != nil { |
|
|
|
glog.V(1).Infof("Loaded EC task policy from %s", configPath) |
|
|
|
return &policy, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil, fmt.Errorf("failed to unmarshal EC task configuration") |
|
|
|
} |
|
|
|
|
|
|
|
// saveTaskConfig is a generic helper for saving task configurations with both protobuf and JSON reference
|
|
|
|
func (cp *ConfigPersistence) saveTaskConfig(filename string, config proto.Message) error { |
|
|
|
if cp.dataDir == "" { |
|
|
|
@ -502,37 +402,6 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { |
|
|
|
return info |
|
|
|
} |
|
|
|
|
|
|
|
// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
|
|
|
|
func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { |
|
|
|
policy := &worker_pb.MaintenancePolicy{ |
|
|
|
GlobalMaxConcurrent: 4, |
|
|
|
DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
|
|
|
|
DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
|
|
|
|
TaskPolicies: make(map[string]*worker_pb.TaskPolicy), |
|
|
|
} |
|
|
|
|
|
|
|
// Load erasure coding task configuration
|
|
|
|
if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil { |
|
|
|
policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{ |
|
|
|
Enabled: ecConfig.Enabled, |
|
|
|
MaxConcurrent: int32(ecConfig.MaxConcurrent), |
|
|
|
RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), |
|
|
|
CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), |
|
|
|
TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ |
|
|
|
ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ |
|
|
|
FullnessRatio: float64(ecConfig.FullnessRatio), |
|
|
|
QuietForSeconds: int32(ecConfig.QuietForSeconds), |
|
|
|
MinVolumeSizeMb: int32(ecConfig.MinSizeMB), |
|
|
|
CollectionFilter: ecConfig.CollectionFilter, |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) |
|
|
|
return policy |
|
|
|
} |
|
|
|
|
|
|
|
// SaveTaskDetail saves detailed task information to disk
|
|
|
|
func (cp *ConfigPersistence) SaveTaskDetail(taskID string, detail *maintenance.TaskDetailData) error { |
|
|
|
if cp.dataDir == "" { |
|
|
|
|