diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index a587d1b96..5995d2eef 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -192,11 +192,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) { var config interface{} switch taskType { case types.TaskTypeVacuum: - config = &vacuum.VacuumConfig{} + config = &vacuum.Config{} case types.TaskTypeBalance: - config = &balance.BalanceConfig{} + config = &balance.Config{} case types.TaskTypeErasureCoding: - config = &erasure_coding.ErasureCodingConfig{} + config = &erasure_coding.Config{} default: c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName}) return diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go index 9e8fc7093..ea867d950 100644 --- a/weed/worker/tasks/balance/balance.go +++ b/weed/worker/tasks/balance/balance.go @@ -4,10 +4,8 @@ import ( "fmt" "time" - "github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -82,264 +80,3 @@ func (t *Task) EstimateTime(params types.TaskParams) time.Duration { // Could adjust based on volume size or cluster state return baseTime } - -// BalanceConfig extends BaseConfig with balance-specific settings -type BalanceConfig struct { - base.BaseConfig - ImbalanceThreshold float64 `json:"imbalance_threshold"` - MinServerCount int `json:"min_server_count"` -} - -// balanceDetection implements the detection logic for balance tasks -func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - balanceConfig := config.(*BalanceConfig) - - // Skip if cluster is too small - minVolumeCount := 10 - if len(metrics) < minVolumeCount { - return nil, nil - } - - // Analyze volume distribution across servers - serverVolumeCounts := make(map[string]int) - for _, metric := range metrics { - serverVolumeCounts[metric.Server]++ - } - - if len(serverVolumeCounts) < balanceConfig.MinServerCount { - return nil, nil - } - - // Calculate balance metrics - totalVolumes := len(metrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) - - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" - - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server - } - if count < minVolumes { - minVolumes = count - minServer = server - } - } - - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= balanceConfig.ImbalanceThreshold { - return nil, nil - } - - // Create balance task - reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - - task := &types.TaskDetectionResult{ - TaskType: types.TaskTypeBalance, - Priority: types.TaskPriorityNormal, - Reason: reason, - ScheduleAt: time.Now(), - Parameters: map[string]interface{}{ - "imbalance_ratio": imbalanceRatio, - "threshold": balanceConfig.ImbalanceThreshold, - "max_volumes": maxVolumes, - "min_volumes": minVolumes, - "avg_volumes_per_server": avgVolumesPerServer, - "max_server": maxServer, - "min_server": minServer, - "total_servers": len(serverVolumeCounts), - }, - } - - return []*types.TaskDetectionResult{task}, nil -} - -// balanceScheduling implements the scheduling logic for balance tasks -func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - balanceConfig := config.(*BalanceConfig) - - // Count running balance tasks - runningBalanceCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeBalance { - runningBalanceCount++ - } - } - - // Check concurrency limit - if runningBalanceCount >= balanceConfig.MaxConcurrent { - return false - } - - // Check if we have available workers - availableWorkerCount := 0 - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeBalance { - availableWorkerCount++ - break - } - } - } - - return availableWorkerCount > 0 -} - -// createBalanceTask creates a new balance task instance -func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) { - // Extract configuration from params - var config *BalanceConfig - if configData, ok := params.Parameters["config"]; ok { - if configMap, ok := configData.(map[string]interface{}); ok { - config = &BalanceConfig{} - if err := config.FromMap(configMap); err != nil { - return nil, fmt.Errorf("failed to parse balance config: %v", err) - } - } - } - - if config == nil { - config = &BalanceConfig{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 30 * 60, // 30 minutes - MaxConcurrent: 1, - }, - ImbalanceThreshold: 0.2, // 20% - MinServerCount: 2, - } - } - - // Create and return the balance task using existing Task type - return NewTask(params.Server, params.VolumeID, params.Collection), nil -} - -// getBalanceConfigSpec returns the configuration schema for balance tasks -func getBalanceConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Balance Tasks", - Description: "Whether balance tasks should be automatically created", - HelpText: "Toggle this to enable or disable automatic balance task generation", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 30 * 60, - MinValue: 5 * 60, - MaxValue: 2 * 60 * 60, - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volume distribution imbalances", - HelpText: "The system will check for volume distribution imbalances at this interval", - Placeholder: "30", - Unit: config.UnitMinutes, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 1, - MinValue: 1, - MaxValue: 3, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of balance tasks that can run simultaneously", - HelpText: "Limits the number of balance operations running at the same time", - Placeholder: "1 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "imbalance_threshold", - JSONName: "imbalance_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.2, - MinValue: 0.05, - MaxValue: 0.5, - Required: true, - DisplayName: "Imbalance Threshold", - Description: "Minimum imbalance ratio to trigger balancing", - HelpText: "Volume distribution imbalances above this threshold will trigger balancing", - Placeholder: "0.20 (20%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_server_count", - JSONName: "min_server_count", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 2, - MaxValue: 10, - Required: true, - DisplayName: "Minimum Server Count", - Description: "Minimum number of servers required for balancing", - HelpText: "Balancing will only occur if there are at least this many servers", - Placeholder: "2 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - }, - } -} - -// initBalance registers the refactored balance task -func initBalance() { - // Create configuration instance - config := &BalanceConfig{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 30 * 60, // 30 minutes - MaxConcurrent: 1, - }, - ImbalanceThreshold: 0.2, // 20% - MinServerCount: 2, - } - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeBalance, - Name: "balance", - DisplayName: "Volume Balance", - Description: "Balances volume distribution across servers", - Icon: "fas fa-balance-scale text-warning", - Capabilities: []string{"balance", "distribution"}, - - Config: config, - ConfigSpec: getBalanceConfigSpec(), - CreateTask: createBalanceTask, - DetectionFunc: balanceDetection, - ScanInterval: 30 * time.Minute, - SchedulingFunc: balanceScheduling, - MaxConcurrent: 1, - RepeatInterval: 2 * time.Hour, - } - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go index 5df8e9631..6a23cbbbd 100644 --- a/weed/worker/tasks/balance/balance_register.go +++ b/weed/worker/tasks/balance/balance_register.go @@ -1,7 +1,41 @@ package balance +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + // Auto-register this task when the package is imported func init() { - // Use new architecture instead of old registration - initBalance() + RegisterBalanceTask() +} + +// RegisterBalanceTask registers the balance task with the new architecture +func RegisterBalanceTask() { + // Create configuration instance + config := NewDefaultConfig() + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeBalance, + Name: "balance", + DisplayName: "Volume Balance", + Description: "Balances volume distribution across servers", + Icon: "fas fa-balance-scale text-warning", + Capabilities: []string{"balance", "distribution"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: CreateTask, + DetectionFunc: Detection, + ScanInterval: 30 * time.Minute, + SchedulingFunc: Scheduling, + MaxConcurrent: 1, + RepeatInterval: 2 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) } diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go new file mode 100644 index 000000000..9ee1fa777 --- /dev/null +++ b/weed/worker/tasks/balance/config.go @@ -0,0 +1,110 @@ +package balance + +import ( + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with balance-specific settings +type Config struct { + base.BaseConfig + ImbalanceThreshold float64 `json:"imbalance_threshold"` + MinServerCount int `json:"min_server_count"` +} + +// NewDefaultConfig creates a new default balance configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30 * 60, // 30 minutes + MaxConcurrent: 1, + }, + ImbalanceThreshold: 0.2, // 20% + MinServerCount: 2, + } +} + +// GetConfigSpec returns the configuration schema for balance tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Balance Tasks", + Description: "Whether balance tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic balance task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 30 * 60, + MinValue: 5 * 60, + MaxValue: 2 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volume distribution imbalances", + HelpText: "The system will check for volume distribution imbalances at this interval", + Placeholder: "30", + Unit: config.UnitMinutes, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 3, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of balance tasks that can run simultaneously", + HelpText: "Limits the number of balance operations running at the same time", + Placeholder: "1 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "imbalance_threshold", + JSONName: "imbalance_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.2, + MinValue: 0.05, + MaxValue: 0.5, + Required: true, + DisplayName: "Imbalance Threshold", + Description: "Minimum imbalance ratio to trigger balancing", + HelpText: "Volume distribution imbalances above this threshold will trigger balancing", + Placeholder: "0.20 (20%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_server_count", + JSONName: "min_server_count", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 2, + MaxValue: 10, + Required: true, + DisplayName: "Minimum Server Count", + Description: "Minimum number of servers required for balancing", + HelpText: "Balancing will only occur if there are at least this many servers", + Placeholder: "2 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go new file mode 100644 index 000000000..6249d8fd7 --- /dev/null +++ b/weed/worker/tasks/balance/detection.go @@ -0,0 +1,135 @@ +package balance + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection implements the detection logic for balance tasks +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + balanceConfig := config.(*Config) + + // Skip if cluster is too small + minVolumeCount := 10 + if len(metrics) < minVolumeCount { + return nil, nil + } + + // Analyze volume distribution across servers + serverVolumeCounts := make(map[string]int) + for _, metric := range metrics { + serverVolumeCounts[metric.Server]++ + } + + if len(serverVolumeCounts) < balanceConfig.MinServerCount { + return nil, nil + } + + // Calculate balance metrics + totalVolumes := len(metrics) + avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) + + maxVolumes := 0 + minVolumes := totalVolumes + maxServer := "" + minServer := "" + + for server, count := range serverVolumeCounts { + if count > maxVolumes { + maxVolumes = count + maxServer = server + } + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + // Check if imbalance exceeds threshold + imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer + if imbalanceRatio <= balanceConfig.ImbalanceThreshold { + return nil, nil + } + + // Create balance task + reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", + imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + + task := &types.TaskDetectionResult{ + TaskType: types.TaskTypeBalance, + Priority: types.TaskPriorityNormal, + Reason: reason, + ScheduleAt: time.Now(), + Parameters: map[string]interface{}{ + "imbalance_ratio": imbalanceRatio, + "threshold": balanceConfig.ImbalanceThreshold, + "max_volumes": maxVolumes, + "min_volumes": minVolumes, + "avg_volumes_per_server": avgVolumesPerServer, + "max_server": maxServer, + "min_server": minServer, + "total_servers": len(serverVolumeCounts), + }, + } + + return []*types.TaskDetectionResult{task}, nil +} + +// Scheduling implements the scheduling logic for balance tasks +func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + balanceConfig := config.(*Config) + + // Count running balance tasks + runningBalanceCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeBalance { + runningBalanceCount++ + } + } + + // Check concurrency limit + if runningBalanceCount >= balanceConfig.MaxConcurrent { + return false + } + + // Check if we have available workers + availableWorkerCount := 0 + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeBalance { + availableWorkerCount++ + break + } + } + } + + return availableWorkerCount > 0 +} + +// CreateTask creates a new balance task instance +func CreateTask(params types.TaskParams) (types.TaskInterface, error) { + // Extract configuration from params + var config *Config + if configData, ok := params.Parameters["config"]; ok { + if configMap, ok := configData.(map[string]interface{}); ok { + config = &Config{} + if err := config.FromMap(configMap); err != nil { + return nil, fmt.Errorf("failed to parse balance config: %v", err) + } + } + } + + if config == nil { + config = NewDefaultConfig() + } + + // Create and return the balance task using existing Task type + return NewTask(params.Server, params.VolumeID, params.Collection), nil +} diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go new file mode 100644 index 000000000..2eaaccdba --- /dev/null +++ b/weed/worker/tasks/erasure_coding/config.go @@ -0,0 +1,125 @@ +package erasure_coding + +import ( + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with erasure coding specific settings +type Config struct { + base.BaseConfig + QuietForSeconds int `json:"quiet_for_seconds"` + FullnessRatio float64 `json:"fullness_ratio"` + CollectionFilter string `json:"collection_filter"` +} + +// NewDefaultConfig creates a new default erasure coding configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 60 * 60, // 1 hour + MaxConcurrent: 1, + }, + QuietForSeconds: 300, // 5 minutes + FullnessRatio: 0.8, // 80% + CollectionFilter: "", + } +} + +// GetConfigSpec returns the configuration schema for erasure coding tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Erasure Coding Tasks", + Description: "Whether erasure coding tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic erasure coding task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing erasure coding", + HelpText: "The system will check for volumes that need erasure coding at this interval", + Placeholder: "1", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 5, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of erasure coding tasks that can run simultaneously", + HelpText: "Limits the number of erasure coding operations running at the same time", + Placeholder: "1 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "quiet_for_seconds", + JSONName: "quiet_for_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 300, + MinValue: 60, + MaxValue: 3600, + Required: true, + DisplayName: "Quiet Period", + Description: "Minimum time volume must be quiet before erasure coding", + HelpText: "Volume must not be modified for this duration before erasure coding", + Placeholder: "5", + Unit: config.UnitMinutes, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "fullness_ratio", + JSONName: "fullness_ratio", + Type: config.FieldTypeFloat, + DefaultValue: 0.8, + MinValue: 0.1, + MaxValue: 1.0, + Required: true, + DisplayName: "Fullness Ratio", + Description: "Minimum fullness ratio to trigger erasure coding", + HelpText: "Only volumes with this fullness ratio or higher will be erasure coded", + Placeholder: "0.80 (80%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "collection_filter", + JSONName: "collection_filter", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Collection Filter", + Description: "Only process volumes from specific collections", + HelpText: "Leave empty to process all collections, or specify collection name", + Placeholder: "my_collection", + InputType: "text", + CSSClasses: "form-control", + }, + }, + } +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go new file mode 100644 index 000000000..756da6fea --- /dev/null +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -0,0 +1,126 @@ +package erasure_coding + +import ( + "fmt" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection implements the detection logic for erasure coding tasks +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + ecConfig := config.(*Config) + var results []*types.TaskDetectionResult + now := time.Now() + quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second + minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum + + for _, metric := range metrics { + // Skip if already EC volume + if metric.IsECVolume { + continue + } + + // Check minimum size requirement + if metric.Size < minSizeBytes { + continue + } + + // Check collection filter if specified + if ecConfig.CollectionFilter != "" { + // Parse comma-separated collections + allowedCollections := make(map[string]bool) + for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { + allowedCollections[strings.TrimSpace(collection)] = true + } + // Skip if volume's collection is not in the allowed list + if !allowedCollections[metric.Collection] { + continue + } + } + + // Check quiet duration and fullness criteria + if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeErasureCoding, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: types.TaskPriorityLow, // EC is not urgent + Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)", + metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, + float64(metric.Size)/(1024*1024)), + Parameters: map[string]interface{}{ + "age_seconds": int(metric.Age.Seconds()), + "fullness_ratio": metric.FullnessRatio, + "size_mb": int(metric.Size / (1024 * 1024)), + }, + ScheduleAt: now, + } + results = append(results, result) + } + } + + return results, nil +} + +// Scheduling implements the scheduling logic for erasure coding tasks +func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + ecConfig := config.(*Config) + + // Check if we have available workers + if len(availableWorkers) == 0 { + return false + } + + // Count running EC tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeErasureCoding { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecConfig.MaxConcurrent { + return false + } + + // Check if any worker can handle EC tasks + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeErasureCoding { + return true + } + } + } + + return false +} + +// CreateTask creates a new erasure coding task instance +func CreateTask(params types.TaskParams) (types.TaskInterface, error) { + // Extract configuration from params + var config *Config + if configData, ok := params.Parameters["config"]; ok { + if configMap, ok := configData.(map[string]interface{}); ok { + config = &Config{} + if err := config.FromMap(configMap); err != nil { + return nil, fmt.Errorf("failed to parse erasure coding config: %v", err) + } + } + } + + if config == nil { + config = NewDefaultConfig() + } + + // Create and return the erasure coding task using existing Task type + return NewTask(params.Server, params.VolumeID), nil +} diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go index bbf15c079..61c13188f 100644 --- a/weed/worker/tasks/erasure_coding/ec.go +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -7,13 +7,11 @@ import ( "math" "os" "path/filepath" - "strings" "sync" "time" "google.golang.org/grpc" - "github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -23,7 +21,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc/credentials/insecure" ) @@ -1019,270 +1016,3 @@ func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32) glog.V(1).Infof("MOUNT SUCCESS: Shard %d successfully mounted on %s", shardId, targetServer) return nil } - -// ErasureCodingConfig extends BaseConfig with erasure coding specific settings -type ErasureCodingConfig struct { - base.BaseConfig - QuietForSeconds int `json:"quiet_for_seconds"` - FullnessRatio float64 `json:"fullness_ratio"` - CollectionFilter string `json:"collection_filter"` -} - -// ecDetection implements the detection logic for erasure coding tasks -func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - ecConfig := config.(*ErasureCodingConfig) - var results []*types.TaskDetectionResult - now := time.Now() - quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second - minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum - - for _, metric := range metrics { - // Skip if already EC volume - if metric.IsECVolume { - continue - } - - // Check minimum size requirement - if metric.Size < minSizeBytes { - continue - } - - // Check collection filter if specified - if ecConfig.CollectionFilter != "" { - // Parse comma-separated collections - allowedCollections := make(map[string]bool) - for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { - allowedCollections[strings.TrimSpace(collection)] = true - } - // Skip if volume's collection is not in the allowed list - if !allowedCollections[metric.Collection] { - continue - } - } - - // Check quiet duration and fullness criteria - if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeErasureCoding, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: types.TaskPriorityLow, // EC is not urgent - Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)", - metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, - float64(metric.Size)/(1024*1024)), - Parameters: map[string]interface{}{ - "age_seconds": int(metric.Age.Seconds()), - "fullness_ratio": metric.FullnessRatio, - "size_mb": int(metric.Size / (1024 * 1024)), - }, - ScheduleAt: now, - } - results = append(results, result) - } - } - - return results, nil -} - -// ecScheduling implements the scheduling logic for erasure coding tasks -func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - ecConfig := config.(*ErasureCodingConfig) - - // Check if we have available workers - if len(availableWorkers) == 0 { - return false - } - - // Count running EC tasks - runningCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { - runningCount++ - } - } - - // Check concurrency limit - if runningCount >= ecConfig.MaxConcurrent { - return false - } - - // Check if any worker can handle EC tasks - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { - return true - } - } - } - - return false -} - -// createErasureCodingTask creates a new erasure coding task instance -func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) { - // Extract configuration from params - var config *ErasureCodingConfig - if configData, ok := params.Parameters["config"]; ok { - if configMap, ok := configData.(map[string]interface{}); ok { - config = &ErasureCodingConfig{} - if err := config.FromMap(configMap); err != nil { - return nil, fmt.Errorf("failed to parse erasure coding config: %v", err) - } - } - } - - if config == nil { - config = &ErasureCodingConfig{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 60 * 60, // 1 hour - MaxConcurrent: 1, - }, - QuietForSeconds: 300, // 5 minutes - FullnessRatio: 0.8, // 80% - CollectionFilter: "", - } - } - - // Create and return the erasure coding task using existing Task type - return NewTask(params.Server, params.VolumeID), nil -} - -// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks -func getErasureCodingConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Erasure Coding Tasks", - Description: "Whether erasure coding tasks should be automatically created", - HelpText: "Toggle this to enable or disable automatic erasure coding task generation", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 60 * 60, - MinValue: 10 * 60, - MaxValue: 24 * 60 * 60, - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing erasure coding", - HelpText: "The system will check for volumes that need erasure coding at this interval", - Placeholder: "1", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 1, - MinValue: 1, - MaxValue: 5, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of erasure coding tasks that can run simultaneously", - HelpText: "Limits the number of erasure coding operations running at the same time", - Placeholder: "1 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "quiet_for_seconds", - JSONName: "quiet_for_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 300, - MinValue: 60, - MaxValue: 3600, - Required: true, - DisplayName: "Quiet Period", - Description: "Minimum time volume must be quiet before erasure coding", - HelpText: "Volume must not be modified for this duration before erasure coding", - Placeholder: "5", - Unit: config.UnitMinutes, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "fullness_ratio", - JSONName: "fullness_ratio", - Type: config.FieldTypeFloat, - DefaultValue: 0.8, - MinValue: 0.1, - MaxValue: 1.0, - Required: true, - DisplayName: "Fullness Ratio", - Description: "Minimum fullness ratio to trigger erasure coding", - HelpText: "Only volumes with this fullness ratio or higher will be erasure coded", - Placeholder: "0.80 (80%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "collection_filter", - JSONName: "collection_filter", - Type: config.FieldTypeString, - DefaultValue: "", - Required: false, - DisplayName: "Collection Filter", - Description: "Only process volumes from specific collections", - HelpText: "Leave empty to process all collections, or specify collection name", - Placeholder: "my_collection", - InputType: "text", - CSSClasses: "form-control", - }, - }, - } -} - -// initErasureCoding registers the refactored erasure coding task -func initErasureCoding() { - // Create configuration instance - config := &ErasureCodingConfig{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 60 * 60, // 1 hour - MaxConcurrent: 1, - }, - QuietForSeconds: 300, // 5 minutes - FullnessRatio: 0.8, // 80% - CollectionFilter: "", - } - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeErasureCoding, - Name: "erasure_coding", - DisplayName: "Erasure Coding", - Description: "Applies erasure coding to volumes for data protection", - Icon: "fas fa-shield-alt text-success", - Capabilities: []string{"erasure_coding", "data_protection"}, - - Config: config, - ConfigSpec: getErasureCodingConfigSpec(), - CreateTask: createErasureCodingTask, - DetectionFunc: ecDetection, - ScanInterval: 1 * time.Hour, - SchedulingFunc: ecScheduling, - MaxConcurrent: 1, - RepeatInterval: 24 * time.Hour, - } - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go index 5c68741e3..807e9705d 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/ec_register.go @@ -1,7 +1,41 @@ package erasure_coding +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + // Auto-register this task when the package is imported func init() { - // Use new architecture instead of old registration - initErasureCoding() + RegisterErasureCodingTask() +} + +// RegisterErasureCodingTask registers the erasure coding task with the new architecture +func RegisterErasureCodingTask() { + // Create configuration instance + config := NewDefaultConfig() + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeErasureCoding, + Name: "erasure_coding", + DisplayName: "Erasure Coding", + Description: "Applies erasure coding to volumes for data protection", + Icon: "fas fa-shield-alt text-success", + Capabilities: []string{"erasure_coding", "data_protection"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: CreateTask, + DetectionFunc: Detection, + ScanInterval: 1 * time.Hour, + SchedulingFunc: Scheduling, + MaxConcurrent: 1, + RepeatInterval: 24 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) } diff --git a/weed/worker/tasks/vacuum/config.go b/weed/worker/tasks/vacuum/config.go new file mode 100644 index 000000000..39ce10c3d --- /dev/null +++ b/weed/worker/tasks/vacuum/config.go @@ -0,0 +1,128 @@ +package vacuum + +import ( + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with vacuum-specific settings +type Config struct { + base.BaseConfig + GarbageThreshold float64 `json:"garbage_threshold"` + MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` + MinIntervalSeconds int `json:"min_interval_seconds"` +} + +// NewDefaultConfig creates a new default vacuum configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 2 * 60 * 60, // 2 hours + MaxConcurrent: 2, + }, + GarbageThreshold: 0.3, // 30% + MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + } +} + +// GetConfigSpec returns the configuration schema for vacuum tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Vacuum Tasks", + Description: "Whether vacuum tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic vacuum task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 2 * 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing vacuum", + HelpText: "The system will check for volumes that need vacuuming at this interval", + Placeholder: "2", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 10, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of vacuum tasks that can run simultaneously", + HelpText: "Limits the number of vacuum operations running at the same time to control system load", + Placeholder: "2 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "garbage_threshold", + JSONName: "garbage_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.3, + MinValue: 0.0, + MaxValue: 1.0, + Required: true, + DisplayName: "Garbage Percentage Threshold", + Description: "Trigger vacuum when garbage ratio exceeds this percentage", + HelpText: "Volumes with more deleted content than this threshold will be vacuumed", + Placeholder: "0.30 (30%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_volume_age_seconds", + JSONName: "min_volume_age_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 24 * 60 * 60, + MinValue: 1 * 60 * 60, + MaxValue: 7 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Volume Age", + Description: "Only vacuum volumes older than this duration", + HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", + Placeholder: "24", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "min_interval_seconds", + JSONName: "min_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 7 * 24 * 60 * 60, + MinValue: 1 * 24 * 60 * 60, + MaxValue: 30 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Interval", + Description: "Minimum time between vacuum operations on the same volume", + HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", + Placeholder: "7", + Unit: config.UnitDays, + InputType: "interval", + CSSClasses: "form-control", + }, + }, + } +} diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go new file mode 100644 index 000000000..3ae23814a --- /dev/null +++ b/weed/worker/tasks/vacuum/detection.go @@ -0,0 +1,99 @@ +package vacuum + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection implements the detection logic for vacuum tasks +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + vacuumConfig := config.(*Config) + var results []*types.TaskDetectionResult + minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second + + for _, metric := range metrics { + // Check if volume needs vacuum + if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { + priority := types.TaskPriorityNormal + if metric.GarbageRatio > 0.6 { + priority = types.TaskPriorityHigh + } + + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeVacuum, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: priority, + Reason: "Volume has excessive garbage requiring vacuum", + Parameters: map[string]interface{}{ + "garbage_ratio": metric.GarbageRatio, + "volume_age": metric.Age.String(), + }, + ScheduleAt: time.Now(), + } + results = append(results, result) + } + } + + return results, nil +} + +// Scheduling implements the scheduling logic for vacuum tasks +func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + vacuumConfig := config.(*Config) + + // Count running vacuum tasks + runningVacuumCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeVacuum { + runningVacuumCount++ + } + } + + // Check concurrency limit + if runningVacuumCount >= vacuumConfig.MaxConcurrent { + return false + } + + // Check for available workers with vacuum capability + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeVacuum { + return true + } + } + } + } + + return false +} + +// CreateTask creates a new vacuum task instance +func CreateTask(params types.TaskParams) (types.TaskInterface, error) { + // Extract configuration from params + var config *Config + if configData, ok := params.Parameters["config"]; ok { + if configMap, ok := configData.(map[string]interface{}); ok { + config = &Config{} + if err := config.FromMap(configMap); err != nil { + return nil, fmt.Errorf("failed to parse vacuum config: %v", err) + } + } + } + + if config == nil { + config = NewDefaultConfig() + } + + // Create and return the vacuum task using existing Task type + return NewTask(params.Server, params.VolumeID), nil +} diff --git a/weed/worker/tasks/vacuum/vacuum.go b/weed/worker/tasks/vacuum/vacuum.go index fe5ebc96e..24233f59b 100644 --- a/weed/worker/tasks/vacuum/vacuum.go +++ b/weed/worker/tasks/vacuum/vacuum.go @@ -7,12 +7,10 @@ import ( "strconv" "time" - "github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -199,247 +197,3 @@ func (t *Task) GetProgress() float64 { func (t *Task) Cancel() error { return t.BaseTask.Cancel() } - -// VacuumConfig extends BaseConfig with vacuum-specific settings -type VacuumConfig struct { - base.BaseConfig - GarbageThreshold float64 `json:"garbage_threshold"` - MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` - MinIntervalSeconds int `json:"min_interval_seconds"` -} - -// vacuumDetection implements the detection logic for vacuum tasks -func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - vacuumConfig := config.(*VacuumConfig) - var results []*types.TaskDetectionResult - minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second - - for _, metric := range metrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - Parameters: map[string]interface{}{ - "garbage_ratio": metric.GarbageRatio, - "volume_age": metric.Age.String(), - }, - ScheduleAt: time.Now(), - } - results = append(results, result) - } - } - - return results, nil -} - -// vacuumScheduling implements the scheduling logic for vacuum tasks -func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - vacuumConfig := config.(*VacuumConfig) - - // Count running vacuum tasks - runningVacuumCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeVacuum { - runningVacuumCount++ - } - } - - // Check concurrency limit - if runningVacuumCount >= vacuumConfig.MaxConcurrent { - return false - } - - // Check for available workers with vacuum capability - for _, worker := range availableWorkers { - if worker.CurrentLoad < worker.MaxConcurrent { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeVacuum { - return true - } - } - } - } - - return false -} - -// createVacuumTask creates a new vacuum task instance -func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) { - // Extract configuration from params - var config *VacuumConfig - if configData, ok := params.Parameters["config"]; ok { - if configMap, ok := configData.(map[string]interface{}); ok { - config = &VacuumConfig{} - if err := config.FromMap(configMap); err != nil { - return nil, fmt.Errorf("failed to parse vacuum config: %v", err) - } - } - } - - if config == nil { - config = &VacuumConfig{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 2 * 60 * 60, // 2 hours - MaxConcurrent: 2, - }, - GarbageThreshold: 0.3, // 30% - MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - } - } - - // Create and return the vacuum task using existing Task type - return NewTask(params.Server, params.VolumeID), nil -} - -// getVacuumConfigSpec returns the configuration schema for vacuum tasks -func getVacuumConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Vacuum Tasks", - Description: "Whether vacuum tasks should be automatically created", - HelpText: "Toggle this to enable or disable automatic vacuum task generation", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 2 * 60 * 60, - MinValue: 10 * 60, - MaxValue: 24 * 60 * 60, - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing vacuum", - HelpText: "The system will check for volumes that need vacuuming at this interval", - Placeholder: "2", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 1, - MaxValue: 10, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of vacuum tasks that can run simultaneously", - HelpText: "Limits the number of vacuum operations running at the same time to control system load", - Placeholder: "2 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "garbage_threshold", - JSONName: "garbage_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.3, - MinValue: 0.0, - MaxValue: 1.0, - Required: true, - DisplayName: "Garbage Percentage Threshold", - Description: "Trigger vacuum when garbage ratio exceeds this percentage", - HelpText: "Volumes with more deleted content than this threshold will be vacuumed", - Placeholder: "0.30 (30%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_volume_age_seconds", - JSONName: "min_volume_age_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 24 * 60 * 60, - MinValue: 1 * 60 * 60, - MaxValue: 7 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Volume Age", - Description: "Only vacuum volumes older than this duration", - HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", - Placeholder: "24", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "min_interval_seconds", - JSONName: "min_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 7 * 24 * 60 * 60, - MinValue: 1 * 24 * 60 * 60, - MaxValue: 30 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Interval", - Description: "Minimum time between vacuum operations on the same volume", - HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", - Placeholder: "7", - Unit: config.UnitDays, - InputType: "interval", - CSSClasses: "form-control", - }, - }, - } -} - -// initVacuum registers the refactored vacuum task (replaces the old registration) -func initVacuum() { - // Create configuration instance - config := &VacuumConfig{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 2 * 60 * 60, // 2 hours - MaxConcurrent: 2, - }, - GarbageThreshold: 0.3, // 30% - MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - } - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeVacuum, - Name: "vacuum", - DisplayName: "Volume Vacuum", - Description: "Reclaims disk space by removing deleted files from volumes", - Icon: "fas fa-broom text-primary", - Capabilities: []string{"vacuum", "storage"}, - - Config: config, - ConfigSpec: getVacuumConfigSpec(), - CreateTask: createVacuumTask, - DetectionFunc: vacuumDetection, - ScanInterval: 2 * time.Hour, - SchedulingFunc: vacuumScheduling, - MaxConcurrent: 2, - RepeatInterval: 7 * 24 * time.Hour, - } - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} diff --git a/weed/worker/tasks/vacuum/vacuum_register.go b/weed/worker/tasks/vacuum/vacuum_register.go index 9d247be71..1d00937b0 100644 --- a/weed/worker/tasks/vacuum/vacuum_register.go +++ b/weed/worker/tasks/vacuum/vacuum_register.go @@ -1,7 +1,41 @@ package vacuum +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + // Auto-register this task when the package is imported func init() { - // Use new architecture instead of old registration - initVacuum() + RegisterVacuumTask() +} + +// RegisterVacuumTask registers the vacuum task with the new architecture +func RegisterVacuumTask() { + // Create configuration instance + config := NewDefaultConfig() + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeVacuum, + Name: "vacuum", + DisplayName: "Volume Vacuum", + Description: "Reclaims disk space by removing deleted files from volumes", + Icon: "fas fa-broom text-primary", + Capabilities: []string{"vacuum", "storage"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: CreateTask, + DetectionFunc: Detection, + ScanInterval: 2 * time.Hour, + SchedulingFunc: Scheduling, + MaxConcurrent: 2, + RepeatInterval: 7 * 24 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) }