From 1b40075d2af286601f11e410524ed3f574b26713 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 27 Jul 2025 00:43:02 -0700 Subject: [PATCH] use new framework --- .gitignore | 2 + weed/worker/tasks/balance/balance.go | 241 +++++++++++++++++ weed/worker/tasks/balance/balance_v2.go | 274 -------------------- weed/worker/tasks/base/task_definition.go | 179 ++++++++++++- weed/worker/tasks/erasure_coding/ec.go | 264 +++++++++++++++++++ weed/worker/tasks/erasure_coding/ec_v2.go | 301 ---------------------- weed/worker/tasks/vacuum/vacuum.go | 232 +++++++++++++++++ weed/worker/tasks/vacuum/vacuum_v2.go | 269 ------------------- weed/worker/tasks/vacuum_v2/vacuum.go | 260 ------------------- 9 files changed, 904 insertions(+), 1118 deletions(-) delete mode 100644 weed/worker/tasks/balance/balance_v2.go delete mode 100644 weed/worker/tasks/erasure_coding/ec_v2.go delete mode 100644 weed/worker/tasks/vacuum/vacuum_v2.go delete mode 100644 weed/worker/tasks/vacuum_v2/vacuum.go diff --git a/.gitignore b/.gitignore index 8e7041175..b330bbd96 100644 --- a/.gitignore +++ b/.gitignore @@ -113,3 +113,5 @@ test/s3/retention/weed-test.log /test/s3/versioning/test-volume-data test/s3/versioning/weed-test.log /docker/admin_integration/data +docker/agent_pub_record +docker/admin_integration/weed-local diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go index ea867d950..e6950b053 100644 --- a/weed/worker/tasks/balance/balance.go +++ b/weed/worker/tasks/balance/balance.go @@ -4,8 +4,10 @@ import ( "fmt" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -80,3 +82,242 @@ func (t *Task) EstimateTime(params types.TaskParams) time.Duration { // Could adjust based on volume size or cluster state return baseTime } + +// BalanceConfigV2 extends BaseConfig with balance-specific settings +type BalanceConfigV2 struct { + base.BaseConfig + ImbalanceThreshold float64 `json:"imbalance_threshold"` + MinServerCount int `json:"min_server_count"` +} + +// balanceDetection implements the detection logic for balance tasks +func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + balanceConfig := config.(*BalanceConfigV2) + + // Skip if cluster is too small + minVolumeCount := 10 + if len(metrics) < minVolumeCount { + return nil, nil + } + + // Analyze volume distribution across servers + serverVolumeCounts := make(map[string]int) + for _, metric := range metrics { + serverVolumeCounts[metric.Server]++ + } + + if len(serverVolumeCounts) < balanceConfig.MinServerCount { + return nil, nil + } + + // Calculate balance metrics + totalVolumes := len(metrics) + avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) + + maxVolumes := 0 + minVolumes := totalVolumes + maxServer := "" + minServer := "" + + for server, count := range serverVolumeCounts { + if count > maxVolumes { + maxVolumes = count + maxServer = server + } + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + // Check if imbalance exceeds threshold + imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer + if imbalanceRatio <= balanceConfig.ImbalanceThreshold { + return nil, nil + } + + // Create balance task + reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", + imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + + task := &types.TaskDetectionResult{ + TaskType: types.TaskTypeBalance, + Priority: types.TaskPriorityNormal, + Reason: reason, + ScheduleAt: time.Now(), + Parameters: map[string]interface{}{ + "imbalance_ratio": imbalanceRatio, + "threshold": balanceConfig.ImbalanceThreshold, + "max_volumes": maxVolumes, + "min_volumes": minVolumes, + "avg_volumes_per_server": avgVolumesPerServer, + "max_server": maxServer, + "min_server": minServer, + "total_servers": len(serverVolumeCounts), + }, + } + + return []*types.TaskDetectionResult{task}, nil +} + +// balanceScheduling implements the scheduling logic for balance tasks +func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + balanceConfig := config.(*BalanceConfigV2) + + // Count running balance tasks + runningBalanceCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeBalance { + runningBalanceCount++ + } + } + + // Check concurrency limit + if runningBalanceCount >= balanceConfig.MaxConcurrent { + return false + } + + // Check if we have available workers + availableWorkerCount := 0 + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeBalance { + availableWorkerCount++ + break + } + } + } + + return availableWorkerCount > 0 +} + +// createBalanceTask creates a balance task instance +func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) { + // Validate parameters + if params.VolumeID == 0 { + return nil, fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return nil, fmt.Errorf("server is required") + } + + task := NewTask(params.Server, params.VolumeID, params.Collection) + task.SetEstimatedDuration(task.EstimateTime(params)) + return task, nil +} + +// getBalanceConfigSpec returns the configuration schema for balance tasks +func getBalanceConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Balance Tasks", + Description: "Whether balance tasks should be automatically created", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "imbalance_threshold", + JSONName: "imbalance_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.1, // 10% + MinValue: 0.01, + MaxValue: 0.5, + Required: true, + DisplayName: "Imbalance Threshold", + Description: "Trigger balance when storage imbalance exceeds this ratio", + Placeholder: "0.10 (10%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 6 * 60 * 60, // 6 hours + MinValue: 1 * 60 * 60, // 1 hour + MaxValue: 24 * 60 * 60, // 24 hours + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for imbalanced volumes", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 5, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of balance tasks that can run simultaneously", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_server_count", + JSONName: "min_server_count", + Type: config.FieldTypeInt, + DefaultValue: 3, + MinValue: 2, + MaxValue: 20, + Required: true, + DisplayName: "Minimum Server Count", + Description: "Only balance when at least this many servers are available", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} + +// initBalanceV2 registers the refactored balance task +func initBalanceV2() { + // Create configuration instance + config := &BalanceConfigV2{ + BaseConfig: base.BaseConfig{ + Enabled: false, // Conservative default + ScanIntervalSeconds: 6 * 60 * 60, // 6 hours + MaxConcurrent: 2, + }, + ImbalanceThreshold: 0.1, // 10% + MinServerCount: 3, + } + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeBalance, + Name: "balance", + DisplayName: "Volume Balance", + Description: "Redistributes volumes across volume servers to optimize storage utilization", + Icon: "fas fa-balance-scale text-secondary", + Capabilities: []string{"balance", "storage", "optimization"}, + + Config: config, + ConfigSpec: getBalanceConfigSpec(), + CreateTask: createBalanceTask, + DetectionFunc: balanceDetection, + ScanInterval: 6 * time.Hour, + SchedulingFunc: balanceScheduling, + MaxConcurrent: 2, + RepeatInterval: 12 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) +} diff --git a/weed/worker/tasks/balance/balance_v2.go b/weed/worker/tasks/balance/balance_v2.go deleted file mode 100644 index cb66d26d8..000000000 --- a/weed/worker/tasks/balance/balance_v2.go +++ /dev/null @@ -1,274 +0,0 @@ -package balance - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// BalanceConfigV2 extends BaseConfig with balance-specific settings -type BalanceConfigV2 struct { - base.BaseConfig - ImbalanceThreshold float64 `json:"imbalance_threshold"` - MinServerCount int `json:"min_server_count"` -} - -// ToMap converts config to map (extend base functionality) -func (c *BalanceConfigV2) ToMap() map[string]interface{} { - result := c.BaseConfig.ToMap() - result["imbalance_threshold"] = c.ImbalanceThreshold - result["min_server_count"] = c.MinServerCount - return result -} - -// FromMap loads config from map (extend base functionality) -func (c *BalanceConfigV2) FromMap(data map[string]interface{}) error { - // Load base config first - if err := c.BaseConfig.FromMap(data); err != nil { - return err - } - - // Load balance-specific config - if threshold, ok := data["imbalance_threshold"].(float64); ok { - c.ImbalanceThreshold = threshold - } - if serverCount, ok := data["min_server_count"].(int); ok { - c.MinServerCount = serverCount - } - return nil -} - -// balanceDetection implements the detection logic for balance tasks -func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - balanceConfig := config.(*BalanceConfigV2) - - // Skip if cluster is too small - minVolumeCount := 10 - if len(metrics) < minVolumeCount { - return nil, nil - } - - // Analyze volume distribution across servers - serverVolumeCounts := make(map[string]int) - for _, metric := range metrics { - serverVolumeCounts[metric.Server]++ - } - - if len(serverVolumeCounts) < balanceConfig.MinServerCount { - return nil, nil - } - - // Calculate balance metrics - totalVolumes := len(metrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) - - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" - - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server - } - if count < minVolumes { - minVolumes = count - minServer = server - } - } - - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= balanceConfig.ImbalanceThreshold { - return nil, nil - } - - // Create balance task - reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - - task := &types.TaskDetectionResult{ - TaskType: types.TaskTypeBalance, - Priority: types.TaskPriorityNormal, - Reason: reason, - ScheduleAt: time.Now(), - Parameters: map[string]interface{}{ - "imbalance_ratio": imbalanceRatio, - "threshold": balanceConfig.ImbalanceThreshold, - "max_volumes": maxVolumes, - "min_volumes": minVolumes, - "avg_volumes_per_server": avgVolumesPerServer, - "max_server": maxServer, - "min_server": minServer, - "total_servers": len(serverVolumeCounts), - }, - } - - return []*types.TaskDetectionResult{task}, nil -} - -// balanceScheduling implements the scheduling logic for balance tasks -func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - balanceConfig := config.(*BalanceConfigV2) - - // Count running balance tasks - runningBalanceCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeBalance { - runningBalanceCount++ - } - } - - // Check concurrency limit - if runningBalanceCount >= balanceConfig.MaxConcurrent { - return false - } - - // Check if we have available workers - availableWorkerCount := 0 - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeBalance { - availableWorkerCount++ - break - } - } - } - - return availableWorkerCount > 0 -} - -// createBalanceTask creates a balance task instance -func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID, params.Collection) - task.SetEstimatedDuration(task.EstimateTime(params)) - return task, nil -} - -// getBalanceConfigSpec returns the configuration schema for balance tasks -func getBalanceConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Balance Tasks", - Description: "Whether balance tasks should be automatically created", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "imbalance_threshold", - JSONName: "imbalance_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.1, // 10% - MinValue: 0.01, - MaxValue: 0.5, - Required: true, - DisplayName: "Imbalance Threshold", - Description: "Trigger balance when storage imbalance exceeds this ratio", - Placeholder: "0.10 (10%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 6 * 60 * 60, // 6 hours - MinValue: 1 * 60 * 60, // 1 hour - MaxValue: 24 * 60 * 60, // 24 hours - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for imbalanced volumes", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 1, - MaxValue: 5, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of balance tasks that can run simultaneously", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_server_count", - JSONName: "min_server_count", - Type: config.FieldTypeInt, - DefaultValue: 3, - MinValue: 2, - MaxValue: 20, - Required: true, - DisplayName: "Minimum Server Count", - Description: "Only balance when at least this many servers are available", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - }, - } -} - -// initBalanceV2 registers the refactored balance task -func initBalanceV2() { - // Create configuration instance - config := &BalanceConfigV2{ - BaseConfig: base.BaseConfig{ - Enabled: false, // Conservative default - ScanIntervalSeconds: 6 * 60 * 60, // 6 hours - MaxConcurrent: 2, - }, - ImbalanceThreshold: 0.1, // 10% - MinServerCount: 3, - } - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeBalance, - Name: "balance", - DisplayName: "Volume Balance", - Description: "Redistributes volumes across volume servers to optimize storage utilization", - Icon: "fas fa-balance-scale text-secondary", - Capabilities: []string{"balance", "storage", "optimization"}, - - Config: config, - ConfigSpec: getBalanceConfigSpec(), - CreateTask: createBalanceTask, - DetectionFunc: balanceDetection, - ScanInterval: 6 * time.Hour, - SchedulingFunc: balanceScheduling, - MaxConcurrent: 2, - RepeatInterval: 12 * time.Hour, - } - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go index c32471664..9d8b6124d 100644 --- a/weed/worker/tasks/base/task_definition.go +++ b/weed/worker/tasks/base/task_definition.go @@ -1,6 +1,9 @@ package base import ( + "fmt" + "reflect" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/config" @@ -48,7 +51,7 @@ type ConfigSpec struct { Fields []*config.Field } -// BaseConfig provides common configuration fields +// BaseConfig provides common configuration fields with reflection-based serialization type BaseConfig struct { Enabled bool `json:"enabled"` ScanIntervalSeconds int `json:"scan_interval_seconds"` @@ -71,25 +74,173 @@ func (c *BaseConfig) Validate() error { return nil } -// ToMap converts config to map -func (c *BaseConfig) ToMap() map[string]interface{} { - return map[string]interface{}{ - "enabled": c.Enabled, - "scan_interval_seconds": c.ScanIntervalSeconds, - "max_concurrent": c.MaxConcurrent, +// StructToMap converts any struct to a map using reflection +func StructToMap(obj interface{}) map[string]interface{} { + result := make(map[string]interface{}) + val := reflect.ValueOf(obj) + + // Handle pointer to struct + if val.Kind() == reflect.Ptr { + val = val.Elem() } + + if val.Kind() != reflect.Struct { + return result + } + + typ := val.Type() + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Skip unexported fields + if !field.CanInterface() { + continue + } + + // Get JSON tag name + jsonTag := fieldType.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + continue + } + + // Remove options like ",omitempty" + if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 { + jsonTag = jsonTag[:commaIdx] + } + + // Handle embedded structs recursively + if field.Kind() == reflect.Struct && fieldType.Anonymous { + embeddedMap := StructToMap(field.Interface()) + for k, v := range embeddedMap { + result[k] = v + } + } else { + result[jsonTag] = field.Interface() + } + } + + return result } -// FromMap loads config from map +// MapToStruct loads data from map into struct using reflection +func MapToStruct(data map[string]interface{}, obj interface{}) error { + val := reflect.ValueOf(obj) + + // Must be pointer to struct + if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct { + return fmt.Errorf("obj must be pointer to struct") + } + + val = val.Elem() + typ := val.Type() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Skip unexported fields + if !field.CanSet() { + continue + } + + // Get JSON tag name + jsonTag := fieldType.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + continue + } + + // Remove options like ",omitempty" + if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 { + jsonTag = jsonTag[:commaIdx] + } + + // Handle embedded structs recursively + if field.Kind() == reflect.Struct && fieldType.Anonymous { + err := MapToStruct(data, field.Addr().Interface()) + if err != nil { + return err + } + } else if value, exists := data[jsonTag]; exists { + err := setFieldValue(field, value) + if err != nil { + return fmt.Errorf("failed to set field %s: %v", jsonTag, err) + } + } + } + + return nil +} + +// ToMap converts config to map using reflection +func (c *BaseConfig) ToMap() map[string]interface{} { + return StructToMap(c) +} + +// FromMap loads config from map using reflection func (c *BaseConfig) FromMap(data map[string]interface{}) error { - if enabled, ok := data["enabled"].(bool); ok { - c.Enabled = enabled + return MapToStruct(data, c) +} + +// setFieldValue sets a field value with type conversion +func setFieldValue(field reflect.Value, value interface{}) error { + if value == nil { + return nil } - if interval, ok := data["scan_interval_seconds"].(int); ok { - c.ScanIntervalSeconds = interval + + valueVal := reflect.ValueOf(value) + fieldType := field.Type() + valueType := valueVal.Type() + + // Direct assignment if types match + if valueType.AssignableTo(fieldType) { + field.Set(valueVal) + return nil } - if concurrent, ok := data["max_concurrent"].(int); ok { - c.MaxConcurrent = concurrent + + // Type conversion for common cases + switch fieldType.Kind() { + case reflect.Bool: + if b, ok := value.(bool); ok { + field.SetBool(b) + } else { + return fmt.Errorf("cannot convert %T to bool", value) + } + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + switch v := value.(type) { + case int: + field.SetInt(int64(v)) + case int32: + field.SetInt(int64(v)) + case int64: + field.SetInt(v) + case float64: + field.SetInt(int64(v)) + default: + return fmt.Errorf("cannot convert %T to int", value) + } + case reflect.Float32, reflect.Float64: + switch v := value.(type) { + case float32: + field.SetFloat(float64(v)) + case float64: + field.SetFloat(v) + case int: + field.SetFloat(float64(v)) + case int64: + field.SetFloat(float64(v)) + default: + return fmt.Errorf("cannot convert %T to float", value) + } + case reflect.String: + if s, ok := value.(string); ok { + field.SetString(s) + } else { + return fmt.Errorf("cannot convert %T to string", value) + } + default: + return fmt.Errorf("unsupported field type %s", fieldType.Kind()) } + return nil } diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go index 61c13188f..2079082c7 100644 --- a/weed/worker/tasks/erasure_coding/ec.go +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -7,11 +7,13 @@ import ( "math" "os" "path/filepath" + "strings" "sync" "time" "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -21,6 +23,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc/credentials/insecure" ) @@ -1016,3 +1019,264 @@ func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32) glog.V(1).Infof("MOUNT SUCCESS: Shard %d successfully mounted on %s", shardId, targetServer) return nil } + +// ErasureCodingConfigV2 extends BaseConfig with erasure coding specific settings +type ErasureCodingConfigV2 struct { + base.BaseConfig + QuietForSeconds int `json:"quiet_for_seconds"` + FullnessRatio float64 `json:"fullness_ratio"` + CollectionFilter string `json:"collection_filter"` +} + +// ecDetection implements the detection logic for erasure coding tasks +func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + ecConfig := config.(*ErasureCodingConfigV2) + var results []*types.TaskDetectionResult + now := time.Now() + quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second + minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum + + for _, metric := range metrics { + // Skip if already EC volume + if metric.IsECVolume { + continue + } + + // Check minimum size requirement + if metric.Size < minSizeBytes { + continue + } + + // Check collection filter if specified + if ecConfig.CollectionFilter != "" { + // Parse comma-separated collections + allowedCollections := make(map[string]bool) + for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { + allowedCollections[strings.TrimSpace(collection)] = true + } + // Skip if volume's collection is not in the allowed list + if !allowedCollections[metric.Collection] { + continue + } + } + + // Check quiet duration and fullness criteria + if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeErasureCoding, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: types.TaskPriorityLow, // EC is not urgent + Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)", + metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, + float64(metric.Size)/(1024*1024)), + Parameters: map[string]interface{}{ + "age_seconds": int(metric.Age.Seconds()), + "fullness_ratio": metric.FullnessRatio, + "size_mb": int(metric.Size / (1024 * 1024)), + }, + ScheduleAt: now, + } + results = append(results, result) + } + } + + return results, nil +} + +// ecScheduling implements the scheduling logic for erasure coding tasks +func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + ecConfig := config.(*ErasureCodingConfigV2) + + // Check if we have available workers + if len(availableWorkers) == 0 { + return false + } + + // Count running EC tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeErasureCoding { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecConfig.MaxConcurrent { + return false + } + + // Check if any worker can handle EC tasks + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeErasureCoding { + return true + } + } + } + + return false +} + +// createErasureCodingTask creates an erasure coding task instance +func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) { + // Validate parameters + if params.VolumeID == 0 { + return nil, fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return nil, fmt.Errorf("server is required") + } + + // Extract additional parameters for comprehensive EC + masterClient := "localhost:9333" // Default master client + workDir := "/tmp/seaweedfs_ec_work" // Default work directory + + if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" { + masterClient = mc + } + if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" { + workDir = wd + } + + // Create EC task with comprehensive capabilities + task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir) + + // Set gRPC dial option if provided + if params.GrpcDialOption != nil { + task.SetDialOption(params.GrpcDialOption) + } + + task.SetEstimatedDuration(task.EstimateTime(params)) + return task, nil +} + +// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks +func getErasureCodingConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Erasure Coding Tasks", + Description: "Whether erasure coding tasks should be automatically created", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "quiet_for_seconds", + JSONName: "quiet_for_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 7 * 24 * 60 * 60, // 7 days + MinValue: 1 * 24 * 60 * 60, // 1 day + MaxValue: 30 * 24 * 60 * 60, // 30 days + Required: true, + DisplayName: "Quiet For Duration", + Description: "Only apply erasure coding to volumes that have not been modified for this duration", + Unit: config.UnitDays, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 12 * 60 * 60, // 12 hours + MinValue: 2 * 60 * 60, // 2 hours + MaxValue: 24 * 60 * 60, // 24 hours + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing erasure coding", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 3, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of erasure coding tasks that can run simultaneously", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "fullness_ratio", + JSONName: "fullness_ratio", + Type: config.FieldTypeFloat, + DefaultValue: 0.9, // 90% + MinValue: 0.5, + MaxValue: 1.0, + Required: true, + DisplayName: "Fullness Ratio", + Description: "Only apply erasure coding to volumes with fullness ratio above this threshold", + Placeholder: "0.90 (90%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "collection_filter", + JSONName: "collection_filter", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Collection Filter", + Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)", + Placeholder: "collection1,collection2", + InputType: "text", + CSSClasses: "form-control", + }, + }, + } +} + +// initErasureCodingV2 registers the refactored erasure coding task +func initErasureCodingV2() { + // Create configuration instance + config := &ErasureCodingConfigV2{ + BaseConfig: base.BaseConfig{ + Enabled: false, // Conservative default - enable via configuration + ScanIntervalSeconds: 12 * 60 * 60, // 12 hours + MaxConcurrent: 1, // Conservative default + }, + QuietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period + FullnessRatio: 0.90, // 90% full threshold + CollectionFilter: "", // No collection filter by default + } + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeErasureCoding, + Name: "erasure_coding", + DisplayName: "Erasure Coding", + Description: "Converts volumes to erasure coded format for improved data durability", + Icon: "fas fa-shield-alt text-info", + Capabilities: []string{"erasure_coding", "storage", "durability"}, + + Config: config, + ConfigSpec: getErasureCodingConfigSpec(), + CreateTask: createErasureCodingTask, + DetectionFunc: ecDetection, + ScanInterval: 12 * time.Hour, + SchedulingFunc: ecScheduling, + MaxConcurrent: 1, + RepeatInterval: 24 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) +} diff --git a/weed/worker/tasks/erasure_coding/ec_v2.go b/weed/worker/tasks/erasure_coding/ec_v2.go deleted file mode 100644 index ca2b28768..000000000 --- a/weed/worker/tasks/erasure_coding/ec_v2.go +++ /dev/null @@ -1,301 +0,0 @@ -package erasure_coding - -import ( - "fmt" - "strings" - "time" - - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// ErasureCodingConfigV2 extends BaseConfig with erasure coding specific settings -type ErasureCodingConfigV2 struct { - base.BaseConfig - QuietForSeconds int `json:"quiet_for_seconds"` - FullnessRatio float64 `json:"fullness_ratio"` - CollectionFilter string `json:"collection_filter"` -} - -// ToMap converts config to map (extend base functionality) -func (c *ErasureCodingConfigV2) ToMap() map[string]interface{} { - result := c.BaseConfig.ToMap() - result["quiet_for_seconds"] = c.QuietForSeconds - result["fullness_ratio"] = c.FullnessRatio - result["collection_filter"] = c.CollectionFilter - return result -} - -// FromMap loads config from map (extend base functionality) -func (c *ErasureCodingConfigV2) FromMap(data map[string]interface{}) error { - // Load base config first - if err := c.BaseConfig.FromMap(data); err != nil { - return err - } - - // Load erasure coding specific config - if quietFor, ok := data["quiet_for_seconds"].(int); ok { - c.QuietForSeconds = quietFor - } - if fullness, ok := data["fullness_ratio"].(float64); ok { - c.FullnessRatio = fullness - } - if filter, ok := data["collection_filter"].(string); ok { - c.CollectionFilter = filter - } - return nil -} - -// ecDetection implements the detection logic for erasure coding tasks -func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - ecConfig := config.(*ErasureCodingConfigV2) - var results []*types.TaskDetectionResult - now := time.Now() - quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second - minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum - - for _, metric := range metrics { - // Skip if already EC volume - if metric.IsECVolume { - continue - } - - // Check minimum size requirement - if metric.Size < minSizeBytes { - continue - } - - // Check collection filter if specified - if ecConfig.CollectionFilter != "" { - // Parse comma-separated collections - allowedCollections := make(map[string]bool) - for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { - allowedCollections[strings.TrimSpace(collection)] = true - } - // Skip if volume's collection is not in the allowed list - if !allowedCollections[metric.Collection] { - continue - } - } - - // Check quiet duration and fullness criteria - if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeErasureCoding, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: types.TaskPriorityLow, // EC is not urgent - Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)", - metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, - float64(metric.Size)/(1024*1024)), - Parameters: map[string]interface{}{ - "age_seconds": int(metric.Age.Seconds()), - "fullness_ratio": metric.FullnessRatio, - "size_mb": int(metric.Size / (1024 * 1024)), - }, - ScheduleAt: now, - } - results = append(results, result) - } - } - - return results, nil -} - -// ecScheduling implements the scheduling logic for erasure coding tasks -func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - ecConfig := config.(*ErasureCodingConfigV2) - - // Check if we have available workers - if len(availableWorkers) == 0 { - return false - } - - // Count running EC tasks - runningCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { - runningCount++ - } - } - - // Check concurrency limit - if runningCount >= ecConfig.MaxConcurrent { - return false - } - - // Check if any worker can handle EC tasks - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { - return true - } - } - } - - return false -} - -// createErasureCodingTask creates an erasure coding task instance -func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - // Extract additional parameters for comprehensive EC - masterClient := "localhost:9333" // Default master client - workDir := "/tmp/seaweedfs_ec_work" // Default work directory - - if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" { - masterClient = mc - } - if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" { - workDir = wd - } - - // Create EC task with comprehensive capabilities - task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir) - - // Set gRPC dial option if provided - if params.GrpcDialOption != nil { - task.SetDialOption(params.GrpcDialOption) - } - - task.SetEstimatedDuration(task.EstimateTime(params)) - return task, nil -} - -// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks -func getErasureCodingConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Erasure Coding Tasks", - Description: "Whether erasure coding tasks should be automatically created", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "quiet_for_seconds", - JSONName: "quiet_for_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 7 * 24 * 60 * 60, // 7 days - MinValue: 1 * 24 * 60 * 60, // 1 day - MaxValue: 30 * 24 * 60 * 60, // 30 days - Required: true, - DisplayName: "Quiet For Duration", - Description: "Only apply erasure coding to volumes that have not been modified for this duration", - Unit: config.UnitDays, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 12 * 60 * 60, // 12 hours - MinValue: 2 * 60 * 60, // 2 hours - MaxValue: 24 * 60 * 60, // 24 hours - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing erasure coding", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 1, - MinValue: 1, - MaxValue: 3, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of erasure coding tasks that can run simultaneously", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "fullness_ratio", - JSONName: "fullness_ratio", - Type: config.FieldTypeFloat, - DefaultValue: 0.9, // 90% - MinValue: 0.5, - MaxValue: 1.0, - Required: true, - DisplayName: "Fullness Ratio", - Description: "Only apply erasure coding to volumes with fullness ratio above this threshold", - Placeholder: "0.90 (90%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "collection_filter", - JSONName: "collection_filter", - Type: config.FieldTypeString, - DefaultValue: "", - Required: false, - DisplayName: "Collection Filter", - Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)", - Placeholder: "collection1,collection2", - InputType: "text", - CSSClasses: "form-control", - }, - }, - } -} - -// initErasureCodingV2 registers the refactored erasure coding task -func initErasureCodingV2() { - // Create configuration instance - config := &ErasureCodingConfigV2{ - BaseConfig: base.BaseConfig{ - Enabled: false, // Conservative default - enable via configuration - ScanIntervalSeconds: 12 * 60 * 60, // 12 hours - MaxConcurrent: 1, // Conservative default - }, - QuietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period - FullnessRatio: 0.90, // 90% full threshold - CollectionFilter: "", // No collection filter by default - } - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeErasureCoding, - Name: "erasure_coding", - DisplayName: "Erasure Coding", - Description: "Converts volumes to erasure coded format for improved data durability", - Icon: "fas fa-shield-alt text-info", - Capabilities: []string{"erasure_coding", "storage", "durability"}, - - Config: config, - ConfigSpec: getErasureCodingConfigSpec(), - CreateTask: createErasureCodingTask, - DetectionFunc: ecDetection, - ScanInterval: 12 * time.Hour, - SchedulingFunc: ecScheduling, - MaxConcurrent: 1, - RepeatInterval: 24 * time.Hour, - } - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} diff --git a/weed/worker/tasks/vacuum/vacuum.go b/weed/worker/tasks/vacuum/vacuum.go index 24233f59b..2377406b1 100644 --- a/weed/worker/tasks/vacuum/vacuum.go +++ b/weed/worker/tasks/vacuum/vacuum.go @@ -7,10 +7,12 @@ import ( "strconv" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -197,3 +199,233 @@ func (t *Task) GetProgress() float64 { func (t *Task) Cancel() error { return t.BaseTask.Cancel() } + +// VacuumConfigV2 extends BaseConfig with vacuum-specific settings +type VacuumConfigV2 struct { + base.BaseConfig + GarbageThreshold float64 `json:"garbage_threshold"` + MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` + MinIntervalSeconds int `json:"min_interval_seconds"` +} + +// vacuumDetection implements the detection logic for vacuum tasks +func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + vacuumConfig := config.(*VacuumConfigV2) + var results []*types.TaskDetectionResult + minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second + + for _, metric := range metrics { + // Check if volume needs vacuum + if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { + priority := types.TaskPriorityNormal + if metric.GarbageRatio > 0.6 { + priority = types.TaskPriorityHigh + } + + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeVacuum, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: priority, + Reason: "Volume has excessive garbage requiring vacuum", + Parameters: map[string]interface{}{ + "garbage_ratio": metric.GarbageRatio, + "volume_age": metric.Age.String(), + }, + ScheduleAt: time.Now(), + } + results = append(results, result) + } + } + + return results, nil +} + +// vacuumScheduling implements the scheduling logic for vacuum tasks +func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + vacuumConfig := config.(*VacuumConfigV2) + + // Count running vacuum tasks + runningVacuumCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeVacuum { + runningVacuumCount++ + } + } + + // Check concurrency limit + if runningVacuumCount >= vacuumConfig.MaxConcurrent { + return false + } + + // Check for available workers with vacuum capability + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeVacuum { + return true + } + } + } + } + + return false +} + +// createVacuumTask creates a vacuum task instance +func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) { + // Validate parameters + if params.VolumeID == 0 { + return nil, fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return nil, fmt.Errorf("server is required") + } + + // Use existing vacuum task implementation + task := NewTask(params.Server, params.VolumeID) + task.SetEstimatedDuration(task.EstimateTime(params)) + return task, nil +} + +// getVacuumConfigSpec returns the configuration schema for vacuum tasks +func getVacuumConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Vacuum Tasks", + Description: "Whether vacuum tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic vacuum task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "garbage_threshold", + JSONName: "garbage_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.3, + MinValue: 0.0, + MaxValue: 1.0, + Required: true, + DisplayName: "Garbage Percentage Threshold", + Description: "Trigger vacuum when garbage ratio exceeds this percentage", + HelpText: "Volumes with more deleted content than this threshold will be vacuumed", + Placeholder: "0.30 (30%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 2 * 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing vacuum", + HelpText: "The system will check for volumes that need vacuuming at this interval", + Placeholder: "2", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 10, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of vacuum tasks that can run simultaneously", + HelpText: "Limits the number of vacuum operations running at the same time to control system load", + Placeholder: "2 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_volume_age_seconds", + JSONName: "min_volume_age_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 24 * 60 * 60, + MinValue: 1 * 60 * 60, + MaxValue: 7 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Volume Age", + Description: "Only vacuum volumes older than this duration", + HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", + Placeholder: "24", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "min_interval_seconds", + JSONName: "min_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 7 * 24 * 60 * 60, + MinValue: 1 * 24 * 60 * 60, + MaxValue: 30 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Interval", + Description: "Minimum time between vacuum operations on the same volume", + HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", + Placeholder: "7", + Unit: config.UnitDays, + InputType: "interval", + CSSClasses: "form-control", + }, + }, + } +} + +// initVacuumV2 registers the refactored vacuum task (replaces the old registration) +func initVacuumV2() { + // Create configuration instance + config := &VacuumConfigV2{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 2 * 60 * 60, // 2 hours + MaxConcurrent: 2, + }, + GarbageThreshold: 0.3, // 30% + MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + } + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeVacuum, + Name: "vacuum", + DisplayName: "Volume Vacuum", + Description: "Reclaims disk space by removing deleted files from volumes", + Icon: "fas fa-broom text-primary", + Capabilities: []string{"vacuum", "storage"}, + + Config: config, + ConfigSpec: getVacuumConfigSpec(), + CreateTask: createVacuumTask, + DetectionFunc: vacuumDetection, + ScanInterval: 2 * time.Hour, + SchedulingFunc: vacuumScheduling, + MaxConcurrent: 2, + RepeatInterval: 7 * 24 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) +} diff --git a/weed/worker/tasks/vacuum/vacuum_v2.go b/weed/worker/tasks/vacuum/vacuum_v2.go deleted file mode 100644 index 29129cb0f..000000000 --- a/weed/worker/tasks/vacuum/vacuum_v2.go +++ /dev/null @@ -1,269 +0,0 @@ -package vacuum - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// VacuumConfigV2 extends BaseConfig with vacuum-specific settings -type VacuumConfigV2 struct { - base.BaseConfig - GarbageThreshold float64 `json:"garbage_threshold"` - MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` - MinIntervalSeconds int `json:"min_interval_seconds"` -} - -// ToMap converts config to map (extend base functionality) -func (c *VacuumConfigV2) ToMap() map[string]interface{} { - result := c.BaseConfig.ToMap() - result["garbage_threshold"] = c.GarbageThreshold - result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds - result["min_interval_seconds"] = c.MinIntervalSeconds - return result -} - -// FromMap loads config from map (extend base functionality) -func (c *VacuumConfigV2) FromMap(data map[string]interface{}) error { - // Load base config first - if err := c.BaseConfig.FromMap(data); err != nil { - return err - } - - // Load vacuum-specific config - if threshold, ok := data["garbage_threshold"].(float64); ok { - c.GarbageThreshold = threshold - } - if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok { - c.MinVolumeAgeSeconds = ageSeconds - } - if intervalSeconds, ok := data["min_interval_seconds"].(int); ok { - c.MinIntervalSeconds = intervalSeconds - } - return nil -} - -// vacuumDetection implements the detection logic for vacuum tasks -func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - vacuumConfig := config.(*VacuumConfigV2) - var results []*types.TaskDetectionResult - minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second - - for _, metric := range metrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - Parameters: map[string]interface{}{ - "garbage_ratio": metric.GarbageRatio, - "volume_age": metric.Age.String(), - }, - ScheduleAt: time.Now(), - } - results = append(results, result) - } - } - - return results, nil -} - -// vacuumScheduling implements the scheduling logic for vacuum tasks -func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - vacuumConfig := config.(*VacuumConfigV2) - - // Count running vacuum tasks - runningVacuumCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeVacuum { - runningVacuumCount++ - } - } - - // Check concurrency limit - if runningVacuumCount >= vacuumConfig.MaxConcurrent { - return false - } - - // Check for available workers with vacuum capability - for _, worker := range availableWorkers { - if worker.CurrentLoad < worker.MaxConcurrent { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeVacuum { - return true - } - } - } - } - - return false -} - -// createVacuumTask creates a vacuum task instance -func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - // Use existing vacuum task implementation - task := NewTask(params.Server, params.VolumeID) - task.SetEstimatedDuration(task.EstimateTime(params)) - return task, nil -} - -// getVacuumConfigSpec returns the configuration schema for vacuum tasks -func getVacuumConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Vacuum Tasks", - Description: "Whether vacuum tasks should be automatically created", - HelpText: "Toggle this to enable or disable automatic vacuum task generation", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "garbage_threshold", - JSONName: "garbage_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.3, - MinValue: 0.0, - MaxValue: 1.0, - Required: true, - DisplayName: "Garbage Percentage Threshold", - Description: "Trigger vacuum when garbage ratio exceeds this percentage", - HelpText: "Volumes with more deleted content than this threshold will be vacuumed", - Placeholder: "0.30 (30%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 2 * 60 * 60, - MinValue: 10 * 60, - MaxValue: 24 * 60 * 60, - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing vacuum", - HelpText: "The system will check for volumes that need vacuuming at this interval", - Placeholder: "2", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 1, - MaxValue: 10, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of vacuum tasks that can run simultaneously", - HelpText: "Limits the number of vacuum operations running at the same time to control system load", - Placeholder: "2 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_volume_age_seconds", - JSONName: "min_volume_age_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 24 * 60 * 60, - MinValue: 1 * 60 * 60, - MaxValue: 7 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Volume Age", - Description: "Only vacuum volumes older than this duration", - HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", - Placeholder: "24", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "min_interval_seconds", - JSONName: "min_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 7 * 24 * 60 * 60, - MinValue: 1 * 24 * 60 * 60, - MaxValue: 30 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Interval", - Description: "Minimum time between vacuum operations on the same volume", - HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", - Placeholder: "7", - Unit: config.UnitDays, - InputType: "interval", - CSSClasses: "form-control", - }, - }, - } -} - -// initVacuumV2 registers the refactored vacuum task (replaces the old registration) -func initVacuumV2() { - // Create configuration instance - config := &VacuumConfigV2{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 2 * 60 * 60, // 2 hours - MaxConcurrent: 2, - }, - GarbageThreshold: 0.3, // 30% - MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - } - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeVacuum, - Name: "vacuum", - DisplayName: "Volume Vacuum", - Description: "Reclaims disk space by removing deleted files from volumes", - Icon: "fas fa-broom text-primary", - Capabilities: []string{"vacuum", "storage"}, - - Config: config, - ConfigSpec: getVacuumConfigSpec(), - CreateTask: createVacuumTask, - DetectionFunc: vacuumDetection, - ScanInterval: 2 * time.Hour, - SchedulingFunc: vacuumScheduling, - MaxConcurrent: 2, - RepeatInterval: 7 * 24 * time.Hour, - } - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} diff --git a/weed/worker/tasks/vacuum_v2/vacuum.go b/weed/worker/tasks/vacuum_v2/vacuum.go deleted file mode 100644 index 3034d07d0..000000000 --- a/weed/worker/tasks/vacuum_v2/vacuum.go +++ /dev/null @@ -1,260 +0,0 @@ -package vacuum_v2 - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// VacuumConfig extends BaseConfig with vacuum-specific settings -type VacuumConfig struct { - base.BaseConfig - GarbageThreshold float64 `json:"garbage_threshold"` - MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` - MinIntervalSeconds int `json:"min_interval_seconds"` -} - -// ToMap converts config to map (extend base functionality) -func (c *VacuumConfig) ToMap() map[string]interface{} { - result := c.BaseConfig.ToMap() - result["garbage_threshold"] = c.GarbageThreshold - result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds - result["min_interval_seconds"] = c.MinIntervalSeconds - return result -} - -// FromMap loads config from map (extend base functionality) -func (c *VacuumConfig) FromMap(data map[string]interface{}) error { - // Load base config first - if err := c.BaseConfig.FromMap(data); err != nil { - return err - } - - // Load vacuum-specific config - if threshold, ok := data["garbage_threshold"].(float64); ok { - c.GarbageThreshold = threshold - } - if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok { - c.MinVolumeAgeSeconds = ageSeconds - } - if intervalSeconds, ok := data["min_interval_seconds"].(int); ok { - c.MinIntervalSeconds = intervalSeconds - } - return nil -} - -// vacuumDetection implements the detection logic for vacuum tasks -func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - vacuumConfig := config.(*VacuumConfig) - var results []*types.TaskDetectionResult - minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second - - for _, metric := range metrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - Parameters: map[string]interface{}{ - "garbage_ratio": metric.GarbageRatio, - "volume_age": metric.Age.String(), - }, - ScheduleAt: time.Now(), - } - results = append(results, result) - } - } - - return results, nil -} - -// vacuumScheduling implements the scheduling logic for vacuum tasks -func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - vacuumConfig := config.(*VacuumConfig) - - // Count running vacuum tasks - runningVacuumCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeVacuum { - runningVacuumCount++ - } - } - - // Check concurrency limit - if runningVacuumCount >= vacuumConfig.MaxConcurrent { - return false - } - - // Check for available workers with vacuum capability - for _, worker := range availableWorkers { - if worker.CurrentLoad < worker.MaxConcurrent { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeVacuum { - return true - } - } - } - } - - return false -} - -// createVacuumTask creates a vacuum task instance -func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - // Reuse existing vacuum task implementation - task := vacuum.NewTask(params.Server, params.VolumeID) - task.SetEstimatedDuration(task.EstimateTime(params)) - return task, nil -} - -// getConfigSpec returns the configuration schema for vacuum tasks -func getConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Vacuum Tasks", - Description: "Whether vacuum tasks should be automatically created", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "garbage_threshold", - JSONName: "garbage_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.3, - MinValue: 0.0, - MaxValue: 1.0, - Required: true, - DisplayName: "Garbage Percentage Threshold", - Description: "Trigger vacuum when garbage ratio exceeds this percentage", - Placeholder: "0.30 (30%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 2 * 60 * 60, - MinValue: 10 * 60, - MaxValue: 24 * 60 * 60, - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing vacuum", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 1, - MaxValue: 10, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of vacuum tasks that can run simultaneously", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_volume_age_seconds", - JSONName: "min_volume_age_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 24 * 60 * 60, - MinValue: 1 * 60 * 60, - MaxValue: 7 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Volume Age", - Description: "Only vacuum volumes older than this duration", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "min_interval_seconds", - JSONName: "min_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 7 * 24 * 60 * 60, - MinValue: 1 * 24 * 60 * 60, - MaxValue: 30 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Interval", - Description: "Minimum time between vacuum operations on the same volume", - Unit: config.UnitDays, - InputType: "interval", - CSSClasses: "form-control", - }, - }, - } -} - -// RegisterVacuumV2 registers the refactored vacuum task -func init() { - // Create configuration instance - config := &VacuumConfig{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 2 * 60 * 60, // 2 hours - MaxConcurrent: 2, - }, - GarbageThreshold: 0.3, // 30% - MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - } - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeVacuum, - Name: "vacuum_v2", - DisplayName: "Volume Vacuum (V2)", - Description: "Reclaims disk space by removing deleted files from volumes", - Icon: "fas fa-broom text-primary", - Capabilities: []string{"vacuum", "storage"}, - - Config: config, - ConfigSpec: getConfigSpec(), - CreateTask: createVacuumTask, - DetectionFunc: vacuumDetection, - ScanInterval: 2 * time.Hour, - SchedulingFunc: vacuumScheduling, - MaxConcurrent: 2, - RepeatInterval: 7 * 24 * time.Hour, - } - - // Register everything with a single function call! - base.RegisterTask(taskDef) -}