Browse Source

remove Vacuum - Completely removed Balance - Completely removed

add-ec-vacuum
chrislu 4 months ago
parent
commit
96d6d27607
  1. 208
      weed/admin/dash/config_persistence.go
  2. 20
      weed/admin/handlers/maintenance_handlers.go
  3. 164
      weed/admin/handlers/maintenance_handlers_test.go
  4. 35
      weed/admin/maintenance/maintenance_manager.go
  5. 2
      weed/admin/maintenance/maintenance_worker.go
  6. 2
      weed/admin/view/layout/menu_helper.go
  7. 2
      weed/command/worker.go
  8. 267
      weed/worker/tasks/balance/balance_task.go
  9. 170
      weed/worker/tasks/balance/config.go
  10. 272
      weed/worker/tasks/balance/detection.go
  11. 158
      weed/worker/tasks/balance/execution.go
  12. 138
      weed/worker/tasks/balance/monitoring.go
  13. 86
      weed/worker/tasks/balance/register.go
  14. 37
      weed/worker/tasks/balance/scheduling.go
  15. 7
      weed/worker/tasks/base/volume_utils.go
  16. 190
      weed/worker/tasks/vacuum/config.go
  17. 133
      weed/worker/tasks/vacuum/detection.go
  18. 151
      weed/worker/tasks/vacuum/monitoring.go
  19. 86
      weed/worker/tasks/vacuum/register.go
  20. 37
      weed/worker/tasks/vacuum/scheduling.go
  21. 244
      weed/worker/tasks/vacuum/vacuum_task.go
  22. 2
      weed/worker/types/task_types.go
  23. 2
      weed/worker/worker.go

208
weed/admin/dash/config_persistence.go

@ -12,9 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
@ -25,16 +23,12 @@ const (
// Configuration file names (protobuf binary)
MaintenanceConfigFile = "maintenance.pb"
VacuumTaskConfigFile = "task_vacuum.pb"
ECTaskConfigFile = "task_erasure_coding.pb"
BalanceTaskConfigFile = "task_balance.pb"
ReplicationTaskConfigFile = "task_replication.pb"
// JSON reference files
MaintenanceConfigJSONFile = "maintenance.json"
VacuumTaskConfigJSONFile = "task_vacuum.json"
ECTaskConfigJSONFile = "task_erasure_coding.json"
BalanceTaskConfigJSONFile = "task_balance.json"
ReplicationTaskConfigJSONFile = "task_replication.json"
// Task persistence subdirectories and settings
@ -262,92 +256,6 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
return nil
}
// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file
func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error {
return cp.saveTaskConfig(VacuumTaskConfigFile, config)
}
// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file
func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error {
return cp.saveTaskConfig(VacuumTaskConfigFile, policy)
}
// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file
func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) {
// Load as TaskPolicy and extract vacuum config
if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil {
if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil {
return vacuumConfig, nil
}
}
// Return default config if no valid config found
return &VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}, nil
}
// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file
func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) {
if cp.dataDir == "" {
// Return default policy if no data directory
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 2,
RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
},
},
}, nil
}
confDir := filepath.Join(cp.dataDir, ConfigSubdir)
configPath := filepath.Join(confDir, VacuumTaskConfigFile)
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
// Return default policy if file doesn't exist
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 2,
RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
},
},
}, nil
}
// Read file
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to read vacuum task config file: %w", err)
}
// Try to unmarshal as TaskPolicy
var policy worker_pb.TaskPolicy
if err := proto.Unmarshal(configData, &policy); err == nil {
// Validate that it's actually a TaskPolicy with vacuum config
if policy.GetVacuumConfig() != nil {
glog.V(1).Infof("Loaded vacuum task policy from %s", configPath)
return &policy, nil
}
}
return nil, fmt.Errorf("failed to unmarshal vacuum task configuration")
}
// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file
func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error {
return cp.saveTaskConfig(ECTaskConfigFile, config)
@ -437,89 +345,6 @@ func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolic
return nil, fmt.Errorf("failed to unmarshal EC task configuration")
}
// SaveBalanceTaskConfig saves balance task configuration to protobuf file
func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error {
return cp.saveTaskConfig(BalanceTaskConfigFile, config)
}
// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file
func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error {
return cp.saveTaskConfig(BalanceTaskConfigFile, policy)
}
// LoadBalanceTaskConfig loads balance task configuration from protobuf file
func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) {
// Load as TaskPolicy and extract balance config
if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil {
if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil {
return balanceConfig, nil
}
}
// Return default config if no valid config found
return &BalanceTaskConfig{
ImbalanceThreshold: 0.1,
MinServerCount: 2,
}, nil
}
// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file
func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) {
if cp.dataDir == "" {
// Return default policy if no data directory
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 1,
RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
BalanceConfig: &worker_pb.BalanceTaskConfig{
ImbalanceThreshold: 0.1,
MinServerCount: 2,
},
},
}, nil
}
confDir := filepath.Join(cp.dataDir, ConfigSubdir)
configPath := filepath.Join(confDir, BalanceTaskConfigFile)
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
// Return default policy if file doesn't exist
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 1,
RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
BalanceConfig: &worker_pb.BalanceTaskConfig{
ImbalanceThreshold: 0.1,
MinServerCount: 2,
},
},
}, nil
}
// Read file
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to read balance task config file: %w", err)
}
// Try to unmarshal as TaskPolicy
var policy worker_pb.TaskPolicy
if err := proto.Unmarshal(configData, &policy); err == nil {
// Validate that it's actually a TaskPolicy with balance config
if policy.GetBalanceConfig() != nil {
glog.V(1).Infof("Loaded balance task policy from %s", configPath)
return &policy, nil
}
}
return nil, fmt.Errorf("failed to unmarshal balance task configuration")
}
// SaveReplicationTaskConfig saves replication task configuration to protobuf file
func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error {
return cp.saveTaskConfig(ReplicationTaskConfigFile, config)
@ -673,23 +498,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
}
// Load vacuum task configuration
if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
Enabled: vacuumConfig.Enabled,
MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
},
},
}
}
// Load erasure coding task configuration
if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
@ -708,22 +516,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
}
}
// Load balance task configuration
if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
Enabled: balanceConfig.Enabled,
MaxConcurrent: int32(balanceConfig.MaxConcurrent),
RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
BalanceConfig: &worker_pb.BalanceTaskConfig{
ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
MinServerCount: int32(balanceConfig.MinServerCount),
},
},
}
}
glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
return policy
}

20
weed/admin/handlers/maintenance_handlers.go

@ -17,9 +17,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin/view/layout"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@ -235,10 +233,6 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
// Create a new config instance based on task type and apply schema defaults
var config TaskConfig
switch taskType {
case types.TaskTypeVacuum:
config = &vacuum.Config{}
case types.TaskTypeBalance:
config = &balance.Config{}
case types.TaskTypeErasureCoding:
config = &erasure_coding.Config{}
default:
@ -285,21 +279,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
// Debug logging - show parsed config values
switch taskType {
case types.TaskTypeVacuum:
if vacuumConfig, ok := config.(*vacuum.Config); ok {
glog.V(1).Infof("Parsed vacuum config - GarbageThreshold: %f, MinVolumeAgeSeconds: %d, MinIntervalSeconds: %d",
vacuumConfig.GarbageThreshold, vacuumConfig.MinVolumeAgeSeconds, vacuumConfig.MinIntervalSeconds)
}
case types.TaskTypeErasureCoding:
if ecConfig, ok := config.(*erasure_coding.Config); ok {
glog.V(1).Infof("Parsed EC config - FullnessRatio: %f, QuietForSeconds: %d, MinSizeMB: %d, CollectionFilter: '%s'",
ecConfig.FullnessRatio, ecConfig.QuietForSeconds, ecConfig.MinSizeMB, ecConfig.CollectionFilter)
}
case types.TaskTypeBalance:
if balanceConfig, ok := config.(*balance.Config); ok {
glog.V(1).Infof("Parsed balance config - Enabled: %v, MaxConcurrent: %d, ScanIntervalSeconds: %d, ImbalanceThreshold: %f, MinServerCount: %d",
balanceConfig.Enabled, balanceConfig.MaxConcurrent, balanceConfig.ScanIntervalSeconds, balanceConfig.ImbalanceThreshold, balanceConfig.MinServerCount)
}
}
// Validate the configuration
@ -580,12 +564,8 @@ func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType,
// Save using task-specific methods
switch taskType {
case types.TaskTypeVacuum:
return configPersistence.SaveVacuumTaskPolicy(taskPolicy)
case types.TaskTypeErasureCoding:
return configPersistence.SaveErasureCodingTaskPolicy(taskPolicy)
case types.TaskTypeBalance:
return configPersistence.SaveBalanceTaskPolicy(taskPolicy)
default:
return fmt.Errorf("unsupported task type for protobuf persistence: %s", taskType)
}

164
weed/admin/handlers/maintenance_handlers_test.go

@ -6,123 +6,14 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
func TestParseTaskConfigFromForm_WithEmbeddedStruct(t *testing.T) {
// Create a maintenance handlers instance for testing
h := &MaintenanceHandlers{}
// Test with balance config
t.Run("Balance Config", func(t *testing.T) {
// Simulate form data
formData := url.Values{
"enabled": {"on"}, // checkbox field
"scan_interval_seconds_value": {"30"}, // interval field
"scan_interval_seconds_unit": {"minutes"}, // interval unit
"max_concurrent": {"2"}, // number field
"imbalance_threshold": {"0.15"}, // float field
"min_server_count": {"3"}, // number field
}
// Get schema
schema := tasks.GetTaskConfigSchema("balance")
if schema == nil {
t.Fatal("Failed to get balance schema")
}
// Create config instance
config := &balance.Config{}
// Parse form data
err := h.parseTaskConfigFromForm(formData, schema, config)
if err != nil {
t.Fatalf("Failed to parse form data: %v", err)
}
// Verify embedded struct fields were set correctly
if !config.Enabled {
t.Errorf("Expected Enabled=true, got %v", config.Enabled)
}
if config.ScanIntervalSeconds != 1800 { // 30 minutes * 60
t.Errorf("Expected ScanIntervalSeconds=1800, got %v", config.ScanIntervalSeconds)
}
if config.MaxConcurrent != 2 {
t.Errorf("Expected MaxConcurrent=2, got %v", config.MaxConcurrent)
}
// Verify balance-specific fields were set correctly
if config.ImbalanceThreshold != 0.15 {
t.Errorf("Expected ImbalanceThreshold=0.15, got %v", config.ImbalanceThreshold)
}
if config.MinServerCount != 3 {
t.Errorf("Expected MinServerCount=3, got %v", config.MinServerCount)
}
})
// Test with vacuum config
t.Run("Vacuum Config", func(t *testing.T) {
// Simulate form data
formData := url.Values{
// "enabled" field omitted to simulate unchecked checkbox
"scan_interval_seconds_value": {"4"}, // interval field
"scan_interval_seconds_unit": {"hours"}, // interval unit
"max_concurrent": {"3"}, // number field
"garbage_threshold": {"0.4"}, // float field
"min_volume_age_seconds_value": {"2"}, // interval field
"min_volume_age_seconds_unit": {"days"}, // interval unit
"min_interval_seconds_value": {"1"}, // interval field
"min_interval_seconds_unit": {"days"}, // interval unit
}
// Get schema
schema := tasks.GetTaskConfigSchema("vacuum")
if schema == nil {
t.Fatal("Failed to get vacuum schema")
}
// Create config instance
config := &vacuum.Config{}
// Parse form data
err := h.parseTaskConfigFromForm(formData, schema, config)
if err != nil {
t.Fatalf("Failed to parse form data: %v", err)
}
// Verify embedded struct fields were set correctly
if config.Enabled {
t.Errorf("Expected Enabled=false, got %v", config.Enabled)
}
if config.ScanIntervalSeconds != 14400 { // 4 hours * 3600
t.Errorf("Expected ScanIntervalSeconds=14400, got %v", config.ScanIntervalSeconds)
}
if config.MaxConcurrent != 3 {
t.Errorf("Expected MaxConcurrent=3, got %v", config.MaxConcurrent)
}
// Verify vacuum-specific fields were set correctly
if config.GarbageThreshold != 0.4 {
t.Errorf("Expected GarbageThreshold=0.4, got %v", config.GarbageThreshold)
}
if config.MinVolumeAgeSeconds != 172800 { // 2 days * 86400
t.Errorf("Expected MinVolumeAgeSeconds=172800, got %v", config.MinVolumeAgeSeconds)
}
if config.MinIntervalSeconds != 86400 { // 1 day * 86400
t.Errorf("Expected MinIntervalSeconds=86400, got %v", config.MinIntervalSeconds)
}
})
// Test with erasure coding config
t.Run("Erasure Coding Config", func(t *testing.T) {
// Simulate form data
@ -191,31 +82,6 @@ func TestConfigurationValidation(t *testing.T) {
name string
config interface{}
}{
{
"balance",
&balance.Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 2400,
MaxConcurrent: 3,
},
ImbalanceThreshold: 0.18,
MinServerCount: 4,
},
},
{
"vacuum",
&vacuum.Config{
BaseConfig: base.BaseConfig{
Enabled: false,
ScanIntervalSeconds: 7200,
MaxConcurrent: 2,
},
GarbageThreshold: 0.35,
MinVolumeAgeSeconds: 86400,
MinIntervalSeconds: 604800,
},
},
{
"erasure_coding",
&erasure_coding.Config{
@ -236,28 +102,6 @@ func TestConfigurationValidation(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// Test that configs can be converted to protobuf TaskPolicy
switch cfg := test.config.(type) {
case *balance.Config:
policy := cfg.ToTaskPolicy()
if policy == nil {
t.Fatal("ToTaskPolicy returned nil")
}
if policy.Enabled != cfg.Enabled {
t.Errorf("Expected Enabled=%v, got %v", cfg.Enabled, policy.Enabled)
}
if policy.MaxConcurrent != int32(cfg.MaxConcurrent) {
t.Errorf("Expected MaxConcurrent=%v, got %v", cfg.MaxConcurrent, policy.MaxConcurrent)
}
case *vacuum.Config:
policy := cfg.ToTaskPolicy()
if policy == nil {
t.Fatal("ToTaskPolicy returned nil")
}
if policy.Enabled != cfg.Enabled {
t.Errorf("Expected Enabled=%v, got %v", cfg.Enabled, policy.Enabled)
}
if policy.MaxConcurrent != int32(cfg.MaxConcurrent) {
t.Errorf("Expected MaxConcurrent=%v, got %v", cfg.MaxConcurrent, policy.MaxConcurrent)
}
case *erasure_coding.Config:
policy := cfg.ToTaskPolicy()
if policy == nil {
@ -275,14 +119,6 @@ func TestConfigurationValidation(t *testing.T) {
// Test that configs can be validated
switch cfg := test.config.(type) {
case *balance.Config:
if err := cfg.Validate(); err != nil {
t.Errorf("Validation failed: %v", err)
}
case *vacuum.Config:
if err := cfg.Validate(); err != nil {
t.Errorf("Validation failed: %v", err)
}
case *erasure_coding.Config:
if err := cfg.Validate(); err != nil {
t.Errorf("Validation failed: %v", err)

35
weed/admin/maintenance/maintenance_manager.go

@ -8,9 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
@ -22,23 +20,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
}
// Load vacuum task configuration
if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
Enabled: vacuumConfig.Enabled,
MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
},
},
}
}
// Load erasure coding task configuration
if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
@ -57,22 +38,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
}
}
// Load balance task configuration
if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
Enabled: balanceConfig.Enabled,
MaxConcurrent: int32(balanceConfig.MaxConcurrent),
RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
BalanceConfig: &worker_pb.BalanceTaskConfig{
ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
MinServerCount: int32(balanceConfig.MinServerCount),
},
},
}
}
glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
return policy
}

2
weed/admin/maintenance/maintenance_worker.go

@ -13,9 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// MaintenanceWorkerService manages maintenance task execution

2
weed/admin/view/layout/menu_helper.go

@ -4,9 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// MenuItemData represents a menu item

2
weed/command/worker.go

@ -16,9 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
var cmdWorker = &Command{

267
weed/worker/tasks/balance/balance_task.go

@ -1,267 +0,0 @@
package balance
import (
"context"
"fmt"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
"google.golang.org/grpc"
)
// BalanceTask implements the Task interface
type BalanceTask struct {
*base.BaseTask
server string
volumeID uint32
collection string
progress float64
}
// NewBalanceTask creates a new balance task instance
func NewBalanceTask(id string, server string, volumeID uint32, collection string) *BalanceTask {
return &BalanceTask{
BaseTask: base.NewBaseTask(id, types.TaskTypeBalance),
server: server,
volumeID: volumeID,
collection: collection,
}
}
// Execute implements the Task interface
func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task parameters are required")
}
balanceParams := params.GetBalanceParams()
if balanceParams == nil {
return fmt.Errorf("balance parameters are required")
}
// Get source and destination from unified arrays
if len(params.Sources) == 0 {
return fmt.Errorf("source is required for balance task")
}
if len(params.Targets) == 0 {
return fmt.Errorf("target is required for balance task")
}
sourceNode := params.Sources[0].Node
destNode := params.Targets[0].Node
if sourceNode == "" {
return fmt.Errorf("source node is required for balance task")
}
if destNode == "" {
return fmt.Errorf("destination node is required for balance task")
}
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"source": sourceNode,
"destination": destNode,
"collection": t.collection,
}).Info("Starting balance task - moving volume")
sourceServer := pb.ServerAddress(sourceNode)
targetServer := pb.ServerAddress(destNode)
volumeId := needle.VolumeId(t.volumeID)
// Step 1: Mark volume readonly
t.ReportProgress(10.0)
t.GetLogger().Info("Marking volume readonly for move")
if err := t.markVolumeReadonly(sourceServer, volumeId); err != nil {
return fmt.Errorf("failed to mark volume readonly: %v", err)
}
// Step 2: Copy volume to destination
t.ReportProgress(20.0)
t.GetLogger().Info("Copying volume to destination")
lastAppendAtNs, err := t.copyVolume(sourceServer, targetServer, volumeId)
if err != nil {
return fmt.Errorf("failed to copy volume: %v", err)
}
// Step 3: Mount volume on target and mark it readonly
t.ReportProgress(60.0)
t.GetLogger().Info("Mounting volume on target server")
if err := t.mountVolume(targetServer, volumeId); err != nil {
return fmt.Errorf("failed to mount volume on target: %v", err)
}
// Step 4: Tail for updates
t.ReportProgress(70.0)
t.GetLogger().Info("Syncing final updates")
if err := t.tailVolume(sourceServer, targetServer, volumeId, lastAppendAtNs); err != nil {
glog.Warningf("Tail operation failed (may be normal): %v", err)
}
// Step 5: Unmount from source
t.ReportProgress(85.0)
t.GetLogger().Info("Unmounting volume from source server")
if err := t.unmountVolume(sourceServer, volumeId); err != nil {
return fmt.Errorf("failed to unmount volume from source: %v", err)
}
// Step 6: Delete from source
t.ReportProgress(95.0)
t.GetLogger().Info("Deleting volume from source server")
if err := t.deleteVolume(sourceServer, volumeId); err != nil {
return fmt.Errorf("failed to delete volume from source: %v", err)
}
t.ReportProgress(100.0)
glog.Infof("Balance task completed successfully: volume %d moved from %s to %s",
t.volumeID, t.server, destNode)
return nil
}
// Validate implements the UnifiedTask interface
func (t *BalanceTask) Validate(params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task parameters are required")
}
balanceParams := params.GetBalanceParams()
if balanceParams == nil {
return fmt.Errorf("balance parameters are required")
}
if params.VolumeId != t.volumeID {
return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
}
// Validate that at least one source matches our server
found := false
for _, source := range params.Sources {
if source.Node == t.server {
found = true
break
}
}
if !found {
return fmt.Errorf("no source matches expected server %s", t.server)
}
return nil
}
// EstimateTime implements the UnifiedTask interface
func (t *BalanceTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
// Basic estimate based on simulated steps
return 14 * time.Second // Sum of all step durations
}
// GetProgress returns current progress
func (t *BalanceTask) GetProgress() float64 {
return t.progress
}
// Helper methods for real balance operations
// markVolumeReadonly marks the volume readonly
func (t *BalanceTask) markVolumeReadonly(server pb.ServerAddress, volumeId needle.VolumeId) error {
return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
})
return err
})
}
// copyVolume copies volume from source to target server
func (t *BalanceTask) copyVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId) (uint64, error) {
var lastAppendAtNs uint64
err := operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceServer),
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
}
return recvErr
}
if resp.LastAppendAtNs != 0 {
lastAppendAtNs = resp.LastAppendAtNs
} else {
// Report copy progress
glog.V(1).Infof("Volume %d copy progress: %s", volumeId,
util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
}
}
return nil
})
return lastAppendAtNs, err
}
// mountVolume mounts the volume on the target server
func (t *BalanceTask) mountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error {
return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
return err
})
}
// tailVolume syncs remaining updates from source to target
func (t *BalanceTask) tailVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId, sinceNs uint64) error {
return operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: sinceNs,
IdleTimeoutSeconds: 60, // 1 minute timeout
SourceVolumeServer: string(sourceServer),
})
return err
})
}
// unmountVolume unmounts the volume from the server
func (t *BalanceTask) unmountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error {
return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
return err
})
}
// deleteVolume deletes the volume from the server
func (t *BalanceTask) deleteVolume(server pb.ServerAddress, volumeId needle.VolumeId) error {
return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
OnlyEmpty: false,
})
return err
})
}

170
weed/worker/tasks/balance/config.go

@ -1,170 +0,0 @@
package balance
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with balance-specific settings
type Config struct {
base.BaseConfig
ImbalanceThreshold float64 `json:"imbalance_threshold"`
MinServerCount int `json:"min_server_count"`
}
// NewDefaultConfig creates a new default balance configuration
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 30 * 60, // 30 minutes
MaxConcurrent: 1,
},
ImbalanceThreshold: 0.2, // 20%
MinServerCount: 2,
}
}
// GetConfigSpec returns the configuration schema for balance tasks
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Balance Tasks",
Description: "Whether balance tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic balance task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 30 * 60,
MinValue: 5 * 60,
MaxValue: 2 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volume distribution imbalances",
HelpText: "The system will check for volume distribution imbalances at this interval",
Placeholder: "30",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 3,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of balance tasks that can run simultaneously",
HelpText: "Limits the number of balance operations running at the same time",
Placeholder: "1 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "imbalance_threshold",
JSONName: "imbalance_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.2,
MinValue: 0.05,
MaxValue: 0.5,
Required: true,
DisplayName: "Imbalance Threshold",
Description: "Minimum imbalance ratio to trigger balancing",
HelpText: "Volume distribution imbalances above this threshold will trigger balancing",
Placeholder: "0.20 (20%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_server_count",
JSONName: "min_server_count",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 2,
MaxValue: 10,
Required: true,
DisplayName: "Minimum Server Count",
Description: "Minimum number of servers required for balancing",
HelpText: "Balancing will only occur if there are at least this many servers",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
Enabled: c.Enabled,
MaxConcurrent: int32(c.MaxConcurrent),
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
BalanceConfig: &worker_pb.BalanceTaskConfig{
ImbalanceThreshold: float64(c.ImbalanceThreshold),
MinServerCount: int32(c.MinServerCount),
},
},
}
}
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
if policy == nil {
return fmt.Errorf("policy is nil")
}
// Set general TaskPolicy fields
c.Enabled = policy.Enabled
c.MaxConcurrent = int(policy.MaxConcurrent)
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
// Set balance-specific fields from the task config
if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil {
c.ImbalanceThreshold = float64(balanceConfig.ImbalanceThreshold)
c.MinServerCount = int(balanceConfig.MinServerCount)
}
return nil
}
// LoadConfigFromPersistence loads configuration from the persistence layer if available
func LoadConfigFromPersistence(configPersistence interface{}) *Config {
config := NewDefaultConfig()
// Try to load from persistence if available
if persistence, ok := configPersistence.(interface {
LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error)
}); ok {
if policy, err := persistence.LoadBalanceTaskPolicy(); err == nil && policy != nil {
if err := config.FromTaskPolicy(policy); err == nil {
glog.V(1).Infof("Loaded balance configuration from persistence")
return config
}
}
}
glog.V(1).Infof("Using default balance configuration")
return config
}

272
weed/worker/tasks/balance/detection.go

@ -1,272 +0,0 @@
package balance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Detection implements the detection logic for balance tasks
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
balanceConfig := config.(*Config)
// Skip if cluster is too small
minVolumeCount := 2 // More reasonable for small clusters
if len(metrics) < minVolumeCount {
glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need ≥%d)", len(metrics), minVolumeCount)
return nil, nil
}
// Analyze volume distribution across servers
serverVolumeCounts := make(map[string]int)
for _, metric := range metrics {
serverVolumeCounts[metric.Server]++
}
if len(serverVolumeCounts) < balanceConfig.MinServerCount {
glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need ≥%d)", len(serverVolumeCounts), balanceConfig.MinServerCount)
return nil, nil
}
// Calculate balance metrics
totalVolumes := len(metrics)
avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
maxVolumes := 0
minVolumes := totalVolumes
maxServer := ""
minServer := ""
for server, count := range serverVolumeCounts {
if count > maxVolumes {
maxVolumes = count
maxServer = server
}
if count < minVolumes {
minVolumes = count
minServer = server
}
}
// Check if imbalance exceeds threshold
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
return nil, nil
}
// Select a volume from the overloaded server for balance
var selectedVolume *types.VolumeHealthMetrics
for _, metric := range metrics {
if metric.Server == maxServer {
selectedVolume = metric
break
}
}
if selectedVolume == nil {
glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer)
return nil, nil
}
// Create balance task with volume and destination planning info
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
// Generate task ID for ActiveTopology integration
taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix())
task := &types.TaskDetectionResult{
TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeBalance,
VolumeID: selectedVolume.VolumeID,
Server: selectedVolume.Server,
Collection: selectedVolume.Collection,
Priority: types.TaskPriorityNormal,
Reason: reason,
ScheduleAt: time.Now(),
}
// Plan destination if ActiveTopology is available
if clusterInfo.ActiveTopology != nil {
destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume)
if err != nil {
glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err)
return nil, nil // Skip this task if destination planning fails
}
// Find the actual disk containing the volume on the source server
sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
if !found {
return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
}
// Create typed parameters with unified source and target information
task.TypedParams = &worker_pb.TaskParams{
TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: selectedVolume.VolumeID,
Collection: selectedVolume.Collection,
VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes
// Unified sources and targets - the only way to specify locations
Sources: []*worker_pb.TaskSource{
{
Node: selectedVolume.Server,
DiskId: sourceDisk,
VolumeId: selectedVolume.VolumeID,
EstimatedSize: selectedVolume.Size,
DataCenter: selectedVolume.DataCenter,
Rack: selectedVolume.Rack,
},
},
Targets: []*worker_pb.TaskTarget{
{
Node: destinationPlan.TargetNode,
DiskId: destinationPlan.TargetDisk,
VolumeId: selectedVolume.VolumeID,
EstimatedSize: destinationPlan.ExpectedSize,
DataCenter: destinationPlan.TargetDC,
Rack: destinationPlan.TargetRack,
},
},
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
ForceMove: false,
TimeoutSeconds: 600, // 10 minutes default
},
},
}
glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s",
selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode)
// Add pending balance task to ActiveTopology for capacity management
targetDisk := destinationPlan.TargetDisk
err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeBalance,
VolumeID: selectedVolume.VolumeID,
VolumeSize: int64(selectedVolume.Size),
Sources: []topology.TaskSourceSpec{
{ServerID: selectedVolume.Server, DiskID: sourceDisk},
},
Destinations: []topology.TaskDestinationSpec{
{ServerID: destinationPlan.TargetNode, DiskID: targetDisk},
},
})
if err != nil {
return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err)
}
glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
} else {
glog.Warningf("No ActiveTopology available for destination planning in balance detection")
return nil, nil
}
return []*types.TaskDetectionResult{task}, nil
}
// planBalanceDestination plans the destination for a balance operation
// This function implements destination planning logic directly in the detection phase
func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) {
// Get source node information from topology
var sourceRack, sourceDC string
// Extract rack and DC from topology info
topologyInfo := activeTopology.GetTopologyInfo()
if topologyInfo != nil {
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dataNodeInfo := range rack.DataNodeInfos {
if dataNodeInfo.Id == selectedVolume.Server {
sourceDC = dc.Id
sourceRack = rack.Id
break
}
}
if sourceRack != "" {
break
}
}
if sourceDC != "" {
break
}
}
}
// Get available disks, excluding the source node
availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeBalance, selectedVolume.Server)
if len(availableDisks) == 0 {
return nil, fmt.Errorf("no available disks for balance operation")
}
// Find the best destination disk based on balance criteria
var bestDisk *topology.DiskInfo
bestScore := -1.0
for _, disk := range availableDisks {
score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size)
if score > bestScore {
bestScore = score
bestDisk = disk
}
}
if bestDisk == nil {
return nil, fmt.Errorf("no suitable destination found for balance operation")
}
return &topology.DestinationPlan{
TargetNode: bestDisk.NodeID,
TargetDisk: bestDisk.DiskID,
TargetRack: bestDisk.Rack,
TargetDC: bestDisk.DataCenter,
ExpectedSize: selectedVolume.Size,
PlacementScore: bestScore,
}, nil
}
// calculateBalanceScore calculates placement score for balance operations
func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 {
if disk.DiskInfo == nil {
return 0.0
}
score := 0.0
// Prefer disks with lower current volume count (better for balance)
if disk.DiskInfo.MaxVolumeCount > 0 {
utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
score += (1.0 - utilization) * 40.0 // Up to 40 points for low utilization
}
// Prefer different racks for better distribution
if disk.Rack != sourceRack {
score += 30.0
}
// Prefer different data centers for better distribution
if disk.DataCenter != sourceDC {
score += 20.0
}
// Prefer disks with lower current load
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
return score
}

158
weed/worker/tasks/balance/execution.go

@ -1,158 +0,0 @@
package balance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// TypedTask implements balance operation with typed protobuf parameters
type TypedTask struct {
*base.BaseTypedTask
// Task state from protobuf
sourceServer string
destNode string
volumeID uint32
collection string
estimatedSize uint64
forceMove bool
timeoutSeconds int32
}
// NewTypedTask creates a new typed balance task
func NewTypedTask() types.TypedTaskInterface {
task := &TypedTask{
BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance),
}
return task
}
// ValidateTyped validates the typed parameters for balance task
func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
// Basic validation from base class
if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
return err
}
// Check that we have balance-specific parameters
balanceParams := params.GetBalanceParams()
if balanceParams == nil {
return fmt.Errorf("balance_params is required for balance task")
}
// Validate sources and targets
if len(params.Sources) == 0 {
return fmt.Errorf("at least one source is required for balance task")
}
if len(params.Targets) == 0 {
return fmt.Errorf("at least one target is required for balance task")
}
// Validate that source and target have volume IDs
if params.Sources[0].VolumeId == 0 {
return fmt.Errorf("source volume_id is required for balance task")
}
if params.Targets[0].VolumeId == 0 {
return fmt.Errorf("target volume_id is required for balance task")
}
// Validate timeout
if balanceParams.TimeoutSeconds <= 0 {
return fmt.Errorf("timeout_seconds must be greater than 0")
}
return nil
}
// EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters
func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
balanceParams := params.GetBalanceParams()
if balanceParams != nil {
// Use the timeout from parameters if specified
if balanceParams.TimeoutSeconds > 0 {
return time.Duration(balanceParams.TimeoutSeconds) * time.Second
}
}
// Estimate based on volume size from sources (1 minute per GB)
if len(params.Sources) > 0 {
source := params.Sources[0]
if source.EstimatedSize > 0 {
gbSize := source.EstimatedSize / (1024 * 1024 * 1024)
return time.Duration(gbSize) * time.Minute
}
}
// Default estimation
return 10 * time.Minute
}
// ExecuteTyped implements the balance operation with typed parameters
func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
// Extract basic parameters
t.volumeID = params.VolumeId
t.collection = params.Collection
// Ensure sources and targets are present (should be guaranteed by validation)
if len(params.Sources) == 0 {
return fmt.Errorf("at least one source is required for balance task (ExecuteTyped)")
}
if len(params.Targets) == 0 {
return fmt.Errorf("at least one target is required for balance task (ExecuteTyped)")
}
// Extract source and target information
t.sourceServer = params.Sources[0].Node
t.estimatedSize = params.Sources[0].EstimatedSize
t.destNode = params.Targets[0].Node
// Extract balance-specific parameters
balanceParams := params.GetBalanceParams()
if balanceParams != nil {
t.forceMove = balanceParams.ForceMove
t.timeoutSeconds = balanceParams.TimeoutSeconds
}
glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)",
t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize)
// Simulate balance operation with progress updates
steps := []struct {
name string
duration time.Duration
progress float64
}{
{"Analyzing cluster state", 2 * time.Second, 15},
{"Verifying destination capacity", 1 * time.Second, 25},
{"Starting volume migration", 1 * time.Second, 35},
{"Moving volume data", 6 * time.Second, 75},
{"Updating cluster metadata", 2 * time.Second, 95},
{"Verifying balance completion", 1 * time.Second, 100},
}
for _, step := range steps {
if t.IsCancelled() {
return fmt.Errorf("balance task cancelled during: %s", step.name)
}
glog.V(1).Infof("Balance task step: %s", step.name)
t.SetProgress(step.progress)
// Simulate work
time.Sleep(step.duration)
}
glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s",
t.volumeID, t.sourceServer, t.destNode)
return nil
}
// Register the typed task in the global registry
func init() {
types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask)
glog.V(1).Infof("Registered typed balance task")
}

138
weed/worker/tasks/balance/monitoring.go

@ -1,138 +0,0 @@
package balance
import (
"sync"
"time"
)
// BalanceMetrics contains balance-specific monitoring data
type BalanceMetrics struct {
// Execution metrics
VolumesBalanced int64 `json:"volumes_balanced"`
TotalDataTransferred int64 `json:"total_data_transferred"`
AverageImbalance float64 `json:"average_imbalance"`
LastBalanceTime time.Time `json:"last_balance_time"`
// Performance metrics
AverageTransferSpeed float64 `json:"average_transfer_speed_mbps"`
TotalExecutionTime int64 `json:"total_execution_time_seconds"`
SuccessfulOperations int64 `json:"successful_operations"`
FailedOperations int64 `json:"failed_operations"`
// Current task metrics
CurrentImbalanceScore float64 `json:"current_imbalance_score"`
PlannedDestinations int `json:"planned_destinations"`
mutex sync.RWMutex
}
// NewBalanceMetrics creates a new balance metrics instance
func NewBalanceMetrics() *BalanceMetrics {
return &BalanceMetrics{
LastBalanceTime: time.Now(),
}
}
// RecordVolumeBalanced records a successful volume balance operation
func (m *BalanceMetrics) RecordVolumeBalanced(volumeSize int64, transferTime time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.VolumesBalanced++
m.TotalDataTransferred += volumeSize
m.SuccessfulOperations++
m.LastBalanceTime = time.Now()
m.TotalExecutionTime += int64(transferTime.Seconds())
// Calculate average transfer speed (MB/s)
if transferTime > 0 {
speedMBps := float64(volumeSize) / (1024 * 1024) / transferTime.Seconds()
if m.AverageTransferSpeed == 0 {
m.AverageTransferSpeed = speedMBps
} else {
// Exponential moving average
m.AverageTransferSpeed = 0.8*m.AverageTransferSpeed + 0.2*speedMBps
}
}
}
// RecordFailure records a failed balance operation
func (m *BalanceMetrics) RecordFailure() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.FailedOperations++
}
// UpdateImbalanceScore updates the current cluster imbalance score
func (m *BalanceMetrics) UpdateImbalanceScore(score float64) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.CurrentImbalanceScore = score
// Update average imbalance with exponential moving average
if m.AverageImbalance == 0 {
m.AverageImbalance = score
} else {
m.AverageImbalance = 0.9*m.AverageImbalance + 0.1*score
}
}
// SetPlannedDestinations sets the number of planned destinations
func (m *BalanceMetrics) SetPlannedDestinations(count int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.PlannedDestinations = count
}
// GetMetrics returns a copy of the current metrics (without the mutex)
func (m *BalanceMetrics) GetMetrics() BalanceMetrics {
m.mutex.RLock()
defer m.mutex.RUnlock()
// Create a copy without the mutex to avoid copying lock value
return BalanceMetrics{
VolumesBalanced: m.VolumesBalanced,
TotalDataTransferred: m.TotalDataTransferred,
AverageImbalance: m.AverageImbalance,
LastBalanceTime: m.LastBalanceTime,
AverageTransferSpeed: m.AverageTransferSpeed,
TotalExecutionTime: m.TotalExecutionTime,
SuccessfulOperations: m.SuccessfulOperations,
FailedOperations: m.FailedOperations,
CurrentImbalanceScore: m.CurrentImbalanceScore,
PlannedDestinations: m.PlannedDestinations,
}
}
// GetSuccessRate returns the success rate as a percentage
func (m *BalanceMetrics) GetSuccessRate() float64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
total := m.SuccessfulOperations + m.FailedOperations
if total == 0 {
return 100.0
}
return float64(m.SuccessfulOperations) / float64(total) * 100.0
}
// Reset resets all metrics to zero
func (m *BalanceMetrics) Reset() {
m.mutex.Lock()
defer m.mutex.Unlock()
*m = BalanceMetrics{
LastBalanceTime: time.Now(),
}
}
// Global metrics instance for balance tasks
var globalBalanceMetrics = NewBalanceMetrics()
// GetGlobalBalanceMetrics returns the global balance metrics instance
func GetGlobalBalanceMetrics() *BalanceMetrics {
return globalBalanceMetrics
}

86
weed/worker/tasks/balance/register.go

@ -1,86 +0,0 @@
package balance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Global variable to hold the task definition for configuration updates
var globalTaskDef *base.TaskDefinition
// Auto-register this task when the package is imported
func init() {
RegisterBalanceTask()
// Register config updater
tasks.AutoRegisterConfigUpdater(types.TaskTypeBalance, UpdateConfigFromPersistence)
}
// RegisterBalanceTask registers the balance task with the new architecture
func RegisterBalanceTask() {
// Create configuration instance
config := NewDefaultConfig()
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeBalance,
Name: "balance",
DisplayName: "Volume Balance",
Description: "Balances volume distribution across servers",
Icon: "fas fa-balance-scale text-warning",
Capabilities: []string{"balance", "distribution"},
Config: config,
ConfigSpec: GetConfigSpec(),
CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
if params == nil {
return nil, fmt.Errorf("task parameters are required")
}
if len(params.Sources) == 0 {
return nil, fmt.Errorf("at least one source is required for balance task")
}
return NewBalanceTask(
fmt.Sprintf("balance-%d", params.VolumeId),
params.Sources[0].Node, // Use first source node
params.VolumeId,
params.Collection,
), nil
},
DetectionFunc: Detection,
ScanInterval: 30 * time.Minute,
SchedulingFunc: Scheduling,
MaxConcurrent: 1,
RepeatInterval: 2 * time.Hour,
}
// Store task definition globally for configuration updates
globalTaskDef = taskDef
// Register everything with a single function call!
base.RegisterTask(taskDef)
}
// UpdateConfigFromPersistence updates the balance configuration from persistence
func UpdateConfigFromPersistence(configPersistence interface{}) error {
if globalTaskDef == nil {
return fmt.Errorf("balance task not registered")
}
// Load configuration from persistence
newConfig := LoadConfigFromPersistence(configPersistence)
if newConfig == nil {
return fmt.Errorf("failed to load configuration from persistence")
}
// Update the task definition's config
globalTaskDef.Config = newConfig
glog.V(1).Infof("Updated balance task configuration from persistence")
return nil
}

37
weed/worker/tasks/balance/scheduling.go

@ -1,37 +0,0 @@
package balance
import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Scheduling implements the scheduling logic for balance tasks
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
balanceConfig := config.(*Config)
// Count running balance tasks
runningBalanceCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeBalance {
runningBalanceCount++
}
}
// Check concurrency limit
if runningBalanceCount >= balanceConfig.MaxConcurrent {
return false
}
// Check if we have available workers
availableWorkerCount := 0
for _, worker := range availableWorkers {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeBalance {
availableWorkerCount++
break
}
}
}
return availableWorkerCount > 0
}

7
weed/worker/tasks/base/volume_utils.go

@ -9,15 +9,12 @@ import (
// Uses O(1) indexed lookup for optimal performance on large clusters.
//
// This is a shared utility function used by multiple task detection algorithms
// (balance, vacuum, etc.) to locate volumes efficiently.
// to locate volumes efficiently.
//
// Example usage:
//
// // In balance task: find source disk for a volume that needs to be moved
// // Find source disk for a volume that needs to be processed
// sourceDisk, found := base.FindVolumeDisk(topology, volumeID, collection, sourceServer)
//
// // In vacuum task: find disk containing volume that needs cleanup
// diskID, exists := base.FindVolumeDisk(topology, volumeID, collection, serverID)
func FindVolumeDisk(activeTopology *topology.ActiveTopology, volumeID uint32, collection string, serverID string) (uint32, bool) {
if activeTopology == nil {
return 0, false

190
weed/worker/tasks/vacuum/config.go

@ -1,190 +0,0 @@
package vacuum
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with vacuum-specific settings
type Config struct {
base.BaseConfig
GarbageThreshold float64 `json:"garbage_threshold"`
MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
MinIntervalSeconds int `json:"min_interval_seconds"`
}
// NewDefaultConfig creates a new default vacuum configuration
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
},
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
}
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
Enabled: c.Enabled,
MaxConcurrent: int32(c.MaxConcurrent),
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(c.GarbageThreshold),
MinVolumeAgeHours: int32(c.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
MinIntervalSeconds: int32(c.MinIntervalSeconds),
},
},
}
}
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
if policy == nil {
return fmt.Errorf("policy is nil")
}
// Set general TaskPolicy fields
c.Enabled = policy.Enabled
c.MaxConcurrent = int(policy.MaxConcurrent)
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
// Set vacuum-specific fields from the task config
if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil {
c.GarbageThreshold = float64(vacuumConfig.GarbageThreshold)
c.MinVolumeAgeSeconds = int(vacuumConfig.MinVolumeAgeHours * 3600) // Convert hours to seconds
c.MinIntervalSeconds = int(vacuumConfig.MinIntervalSeconds)
}
return nil
}
// LoadConfigFromPersistence loads configuration from the persistence layer if available
func LoadConfigFromPersistence(configPersistence interface{}) *Config {
config := NewDefaultConfig()
// Try to load from persistence if available
if persistence, ok := configPersistence.(interface {
LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error)
}); ok {
if policy, err := persistence.LoadVacuumTaskPolicy(); err == nil && policy != nil {
if err := config.FromTaskPolicy(policy); err == nil {
glog.V(1).Infof("Loaded vacuum configuration from persistence")
return config
}
}
}
glog.V(1).Infof("Using default vacuum configuration")
return config
}
// GetConfigSpec returns the configuration schema for vacuum tasks
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Vacuum Tasks",
Description: "Whether vacuum tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic vacuum task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 2 * 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing vacuum",
HelpText: "The system will check for volumes that need vacuuming at this interval",
Placeholder: "2",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of vacuum tasks that can run simultaneously",
HelpText: "Limits the number of vacuum operations running at the same time to control system load",
Placeholder: "2 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "garbage_threshold",
JSONName: "garbage_threshold",
Type: config.FieldTypeFloat,
DefaultValue: 0.3,
MinValue: 0.0,
MaxValue: 1.0,
Required: true,
DisplayName: "Garbage Percentage Threshold",
Description: "Trigger vacuum when garbage ratio exceeds this percentage",
HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
Placeholder: "0.30 (30%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "min_volume_age_seconds",
JSONName: "min_volume_age_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60,
MinValue: 1 * 60 * 60,
MaxValue: 7 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Volume Age",
Description: "Only vacuum volumes older than this duration",
HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
Placeholder: "24",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "min_interval_seconds",
JSONName: "min_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60,
MinValue: 1 * 24 * 60 * 60,
MaxValue: 30 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Interval",
Description: "Minimum time between vacuum operations on the same volume",
HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
Placeholder: "7",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
},
}
}

133
weed/worker/tasks/vacuum/detection.go

@ -1,133 +0,0 @@
package vacuum
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Detection implements the detection logic for vacuum tasks
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
vacuumConfig := config.(*Config)
var results []*types.TaskDetectionResult
minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
debugCount := 0
skippedDueToGarbage := 0
skippedDueToAge := 0
for _, metric := range metrics {
// Check if volume needs vacuum
if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
priority := types.TaskPriorityNormal
if metric.GarbageRatio > 0.6 {
priority = types.TaskPriorityHigh
}
// Generate task ID for future ActiveTopology integration
taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix())
result := &types.TaskDetectionResult{
TaskID: taskID, // For future ActiveTopology integration
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: priority,
Reason: "Volume has excessive garbage requiring vacuum",
ScheduleAt: time.Now(),
}
// Create typed parameters for vacuum task
result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo)
results = append(results, result)
} else {
// Debug why volume was not selected
if debugCount < 5 { // Limit debug output to first 5 volumes
if metric.GarbageRatio < vacuumConfig.GarbageThreshold {
skippedDueToGarbage++
}
if metric.Age < minVolumeAge {
skippedDueToAge++
}
}
debugCount++
}
}
// Log debug summary if no tasks were created
if len(results) == 0 && len(metrics) > 0 {
totalVolumes := len(metrics)
glog.Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage<threshold), %d (age<minimum)",
totalVolumes, vacuumConfig.GarbageThreshold*100, minVolumeAge, skippedDueToGarbage, skippedDueToAge)
// Show details for first few volumes
for i, metric := range metrics {
if i >= 3 { // Limit to first 3 volumes
break
}
glog.Infof("VACUUM: Volume %d: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)",
metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100,
metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute))
}
}
return results, nil
}
// createVacuumTaskParams creates typed parameters for vacuum tasks
// This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic
func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams {
// Use configured values or defaults
garbageThreshold := 0.3 // Default 30%
verifyChecksum := true // Default to verify
batchSize := int32(1000) // Default batch size
workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory
if vacuumConfig != nil {
garbageThreshold = vacuumConfig.GarbageThreshold
// Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds
// Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added
// to the protobuf definition if they should be configurable
}
// Use DC and rack information directly from VolumeHealthMetrics
sourceDC, sourceRack := metric.DataCenter, metric.Rack
// Create typed protobuf parameters with unified sources
return &worker_pb.TaskParams{
TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated)
VolumeId: task.VolumeID,
Collection: task.Collection,
VolumeSize: metric.Size, // Store original volume size for tracking changes
// Unified sources array
Sources: []*worker_pb.TaskSource{
{
Node: task.Server,
VolumeId: task.VolumeID,
EstimatedSize: metric.Size,
DataCenter: sourceDC,
Rack: sourceRack,
},
},
TaskParams: &worker_pb.TaskParams_VacuumParams{
VacuumParams: &worker_pb.VacuumTaskParams{
GarbageThreshold: garbageThreshold,
ForceVacuum: false,
BatchSize: batchSize,
WorkingDir: workingDir,
VerifyChecksum: verifyChecksum,
},
},
}
}

151
weed/worker/tasks/vacuum/monitoring.go

@ -1,151 +0,0 @@
package vacuum
import (
"sync"
"time"
)
// VacuumMetrics contains vacuum-specific monitoring data
type VacuumMetrics struct {
// Execution metrics
VolumesVacuumed int64 `json:"volumes_vacuumed"`
TotalSpaceReclaimed int64 `json:"total_space_reclaimed"`
TotalFilesProcessed int64 `json:"total_files_processed"`
TotalGarbageCollected int64 `json:"total_garbage_collected"`
LastVacuumTime time.Time `json:"last_vacuum_time"`
// Performance metrics
AverageVacuumTime int64 `json:"average_vacuum_time_seconds"`
AverageGarbageRatio float64 `json:"average_garbage_ratio"`
SuccessfulOperations int64 `json:"successful_operations"`
FailedOperations int64 `json:"failed_operations"`
// Current task metrics
CurrentGarbageRatio float64 `json:"current_garbage_ratio"`
VolumesPendingVacuum int `json:"volumes_pending_vacuum"`
mutex sync.RWMutex
}
// NewVacuumMetrics creates a new vacuum metrics instance
func NewVacuumMetrics() *VacuumMetrics {
return &VacuumMetrics{
LastVacuumTime: time.Now(),
}
}
// RecordVolumeVacuumed records a successful volume vacuum operation
func (m *VacuumMetrics) RecordVolumeVacuumed(spaceReclaimed int64, filesProcessed int64, garbageCollected int64, vacuumTime time.Duration, garbageRatio float64) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.VolumesVacuumed++
m.TotalSpaceReclaimed += spaceReclaimed
m.TotalFilesProcessed += filesProcessed
m.TotalGarbageCollected += garbageCollected
m.SuccessfulOperations++
m.LastVacuumTime = time.Now()
// Update average vacuum time
if m.AverageVacuumTime == 0 {
m.AverageVacuumTime = int64(vacuumTime.Seconds())
} else {
// Exponential moving average
newTime := int64(vacuumTime.Seconds())
m.AverageVacuumTime = (m.AverageVacuumTime*4 + newTime) / 5
}
// Update average garbage ratio
if m.AverageGarbageRatio == 0 {
m.AverageGarbageRatio = garbageRatio
} else {
// Exponential moving average
m.AverageGarbageRatio = 0.8*m.AverageGarbageRatio + 0.2*garbageRatio
}
}
// RecordFailure records a failed vacuum operation
func (m *VacuumMetrics) RecordFailure() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.FailedOperations++
}
// UpdateCurrentGarbageRatio updates the current volume's garbage ratio
func (m *VacuumMetrics) UpdateCurrentGarbageRatio(ratio float64) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.CurrentGarbageRatio = ratio
}
// SetVolumesPendingVacuum sets the number of volumes pending vacuum
func (m *VacuumMetrics) SetVolumesPendingVacuum(count int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.VolumesPendingVacuum = count
}
// GetMetrics returns a copy of the current metrics (without the mutex)
func (m *VacuumMetrics) GetMetrics() VacuumMetrics {
m.mutex.RLock()
defer m.mutex.RUnlock()
// Create a copy without the mutex to avoid copying lock value
return VacuumMetrics{
VolumesVacuumed: m.VolumesVacuumed,
TotalSpaceReclaimed: m.TotalSpaceReclaimed,
TotalFilesProcessed: m.TotalFilesProcessed,
TotalGarbageCollected: m.TotalGarbageCollected,
LastVacuumTime: m.LastVacuumTime,
AverageVacuumTime: m.AverageVacuumTime,
AverageGarbageRatio: m.AverageGarbageRatio,
SuccessfulOperations: m.SuccessfulOperations,
FailedOperations: m.FailedOperations,
CurrentGarbageRatio: m.CurrentGarbageRatio,
VolumesPendingVacuum: m.VolumesPendingVacuum,
}
}
// GetSuccessRate returns the success rate as a percentage
func (m *VacuumMetrics) GetSuccessRate() float64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
total := m.SuccessfulOperations + m.FailedOperations
if total == 0 {
return 100.0
}
return float64(m.SuccessfulOperations) / float64(total) * 100.0
}
// GetAverageSpaceReclaimed returns the average space reclaimed per volume
func (m *VacuumMetrics) GetAverageSpaceReclaimed() float64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
if m.VolumesVacuumed == 0 {
return 0
}
return float64(m.TotalSpaceReclaimed) / float64(m.VolumesVacuumed)
}
// Reset resets all metrics to zero
func (m *VacuumMetrics) Reset() {
m.mutex.Lock()
defer m.mutex.Unlock()
*m = VacuumMetrics{
LastVacuumTime: time.Now(),
}
}
// Global metrics instance for vacuum tasks
var globalVacuumMetrics = NewVacuumMetrics()
// GetGlobalVacuumMetrics returns the global vacuum metrics instance
func GetGlobalVacuumMetrics() *VacuumMetrics {
return globalVacuumMetrics
}

86
weed/worker/tasks/vacuum/register.go

@ -1,86 +0,0 @@
package vacuum
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Global variable to hold the task definition for configuration updates
var globalTaskDef *base.TaskDefinition
// Auto-register this task when the package is imported
func init() {
RegisterVacuumTask()
// Register config updater
tasks.AutoRegisterConfigUpdater(types.TaskTypeVacuum, UpdateConfigFromPersistence)
}
// RegisterVacuumTask registers the vacuum task with the new architecture
func RegisterVacuumTask() {
// Create configuration instance
config := NewDefaultConfig()
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeVacuum,
Name: "vacuum",
DisplayName: "Volume Vacuum",
Description: "Reclaims disk space by removing deleted files from volumes",
Icon: "fas fa-broom text-primary",
Capabilities: []string{"vacuum", "storage"},
Config: config,
ConfigSpec: GetConfigSpec(),
CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
if params == nil {
return nil, fmt.Errorf("task parameters are required")
}
if len(params.Sources) == 0 {
return nil, fmt.Errorf("at least one source is required for vacuum task")
}
return NewVacuumTask(
fmt.Sprintf("vacuum-%d", params.VolumeId),
params.Sources[0].Node, // Use first source node
params.VolumeId,
params.Collection,
), nil
},
DetectionFunc: Detection,
ScanInterval: 2 * time.Hour,
SchedulingFunc: Scheduling,
MaxConcurrent: 2,
RepeatInterval: 7 * 24 * time.Hour,
}
// Store task definition globally for configuration updates
globalTaskDef = taskDef
// Register everything with a single function call!
base.RegisterTask(taskDef)
}
// UpdateConfigFromPersistence updates the vacuum configuration from persistence
func UpdateConfigFromPersistence(configPersistence interface{}) error {
if globalTaskDef == nil {
return fmt.Errorf("vacuum task not registered")
}
// Load configuration from persistence
newConfig := LoadConfigFromPersistence(configPersistence)
if newConfig == nil {
return fmt.Errorf("failed to load configuration from persistence")
}
// Update the task definition's config
globalTaskDef.Config = newConfig
glog.V(1).Infof("Updated vacuum task configuration from persistence")
return nil
}

37
weed/worker/tasks/vacuum/scheduling.go

@ -1,37 +0,0 @@
package vacuum
import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Scheduling implements the scheduling logic for vacuum tasks
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
vacuumConfig := config.(*Config)
// Count running vacuum tasks
runningVacuumCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeVacuum {
runningVacuumCount++
}
}
// Check concurrency limit
if runningVacuumCount >= vacuumConfig.MaxConcurrent {
return false
}
// Check for available workers with vacuum capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeVacuum {
return true
}
}
}
}
return false
}

244
weed/worker/tasks/vacuum/vacuum_task.go

@ -1,244 +0,0 @@
package vacuum
import (
"context"
"fmt"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
"google.golang.org/grpc"
)
// VacuumTask implements the Task interface
type VacuumTask struct {
*base.BaseTask
server string
volumeID uint32
collection string
garbageThreshold float64
progress float64
}
// NewVacuumTask creates a new unified vacuum task instance
func NewVacuumTask(id string, server string, volumeID uint32, collection string) *VacuumTask {
return &VacuumTask{
BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum),
server: server,
volumeID: volumeID,
collection: collection,
garbageThreshold: 0.3, // Default 30% threshold
}
}
// Execute implements the UnifiedTask interface
func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task parameters are required")
}
vacuumParams := params.GetVacuumParams()
if vacuumParams == nil {
return fmt.Errorf("vacuum parameters are required")
}
t.garbageThreshold = vacuumParams.GarbageThreshold
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"server": t.server,
"collection": t.collection,
"garbage_threshold": t.garbageThreshold,
}).Info("Starting vacuum task")
// Step 1: Check volume status and garbage ratio
t.ReportProgress(10.0)
t.GetLogger().Info("Checking volume status")
eligible, currentGarbageRatio, err := t.checkVacuumEligibility()
if err != nil {
return fmt.Errorf("failed to check vacuum eligibility: %v", err)
}
if !eligible {
t.GetLogger().WithFields(map[string]interface{}{
"current_garbage_ratio": currentGarbageRatio,
"required_threshold": t.garbageThreshold,
}).Info("Volume does not meet vacuum criteria, skipping")
t.ReportProgress(100.0)
return nil
}
// Step 2: Perform vacuum operation
t.ReportProgress(50.0)
t.GetLogger().WithFields(map[string]interface{}{
"garbage_ratio": currentGarbageRatio,
"threshold": t.garbageThreshold,
}).Info("Performing vacuum operation")
if err := t.performVacuum(); err != nil {
return fmt.Errorf("failed to perform vacuum: %v", err)
}
// Step 3: Verify vacuum results
t.ReportProgress(90.0)
t.GetLogger().Info("Verifying vacuum results")
if err := t.verifyVacuumResults(); err != nil {
glog.Warningf("Vacuum verification failed: %v", err)
// Don't fail the task - vacuum operation itself succeeded
}
t.ReportProgress(100.0)
glog.Infof("Vacuum task completed successfully: volume %d from %s (garbage ratio was %.2f%%)",
t.volumeID, t.server, currentGarbageRatio*100)
return nil
}
// Validate implements the UnifiedTask interface
func (t *VacuumTask) Validate(params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task parameters are required")
}
vacuumParams := params.GetVacuumParams()
if vacuumParams == nil {
return fmt.Errorf("vacuum parameters are required")
}
if params.VolumeId != t.volumeID {
return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
}
// Validate that at least one source matches our server
found := false
for _, source := range params.Sources {
if source.Node == t.server {
found = true
break
}
}
if !found {
return fmt.Errorf("no source matches expected server %s", t.server)
}
if vacuumParams.GarbageThreshold < 0 || vacuumParams.GarbageThreshold > 1.0 {
return fmt.Errorf("invalid garbage threshold: %f (must be between 0.0 and 1.0)", vacuumParams.GarbageThreshold)
}
return nil
}
// EstimateTime implements the UnifiedTask interface
func (t *VacuumTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
// Basic estimate based on simulated steps
return 14 * time.Second // Sum of all step durations
}
// GetProgress returns current progress
func (t *VacuumTask) GetProgress() float64 {
return t.progress
}
// Helper methods for real vacuum operations
// checkVacuumEligibility checks if the volume meets vacuum criteria
func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
var garbageRatio float64
err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("failed to check volume vacuum status: %v", err)
}
garbageRatio = resp.GarbageRatio
return nil
})
if err != nil {
return false, 0, err
}
eligible := garbageRatio >= t.garbageThreshold
glog.V(1).Infof("Volume %d garbage ratio: %.2f%%, threshold: %.2f%%, eligible: %v",
t.volumeID, garbageRatio*100, t.garbageThreshold*100, eligible)
return eligible, garbageRatio, nil
}
// performVacuum executes the actual vacuum operation
func (t *VacuumTask) performVacuum() error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
// Step 1: Compact the volume
t.GetLogger().Info("Compacting volume")
stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("vacuum compact failed: %v", err)
}
// Read compact progress
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
}
return fmt.Errorf("vacuum compact stream error: %v", recvErr)
}
glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes)
}
// Step 2: Commit the vacuum
t.GetLogger().Info("Committing vacuum operation")
_, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("vacuum commit failed: %v", err)
}
// Step 3: Cleanup old files
t.GetLogger().Info("Cleaning up vacuum files")
_, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("vacuum cleanup failed: %v", err)
}
glog.V(1).Infof("Volume %d vacuum operation completed successfully", t.volumeID)
return nil
})
}
// verifyVacuumResults checks the volume status after vacuum
func (t *VacuumTask) verifyVacuumResults() error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: t.volumeID,
})
if err != nil {
return fmt.Errorf("failed to verify vacuum results: %v", err)
}
postVacuumGarbageRatio := resp.GarbageRatio
glog.V(1).Infof("Volume %d post-vacuum garbage ratio: %.2f%%",
t.volumeID, postVacuumGarbageRatio*100)
return nil
})
}

2
weed/worker/types/task_types.go

@ -11,9 +11,7 @@ import (
type TaskType string
const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeBalance TaskType = "balance"
TaskTypeReplication TaskType = "replication"
)

2
weed/worker/worker.go

@ -16,9 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// Worker represents a maintenance worker instance

Loading…
Cancel
Save