Browse Source

refactor

worker-execute-ec-tasks
chrislu 4 months ago
parent
commit
7c4e44875f
  1. 6
      weed/admin/handlers/maintenance_handlers.go
  2. 263
      weed/worker/tasks/balance/balance.go
  3. 38
      weed/worker/tasks/balance/balance_register.go
  4. 110
      weed/worker/tasks/balance/config.go
  5. 135
      weed/worker/tasks/balance/detection.go
  6. 125
      weed/worker/tasks/erasure_coding/config.go
  7. 126
      weed/worker/tasks/erasure_coding/detection.go
  8. 270
      weed/worker/tasks/erasure_coding/ec.go
  9. 38
      weed/worker/tasks/erasure_coding/ec_register.go
  10. 128
      weed/worker/tasks/vacuum/config.go
  11. 99
      weed/worker/tasks/vacuum/detection.go
  12. 246
      weed/worker/tasks/vacuum/vacuum.go
  13. 38
      weed/worker/tasks/vacuum/vacuum_register.go

6
weed/admin/handlers/maintenance_handlers.go

@ -192,11 +192,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
var config interface{}
switch taskType {
case types.TaskTypeVacuum:
config = &vacuum.VacuumConfig{}
config = &vacuum.Config{}
case types.TaskTypeBalance:
config = &balance.BalanceConfig{}
config = &balance.Config{}
case types.TaskTypeErasureCoding:
config = &erasure_coding.ErasureCodingConfig{}
config = &erasure_coding.Config{}
default:
c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName})
return

263
weed/worker/tasks/balance/balance.go

@ -4,10 +4,8 @@ import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@ -82,264 +80,3 @@ func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
// Could adjust based on volume size or cluster state
return baseTime
}
// BalanceConfig extends BaseConfig with balance-specific settings
type BalanceConfig struct {
base.BaseConfig
ImbalanceThreshold float64 `json:"imbalance_threshold"`
MinServerCount int `json:"min_server_count"`
}
// balanceDetection implements the detection logic for balance tasks
func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
balanceConfig := config.(*BalanceConfig)
// Skip if cluster is too small
minVolumeCount := 10
if len(metrics) < minVolumeCount {
return nil, nil
}
// Analyze volume distribution across servers
serverVolumeCounts := make(map[string]int)
for _, metric := range metrics {
serverVolumeCounts[metric.Server]++
}
if len(serverVolumeCounts) < balanceConfig.MinServerCount {
return nil, nil
}
// Calculate balance metrics
totalVolumes := len(metrics)
avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
maxVolumes := 0
minVolumes := totalVolumes
maxServer := ""
minServer := ""
for server, count := range serverVolumeCounts {
if count > maxVolumes {
maxVolumes = count
maxServer = server
}
if count < minVolumes {
minVolumes = count
minServer = server
}
}
// Check if imbalance exceeds threshold
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
return nil, nil
}
// Create balance task
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
task := &types.TaskDetectionResult{
TaskType: types.TaskTypeBalance,
Priority: types.TaskPriorityNormal,
Reason: reason,
ScheduleAt: time.Now(),
Parameters: map[string]interface{}{
"imbalance_ratio": imbalanceRatio,
"threshold": balanceConfig.ImbalanceThreshold,
"max_volumes": maxVolumes,
"min_volumes": minVolumes,
"avg_volumes_per_server": avgVolumesPerServer,
"max_server": maxServer,
"min_server": minServer,
"total_servers": len(serverVolumeCounts),
},
}
return []*types.TaskDetectionResult{task}, nil
}
// balanceScheduling implements the scheduling logic for balance tasks
func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
balanceConfig := config.(*BalanceConfig)
// Count running balance tasks
runningBalanceCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeBalance {
runningBalanceCount++
}
}
// Check concurrency limit
if runningBalanceCount >= balanceConfig.MaxConcurrent {
return false
}
// Check if we have available workers
availableWorkerCount := 0
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeBalance {
availableWorkerCount++
break
}
}
}
return availableWorkerCount > 0
}
// createBalanceTask creates a new balance task instance
func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) {
// Extract configuration from params
var config *BalanceConfig
if configData, ok := params.Parameters["config"]; ok {
if configMap, ok := configData.(map[string]interface{}); ok {
config = &BalanceConfig{}
if err := config.FromMap(configMap); err != nil {
return nil, fmt.Errorf("failed to parse balance config: %v", err)
}
}
}
if config == nil {
config = &BalanceConfig{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 30 * 60, // 30 minutes
MaxConcurrent: 1,
},
ImbalanceThreshold: 0.2, // 20%
MinServerCount: 2,
}
}
// Create and return the balance task using existing Task type
return NewTask(params.Server, params.VolumeID, params.Collection), nil
}
// getBalanceConfigSpec returns the configuration schema for balance tasks
func getBalanceConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Balance Tasks",
Description: "Whether balance tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic balance task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 30 * 60,
MinValue: 5 * 60,
MaxValue: 2 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volume distribution imbalances",
HelpText: "The system will check for volume distribution imbalances at this interval",
Placeholder: "30",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 3,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of balance tasks that can run simultaneously",
HelpText: "Limits the number of balance operations running at the same time",
Placeholder: "1 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "imbalance_threshold",
JSONName: "imbalance_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.2,
MinValue: 0.05,
MaxValue: 0.5,
Required: true,
DisplayName: "Imbalance Threshold",
Description: "Minimum imbalance ratio to trigger balancing",
HelpText: "Volume distribution imbalances above this threshold will trigger balancing",
Placeholder: "0.20 (20%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_server_count",
JSONName: "min_server_count",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 2,
MaxValue: 10,
Required: true,
DisplayName: "Minimum Server Count",
Description: "Minimum number of servers required for balancing",
HelpText: "Balancing will only occur if there are at least this many servers",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}
// initBalance registers the refactored balance task
func initBalance() {
// Create configuration instance
config := &BalanceConfig{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 30 * 60, // 30 minutes
MaxConcurrent: 1,
},
ImbalanceThreshold: 0.2, // 20%
MinServerCount: 2,
}
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeBalance,
Name: "balance",
DisplayName: "Volume Balance",
Description: "Balances volume distribution across servers",
Icon: "fas fa-balance-scale text-warning",
Capabilities: []string{"balance", "distribution"},
Config: config,
ConfigSpec: getBalanceConfigSpec(),
CreateTask: createBalanceTask,
DetectionFunc: balanceDetection,
ScanInterval: 30 * time.Minute,
SchedulingFunc: balanceScheduling,
MaxConcurrent: 1,
RepeatInterval: 2 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

38
weed/worker/tasks/balance/balance_register.go

@ -1,7 +1,41 @@
package balance
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Auto-register this task when the package is imported
func init() {
// Use new architecture instead of old registration
initBalance()
RegisterBalanceTask()
}
// RegisterBalanceTask registers the balance task with the new architecture
func RegisterBalanceTask() {
// Create configuration instance
config := NewDefaultConfig()
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeBalance,
Name: "balance",
DisplayName: "Volume Balance",
Description: "Balances volume distribution across servers",
Icon: "fas fa-balance-scale text-warning",
Capabilities: []string{"balance", "distribution"},
Config: config,
ConfigSpec: GetConfigSpec(),
CreateTask: CreateTask,
DetectionFunc: Detection,
ScanInterval: 30 * time.Minute,
SchedulingFunc: Scheduling,
MaxConcurrent: 1,
RepeatInterval: 2 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

110
weed/worker/tasks/balance/config.go

@ -0,0 +1,110 @@
package balance
import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with balance-specific settings
type Config struct {
base.BaseConfig
ImbalanceThreshold float64 `json:"imbalance_threshold"`
MinServerCount int `json:"min_server_count"`
}
// NewDefaultConfig creates a new default balance configuration
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 30 * 60, // 30 minutes
MaxConcurrent: 1,
},
ImbalanceThreshold: 0.2, // 20%
MinServerCount: 2,
}
}
// GetConfigSpec returns the configuration schema for balance tasks
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Balance Tasks",
Description: "Whether balance tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic balance task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 30 * 60,
MinValue: 5 * 60,
MaxValue: 2 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volume distribution imbalances",
HelpText: "The system will check for volume distribution imbalances at this interval",
Placeholder: "30",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 3,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of balance tasks that can run simultaneously",
HelpText: "Limits the number of balance operations running at the same time",
Placeholder: "1 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "imbalance_threshold",
JSONName: "imbalance_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.2,
MinValue: 0.05,
MaxValue: 0.5,
Required: true,
DisplayName: "Imbalance Threshold",
Description: "Minimum imbalance ratio to trigger balancing",
HelpText: "Volume distribution imbalances above this threshold will trigger balancing",
Placeholder: "0.20 (20%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_server_count",
JSONName: "min_server_count",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 2,
MaxValue: 10,
Required: true,
DisplayName: "Minimum Server Count",
Description: "Minimum number of servers required for balancing",
HelpText: "Balancing will only occur if there are at least this many servers",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}

135
weed/worker/tasks/balance/detection.go

@ -0,0 +1,135 @@
package balance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Detection implements the detection logic for balance tasks
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
balanceConfig := config.(*Config)
// Skip if cluster is too small
minVolumeCount := 10
if len(metrics) < minVolumeCount {
return nil, nil
}
// Analyze volume distribution across servers
serverVolumeCounts := make(map[string]int)
for _, metric := range metrics {
serverVolumeCounts[metric.Server]++
}
if len(serverVolumeCounts) < balanceConfig.MinServerCount {
return nil, nil
}
// Calculate balance metrics
totalVolumes := len(metrics)
avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
maxVolumes := 0
minVolumes := totalVolumes
maxServer := ""
minServer := ""
for server, count := range serverVolumeCounts {
if count > maxVolumes {
maxVolumes = count
maxServer = server
}
if count < minVolumes {
minVolumes = count
minServer = server
}
}
// Check if imbalance exceeds threshold
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
return nil, nil
}
// Create balance task
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
task := &types.TaskDetectionResult{
TaskType: types.TaskTypeBalance,
Priority: types.TaskPriorityNormal,
Reason: reason,
ScheduleAt: time.Now(),
Parameters: map[string]interface{}{
"imbalance_ratio": imbalanceRatio,
"threshold": balanceConfig.ImbalanceThreshold,
"max_volumes": maxVolumes,
"min_volumes": minVolumes,
"avg_volumes_per_server": avgVolumesPerServer,
"max_server": maxServer,
"min_server": minServer,
"total_servers": len(serverVolumeCounts),
},
}
return []*types.TaskDetectionResult{task}, nil
}
// Scheduling implements the scheduling logic for balance tasks
func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
balanceConfig := config.(*Config)
// Count running balance tasks
runningBalanceCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeBalance {
runningBalanceCount++
}
}
// Check concurrency limit
if runningBalanceCount >= balanceConfig.MaxConcurrent {
return false
}
// Check if we have available workers
availableWorkerCount := 0
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeBalance {
availableWorkerCount++
break
}
}
}
return availableWorkerCount > 0
}
// CreateTask creates a new balance task instance
func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
// Extract configuration from params
var config *Config
if configData, ok := params.Parameters["config"]; ok {
if configMap, ok := configData.(map[string]interface{}); ok {
config = &Config{}
if err := config.FromMap(configMap); err != nil {
return nil, fmt.Errorf("failed to parse balance config: %v", err)
}
}
}
if config == nil {
config = NewDefaultConfig()
}
// Create and return the balance task using existing Task type
return NewTask(params.Server, params.VolumeID, params.Collection), nil
}

125
weed/worker/tasks/erasure_coding/config.go

@ -0,0 +1,125 @@
package erasure_coding
import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with erasure coding specific settings
type Config struct {
base.BaseConfig
QuietForSeconds int `json:"quiet_for_seconds"`
FullnessRatio float64 `json:"fullness_ratio"`
CollectionFilter string `json:"collection_filter"`
}
// NewDefaultConfig creates a new default erasure coding configuration
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 60 * 60, // 1 hour
MaxConcurrent: 1,
},
QuietForSeconds: 300, // 5 minutes
FullnessRatio: 0.8, // 80%
CollectionFilter: "",
}
}
// GetConfigSpec returns the configuration schema for erasure coding tasks
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Erasure Coding Tasks",
Description: "Whether erasure coding tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing erasure coding",
HelpText: "The system will check for volumes that need erasure coding at this interval",
Placeholder: "1",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 5,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of erasure coding tasks that can run simultaneously",
HelpText: "Limits the number of erasure coding operations running at the same time",
Placeholder: "1 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "quiet_for_seconds",
JSONName: "quiet_for_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 300,
MinValue: 60,
MaxValue: 3600,
Required: true,
DisplayName: "Quiet Period",
Description: "Minimum time volume must be quiet before erasure coding",
HelpText: "Volume must not be modified for this duration before erasure coding",
Placeholder: "5",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "fullness_ratio",
JSONName: "fullness_ratio",
Type: config.FieldTypeFloat,
DefaultValue: 0.8,
MinValue: 0.1,
MaxValue: 1.0,
Required: true,
DisplayName: "Fullness Ratio",
Description: "Minimum fullness ratio to trigger erasure coding",
HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
Placeholder: "0.80 (80%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "collection_filter",
JSONName: "collection_filter",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Collection Filter",
Description: "Only process volumes from specific collections",
HelpText: "Leave empty to process all collections, or specify collection name",
Placeholder: "my_collection",
InputType: "text",
CSSClasses: "form-control",
},
},
}
}

126
weed/worker/tasks/erasure_coding/detection.go

@ -0,0 +1,126 @@
package erasure_coding
import (
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Detection implements the detection logic for erasure coding tasks
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
ecConfig := config.(*Config)
var results []*types.TaskDetectionResult
now := time.Now()
quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum
for _, metric := range metrics {
// Skip if already EC volume
if metric.IsECVolume {
continue
}
// Check minimum size requirement
if metric.Size < minSizeBytes {
continue
}
// Check collection filter if specified
if ecConfig.CollectionFilter != "" {
// Parse comma-separated collections
allowedCollections := make(map[string]bool)
for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
allowedCollections[strings.TrimSpace(collection)] = true
}
// Skip if volume's collection is not in the allowed list
if !allowedCollections[metric.Collection] {
continue
}
}
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: types.TaskPriorityLow, // EC is not urgent
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
float64(metric.Size)/(1024*1024)),
Parameters: map[string]interface{}{
"age_seconds": int(metric.Age.Seconds()),
"fullness_ratio": metric.FullnessRatio,
"size_mb": int(metric.Size / (1024 * 1024)),
},
ScheduleAt: now,
}
results = append(results, result)
}
}
return results, nil
}
// Scheduling implements the scheduling logic for erasure coding tasks
func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
ecConfig := config.(*Config)
// Check if we have available workers
if len(availableWorkers) == 0 {
return false
}
// Count running EC tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeErasureCoding {
runningCount++
}
}
// Check concurrency limit
if runningCount >= ecConfig.MaxConcurrent {
return false
}
// Check if any worker can handle EC tasks
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeErasureCoding {
return true
}
}
}
return false
}
// CreateTask creates a new erasure coding task instance
func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
// Extract configuration from params
var config *Config
if configData, ok := params.Parameters["config"]; ok {
if configMap, ok := configData.(map[string]interface{}); ok {
config = &Config{}
if err := config.FromMap(configMap); err != nil {
return nil, fmt.Errorf("failed to parse erasure coding config: %v", err)
}
}
}
if config == nil {
config = NewDefaultConfig()
}
// Create and return the erasure coding task using existing Task type
return NewTask(params.Server, params.VolumeID), nil
}

270
weed/worker/tasks/erasure_coding/ec.go

@ -7,13 +7,11 @@ import (
"math"
"os"
"path/filepath"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@ -23,7 +21,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc/credentials/insecure"
)
@ -1019,270 +1016,3 @@ func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32)
glog.V(1).Infof("MOUNT SUCCESS: Shard %d successfully mounted on %s", shardId, targetServer)
return nil
}
// ErasureCodingConfig extends BaseConfig with erasure coding specific settings
type ErasureCodingConfig struct {
base.BaseConfig
QuietForSeconds int `json:"quiet_for_seconds"`
FullnessRatio float64 `json:"fullness_ratio"`
CollectionFilter string `json:"collection_filter"`
}
// ecDetection implements the detection logic for erasure coding tasks
func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
ecConfig := config.(*ErasureCodingConfig)
var results []*types.TaskDetectionResult
now := time.Now()
quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum
for _, metric := range metrics {
// Skip if already EC volume
if metric.IsECVolume {
continue
}
// Check minimum size requirement
if metric.Size < minSizeBytes {
continue
}
// Check collection filter if specified
if ecConfig.CollectionFilter != "" {
// Parse comma-separated collections
allowedCollections := make(map[string]bool)
for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
allowedCollections[strings.TrimSpace(collection)] = true
}
// Skip if volume's collection is not in the allowed list
if !allowedCollections[metric.Collection] {
continue
}
}
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: types.TaskPriorityLow, // EC is not urgent
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
float64(metric.Size)/(1024*1024)),
Parameters: map[string]interface{}{
"age_seconds": int(metric.Age.Seconds()),
"fullness_ratio": metric.FullnessRatio,
"size_mb": int(metric.Size / (1024 * 1024)),
},
ScheduleAt: now,
}
results = append(results, result)
}
}
return results, nil
}
// ecScheduling implements the scheduling logic for erasure coding tasks
func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
ecConfig := config.(*ErasureCodingConfig)
// Check if we have available workers
if len(availableWorkers) == 0 {
return false
}
// Count running EC tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeErasureCoding {
runningCount++
}
}
// Check concurrency limit
if runningCount >= ecConfig.MaxConcurrent {
return false
}
// Check if any worker can handle EC tasks
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeErasureCoding {
return true
}
}
}
return false
}
// createErasureCodingTask creates a new erasure coding task instance
func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) {
// Extract configuration from params
var config *ErasureCodingConfig
if configData, ok := params.Parameters["config"]; ok {
if configMap, ok := configData.(map[string]interface{}); ok {
config = &ErasureCodingConfig{}
if err := config.FromMap(configMap); err != nil {
return nil, fmt.Errorf("failed to parse erasure coding config: %v", err)
}
}
}
if config == nil {
config = &ErasureCodingConfig{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 60 * 60, // 1 hour
MaxConcurrent: 1,
},
QuietForSeconds: 300, // 5 minutes
FullnessRatio: 0.8, // 80%
CollectionFilter: "",
}
}
// Create and return the erasure coding task using existing Task type
return NewTask(params.Server, params.VolumeID), nil
}
// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks
func getErasureCodingConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Erasure Coding Tasks",
Description: "Whether erasure coding tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing erasure coding",
HelpText: "The system will check for volumes that need erasure coding at this interval",
Placeholder: "1",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 5,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of erasure coding tasks that can run simultaneously",
HelpText: "Limits the number of erasure coding operations running at the same time",
Placeholder: "1 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "quiet_for_seconds",
JSONName: "quiet_for_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 300,
MinValue: 60,
MaxValue: 3600,
Required: true,
DisplayName: "Quiet Period",
Description: "Minimum time volume must be quiet before erasure coding",
HelpText: "Volume must not be modified for this duration before erasure coding",
Placeholder: "5",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "fullness_ratio",
JSONName: "fullness_ratio",
Type: config.FieldTypeFloat,
DefaultValue: 0.8,
MinValue: 0.1,
MaxValue: 1.0,
Required: true,
DisplayName: "Fullness Ratio",
Description: "Minimum fullness ratio to trigger erasure coding",
HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
Placeholder: "0.80 (80%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "collection_filter",
JSONName: "collection_filter",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Collection Filter",
Description: "Only process volumes from specific collections",
HelpText: "Leave empty to process all collections, or specify collection name",
Placeholder: "my_collection",
InputType: "text",
CSSClasses: "form-control",
},
},
}
}
// initErasureCoding registers the refactored erasure coding task
func initErasureCoding() {
// Create configuration instance
config := &ErasureCodingConfig{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 60 * 60, // 1 hour
MaxConcurrent: 1,
},
QuietForSeconds: 300, // 5 minutes
FullnessRatio: 0.8, // 80%
CollectionFilter: "",
}
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeErasureCoding,
Name: "erasure_coding",
DisplayName: "Erasure Coding",
Description: "Applies erasure coding to volumes for data protection",
Icon: "fas fa-shield-alt text-success",
Capabilities: []string{"erasure_coding", "data_protection"},
Config: config,
ConfigSpec: getErasureCodingConfigSpec(),
CreateTask: createErasureCodingTask,
DetectionFunc: ecDetection,
ScanInterval: 1 * time.Hour,
SchedulingFunc: ecScheduling,
MaxConcurrent: 1,
RepeatInterval: 24 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

38
weed/worker/tasks/erasure_coding/ec_register.go

@ -1,7 +1,41 @@
package erasure_coding
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Auto-register this task when the package is imported
func init() {
// Use new architecture instead of old registration
initErasureCoding()
RegisterErasureCodingTask()
}
// RegisterErasureCodingTask registers the erasure coding task with the new architecture
func RegisterErasureCodingTask() {
// Create configuration instance
config := NewDefaultConfig()
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeErasureCoding,
Name: "erasure_coding",
DisplayName: "Erasure Coding",
Description: "Applies erasure coding to volumes for data protection",
Icon: "fas fa-shield-alt text-success",
Capabilities: []string{"erasure_coding", "data_protection"},
Config: config,
ConfigSpec: GetConfigSpec(),
CreateTask: CreateTask,
DetectionFunc: Detection,
ScanInterval: 1 * time.Hour,
SchedulingFunc: Scheduling,
MaxConcurrent: 1,
RepeatInterval: 24 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

128
weed/worker/tasks/vacuum/config.go

@ -0,0 +1,128 @@
package vacuum
import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with vacuum-specific settings
type Config struct {
base.BaseConfig
GarbageThreshold float64 `json:"garbage_threshold"`
MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
MinIntervalSeconds int `json:"min_interval_seconds"`
}
// NewDefaultConfig creates a new default vacuum configuration
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
},
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
}
// GetConfigSpec returns the configuration schema for vacuum tasks
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Vacuum Tasks",
Description: "Whether vacuum tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic vacuum task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 2 * 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing vacuum",
HelpText: "The system will check for volumes that need vacuuming at this interval",
Placeholder: "2",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of vacuum tasks that can run simultaneously",
HelpText: "Limits the number of vacuum operations running at the same time to control system load",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "garbage_threshold",
JSONName: "garbage_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.3,
MinValue: 0.0,
MaxValue: 1.0,
Required: true,
DisplayName: "Garbage Percentage Threshold",
Description: "Trigger vacuum when garbage ratio exceeds this percentage",
HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
Placeholder: "0.30 (30%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_volume_age_seconds",
JSONName: "min_volume_age_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60,
MinValue: 1 * 60 * 60,
MaxValue: 7 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Volume Age",
Description: "Only vacuum volumes older than this duration",
HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
Placeholder: "24",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "min_interval_seconds",
JSONName: "min_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60,
MinValue: 1 * 24 * 60 * 60,
MaxValue: 30 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Interval",
Description: "Minimum time between vacuum operations on the same volume",
HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
Placeholder: "7",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
},
}
}

99
weed/worker/tasks/vacuum/detection.go

@ -0,0 +1,99 @@
package vacuum
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Detection implements the detection logic for vacuum tasks
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
vacuumConfig := config.(*Config)
var results []*types.TaskDetectionResult
minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
for _, metric := range metrics {
// Check if volume needs vacuum
if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
priority := types.TaskPriorityNormal
if metric.GarbageRatio > 0.6 {
priority = types.TaskPriorityHigh
}
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: priority,
Reason: "Volume has excessive garbage requiring vacuum",
Parameters: map[string]interface{}{
"garbage_ratio": metric.GarbageRatio,
"volume_age": metric.Age.String(),
},
ScheduleAt: time.Now(),
}
results = append(results, result)
}
}
return results, nil
}
// Scheduling implements the scheduling logic for vacuum tasks
func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
vacuumConfig := config.(*Config)
// Count running vacuum tasks
runningVacuumCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeVacuum {
runningVacuumCount++
}
}
// Check concurrency limit
if runningVacuumCount >= vacuumConfig.MaxConcurrent {
return false
}
// Check for available workers with vacuum capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeVacuum {
return true
}
}
}
}
return false
}
// CreateTask creates a new vacuum task instance
func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
// Extract configuration from params
var config *Config
if configData, ok := params.Parameters["config"]; ok {
if configMap, ok := configData.(map[string]interface{}); ok {
config = &Config{}
if err := config.FromMap(configMap); err != nil {
return nil, fmt.Errorf("failed to parse vacuum config: %v", err)
}
}
}
if config == nil {
config = NewDefaultConfig()
}
// Create and return the vacuum task using existing Task type
return NewTask(params.Server, params.VolumeID), nil
}

246
weed/worker/tasks/vacuum/vacuum.go

@ -7,12 +7,10 @@ import (
"strconv"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@ -199,247 +197,3 @@ func (t *Task) GetProgress() float64 {
func (t *Task) Cancel() error {
return t.BaseTask.Cancel()
}
// VacuumConfig extends BaseConfig with vacuum-specific settings
type VacuumConfig struct {
base.BaseConfig
GarbageThreshold float64 `json:"garbage_threshold"`
MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
MinIntervalSeconds int `json:"min_interval_seconds"`
}
// vacuumDetection implements the detection logic for vacuum tasks
func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
vacuumConfig := config.(*VacuumConfig)
var results []*types.TaskDetectionResult
minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
for _, metric := range metrics {
// Check if volume needs vacuum
if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
priority := types.TaskPriorityNormal
if metric.GarbageRatio > 0.6 {
priority = types.TaskPriorityHigh
}
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: priority,
Reason: "Volume has excessive garbage requiring vacuum",
Parameters: map[string]interface{}{
"garbage_ratio": metric.GarbageRatio,
"volume_age": metric.Age.String(),
},
ScheduleAt: time.Now(),
}
results = append(results, result)
}
}
return results, nil
}
// vacuumScheduling implements the scheduling logic for vacuum tasks
func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
vacuumConfig := config.(*VacuumConfig)
// Count running vacuum tasks
runningVacuumCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeVacuum {
runningVacuumCount++
}
}
// Check concurrency limit
if runningVacuumCount >= vacuumConfig.MaxConcurrent {
return false
}
// Check for available workers with vacuum capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeVacuum {
return true
}
}
}
}
return false
}
// createVacuumTask creates a new vacuum task instance
func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) {
// Extract configuration from params
var config *VacuumConfig
if configData, ok := params.Parameters["config"]; ok {
if configMap, ok := configData.(map[string]interface{}); ok {
config = &VacuumConfig{}
if err := config.FromMap(configMap); err != nil {
return nil, fmt.Errorf("failed to parse vacuum config: %v", err)
}
}
}
if config == nil {
config = &VacuumConfig{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
},
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
}
// Create and return the vacuum task using existing Task type
return NewTask(params.Server, params.VolumeID), nil
}
// getVacuumConfigSpec returns the configuration schema for vacuum tasks
func getVacuumConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Vacuum Tasks",
Description: "Whether vacuum tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic vacuum task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 2 * 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing vacuum",
HelpText: "The system will check for volumes that need vacuuming at this interval",
Placeholder: "2",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of vacuum tasks that can run simultaneously",
HelpText: "Limits the number of vacuum operations running at the same time to control system load",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "garbage_threshold",
JSONName: "garbage_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.3,
MinValue: 0.0,
MaxValue: 1.0,
Required: true,
DisplayName: "Garbage Percentage Threshold",
Description: "Trigger vacuum when garbage ratio exceeds this percentage",
HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
Placeholder: "0.30 (30%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_volume_age_seconds",
JSONName: "min_volume_age_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60,
MinValue: 1 * 60 * 60,
MaxValue: 7 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Volume Age",
Description: "Only vacuum volumes older than this duration",
HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
Placeholder: "24",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "min_interval_seconds",
JSONName: "min_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60,
MinValue: 1 * 24 * 60 * 60,
MaxValue: 30 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Interval",
Description: "Minimum time between vacuum operations on the same volume",
HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
Placeholder: "7",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
},
}
}
// initVacuum registers the refactored vacuum task (replaces the old registration)
func initVacuum() {
// Create configuration instance
config := &VacuumConfig{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
},
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeVacuum,
Name: "vacuum",
DisplayName: "Volume Vacuum",
Description: "Reclaims disk space by removing deleted files from volumes",
Icon: "fas fa-broom text-primary",
Capabilities: []string{"vacuum", "storage"},
Config: config,
ConfigSpec: getVacuumConfigSpec(),
CreateTask: createVacuumTask,
DetectionFunc: vacuumDetection,
ScanInterval: 2 * time.Hour,
SchedulingFunc: vacuumScheduling,
MaxConcurrent: 2,
RepeatInterval: 7 * 24 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}

38
weed/worker/tasks/vacuum/vacuum_register.go

@ -1,7 +1,41 @@
package vacuum
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Auto-register this task when the package is imported
func init() {
// Use new architecture instead of old registration
initVacuum()
RegisterVacuumTask()
}
// RegisterVacuumTask registers the vacuum task with the new architecture
func RegisterVacuumTask() {
// Create configuration instance
config := NewDefaultConfig()
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeVacuum,
Name: "vacuum",
DisplayName: "Volume Vacuum",
Description: "Reclaims disk space by removing deleted files from volumes",
Icon: "fas fa-broom text-primary",
Capabilities: []string{"vacuum", "storage"},
Config: config,
ConfigSpec: GetConfigSpec(),
CreateTask: CreateTask,
DetectionFunc: Detection,
ScanInterval: 2 * time.Hour,
SchedulingFunc: Scheduling,
MaxConcurrent: 2,
RepeatInterval: 7 * 24 * time.Hour,
}
// Register everything with a single function call!
base.RegisterTask(taskDef)
}
Loading…
Cancel
Save