From 96d6d27607df70b14f41fbe57a0a9cba1d1a5380 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 00:00:46 -0700 Subject: [PATCH] =?UTF-8?q?remove=20=E2=9D=8C=20Vacuum=20-=20Completely=20?= =?UTF-8?q?removed=20=E2=9D=8C=20Balance=20-=20Completely=20removed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- weed/admin/dash/config_persistence.go | 208 -------------- weed/admin/handlers/maintenance_handlers.go | 20 -- .../handlers/maintenance_handlers_test.go | 164 ----------- weed/admin/maintenance/maintenance_manager.go | 35 --- weed/admin/maintenance/maintenance_worker.go | 2 - weed/admin/view/layout/menu_helper.go | 2 - weed/command/worker.go | 2 - weed/worker/tasks/balance/balance_task.go | 267 ----------------- weed/worker/tasks/balance/config.go | 170 ----------- weed/worker/tasks/balance/detection.go | 272 ------------------ weed/worker/tasks/balance/execution.go | 158 ---------- weed/worker/tasks/balance/monitoring.go | 138 --------- weed/worker/tasks/balance/register.go | 86 ------ weed/worker/tasks/balance/scheduling.go | 37 --- weed/worker/tasks/base/volume_utils.go | 7 +- weed/worker/tasks/vacuum/config.go | 190 ------------ weed/worker/tasks/vacuum/detection.go | 133 --------- weed/worker/tasks/vacuum/monitoring.go | 151 ---------- weed/worker/tasks/vacuum/register.go | 86 ------ weed/worker/tasks/vacuum/scheduling.go | 37 --- weed/worker/tasks/vacuum/vacuum_task.go | 244 ---------------- weed/worker/types/task_types.go | 2 - weed/worker/worker.go | 2 - 23 files changed, 2 insertions(+), 2411 deletions(-) delete mode 100644 weed/worker/tasks/balance/balance_task.go delete mode 100644 weed/worker/tasks/balance/config.go delete mode 100644 weed/worker/tasks/balance/detection.go delete mode 100644 weed/worker/tasks/balance/execution.go delete mode 100644 weed/worker/tasks/balance/monitoring.go delete mode 100644 weed/worker/tasks/balance/register.go delete mode 100644 weed/worker/tasks/balance/scheduling.go delete mode 100644 weed/worker/tasks/vacuum/config.go delete mode 100644 weed/worker/tasks/vacuum/detection.go delete mode 100644 weed/worker/tasks/vacuum/monitoring.go delete mode 100644 weed/worker/tasks/vacuum/register.go delete mode 100644 weed/worker/tasks/vacuum/scheduling.go delete mode 100644 weed/worker/tasks/vacuum/vacuum_task.go diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index 1fe1a9b42..bdc9794f4 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -12,9 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) @@ -25,16 +23,12 @@ const ( // Configuration file names (protobuf binary) MaintenanceConfigFile = "maintenance.pb" - VacuumTaskConfigFile = "task_vacuum.pb" ECTaskConfigFile = "task_erasure_coding.pb" - BalanceTaskConfigFile = "task_balance.pb" ReplicationTaskConfigFile = "task_replication.pb" // JSON reference files MaintenanceConfigJSONFile = "maintenance.json" - VacuumTaskConfigJSONFile = "task_vacuum.json" ECTaskConfigJSONFile = "task_erasure_coding.json" - BalanceTaskConfigJSONFile = "task_balance.json" ReplicationTaskConfigJSONFile = "task_replication.json" // Task persistence subdirectories and settings @@ -262,92 +256,6 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { return nil } -// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file -func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error { - return cp.saveTaskConfig(VacuumTaskConfigFile, config) -} - -// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file -func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error { - return cp.saveTaskConfig(VacuumTaskConfigFile, policy) -} - -// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file -func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) { - // Load as TaskPolicy and extract vacuum config - if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil { - if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil { - return vacuumConfig, nil - } - } - - // Return default config if no valid config found - return &VacuumTaskConfig{ - GarbageThreshold: 0.3, - MinVolumeAgeHours: 24, - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - }, nil -} - -// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file -func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) { - if cp.dataDir == "" { - // Return default policy if no data directory - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 2, - RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds - CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: 0.3, - MinVolumeAgeHours: 24, - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - }, - }, - }, nil - } - - confDir := filepath.Join(cp.dataDir, ConfigSubdir) - configPath := filepath.Join(confDir, VacuumTaskConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - // Return default policy if file doesn't exist - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 2, - RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds - CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: 0.3, - MinVolumeAgeHours: 24, - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - }, - }, - }, nil - } - - // Read file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read vacuum task config file: %w", err) - } - - // Try to unmarshal as TaskPolicy - var policy worker_pb.TaskPolicy - if err := proto.Unmarshal(configData, &policy); err == nil { - // Validate that it's actually a TaskPolicy with vacuum config - if policy.GetVacuumConfig() != nil { - glog.V(1).Infof("Loaded vacuum task policy from %s", configPath) - return &policy, nil - } - } - - return nil, fmt.Errorf("failed to unmarshal vacuum task configuration") -} - // SaveErasureCodingTaskConfig saves EC task configuration to protobuf file func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error { return cp.saveTaskConfig(ECTaskConfigFile, config) @@ -437,89 +345,6 @@ func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolic return nil, fmt.Errorf("failed to unmarshal EC task configuration") } -// SaveBalanceTaskConfig saves balance task configuration to protobuf file -func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error { - return cp.saveTaskConfig(BalanceTaskConfigFile, config) -} - -// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file -func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error { - return cp.saveTaskConfig(BalanceTaskConfigFile, policy) -} - -// LoadBalanceTaskConfig loads balance task configuration from protobuf file -func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) { - // Load as TaskPolicy and extract balance config - if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil { - if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil { - return balanceConfig, nil - } - } - - // Return default config if no valid config found - return &BalanceTaskConfig{ - ImbalanceThreshold: 0.1, - MinServerCount: 2, - }, nil -} - -// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file -func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) { - if cp.dataDir == "" { - // Return default policy if no data directory - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 1, - RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds - CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: 0.1, - MinServerCount: 2, - }, - }, - }, nil - } - - confDir := filepath.Join(cp.dataDir, ConfigSubdir) - configPath := filepath.Join(confDir, BalanceTaskConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - // Return default policy if file doesn't exist - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 1, - RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds - CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: 0.1, - MinServerCount: 2, - }, - }, - }, nil - } - - // Read file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read balance task config file: %w", err) - } - - // Try to unmarshal as TaskPolicy - var policy worker_pb.TaskPolicy - if err := proto.Unmarshal(configData, &policy); err == nil { - // Validate that it's actually a TaskPolicy with balance config - if policy.GetBalanceConfig() != nil { - glog.V(1).Infof("Loaded balance task policy from %s", configPath) - return &policy, nil - } - } - - return nil, fmt.Errorf("failed to unmarshal balance task configuration") -} - // SaveReplicationTaskConfig saves replication task configuration to protobuf file func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error { return cp.saveTaskConfig(ReplicationTaskConfigFile, config) @@ -673,23 +498,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { TaskPolicies: make(map[string]*worker_pb.TaskPolicy), } - // Load vacuum task configuration - if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil { - policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{ - Enabled: vacuumConfig.Enabled, - MaxConcurrent: int32(vacuumConfig.MaxConcurrent), - RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), - CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: float64(vacuumConfig.GarbageThreshold), - MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours - MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds), - }, - }, - } - } - // Load erasure coding task configuration if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil { policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{ @@ -708,22 +516,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { } } - // Load balance task configuration - if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil { - policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{ - Enabled: balanceConfig.Enabled, - MaxConcurrent: int32(balanceConfig.MaxConcurrent), - RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), - CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold), - MinServerCount: int32(balanceConfig.MinServerCount), - }, - }, - } - } - glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) return policy } diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index e92a50c9d..7404cfc1e 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -17,9 +17,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/view/layout" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -235,10 +233,6 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) { // Create a new config instance based on task type and apply schema defaults var config TaskConfig switch taskType { - case types.TaskTypeVacuum: - config = &vacuum.Config{} - case types.TaskTypeBalance: - config = &balance.Config{} case types.TaskTypeErasureCoding: config = &erasure_coding.Config{} default: @@ -285,21 +279,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) { // Debug logging - show parsed config values switch taskType { - case types.TaskTypeVacuum: - if vacuumConfig, ok := config.(*vacuum.Config); ok { - glog.V(1).Infof("Parsed vacuum config - GarbageThreshold: %f, MinVolumeAgeSeconds: %d, MinIntervalSeconds: %d", - vacuumConfig.GarbageThreshold, vacuumConfig.MinVolumeAgeSeconds, vacuumConfig.MinIntervalSeconds) - } case types.TaskTypeErasureCoding: if ecConfig, ok := config.(*erasure_coding.Config); ok { glog.V(1).Infof("Parsed EC config - FullnessRatio: %f, QuietForSeconds: %d, MinSizeMB: %d, CollectionFilter: '%s'", ecConfig.FullnessRatio, ecConfig.QuietForSeconds, ecConfig.MinSizeMB, ecConfig.CollectionFilter) } - case types.TaskTypeBalance: - if balanceConfig, ok := config.(*balance.Config); ok { - glog.V(1).Infof("Parsed balance config - Enabled: %v, MaxConcurrent: %d, ScanIntervalSeconds: %d, ImbalanceThreshold: %f, MinServerCount: %d", - balanceConfig.Enabled, balanceConfig.MaxConcurrent, balanceConfig.ScanIntervalSeconds, balanceConfig.ImbalanceThreshold, balanceConfig.MinServerCount) - } } // Validate the configuration @@ -580,12 +564,8 @@ func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType, // Save using task-specific methods switch taskType { - case types.TaskTypeVacuum: - return configPersistence.SaveVacuumTaskPolicy(taskPolicy) case types.TaskTypeErasureCoding: return configPersistence.SaveErasureCodingTaskPolicy(taskPolicy) - case types.TaskTypeBalance: - return configPersistence.SaveBalanceTaskPolicy(taskPolicy) default: return fmt.Errorf("unsupported task type for protobuf persistence: %s", taskType) } diff --git a/weed/admin/handlers/maintenance_handlers_test.go b/weed/admin/handlers/maintenance_handlers_test.go index fa5a365f1..5309094b3 100644 --- a/weed/admin/handlers/maintenance_handlers_test.go +++ b/weed/admin/handlers/maintenance_handlers_test.go @@ -6,123 +6,14 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) func TestParseTaskConfigFromForm_WithEmbeddedStruct(t *testing.T) { // Create a maintenance handlers instance for testing h := &MaintenanceHandlers{} - // Test with balance config - t.Run("Balance Config", func(t *testing.T) { - // Simulate form data - formData := url.Values{ - "enabled": {"on"}, // checkbox field - "scan_interval_seconds_value": {"30"}, // interval field - "scan_interval_seconds_unit": {"minutes"}, // interval unit - "max_concurrent": {"2"}, // number field - "imbalance_threshold": {"0.15"}, // float field - "min_server_count": {"3"}, // number field - } - - // Get schema - schema := tasks.GetTaskConfigSchema("balance") - if schema == nil { - t.Fatal("Failed to get balance schema") - } - - // Create config instance - config := &balance.Config{} - - // Parse form data - err := h.parseTaskConfigFromForm(formData, schema, config) - if err != nil { - t.Fatalf("Failed to parse form data: %v", err) - } - - // Verify embedded struct fields were set correctly - if !config.Enabled { - t.Errorf("Expected Enabled=true, got %v", config.Enabled) - } - - if config.ScanIntervalSeconds != 1800 { // 30 minutes * 60 - t.Errorf("Expected ScanIntervalSeconds=1800, got %v", config.ScanIntervalSeconds) - } - - if config.MaxConcurrent != 2 { - t.Errorf("Expected MaxConcurrent=2, got %v", config.MaxConcurrent) - } - - // Verify balance-specific fields were set correctly - if config.ImbalanceThreshold != 0.15 { - t.Errorf("Expected ImbalanceThreshold=0.15, got %v", config.ImbalanceThreshold) - } - - if config.MinServerCount != 3 { - t.Errorf("Expected MinServerCount=3, got %v", config.MinServerCount) - } - }) - - // Test with vacuum config - t.Run("Vacuum Config", func(t *testing.T) { - // Simulate form data - formData := url.Values{ - // "enabled" field omitted to simulate unchecked checkbox - "scan_interval_seconds_value": {"4"}, // interval field - "scan_interval_seconds_unit": {"hours"}, // interval unit - "max_concurrent": {"3"}, // number field - "garbage_threshold": {"0.4"}, // float field - "min_volume_age_seconds_value": {"2"}, // interval field - "min_volume_age_seconds_unit": {"days"}, // interval unit - "min_interval_seconds_value": {"1"}, // interval field - "min_interval_seconds_unit": {"days"}, // interval unit - } - - // Get schema - schema := tasks.GetTaskConfigSchema("vacuum") - if schema == nil { - t.Fatal("Failed to get vacuum schema") - } - - // Create config instance - config := &vacuum.Config{} - - // Parse form data - err := h.parseTaskConfigFromForm(formData, schema, config) - if err != nil { - t.Fatalf("Failed to parse form data: %v", err) - } - - // Verify embedded struct fields were set correctly - if config.Enabled { - t.Errorf("Expected Enabled=false, got %v", config.Enabled) - } - - if config.ScanIntervalSeconds != 14400 { // 4 hours * 3600 - t.Errorf("Expected ScanIntervalSeconds=14400, got %v", config.ScanIntervalSeconds) - } - - if config.MaxConcurrent != 3 { - t.Errorf("Expected MaxConcurrent=3, got %v", config.MaxConcurrent) - } - - // Verify vacuum-specific fields were set correctly - if config.GarbageThreshold != 0.4 { - t.Errorf("Expected GarbageThreshold=0.4, got %v", config.GarbageThreshold) - } - - if config.MinVolumeAgeSeconds != 172800 { // 2 days * 86400 - t.Errorf("Expected MinVolumeAgeSeconds=172800, got %v", config.MinVolumeAgeSeconds) - } - - if config.MinIntervalSeconds != 86400 { // 1 day * 86400 - t.Errorf("Expected MinIntervalSeconds=86400, got %v", config.MinIntervalSeconds) - } - }) - // Test with erasure coding config t.Run("Erasure Coding Config", func(t *testing.T) { // Simulate form data @@ -191,31 +82,6 @@ func TestConfigurationValidation(t *testing.T) { name string config interface{} }{ - { - "balance", - &balance.Config{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 2400, - MaxConcurrent: 3, - }, - ImbalanceThreshold: 0.18, - MinServerCount: 4, - }, - }, - { - "vacuum", - &vacuum.Config{ - BaseConfig: base.BaseConfig{ - Enabled: false, - ScanIntervalSeconds: 7200, - MaxConcurrent: 2, - }, - GarbageThreshold: 0.35, - MinVolumeAgeSeconds: 86400, - MinIntervalSeconds: 604800, - }, - }, { "erasure_coding", &erasure_coding.Config{ @@ -236,28 +102,6 @@ func TestConfigurationValidation(t *testing.T) { t.Run(test.name, func(t *testing.T) { // Test that configs can be converted to protobuf TaskPolicy switch cfg := test.config.(type) { - case *balance.Config: - policy := cfg.ToTaskPolicy() - if policy == nil { - t.Fatal("ToTaskPolicy returned nil") - } - if policy.Enabled != cfg.Enabled { - t.Errorf("Expected Enabled=%v, got %v", cfg.Enabled, policy.Enabled) - } - if policy.MaxConcurrent != int32(cfg.MaxConcurrent) { - t.Errorf("Expected MaxConcurrent=%v, got %v", cfg.MaxConcurrent, policy.MaxConcurrent) - } - case *vacuum.Config: - policy := cfg.ToTaskPolicy() - if policy == nil { - t.Fatal("ToTaskPolicy returned nil") - } - if policy.Enabled != cfg.Enabled { - t.Errorf("Expected Enabled=%v, got %v", cfg.Enabled, policy.Enabled) - } - if policy.MaxConcurrent != int32(cfg.MaxConcurrent) { - t.Errorf("Expected MaxConcurrent=%v, got %v", cfg.MaxConcurrent, policy.MaxConcurrent) - } case *erasure_coding.Config: policy := cfg.ToTaskPolicy() if policy == nil { @@ -275,14 +119,6 @@ func TestConfigurationValidation(t *testing.T) { // Test that configs can be validated switch cfg := test.config.(type) { - case *balance.Config: - if err := cfg.Validate(); err != nil { - t.Errorf("Validation failed: %v", err) - } - case *vacuum.Config: - if err := cfg.Validate(); err != nil { - t.Errorf("Validation failed: %v", err) - } case *erasure_coding.Config: if err := cfg.Validate(); err != nil { t.Errorf("Validation failed: %v", err) diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go index 4aab137e0..62da87e5a 100644 --- a/weed/admin/maintenance/maintenance_manager.go +++ b/weed/admin/maintenance/maintenance_manager.go @@ -8,9 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) // buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy @@ -22,23 +20,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { TaskPolicies: make(map[string]*worker_pb.TaskPolicy), } - // Load vacuum task configuration - if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil { - policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{ - Enabled: vacuumConfig.Enabled, - MaxConcurrent: int32(vacuumConfig.MaxConcurrent), - RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), - CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: float64(vacuumConfig.GarbageThreshold), - MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours - MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds), - }, - }, - } - } - // Load erasure coding task configuration if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil { policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{ @@ -57,22 +38,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { } } - // Load balance task configuration - if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil { - policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{ - Enabled: balanceConfig.Enabled, - MaxConcurrent: int32(balanceConfig.MaxConcurrent), - RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), - CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold), - MinServerCount: int32(balanceConfig.MinServerCount), - }, - }, - } - } - glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) return policy } diff --git a/weed/admin/maintenance/maintenance_worker.go b/weed/admin/maintenance/maintenance_worker.go index e4a6b4cf6..ea8428479 100644 --- a/weed/admin/maintenance/maintenance_worker.go +++ b/weed/admin/maintenance/maintenance_worker.go @@ -13,9 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" // Import task packages to trigger their auto-registration - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) // MaintenanceWorkerService manages maintenance task execution diff --git a/weed/admin/view/layout/menu_helper.go b/weed/admin/view/layout/menu_helper.go index fc8954423..d3540cf29 100644 --- a/weed/admin/view/layout/menu_helper.go +++ b/weed/admin/view/layout/menu_helper.go @@ -4,9 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" // Import task packages to trigger their auto-registration - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) // MenuItemData represents a menu item diff --git a/weed/command/worker.go b/weed/command/worker.go index 6e592f73f..ec51f6825 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -16,9 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" // Import task packages to trigger their auto-registration - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) var cmdWorker = &Command{ diff --git a/weed/worker/tasks/balance/balance_task.go b/weed/worker/tasks/balance/balance_task.go deleted file mode 100644 index 8daafde97..000000000 --- a/weed/worker/tasks/balance/balance_task.go +++ /dev/null @@ -1,267 +0,0 @@ -package balance - -import ( - "context" - "fmt" - "io" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/operation" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/worker/types" - "github.com/seaweedfs/seaweedfs/weed/worker/types/base" - "google.golang.org/grpc" -) - -// BalanceTask implements the Task interface -type BalanceTask struct { - *base.BaseTask - server string - volumeID uint32 - collection string - progress float64 -} - -// NewBalanceTask creates a new balance task instance -func NewBalanceTask(id string, server string, volumeID uint32, collection string) *BalanceTask { - return &BalanceTask{ - BaseTask: base.NewBaseTask(id, types.TaskTypeBalance), - server: server, - volumeID: volumeID, - collection: collection, - } -} - -// Execute implements the Task interface -func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { - if params == nil { - return fmt.Errorf("task parameters are required") - } - - balanceParams := params.GetBalanceParams() - if balanceParams == nil { - return fmt.Errorf("balance parameters are required") - } - - // Get source and destination from unified arrays - if len(params.Sources) == 0 { - return fmt.Errorf("source is required for balance task") - } - if len(params.Targets) == 0 { - return fmt.Errorf("target is required for balance task") - } - - sourceNode := params.Sources[0].Node - destNode := params.Targets[0].Node - - if sourceNode == "" { - return fmt.Errorf("source node is required for balance task") - } - if destNode == "" { - return fmt.Errorf("destination node is required for balance task") - } - - t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, - "source": sourceNode, - "destination": destNode, - "collection": t.collection, - }).Info("Starting balance task - moving volume") - - sourceServer := pb.ServerAddress(sourceNode) - targetServer := pb.ServerAddress(destNode) - volumeId := needle.VolumeId(t.volumeID) - - // Step 1: Mark volume readonly - t.ReportProgress(10.0) - t.GetLogger().Info("Marking volume readonly for move") - if err := t.markVolumeReadonly(sourceServer, volumeId); err != nil { - return fmt.Errorf("failed to mark volume readonly: %v", err) - } - - // Step 2: Copy volume to destination - t.ReportProgress(20.0) - t.GetLogger().Info("Copying volume to destination") - lastAppendAtNs, err := t.copyVolume(sourceServer, targetServer, volumeId) - if err != nil { - return fmt.Errorf("failed to copy volume: %v", err) - } - - // Step 3: Mount volume on target and mark it readonly - t.ReportProgress(60.0) - t.GetLogger().Info("Mounting volume on target server") - if err := t.mountVolume(targetServer, volumeId); err != nil { - return fmt.Errorf("failed to mount volume on target: %v", err) - } - - // Step 4: Tail for updates - t.ReportProgress(70.0) - t.GetLogger().Info("Syncing final updates") - if err := t.tailVolume(sourceServer, targetServer, volumeId, lastAppendAtNs); err != nil { - glog.Warningf("Tail operation failed (may be normal): %v", err) - } - - // Step 5: Unmount from source - t.ReportProgress(85.0) - t.GetLogger().Info("Unmounting volume from source server") - if err := t.unmountVolume(sourceServer, volumeId); err != nil { - return fmt.Errorf("failed to unmount volume from source: %v", err) - } - - // Step 6: Delete from source - t.ReportProgress(95.0) - t.GetLogger().Info("Deleting volume from source server") - if err := t.deleteVolume(sourceServer, volumeId); err != nil { - return fmt.Errorf("failed to delete volume from source: %v", err) - } - - t.ReportProgress(100.0) - glog.Infof("Balance task completed successfully: volume %d moved from %s to %s", - t.volumeID, t.server, destNode) - return nil -} - -// Validate implements the UnifiedTask interface -func (t *BalanceTask) Validate(params *worker_pb.TaskParams) error { - if params == nil { - return fmt.Errorf("task parameters are required") - } - - balanceParams := params.GetBalanceParams() - if balanceParams == nil { - return fmt.Errorf("balance parameters are required") - } - - if params.VolumeId != t.volumeID { - return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId) - } - - // Validate that at least one source matches our server - found := false - for _, source := range params.Sources { - if source.Node == t.server { - found = true - break - } - } - if !found { - return fmt.Errorf("no source matches expected server %s", t.server) - } - - return nil -} - -// EstimateTime implements the UnifiedTask interface -func (t *BalanceTask) EstimateTime(params *worker_pb.TaskParams) time.Duration { - // Basic estimate based on simulated steps - return 14 * time.Second // Sum of all step durations -} - -// GetProgress returns current progress -func (t *BalanceTask) GetProgress() float64 { - return t.progress -} - -// Helper methods for real balance operations - -// markVolumeReadonly marks the volume readonly -func (t *BalanceTask) markVolumeReadonly(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ - VolumeId: uint32(volumeId), - }) - return err - }) -} - -// copyVolume copies volume from source to target server -func (t *BalanceTask) copyVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId) (uint64, error) { - var lastAppendAtNs uint64 - - err := operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: uint32(volumeId), - SourceDataNode: string(sourceServer), - }) - if err != nil { - return err - } - - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } - return recvErr - } - - if resp.LastAppendAtNs != 0 { - lastAppendAtNs = resp.LastAppendAtNs - } else { - // Report copy progress - glog.V(1).Infof("Volume %d copy progress: %s", volumeId, - util.BytesToHumanReadable(uint64(resp.ProcessedBytes))) - } - } - - return nil - }) - - return lastAppendAtNs, err -} - -// mountVolume mounts the volume on the target server -func (t *BalanceTask) mountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ - VolumeId: uint32(volumeId), - }) - return err - }) -} - -// tailVolume syncs remaining updates from source to target -func (t *BalanceTask) tailVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId, sinceNs uint64) error { - return operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ - VolumeId: uint32(volumeId), - SinceNs: sinceNs, - IdleTimeoutSeconds: 60, // 1 minute timeout - SourceVolumeServer: string(sourceServer), - }) - return err - }) -} - -// unmountVolume unmounts the volume from the server -func (t *BalanceTask) unmountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ - VolumeId: uint32(volumeId), - }) - return err - }) -} - -// deleteVolume deletes the volume from the server -func (t *BalanceTask) deleteVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ - VolumeId: uint32(volumeId), - OnlyEmpty: false, - }) - return err - }) -} diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go deleted file mode 100644 index 9303b4b2a..000000000 --- a/weed/worker/tasks/balance/config.go +++ /dev/null @@ -1,170 +0,0 @@ -package balance - -import ( - "fmt" - - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" -) - -// Config extends BaseConfig with balance-specific settings -type Config struct { - base.BaseConfig - ImbalanceThreshold float64 `json:"imbalance_threshold"` - MinServerCount int `json:"min_server_count"` -} - -// NewDefaultConfig creates a new default balance configuration -func NewDefaultConfig() *Config { - return &Config{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 30 * 60, // 30 minutes - MaxConcurrent: 1, - }, - ImbalanceThreshold: 0.2, // 20% - MinServerCount: 2, - } -} - -// GetConfigSpec returns the configuration schema for balance tasks -func GetConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Balance Tasks", - Description: "Whether balance tasks should be automatically created", - HelpText: "Toggle this to enable or disable automatic balance task generation", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 30 * 60, - MinValue: 5 * 60, - MaxValue: 2 * 60 * 60, - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volume distribution imbalances", - HelpText: "The system will check for volume distribution imbalances at this interval", - Placeholder: "30", - Unit: config.UnitMinutes, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 1, - MinValue: 1, - MaxValue: 3, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of balance tasks that can run simultaneously", - HelpText: "Limits the number of balance operations running at the same time", - Placeholder: "1 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "imbalance_threshold", - JSONName: "imbalance_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.2, - MinValue: 0.05, - MaxValue: 0.5, - Required: true, - DisplayName: "Imbalance Threshold", - Description: "Minimum imbalance ratio to trigger balancing", - HelpText: "Volume distribution imbalances above this threshold will trigger balancing", - Placeholder: "0.20 (20%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_server_count", - JSONName: "min_server_count", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 2, - MaxValue: 10, - Required: true, - DisplayName: "Minimum Server Count", - Description: "Minimum number of servers required for balancing", - HelpText: "Balancing will only occur if there are at least this many servers", - Placeholder: "2 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - }, - } -} - -// ToTaskPolicy converts configuration to a TaskPolicy protobuf message -func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { - return &worker_pb.TaskPolicy{ - Enabled: c.Enabled, - MaxConcurrent: int32(c.MaxConcurrent), - RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), - CheckIntervalSeconds: int32(c.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: float64(c.ImbalanceThreshold), - MinServerCount: int32(c.MinServerCount), - }, - }, - } -} - -// FromTaskPolicy loads configuration from a TaskPolicy protobuf message -func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { - if policy == nil { - return fmt.Errorf("policy is nil") - } - - // Set general TaskPolicy fields - c.Enabled = policy.Enabled - c.MaxConcurrent = int(policy.MaxConcurrent) - c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping - - // Set balance-specific fields from the task config - if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil { - c.ImbalanceThreshold = float64(balanceConfig.ImbalanceThreshold) - c.MinServerCount = int(balanceConfig.MinServerCount) - } - - return nil -} - -// LoadConfigFromPersistence loads configuration from the persistence layer if available -func LoadConfigFromPersistence(configPersistence interface{}) *Config { - config := NewDefaultConfig() - - // Try to load from persistence if available - if persistence, ok := configPersistence.(interface { - LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) - }); ok { - if policy, err := persistence.LoadBalanceTaskPolicy(); err == nil && policy != nil { - if err := config.FromTaskPolicy(policy); err == nil { - glog.V(1).Infof("Loaded balance configuration from persistence") - return config - } - } - } - - glog.V(1).Infof("Using default balance configuration") - return config -} diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go deleted file mode 100644 index 6d433c719..000000000 --- a/weed/worker/tasks/balance/detection.go +++ /dev/null @@ -1,272 +0,0 @@ -package balance - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/admin/topology" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Detection implements the detection logic for balance tasks -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - balanceConfig := config.(*Config) - - // Skip if cluster is too small - minVolumeCount := 2 // More reasonable for small clusters - if len(metrics) < minVolumeCount { - glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need ≥%d)", len(metrics), minVolumeCount) - return nil, nil - } - - // Analyze volume distribution across servers - serverVolumeCounts := make(map[string]int) - for _, metric := range metrics { - serverVolumeCounts[metric.Server]++ - } - - if len(serverVolumeCounts) < balanceConfig.MinServerCount { - glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need ≥%d)", len(serverVolumeCounts), balanceConfig.MinServerCount) - return nil, nil - } - - // Calculate balance metrics - totalVolumes := len(metrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) - - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" - - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server - } - if count < minVolumes { - minVolumes = count - minServer = server - } - } - - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= balanceConfig.ImbalanceThreshold { - glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", - imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - return nil, nil - } - - // Select a volume from the overloaded server for balance - var selectedVolume *types.VolumeHealthMetrics - for _, metric := range metrics { - if metric.Server == maxServer { - selectedVolume = metric - break - } - } - - if selectedVolume == nil { - glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer) - return nil, nil - } - - // Create balance task with volume and destination planning info - reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - - // Generate task ID for ActiveTopology integration - taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) - - task := &types.TaskDetectionResult{ - TaskID: taskID, // Link to ActiveTopology pending task - TaskType: types.TaskTypeBalance, - VolumeID: selectedVolume.VolumeID, - Server: selectedVolume.Server, - Collection: selectedVolume.Collection, - Priority: types.TaskPriorityNormal, - Reason: reason, - ScheduleAt: time.Now(), - } - - // Plan destination if ActiveTopology is available - if clusterInfo.ActiveTopology != nil { - destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) - if err != nil { - glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil, nil // Skip this task if destination planning fails - } - - // Find the actual disk containing the volume on the source server - sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - if !found { - return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", - selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - } - - // Create typed parameters with unified source and target information - task.TypedParams = &worker_pb.TaskParams{ - TaskId: taskID, // Link to ActiveTopology pending task - VolumeId: selectedVolume.VolumeID, - Collection: selectedVolume.Collection, - VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes - - // Unified sources and targets - the only way to specify locations - Sources: []*worker_pb.TaskSource{ - { - Node: selectedVolume.Server, - DiskId: sourceDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: selectedVolume.Size, - DataCenter: selectedVolume.DataCenter, - Rack: selectedVolume.Rack, - }, - }, - Targets: []*worker_pb.TaskTarget{ - { - Node: destinationPlan.TargetNode, - DiskId: destinationPlan.TargetDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: destinationPlan.ExpectedSize, - DataCenter: destinationPlan.TargetDC, - Rack: destinationPlan.TargetRack, - }, - }, - - TaskParams: &worker_pb.TaskParams_BalanceParams{ - BalanceParams: &worker_pb.BalanceTaskParams{ - ForceMove: false, - TimeoutSeconds: 600, // 10 minutes default - }, - }, - } - - glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s", - selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) - - // Add pending balance task to ActiveTopology for capacity management - targetDisk := destinationPlan.TargetDisk - - err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ - TaskID: taskID, - TaskType: topology.TaskTypeBalance, - VolumeID: selectedVolume.VolumeID, - VolumeSize: int64(selectedVolume.Size), - Sources: []topology.TaskSourceSpec{ - {ServerID: selectedVolume.Server, DiskID: sourceDisk}, - }, - Destinations: []topology.TaskDestinationSpec{ - {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, - }, - }) - if err != nil { - return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err) - } - - glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", - taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) - } else { - glog.Warningf("No ActiveTopology available for destination planning in balance detection") - return nil, nil - } - - return []*types.TaskDetectionResult{task}, nil -} - -// planBalanceDestination plans the destination for a balance operation -// This function implements destination planning logic directly in the detection phase -func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) { - // Get source node information from topology - var sourceRack, sourceDC string - - // Extract rack and DC from topology info - topologyInfo := activeTopology.GetTopologyInfo() - if topologyInfo != nil { - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dataNodeInfo := range rack.DataNodeInfos { - if dataNodeInfo.Id == selectedVolume.Server { - sourceDC = dc.Id - sourceRack = rack.Id - break - } - } - if sourceRack != "" { - break - } - } - if sourceDC != "" { - break - } - } - } - - // Get available disks, excluding the source node - availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeBalance, selectedVolume.Server) - if len(availableDisks) == 0 { - return nil, fmt.Errorf("no available disks for balance operation") - } - - // Find the best destination disk based on balance criteria - var bestDisk *topology.DiskInfo - bestScore := -1.0 - - for _, disk := range availableDisks { - score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size) - if score > bestScore { - bestScore = score - bestDisk = disk - } - } - - if bestDisk == nil { - return nil, fmt.Errorf("no suitable destination found for balance operation") - } - - return &topology.DestinationPlan{ - TargetNode: bestDisk.NodeID, - TargetDisk: bestDisk.DiskID, - TargetRack: bestDisk.Rack, - TargetDC: bestDisk.DataCenter, - ExpectedSize: selectedVolume.Size, - PlacementScore: bestScore, - }, nil -} - -// calculateBalanceScore calculates placement score for balance operations -func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 { - if disk.DiskInfo == nil { - return 0.0 - } - - score := 0.0 - - // Prefer disks with lower current volume count (better for balance) - if disk.DiskInfo.MaxVolumeCount > 0 { - utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) - score += (1.0 - utilization) * 40.0 // Up to 40 points for low utilization - } - - // Prefer different racks for better distribution - if disk.Rack != sourceRack { - score += 30.0 - } - - // Prefer different data centers for better distribution - if disk.DataCenter != sourceDC { - score += 20.0 - } - - // Prefer disks with lower current load - score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load - - return score -} diff --git a/weed/worker/tasks/balance/execution.go b/weed/worker/tasks/balance/execution.go deleted file mode 100644 index 0acd2b662..000000000 --- a/weed/worker/tasks/balance/execution.go +++ /dev/null @@ -1,158 +0,0 @@ -package balance - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// TypedTask implements balance operation with typed protobuf parameters -type TypedTask struct { - *base.BaseTypedTask - - // Task state from protobuf - sourceServer string - destNode string - volumeID uint32 - collection string - estimatedSize uint64 - forceMove bool - timeoutSeconds int32 -} - -// NewTypedTask creates a new typed balance task -func NewTypedTask() types.TypedTaskInterface { - task := &TypedTask{ - BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance), - } - return task -} - -// ValidateTyped validates the typed parameters for balance task -func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error { - // Basic validation from base class - if err := t.BaseTypedTask.ValidateTyped(params); err != nil { - return err - } - - // Check that we have balance-specific parameters - balanceParams := params.GetBalanceParams() - if balanceParams == nil { - return fmt.Errorf("balance_params is required for balance task") - } - - // Validate sources and targets - if len(params.Sources) == 0 { - return fmt.Errorf("at least one source is required for balance task") - } - if len(params.Targets) == 0 { - return fmt.Errorf("at least one target is required for balance task") - } - - // Validate that source and target have volume IDs - if params.Sources[0].VolumeId == 0 { - return fmt.Errorf("source volume_id is required for balance task") - } - if params.Targets[0].VolumeId == 0 { - return fmt.Errorf("target volume_id is required for balance task") - } - - // Validate timeout - if balanceParams.TimeoutSeconds <= 0 { - return fmt.Errorf("timeout_seconds must be greater than 0") - } - - return nil -} - -// EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters -func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration { - balanceParams := params.GetBalanceParams() - if balanceParams != nil { - // Use the timeout from parameters if specified - if balanceParams.TimeoutSeconds > 0 { - return time.Duration(balanceParams.TimeoutSeconds) * time.Second - } - } - - // Estimate based on volume size from sources (1 minute per GB) - if len(params.Sources) > 0 { - source := params.Sources[0] - if source.EstimatedSize > 0 { - gbSize := source.EstimatedSize / (1024 * 1024 * 1024) - return time.Duration(gbSize) * time.Minute - } - } - - // Default estimation - return 10 * time.Minute -} - -// ExecuteTyped implements the balance operation with typed parameters -func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error { - // Extract basic parameters - t.volumeID = params.VolumeId - t.collection = params.Collection - - // Ensure sources and targets are present (should be guaranteed by validation) - if len(params.Sources) == 0 { - return fmt.Errorf("at least one source is required for balance task (ExecuteTyped)") - } - if len(params.Targets) == 0 { - return fmt.Errorf("at least one target is required for balance task (ExecuteTyped)") - } - - // Extract source and target information - t.sourceServer = params.Sources[0].Node - t.estimatedSize = params.Sources[0].EstimatedSize - t.destNode = params.Targets[0].Node - // Extract balance-specific parameters - balanceParams := params.GetBalanceParams() - if balanceParams != nil { - t.forceMove = balanceParams.ForceMove - t.timeoutSeconds = balanceParams.TimeoutSeconds - } - - glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)", - t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize) - - // Simulate balance operation with progress updates - steps := []struct { - name string - duration time.Duration - progress float64 - }{ - {"Analyzing cluster state", 2 * time.Second, 15}, - {"Verifying destination capacity", 1 * time.Second, 25}, - {"Starting volume migration", 1 * time.Second, 35}, - {"Moving volume data", 6 * time.Second, 75}, - {"Updating cluster metadata", 2 * time.Second, 95}, - {"Verifying balance completion", 1 * time.Second, 100}, - } - - for _, step := range steps { - if t.IsCancelled() { - return fmt.Errorf("balance task cancelled during: %s", step.name) - } - - glog.V(1).Infof("Balance task step: %s", step.name) - t.SetProgress(step.progress) - - // Simulate work - time.Sleep(step.duration) - } - - glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s", - t.volumeID, t.sourceServer, t.destNode) - return nil -} - -// Register the typed task in the global registry -func init() { - types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask) - glog.V(1).Infof("Registered typed balance task") -} diff --git a/weed/worker/tasks/balance/monitoring.go b/weed/worker/tasks/balance/monitoring.go deleted file mode 100644 index 517de2484..000000000 --- a/weed/worker/tasks/balance/monitoring.go +++ /dev/null @@ -1,138 +0,0 @@ -package balance - -import ( - "sync" - "time" -) - -// BalanceMetrics contains balance-specific monitoring data -type BalanceMetrics struct { - // Execution metrics - VolumesBalanced int64 `json:"volumes_balanced"` - TotalDataTransferred int64 `json:"total_data_transferred"` - AverageImbalance float64 `json:"average_imbalance"` - LastBalanceTime time.Time `json:"last_balance_time"` - - // Performance metrics - AverageTransferSpeed float64 `json:"average_transfer_speed_mbps"` - TotalExecutionTime int64 `json:"total_execution_time_seconds"` - SuccessfulOperations int64 `json:"successful_operations"` - FailedOperations int64 `json:"failed_operations"` - - // Current task metrics - CurrentImbalanceScore float64 `json:"current_imbalance_score"` - PlannedDestinations int `json:"planned_destinations"` - - mutex sync.RWMutex -} - -// NewBalanceMetrics creates a new balance metrics instance -func NewBalanceMetrics() *BalanceMetrics { - return &BalanceMetrics{ - LastBalanceTime: time.Now(), - } -} - -// RecordVolumeBalanced records a successful volume balance operation -func (m *BalanceMetrics) RecordVolumeBalanced(volumeSize int64, transferTime time.Duration) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.VolumesBalanced++ - m.TotalDataTransferred += volumeSize - m.SuccessfulOperations++ - m.LastBalanceTime = time.Now() - m.TotalExecutionTime += int64(transferTime.Seconds()) - - // Calculate average transfer speed (MB/s) - if transferTime > 0 { - speedMBps := float64(volumeSize) / (1024 * 1024) / transferTime.Seconds() - if m.AverageTransferSpeed == 0 { - m.AverageTransferSpeed = speedMBps - } else { - // Exponential moving average - m.AverageTransferSpeed = 0.8*m.AverageTransferSpeed + 0.2*speedMBps - } - } -} - -// RecordFailure records a failed balance operation -func (m *BalanceMetrics) RecordFailure() { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.FailedOperations++ -} - -// UpdateImbalanceScore updates the current cluster imbalance score -func (m *BalanceMetrics) UpdateImbalanceScore(score float64) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.CurrentImbalanceScore = score - - // Update average imbalance with exponential moving average - if m.AverageImbalance == 0 { - m.AverageImbalance = score - } else { - m.AverageImbalance = 0.9*m.AverageImbalance + 0.1*score - } -} - -// SetPlannedDestinations sets the number of planned destinations -func (m *BalanceMetrics) SetPlannedDestinations(count int) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.PlannedDestinations = count -} - -// GetMetrics returns a copy of the current metrics (without the mutex) -func (m *BalanceMetrics) GetMetrics() BalanceMetrics { - m.mutex.RLock() - defer m.mutex.RUnlock() - - // Create a copy without the mutex to avoid copying lock value - return BalanceMetrics{ - VolumesBalanced: m.VolumesBalanced, - TotalDataTransferred: m.TotalDataTransferred, - AverageImbalance: m.AverageImbalance, - LastBalanceTime: m.LastBalanceTime, - AverageTransferSpeed: m.AverageTransferSpeed, - TotalExecutionTime: m.TotalExecutionTime, - SuccessfulOperations: m.SuccessfulOperations, - FailedOperations: m.FailedOperations, - CurrentImbalanceScore: m.CurrentImbalanceScore, - PlannedDestinations: m.PlannedDestinations, - } -} - -// GetSuccessRate returns the success rate as a percentage -func (m *BalanceMetrics) GetSuccessRate() float64 { - m.mutex.RLock() - defer m.mutex.RUnlock() - - total := m.SuccessfulOperations + m.FailedOperations - if total == 0 { - return 100.0 - } - return float64(m.SuccessfulOperations) / float64(total) * 100.0 -} - -// Reset resets all metrics to zero -func (m *BalanceMetrics) Reset() { - m.mutex.Lock() - defer m.mutex.Unlock() - - *m = BalanceMetrics{ - LastBalanceTime: time.Now(), - } -} - -// Global metrics instance for balance tasks -var globalBalanceMetrics = NewBalanceMetrics() - -// GetGlobalBalanceMetrics returns the global balance metrics instance -func GetGlobalBalanceMetrics() *BalanceMetrics { - return globalBalanceMetrics -} diff --git a/weed/worker/tasks/balance/register.go b/weed/worker/tasks/balance/register.go deleted file mode 100644 index 76d56c7c5..000000000 --- a/weed/worker/tasks/balance/register.go +++ /dev/null @@ -1,86 +0,0 @@ -package balance - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Global variable to hold the task definition for configuration updates -var globalTaskDef *base.TaskDefinition - -// Auto-register this task when the package is imported -func init() { - RegisterBalanceTask() - - // Register config updater - tasks.AutoRegisterConfigUpdater(types.TaskTypeBalance, UpdateConfigFromPersistence) -} - -// RegisterBalanceTask registers the balance task with the new architecture -func RegisterBalanceTask() { - // Create configuration instance - config := NewDefaultConfig() - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeBalance, - Name: "balance", - DisplayName: "Volume Balance", - Description: "Balances volume distribution across servers", - Icon: "fas fa-balance-scale text-warning", - Capabilities: []string{"balance", "distribution"}, - - Config: config, - ConfigSpec: GetConfigSpec(), - CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) { - if params == nil { - return nil, fmt.Errorf("task parameters are required") - } - if len(params.Sources) == 0 { - return nil, fmt.Errorf("at least one source is required for balance task") - } - return NewBalanceTask( - fmt.Sprintf("balance-%d", params.VolumeId), - params.Sources[0].Node, // Use first source node - params.VolumeId, - params.Collection, - ), nil - }, - DetectionFunc: Detection, - ScanInterval: 30 * time.Minute, - SchedulingFunc: Scheduling, - MaxConcurrent: 1, - RepeatInterval: 2 * time.Hour, - } - - // Store task definition globally for configuration updates - globalTaskDef = taskDef - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} - -// UpdateConfigFromPersistence updates the balance configuration from persistence -func UpdateConfigFromPersistence(configPersistence interface{}) error { - if globalTaskDef == nil { - return fmt.Errorf("balance task not registered") - } - - // Load configuration from persistence - newConfig := LoadConfigFromPersistence(configPersistence) - if newConfig == nil { - return fmt.Errorf("failed to load configuration from persistence") - } - - // Update the task definition's config - globalTaskDef.Config = newConfig - - glog.V(1).Infof("Updated balance task configuration from persistence") - return nil -} diff --git a/weed/worker/tasks/balance/scheduling.go b/weed/worker/tasks/balance/scheduling.go deleted file mode 100644 index 878686309..000000000 --- a/weed/worker/tasks/balance/scheduling.go +++ /dev/null @@ -1,37 +0,0 @@ -package balance - -import ( - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Scheduling implements the scheduling logic for balance tasks -func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool { - balanceConfig := config.(*Config) - - // Count running balance tasks - runningBalanceCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeBalance { - runningBalanceCount++ - } - } - - // Check concurrency limit - if runningBalanceCount >= balanceConfig.MaxConcurrent { - return false - } - - // Check if we have available workers - availableWorkerCount := 0 - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeBalance { - availableWorkerCount++ - break - } - } - } - - return availableWorkerCount > 0 -} diff --git a/weed/worker/tasks/base/volume_utils.go b/weed/worker/tasks/base/volume_utils.go index 2aaf795b2..a696cc5b9 100644 --- a/weed/worker/tasks/base/volume_utils.go +++ b/weed/worker/tasks/base/volume_utils.go @@ -9,15 +9,12 @@ import ( // Uses O(1) indexed lookup for optimal performance on large clusters. // // This is a shared utility function used by multiple task detection algorithms -// (balance, vacuum, etc.) to locate volumes efficiently. +// to locate volumes efficiently. // // Example usage: // -// // In balance task: find source disk for a volume that needs to be moved +// // Find source disk for a volume that needs to be processed // sourceDisk, found := base.FindVolumeDisk(topology, volumeID, collection, sourceServer) -// -// // In vacuum task: find disk containing volume that needs cleanup -// diskID, exists := base.FindVolumeDisk(topology, volumeID, collection, serverID) func FindVolumeDisk(activeTopology *topology.ActiveTopology, volumeID uint32, collection string, serverID string) (uint32, bool) { if activeTopology == nil { return 0, false diff --git a/weed/worker/tasks/vacuum/config.go b/weed/worker/tasks/vacuum/config.go deleted file mode 100644 index fe8c0e8c5..000000000 --- a/weed/worker/tasks/vacuum/config.go +++ /dev/null @@ -1,190 +0,0 @@ -package vacuum - -import ( - "fmt" - - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" -) - -// Config extends BaseConfig with vacuum-specific settings -type Config struct { - base.BaseConfig - GarbageThreshold float64 `json:"garbage_threshold"` - MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` - MinIntervalSeconds int `json:"min_interval_seconds"` -} - -// NewDefaultConfig creates a new default vacuum configuration -func NewDefaultConfig() *Config { - return &Config{ - BaseConfig: base.BaseConfig{ - Enabled: true, - ScanIntervalSeconds: 2 * 60 * 60, // 2 hours - MaxConcurrent: 2, - }, - GarbageThreshold: 0.3, // 30% - MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - } -} - -// ToTaskPolicy converts configuration to a TaskPolicy protobuf message -func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { - return &worker_pb.TaskPolicy{ - Enabled: c.Enabled, - MaxConcurrent: int32(c.MaxConcurrent), - RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), - CheckIntervalSeconds: int32(c.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: float64(c.GarbageThreshold), - MinVolumeAgeHours: int32(c.MinVolumeAgeSeconds / 3600), // Convert seconds to hours - MinIntervalSeconds: int32(c.MinIntervalSeconds), - }, - }, - } -} - -// FromTaskPolicy loads configuration from a TaskPolicy protobuf message -func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { - if policy == nil { - return fmt.Errorf("policy is nil") - } - - // Set general TaskPolicy fields - c.Enabled = policy.Enabled - c.MaxConcurrent = int(policy.MaxConcurrent) - c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping - - // Set vacuum-specific fields from the task config - if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil { - c.GarbageThreshold = float64(vacuumConfig.GarbageThreshold) - c.MinVolumeAgeSeconds = int(vacuumConfig.MinVolumeAgeHours * 3600) // Convert hours to seconds - c.MinIntervalSeconds = int(vacuumConfig.MinIntervalSeconds) - } - - return nil -} - -// LoadConfigFromPersistence loads configuration from the persistence layer if available -func LoadConfigFromPersistence(configPersistence interface{}) *Config { - config := NewDefaultConfig() - - // Try to load from persistence if available - if persistence, ok := configPersistence.(interface { - LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) - }); ok { - if policy, err := persistence.LoadVacuumTaskPolicy(); err == nil && policy != nil { - if err := config.FromTaskPolicy(policy); err == nil { - glog.V(1).Infof("Loaded vacuum configuration from persistence") - return config - } - } - } - - glog.V(1).Infof("Using default vacuum configuration") - return config -} - -// GetConfigSpec returns the configuration schema for vacuum tasks -func GetConfigSpec() base.ConfigSpec { - return base.ConfigSpec{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Vacuum Tasks", - Description: "Whether vacuum tasks should be automatically created", - HelpText: "Toggle this to enable or disable automatic vacuum task generation", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 2 * 60 * 60, - MinValue: 10 * 60, - MaxValue: 24 * 60 * 60, - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing vacuum", - HelpText: "The system will check for volumes that need vacuuming at this interval", - Placeholder: "2", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 1, - MaxValue: 10, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of vacuum tasks that can run simultaneously", - HelpText: "Limits the number of vacuum operations running at the same time to control system load", - Placeholder: "2 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "garbage_threshold", - JSONName: "garbage_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.3, - MinValue: 0.0, - MaxValue: 1.0, - Required: true, - DisplayName: "Garbage Percentage Threshold", - Description: "Trigger vacuum when garbage ratio exceeds this percentage", - HelpText: "Volumes with more deleted content than this threshold will be vacuumed", - Placeholder: "0.30 (30%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_volume_age_seconds", - JSONName: "min_volume_age_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 24 * 60 * 60, - MinValue: 1 * 60 * 60, - MaxValue: 7 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Volume Age", - Description: "Only vacuum volumes older than this duration", - HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", - Placeholder: "24", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "min_interval_seconds", - JSONName: "min_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 7 * 24 * 60 * 60, - MinValue: 1 * 24 * 60 * 60, - MaxValue: 30 * 24 * 60 * 60, - Required: true, - DisplayName: "Minimum Interval", - Description: "Minimum time between vacuum operations on the same volume", - HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", - Placeholder: "7", - Unit: config.UnitDays, - InputType: "interval", - CSSClasses: "form-control", - }, - }, - } -} diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go deleted file mode 100644 index bd86a2742..000000000 --- a/weed/worker/tasks/vacuum/detection.go +++ /dev/null @@ -1,133 +0,0 @@ -package vacuum - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Detection implements the detection logic for vacuum tasks -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - if !config.IsEnabled() { - return nil, nil - } - - vacuumConfig := config.(*Config) - var results []*types.TaskDetectionResult - minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second - - debugCount := 0 - skippedDueToGarbage := 0 - skippedDueToAge := 0 - - for _, metric := range metrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } - - // Generate task ID for future ActiveTopology integration - taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix()) - - result := &types.TaskDetectionResult{ - TaskID: taskID, // For future ActiveTopology integration - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - ScheduleAt: time.Now(), - } - - // Create typed parameters for vacuum task - result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo) - results = append(results, result) - } else { - // Debug why volume was not selected - if debugCount < 5 { // Limit debug output to first 5 volumes - if metric.GarbageRatio < vacuumConfig.GarbageThreshold { - skippedDueToGarbage++ - } - if metric.Age < minVolumeAge { - skippedDueToAge++ - } - } - debugCount++ - } - } - - // Log debug summary if no tasks were created - if len(results) == 0 && len(metrics) > 0 { - totalVolumes := len(metrics) - glog.Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage= 3 { // Limit to first 3 volumes - break - } - glog.Infof("VACUUM: Volume %d: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)", - metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100, - metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute)) - } - } - - return results, nil -} - -// createVacuumTaskParams creates typed parameters for vacuum tasks -// This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic -func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams { - // Use configured values or defaults - garbageThreshold := 0.3 // Default 30% - verifyChecksum := true // Default to verify - batchSize := int32(1000) // Default batch size - workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory - - if vacuumConfig != nil { - garbageThreshold = vacuumConfig.GarbageThreshold - // Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds - // Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added - // to the protobuf definition if they should be configurable - } - - // Use DC and rack information directly from VolumeHealthMetrics - sourceDC, sourceRack := metric.DataCenter, metric.Rack - - // Create typed protobuf parameters with unified sources - return &worker_pb.TaskParams{ - TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated) - VolumeId: task.VolumeID, - Collection: task.Collection, - VolumeSize: metric.Size, // Store original volume size for tracking changes - - // Unified sources array - Sources: []*worker_pb.TaskSource{ - { - Node: task.Server, - VolumeId: task.VolumeID, - EstimatedSize: metric.Size, - DataCenter: sourceDC, - Rack: sourceRack, - }, - }, - - TaskParams: &worker_pb.TaskParams_VacuumParams{ - VacuumParams: &worker_pb.VacuumTaskParams{ - GarbageThreshold: garbageThreshold, - ForceVacuum: false, - BatchSize: batchSize, - WorkingDir: workingDir, - VerifyChecksum: verifyChecksum, - }, - }, - } -} diff --git a/weed/worker/tasks/vacuum/monitoring.go b/weed/worker/tasks/vacuum/monitoring.go deleted file mode 100644 index c7dfd673e..000000000 --- a/weed/worker/tasks/vacuum/monitoring.go +++ /dev/null @@ -1,151 +0,0 @@ -package vacuum - -import ( - "sync" - "time" -) - -// VacuumMetrics contains vacuum-specific monitoring data -type VacuumMetrics struct { - // Execution metrics - VolumesVacuumed int64 `json:"volumes_vacuumed"` - TotalSpaceReclaimed int64 `json:"total_space_reclaimed"` - TotalFilesProcessed int64 `json:"total_files_processed"` - TotalGarbageCollected int64 `json:"total_garbage_collected"` - LastVacuumTime time.Time `json:"last_vacuum_time"` - - // Performance metrics - AverageVacuumTime int64 `json:"average_vacuum_time_seconds"` - AverageGarbageRatio float64 `json:"average_garbage_ratio"` - SuccessfulOperations int64 `json:"successful_operations"` - FailedOperations int64 `json:"failed_operations"` - - // Current task metrics - CurrentGarbageRatio float64 `json:"current_garbage_ratio"` - VolumesPendingVacuum int `json:"volumes_pending_vacuum"` - - mutex sync.RWMutex -} - -// NewVacuumMetrics creates a new vacuum metrics instance -func NewVacuumMetrics() *VacuumMetrics { - return &VacuumMetrics{ - LastVacuumTime: time.Now(), - } -} - -// RecordVolumeVacuumed records a successful volume vacuum operation -func (m *VacuumMetrics) RecordVolumeVacuumed(spaceReclaimed int64, filesProcessed int64, garbageCollected int64, vacuumTime time.Duration, garbageRatio float64) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.VolumesVacuumed++ - m.TotalSpaceReclaimed += spaceReclaimed - m.TotalFilesProcessed += filesProcessed - m.TotalGarbageCollected += garbageCollected - m.SuccessfulOperations++ - m.LastVacuumTime = time.Now() - - // Update average vacuum time - if m.AverageVacuumTime == 0 { - m.AverageVacuumTime = int64(vacuumTime.Seconds()) - } else { - // Exponential moving average - newTime := int64(vacuumTime.Seconds()) - m.AverageVacuumTime = (m.AverageVacuumTime*4 + newTime) / 5 - } - - // Update average garbage ratio - if m.AverageGarbageRatio == 0 { - m.AverageGarbageRatio = garbageRatio - } else { - // Exponential moving average - m.AverageGarbageRatio = 0.8*m.AverageGarbageRatio + 0.2*garbageRatio - } -} - -// RecordFailure records a failed vacuum operation -func (m *VacuumMetrics) RecordFailure() { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.FailedOperations++ -} - -// UpdateCurrentGarbageRatio updates the current volume's garbage ratio -func (m *VacuumMetrics) UpdateCurrentGarbageRatio(ratio float64) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.CurrentGarbageRatio = ratio -} - -// SetVolumesPendingVacuum sets the number of volumes pending vacuum -func (m *VacuumMetrics) SetVolumesPendingVacuum(count int) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.VolumesPendingVacuum = count -} - -// GetMetrics returns a copy of the current metrics (without the mutex) -func (m *VacuumMetrics) GetMetrics() VacuumMetrics { - m.mutex.RLock() - defer m.mutex.RUnlock() - - // Create a copy without the mutex to avoid copying lock value - return VacuumMetrics{ - VolumesVacuumed: m.VolumesVacuumed, - TotalSpaceReclaimed: m.TotalSpaceReclaimed, - TotalFilesProcessed: m.TotalFilesProcessed, - TotalGarbageCollected: m.TotalGarbageCollected, - LastVacuumTime: m.LastVacuumTime, - AverageVacuumTime: m.AverageVacuumTime, - AverageGarbageRatio: m.AverageGarbageRatio, - SuccessfulOperations: m.SuccessfulOperations, - FailedOperations: m.FailedOperations, - CurrentGarbageRatio: m.CurrentGarbageRatio, - VolumesPendingVacuum: m.VolumesPendingVacuum, - } -} - -// GetSuccessRate returns the success rate as a percentage -func (m *VacuumMetrics) GetSuccessRate() float64 { - m.mutex.RLock() - defer m.mutex.RUnlock() - - total := m.SuccessfulOperations + m.FailedOperations - if total == 0 { - return 100.0 - } - return float64(m.SuccessfulOperations) / float64(total) * 100.0 -} - -// GetAverageSpaceReclaimed returns the average space reclaimed per volume -func (m *VacuumMetrics) GetAverageSpaceReclaimed() float64 { - m.mutex.RLock() - defer m.mutex.RUnlock() - - if m.VolumesVacuumed == 0 { - return 0 - } - return float64(m.TotalSpaceReclaimed) / float64(m.VolumesVacuumed) -} - -// Reset resets all metrics to zero -func (m *VacuumMetrics) Reset() { - m.mutex.Lock() - defer m.mutex.Unlock() - - *m = VacuumMetrics{ - LastVacuumTime: time.Now(), - } -} - -// Global metrics instance for vacuum tasks -var globalVacuumMetrics = NewVacuumMetrics() - -// GetGlobalVacuumMetrics returns the global vacuum metrics instance -func GetGlobalVacuumMetrics() *VacuumMetrics { - return globalVacuumMetrics -} diff --git a/weed/worker/tasks/vacuum/register.go b/weed/worker/tasks/vacuum/register.go deleted file mode 100644 index 2c1360b5b..000000000 --- a/weed/worker/tasks/vacuum/register.go +++ /dev/null @@ -1,86 +0,0 @@ -package vacuum - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Global variable to hold the task definition for configuration updates -var globalTaskDef *base.TaskDefinition - -// Auto-register this task when the package is imported -func init() { - RegisterVacuumTask() - - // Register config updater - tasks.AutoRegisterConfigUpdater(types.TaskTypeVacuum, UpdateConfigFromPersistence) -} - -// RegisterVacuumTask registers the vacuum task with the new architecture -func RegisterVacuumTask() { - // Create configuration instance - config := NewDefaultConfig() - - // Create complete task definition - taskDef := &base.TaskDefinition{ - Type: types.TaskTypeVacuum, - Name: "vacuum", - DisplayName: "Volume Vacuum", - Description: "Reclaims disk space by removing deleted files from volumes", - Icon: "fas fa-broom text-primary", - Capabilities: []string{"vacuum", "storage"}, - - Config: config, - ConfigSpec: GetConfigSpec(), - CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) { - if params == nil { - return nil, fmt.Errorf("task parameters are required") - } - if len(params.Sources) == 0 { - return nil, fmt.Errorf("at least one source is required for vacuum task") - } - return NewVacuumTask( - fmt.Sprintf("vacuum-%d", params.VolumeId), - params.Sources[0].Node, // Use first source node - params.VolumeId, - params.Collection, - ), nil - }, - DetectionFunc: Detection, - ScanInterval: 2 * time.Hour, - SchedulingFunc: Scheduling, - MaxConcurrent: 2, - RepeatInterval: 7 * 24 * time.Hour, - } - - // Store task definition globally for configuration updates - globalTaskDef = taskDef - - // Register everything with a single function call! - base.RegisterTask(taskDef) -} - -// UpdateConfigFromPersistence updates the vacuum configuration from persistence -func UpdateConfigFromPersistence(configPersistence interface{}) error { - if globalTaskDef == nil { - return fmt.Errorf("vacuum task not registered") - } - - // Load configuration from persistence - newConfig := LoadConfigFromPersistence(configPersistence) - if newConfig == nil { - return fmt.Errorf("failed to load configuration from persistence") - } - - // Update the task definition's config - globalTaskDef.Config = newConfig - - glog.V(1).Infof("Updated vacuum task configuration from persistence") - return nil -} diff --git a/weed/worker/tasks/vacuum/scheduling.go b/weed/worker/tasks/vacuum/scheduling.go deleted file mode 100644 index c44724eb9..000000000 --- a/weed/worker/tasks/vacuum/scheduling.go +++ /dev/null @@ -1,37 +0,0 @@ -package vacuum - -import ( - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Scheduling implements the scheduling logic for vacuum tasks -func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool { - vacuumConfig := config.(*Config) - - // Count running vacuum tasks - runningVacuumCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeVacuum { - runningVacuumCount++ - } - } - - // Check concurrency limit - if runningVacuumCount >= vacuumConfig.MaxConcurrent { - return false - } - - // Check for available workers with vacuum capability - for _, worker := range availableWorkers { - if worker.CurrentLoad < worker.MaxConcurrent { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeVacuum { - return true - } - } - } - } - - return false -} diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go deleted file mode 100644 index ebb41564f..000000000 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ /dev/null @@ -1,244 +0,0 @@ -package vacuum - -import ( - "context" - "fmt" - "io" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/operation" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/types" - "github.com/seaweedfs/seaweedfs/weed/worker/types/base" - "google.golang.org/grpc" -) - -// VacuumTask implements the Task interface -type VacuumTask struct { - *base.BaseTask - server string - volumeID uint32 - collection string - garbageThreshold float64 - progress float64 -} - -// NewVacuumTask creates a new unified vacuum task instance -func NewVacuumTask(id string, server string, volumeID uint32, collection string) *VacuumTask { - return &VacuumTask{ - BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum), - server: server, - volumeID: volumeID, - collection: collection, - garbageThreshold: 0.3, // Default 30% threshold - } -} - -// Execute implements the UnifiedTask interface -func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { - if params == nil { - return fmt.Errorf("task parameters are required") - } - - vacuumParams := params.GetVacuumParams() - if vacuumParams == nil { - return fmt.Errorf("vacuum parameters are required") - } - - t.garbageThreshold = vacuumParams.GarbageThreshold - - t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, - "server": t.server, - "collection": t.collection, - "garbage_threshold": t.garbageThreshold, - }).Info("Starting vacuum task") - - // Step 1: Check volume status and garbage ratio - t.ReportProgress(10.0) - t.GetLogger().Info("Checking volume status") - eligible, currentGarbageRatio, err := t.checkVacuumEligibility() - if err != nil { - return fmt.Errorf("failed to check vacuum eligibility: %v", err) - } - - if !eligible { - t.GetLogger().WithFields(map[string]interface{}{ - "current_garbage_ratio": currentGarbageRatio, - "required_threshold": t.garbageThreshold, - }).Info("Volume does not meet vacuum criteria, skipping") - t.ReportProgress(100.0) - return nil - } - - // Step 2: Perform vacuum operation - t.ReportProgress(50.0) - t.GetLogger().WithFields(map[string]interface{}{ - "garbage_ratio": currentGarbageRatio, - "threshold": t.garbageThreshold, - }).Info("Performing vacuum operation") - - if err := t.performVacuum(); err != nil { - return fmt.Errorf("failed to perform vacuum: %v", err) - } - - // Step 3: Verify vacuum results - t.ReportProgress(90.0) - t.GetLogger().Info("Verifying vacuum results") - if err := t.verifyVacuumResults(); err != nil { - glog.Warningf("Vacuum verification failed: %v", err) - // Don't fail the task - vacuum operation itself succeeded - } - - t.ReportProgress(100.0) - glog.Infof("Vacuum task completed successfully: volume %d from %s (garbage ratio was %.2f%%)", - t.volumeID, t.server, currentGarbageRatio*100) - return nil -} - -// Validate implements the UnifiedTask interface -func (t *VacuumTask) Validate(params *worker_pb.TaskParams) error { - if params == nil { - return fmt.Errorf("task parameters are required") - } - - vacuumParams := params.GetVacuumParams() - if vacuumParams == nil { - return fmt.Errorf("vacuum parameters are required") - } - - if params.VolumeId != t.volumeID { - return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId) - } - - // Validate that at least one source matches our server - found := false - for _, source := range params.Sources { - if source.Node == t.server { - found = true - break - } - } - if !found { - return fmt.Errorf("no source matches expected server %s", t.server) - } - - if vacuumParams.GarbageThreshold < 0 || vacuumParams.GarbageThreshold > 1.0 { - return fmt.Errorf("invalid garbage threshold: %f (must be between 0.0 and 1.0)", vacuumParams.GarbageThreshold) - } - - return nil -} - -// EstimateTime implements the UnifiedTask interface -func (t *VacuumTask) EstimateTime(params *worker_pb.TaskParams) time.Duration { - // Basic estimate based on simulated steps - return 14 * time.Second // Sum of all step durations -} - -// GetProgress returns current progress -func (t *VacuumTask) GetProgress() float64 { - return t.progress -} - -// Helper methods for real vacuum operations - -// checkVacuumEligibility checks if the volume meets vacuum criteria -func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { - var garbageRatio float64 - - err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("failed to check volume vacuum status: %v", err) - } - - garbageRatio = resp.GarbageRatio - - return nil - }) - - if err != nil { - return false, 0, err - } - - eligible := garbageRatio >= t.garbageThreshold - glog.V(1).Infof("Volume %d garbage ratio: %.2f%%, threshold: %.2f%%, eligible: %v", - t.volumeID, garbageRatio*100, t.garbageThreshold*100, eligible) - - return eligible, garbageRatio, nil -} - -// performVacuum executes the actual vacuum operation -func (t *VacuumTask) performVacuum() error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - // Step 1: Compact the volume - t.GetLogger().Info("Compacting volume") - stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("vacuum compact failed: %v", err) - } - - // Read compact progress - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } - return fmt.Errorf("vacuum compact stream error: %v", recvErr) - } - glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes) - } - - // Step 2: Commit the vacuum - t.GetLogger().Info("Committing vacuum operation") - _, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("vacuum commit failed: %v", err) - } - - // Step 3: Cleanup old files - t.GetLogger().Info("Cleaning up vacuum files") - _, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("vacuum cleanup failed: %v", err) - } - - glog.V(1).Infof("Volume %d vacuum operation completed successfully", t.volumeID) - return nil - }) -} - -// verifyVacuumResults checks the volume status after vacuum -func (t *VacuumTask) verifyVacuumResults() error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("failed to verify vacuum results: %v", err) - } - - postVacuumGarbageRatio := resp.GarbageRatio - - glog.V(1).Infof("Volume %d post-vacuum garbage ratio: %.2f%%", - t.volumeID, postVacuumGarbageRatio*100) - - return nil - }) -} diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go index c4cafd07f..761b5a1fd 100644 --- a/weed/worker/types/task_types.go +++ b/weed/worker/types/task_types.go @@ -11,9 +11,7 @@ import ( type TaskType string const ( - TaskTypeVacuum TaskType = "vacuum" TaskTypeErasureCoding TaskType = "erasure_coding" - TaskTypeBalance TaskType = "balance" TaskTypeReplication TaskType = "replication" ) diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 49d1ea57f..eb0c28e13 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -16,9 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" // Import task packages to trigger their auto-registration - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) // Worker represents a maintenance worker instance