From 05a0cc156b9cea47e56ae5f96f9634a35525860d Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 00:15:26 -0700 Subject: [PATCH] Self-Contained Design To prove the system is truly self-contained: To add a new task: Create a task package (e.g., worker/tasks/compression/) Import it: _ "github.com/.../worker/tasks/compression" That's it! No other changes needed. To remove a task: Delete the task package directory Remove the import line That's it! No other changes needed. --- weed/admin/dash/config_persistence.go | 38 ++++++++++ weed/admin/handlers/maintenance_handlers.go | 75 ++++++++----------- weed/admin/topology/internal.go | 7 +- weed/admin/topology/storage_impact.go | 19 ++--- weed/admin/topology/task_management.go | 8 +- weed/admin/topology/types.go | 9 +-- weed/command/admin.go | 4 + weed/command/worker.go | 3 + weed/worker/tasks/erasure_coding/detection.go | 8 +- weed/worker/tasks/erasure_coding/ec_task.go | 2 +- weed/worker/tasks/erasure_coding/register.go | 4 +- .../worker/tasks/erasure_coding/scheduling.go | 4 +- weed/worker/tasks/registry.go | 59 +++++++++++++++ weed/worker/types/task_types.go | 45 +++++++++-- 14 files changed, 197 insertions(+), 88 deletions(-) diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index bdc9794f4..b71d2aba3 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -455,6 +455,44 @@ func (cp *ConfigPersistence) IsConfigured() bool { return cp.dataDir != "" } +// SaveTaskPolicyGeneric saves a task policy for any task type dynamically +func (cp *ConfigPersistence) SaveTaskPolicyGeneric(taskType string, policy *worker_pb.TaskPolicy) error { + filename := fmt.Sprintf("task_%s.pb", taskType) + return cp.saveTaskConfig(filename, policy) +} + +// LoadTaskPolicyGeneric loads a task policy for any task type dynamically +func (cp *ConfigPersistence) LoadTaskPolicyGeneric(taskType string) (*worker_pb.TaskPolicy, error) { + filename := fmt.Sprintf("task_%s.pb", taskType) + + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory configured") + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, filename) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + return nil, fmt.Errorf("no configuration found for task type: %s", taskType) + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read task config file: %w", err) + } + + // Unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err != nil { + return nil, fmt.Errorf("failed to unmarshal task configuration: %w", err) + } + + glog.V(1).Infof("Loaded task policy for %s from %s", taskType, configPath) + return &policy, nil +} + // GetConfigInfo returns information about the configuration storage func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { info := map[string]interface{}{ diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index 7404cfc1e..87fc23d49 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -17,7 +17,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/view/layout" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -230,27 +230,15 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) { return } - // Create a new config instance based on task type and apply schema defaults - var config TaskConfig - switch taskType { - case types.TaskTypeErasureCoding: - config = &erasure_coding.Config{} - default: - c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName}) - return - } - - // Apply schema defaults first using type-safe method - if err := schema.ApplyDefaultsToConfig(config); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to apply defaults: " + err.Error()}) - return - } - - // First, get the current configuration to preserve existing values + // Get the config instance from the UI provider - this is a dynamic approach + // that doesn't require hardcoding task types currentUIRegistry := tasks.GetGlobalUIRegistry() currentTypesRegistry := tasks.GetGlobalTypesRegistry() + var config types.TaskConfig var currentProvider types.TaskUIProvider + + // Find the UI provider for this task type for workerTaskType := range currentTypesRegistry.GetAllDetectors() { if string(workerTaskType) == string(taskType) { currentProvider = currentUIRegistry.GetProvider(workerTaskType) @@ -258,16 +246,26 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) { } } - if currentProvider != nil { - // Copy current config values to the new config - currentConfig := currentProvider.GetCurrentConfig() - if currentConfigProtobuf, ok := currentConfig.(TaskConfig); ok { - // Apply current values using protobuf directly - no map conversion needed! - currentPolicy := currentConfigProtobuf.ToTaskPolicy() - if err := config.FromTaskPolicy(currentPolicy); err != nil { - glog.Warningf("Failed to load current config for %s: %v", taskTypeName, err) - } - } + if currentProvider == nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName}) + return + } + + // Get a config instance from the UI provider + config = currentProvider.GetCurrentConfig() + if config == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get config for task type: " + taskTypeName}) + return + } + + // Apply schema defaults - config instances should already have defaults applied during creation + glog.V(2).Infof("Using config defaults for task type: %s", taskTypeName) + + // Copy current config values (currentProvider is already set above) + // Apply current values using protobuf directly - no map conversion needed! + currentPolicy := config.ToTaskPolicy() + if err := config.FromTaskPolicy(currentPolicy); err != nil { + glog.Warningf("Failed to load current config for %s: %v", taskTypeName, err) } // Parse form data using schema-based approach (this will override with new values) @@ -277,14 +275,8 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) { return } - // Debug logging - show parsed config values - switch taskType { - case types.TaskTypeErasureCoding: - if ecConfig, ok := config.(*erasure_coding.Config); ok { - glog.V(1).Infof("Parsed EC config - FullnessRatio: %f, QuietForSeconds: %d, MinSizeMB: %d, CollectionFilter: '%s'", - ecConfig.FullnessRatio, ecConfig.QuietForSeconds, ecConfig.MinSizeMB, ecConfig.CollectionFilter) - } - } + // Debug logging - config parsed for task type + glog.V(1).Infof("Parsed configuration for task type: %s", taskTypeName) // Validate the configuration if validationErrors := schema.ValidateConfig(config); len(validationErrors) > 0 { @@ -553,7 +545,7 @@ func (h *MaintenanceHandlers) updateMaintenanceConfig(config *maintenance.Mainte } // saveTaskConfigToProtobuf saves task configuration to protobuf file -func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType, config TaskConfig) error { +func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType, config types.TaskConfig) error { configPersistence := h.adminServer.GetConfigPersistence() if configPersistence == nil { return fmt.Errorf("config persistence not available") @@ -562,11 +554,6 @@ func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType, // Use the new ToTaskPolicy method - much simpler and more maintainable! taskPolicy := config.ToTaskPolicy() - // Save using task-specific methods - switch taskType { - case types.TaskTypeErasureCoding: - return configPersistence.SaveErasureCodingTaskPolicy(taskPolicy) - default: - return fmt.Errorf("unsupported task type for protobuf persistence: %s", taskType) - } + // Save using generic method - no more hardcoded task types! + return configPersistence.SaveTaskPolicyGeneric(string(taskType), taskPolicy) } diff --git a/weed/admin/topology/internal.go b/weed/admin/topology/internal.go index 72e37f6c1..8533ac39e 100644 --- a/weed/admin/topology/internal.go +++ b/weed/admin/topology/internal.go @@ -84,11 +84,10 @@ func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) b // areTaskTypesConflicting checks if two task types conflict func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool { - // Examples of conflicting task types + // Define conflicting task types dynamically + // For now, assume no task types conflict (can be made configurable later) conflictMap := map[TaskType][]TaskType{ - TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding}, - TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding}, - TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance}, + // No conflicts defined currently - this can be made configurable per task } if conflicts, exists := conflictMap[existing]; exists { diff --git a/weed/admin/topology/storage_impact.go b/weed/admin/topology/storage_impact.go index e325fc9cf..fcbc5aa9e 100644 --- a/weed/admin/topology/storage_impact.go +++ b/weed/admin/topology/storage_impact.go @@ -7,30 +7,21 @@ import ( // CalculateTaskStorageImpact calculates storage impact for different task types func CalculateTaskStorageImpact(taskType TaskType, volumeSize int64) (sourceChange, targetChange StorageSlotChange) { - switch taskType { - case TaskTypeErasureCoding: + switch string(taskType) { + case "erasure_coding": // EC task: distributes shards to MULTIPLE targets, source reserves with zero impact // Source reserves capacity but with zero StorageSlotChange (no actual capacity consumption during planning) - // WARNING: EC has multiple targets! Use AddPendingTask with multiple destinations for proper multi-target handling + // WARNING: EC has multiple targets! Use AddPendingTask with multiple destinations for proper multi-destination calculation // This simplified function returns zero impact; real EC requires specialized multi-destination calculation return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0} - case TaskTypeBalance: - // Balance task: moves volume from source to target - // Source loses 1 volume, target gains 1 volume - return StorageSlotChange{VolumeSlots: -1, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} - - case TaskTypeVacuum: - // Vacuum task: frees space by removing deleted entries, no slot change - return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0} - - case TaskTypeReplication: + case "replication": // Replication task: creates new replica on target return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} default: // Unknown task type, assume minimal impact - glog.Warningf("unhandled task type %s in CalculateTaskStorageImpact, assuming default impact", taskType) + glog.V(2).Infof("Task type %s not specifically handled in CalculateTaskStorageImpact, using default impact", taskType) return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} } } diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index ada60248b..19c731f19 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -203,16 +203,16 @@ func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error { // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { - switch taskType { - case TaskTypeErasureCoding: + switch string(taskType) { + case "erasure_coding": switch cleanupType { case CleanupVolumeReplica: - impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) + impact, _ := CalculateTaskStorageImpact(taskType, volumeSize) return impact case CleanupECShards: return CalculateECShardCleanupImpact(volumeSize) default: - impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) + impact, _ := CalculateTaskStorageImpact(taskType, volumeSize) return impact } default: diff --git a/weed/admin/topology/types.go b/weed/admin/topology/types.go index df0103529..747b04da9 100644 --- a/weed/admin/topology/types.go +++ b/weed/admin/topology/types.go @@ -3,19 +3,12 @@ package topology import "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" // TaskType represents different types of maintenance operations +// Task types are now dynamically registered - use the worker/types package for task type operations type TaskType string // TaskStatus represents the current status of a task type TaskStatus string -// Common task type constants -const ( - TaskTypeVacuum TaskType = "vacuum" - TaskTypeBalance TaskType = "balance" - TaskTypeErasureCoding TaskType = "erasure_coding" - TaskTypeReplication TaskType = "replication" -) - // Common task status constants const ( TaskStatusPending TaskStatus = "pending" diff --git a/weed/command/admin.go b/weed/command/admin.go index c1b55f105..0f85b6478 100644 --- a/weed/command/admin.go +++ b/weed/command/admin.go @@ -25,6 +25,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" ) var ( @@ -229,6 +230,9 @@ func startAdminServer(ctx context.Context, options AdminOptions) error { fmt.Printf("Data directory created/verified: %s\n", dataDir) } + // Initialize dynamic task type functions now that all tasks are registered + tasks.InitializeDynamicTaskTypes() + // Create admin server adminServer := dash.NewAdminServer(*options.masters, nil, dataDir) diff --git a/weed/command/worker.go b/weed/command/worker.go index ec51f6825..90440c3b8 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -107,6 +107,9 @@ func runWorker(cmd *Command, args []string) bool { // Create gRPC dial option using TLS configuration grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") + // Initialize dynamic task type functions now that all tasks are registered + tasks.InitializeDynamicTaskTypes() + // Create worker configuration config := &types.WorkerConfig{ AdminServer: *workerAdminServer, diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index cd74bed33..bb7c2c70f 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -68,7 +68,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI result := &types.TaskDetectionResult{ TaskID: taskID, // Link to ActiveTopology pending task - TaskType: types.TaskTypeErasureCoding, + TaskType: types.TaskType("erasure_coding"), VolumeID: metric.VolumeID, Server: metric.Server, Collection: metric.Collection, @@ -168,7 +168,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ TaskID: taskID, - TaskType: topology.TaskTypeErasureCoding, + TaskType: topology.TaskType("erasure_coding"), VolumeID: metric.VolumeID, VolumeSize: int64(metric.Size), Sources: sources, @@ -279,7 +279,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1 // For EC, we need at least 1 available volume slot on a disk to consider it for placement. // Note: We don't exclude the source server since the original volume will be deleted after EC conversion - availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1) + availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskType("erasure_coding"), "", 1) if len(availableDisks) < erasure_coding.MinTotalDisks { return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks)) } @@ -322,7 +322,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity) // Log storage impact for EC task (source only - EC has multiple targets handled individually) - sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size)) + sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskType("erasure_coding"), int64(metric.Size)) glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d", sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size) glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact") diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index 4678ab534..7e6b04954 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -42,7 +42,7 @@ type ErasureCodingTask struct { // NewErasureCodingTask creates a new unified EC task instance func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask { return &ErasureCodingTask{ - BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding), + BaseTask: base.NewBaseTask(id, types.TaskType("erasure_coding")), server: server, volumeID: volumeID, collection: collection, diff --git a/weed/worker/tasks/erasure_coding/register.go b/weed/worker/tasks/erasure_coding/register.go index e574e0033..a2e6406a0 100644 --- a/weed/worker/tasks/erasure_coding/register.go +++ b/weed/worker/tasks/erasure_coding/register.go @@ -19,7 +19,7 @@ func init() { RegisterErasureCodingTask() // Register config updater - tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence) + tasks.AutoRegisterConfigUpdater(types.TaskType("erasure_coding"), UpdateConfigFromPersistence) } // RegisterErasureCodingTask registers the erasure coding task with the new architecture @@ -29,7 +29,7 @@ func RegisterErasureCodingTask() { // Create complete task definition taskDef := &base.TaskDefinition{ - Type: types.TaskTypeErasureCoding, + Type: types.TaskType("erasure_coding"), Name: "erasure_coding", DisplayName: "Erasure Coding", Description: "Applies erasure coding to volumes for data protection", diff --git a/weed/worker/tasks/erasure_coding/scheduling.go b/weed/worker/tasks/erasure_coding/scheduling.go index d9d891e04..77c075f38 100644 --- a/weed/worker/tasks/erasure_coding/scheduling.go +++ b/weed/worker/tasks/erasure_coding/scheduling.go @@ -17,7 +17,7 @@ func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availabl // Count running EC tasks runningCount := 0 for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { + if runningTask.Type == types.TaskType("erasure_coding") { runningCount++ } } @@ -30,7 +30,7 @@ func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availabl // Check if any worker can handle EC tasks for _, worker := range availableWorkers { for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { + if capability == types.TaskType("erasure_coding") { return true } } diff --git a/weed/worker/tasks/registry.go b/weed/worker/tasks/registry.go index 626a54a14..77d9a8d0b 100644 --- a/weed/worker/tasks/registry.go +++ b/weed/worker/tasks/registry.go @@ -146,3 +146,62 @@ func (r *TaskRegistry) GetAll() map[types.TaskType]types.TaskFactory { } return result } + +// InitializeDynamicTaskTypes sets up the dynamic task type functions +// This should be called after all tasks have been registered +func InitializeDynamicTaskTypes() { + // Set up the function variables in the types package + types.GetAvailableTaskTypes = func() []types.TaskType { + typesRegistry := GetGlobalTypesRegistry() + var taskTypes []types.TaskType + for taskType := range typesRegistry.GetAllDetectors() { + taskTypes = append(taskTypes, taskType) + } + return taskTypes + } + + types.IsTaskTypeAvailable = func(taskType types.TaskType) bool { + typesRegistry := GetGlobalTypesRegistry() + detectors := typesRegistry.GetAllDetectors() + _, exists := detectors[taskType] + return exists + } + + types.GetTaskType = func(name string) (types.TaskType, bool) { + taskType := types.TaskType(name) + if types.IsTaskTypeAvailable(taskType) { + return taskType, true + } + return "", false + } + + glog.V(1).Infof("Initialized dynamic task type functions") +} + +// GetAllRegisteredTaskTypes returns all currently registered task types +func GetAllRegisteredTaskTypes() []types.TaskType { + if types.GetAvailableTaskTypes != nil { + return types.GetAvailableTaskTypes() + } + + // Fallback: get directly from registry + typesRegistry := GetGlobalTypesRegistry() + var taskTypes []types.TaskType + for taskType := range typesRegistry.GetAllDetectors() { + taskTypes = append(taskTypes, taskType) + } + return taskTypes +} + +// IsTaskTypeRegistered checks if a task type is currently registered +func IsTaskTypeRegistered(taskType types.TaskType) bool { + if types.IsTaskTypeAvailable != nil { + return types.IsTaskTypeAvailable(taskType) + } + + // Fallback: check directly in registry + typesRegistry := GetGlobalTypesRegistry() + detectors := typesRegistry.GetAllDetectors() + _, exists := detectors[taskType] + return exists +} diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go index 761b5a1fd..738b21fc2 100644 --- a/weed/worker/types/task_types.go +++ b/weed/worker/types/task_types.go @@ -8,13 +8,10 @@ import ( ) // TaskType represents the type of maintenance task +// Task types are now dynamically registered by individual task packages +// No hardcoded constants - use registry functions to discover available tasks type TaskType string -const ( - TaskTypeErasureCoding TaskType = "erasure_coding" - TaskTypeReplication TaskType = "replication" -) - // TaskStatus represents the status of a maintenance task type TaskStatus string @@ -93,3 +90,41 @@ type ClusterReplicationTask struct { CreatedAt time.Time `json:"created_at"` Metadata map[string]string `json:"metadata,omitempty"` } + +// TaskTypeRegistry provides dynamic access to registered task types +// This avoids hardcoded constants and allows tasks to be self-contained +type TaskTypeRegistry interface { + GetAllTaskTypes() []TaskType + IsTaskTypeRegistered(taskType TaskType) bool + GetTaskTypeByName(name string) (TaskType, bool) +} + +// GetAvailableTaskTypes returns all dynamically registered task types +// This function will be implemented by importing a registry package that +// collects task types from all registered task packages +var GetAvailableTaskTypes func() []TaskType + +// IsTaskTypeAvailable checks if a task type is registered and available +var IsTaskTypeAvailable func(TaskType) bool + +// GetTaskType converts a string to TaskType if it's registered +var GetTaskType func(string) (TaskType, bool) + +// Common task type accessor functions that will be set by the registry +// These allow other packages to get task types without hardcoded constants + +// GetErasureCodingTaskType returns the erasure coding task type if registered +func GetErasureCodingTaskType() (TaskType, bool) { + if GetTaskType != nil { + return GetTaskType("erasure_coding") + } + return "", false +} + +// GetReplicationTaskType returns the replication task type if registered +func GetReplicationTaskType() (TaskType, bool) { + if GetTaskType != nil { + return GetTaskType("replication") + } + return "", false +}