Browse Source

Merge 4e1901d0bd into d88f6ed0af

pull/8177/merge
Chris Lu 20 hours ago
committed by GitHub
parent
commit
77f8168304
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 71
      weed/admin/dash/config_persistence.go
  2. 10
      weed/admin/handlers/maintenance_handlers.go
  3. 1
      weed/admin/maintenance/maintenance_worker.go
  4. 1
      weed/pb/worker.proto
  5. 13
      weed/pb/worker_pb/worker.pb.go
  6. 196
      weed/worker/tasks/table_maintenance/config.go
  7. 192
      weed/worker/tasks/table_maintenance/detection.go
  8. 325
      weed/worker/tasks/table_maintenance/iceberg_ops.go
  9. 139
      weed/worker/tasks/table_maintenance/register.go
  10. 85
      weed/worker/tasks/table_maintenance/scheduling.go
  11. 505
      weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go
  12. 370
      weed/worker/tasks/table_maintenance/table_maintenance_task.go
  13. 251
      weed/worker/tasks/table_maintenance/table_maintenance_test.go
  14. 9
      weed/worker/types/task_types.go

71
weed/admin/dash/config_persistence.go

@ -24,18 +24,20 @@ const (
ConfigSubdir = "conf" ConfigSubdir = "conf"
// Configuration file names (protobuf binary) // Configuration file names (protobuf binary)
MaintenanceConfigFile = "maintenance.pb"
VacuumTaskConfigFile = "task_vacuum.pb"
ECTaskConfigFile = "task_erasure_coding.pb"
BalanceTaskConfigFile = "task_balance.pb"
ReplicationTaskConfigFile = "task_replication.pb"
MaintenanceConfigFile = "maintenance.pb"
VacuumTaskConfigFile = "task_vacuum.pb"
ECTaskConfigFile = "task_erasure_coding.pb"
BalanceTaskConfigFile = "task_balance.pb"
ReplicationTaskConfigFile = "task_replication.pb"
TableMaintenanceConfigFile = "task_table_maintenance.pb"
// JSON reference files // JSON reference files
MaintenanceConfigJSONFile = "maintenance.json"
VacuumTaskConfigJSONFile = "task_vacuum.json"
ECTaskConfigJSONFile = "task_erasure_coding.json"
BalanceTaskConfigJSONFile = "task_balance.json"
ReplicationTaskConfigJSONFile = "task_replication.json"
MaintenanceConfigJSONFile = "maintenance.json"
VacuumTaskConfigJSONFile = "task_vacuum.json"
ECTaskConfigJSONFile = "task_erasure_coding.json"
BalanceTaskConfigJSONFile = "task_balance.json"
ReplicationTaskConfigJSONFile = "task_replication.json"
TableMaintenanceConfigJSONFile = "task_table_maintenance.json"
// Task persistence subdirectories and settings // Task persistence subdirectories and settings
TasksSubdir = "tasks" TasksSubdir = "tasks"
@ -520,6 +522,53 @@ func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, err
return nil, fmt.Errorf("failed to unmarshal balance task configuration") return nil, fmt.Errorf("failed to unmarshal balance task configuration")
} }
// defaultTableMaintenanceTaskPolicy returns the default table maintenance task policy
func defaultTableMaintenanceTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 2,
RepeatIntervalSeconds: 30 * 60, // 30 minutes
CheckIntervalSeconds: 30 * 60, // 30 minutes
}
}
// SaveTableMaintenanceTaskPolicy saves table maintenance task policy to protobuf file
func (cp *ConfigPersistence) SaveTableMaintenanceTaskPolicy(policy *worker_pb.TaskPolicy) error {
return cp.saveTaskConfig(TableMaintenanceConfigFile, policy)
}
// LoadTableMaintenanceTaskPolicy loads table maintenance task policy from protobuf file
func (cp *ConfigPersistence) LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error) {
if cp.dataDir == "" {
// Return default policy if no data directory
return defaultTableMaintenanceTaskPolicy(), nil
}
confDir := filepath.Join(cp.dataDir, ConfigSubdir)
configPath := filepath.Join(confDir, TableMaintenanceConfigFile)
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
// Return default policy if file doesn't exist
return defaultTableMaintenanceTaskPolicy(), nil
}
// Read file
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to read table maintenance task config file: %w", err)
}
// Try to unmarshal as TaskPolicy
var policy worker_pb.TaskPolicy
if err := proto.Unmarshal(configData, &policy); err == nil {
glog.V(1).Infof("Loaded table maintenance task policy from %s", configPath)
return &policy, nil
}
return nil, fmt.Errorf("failed to unmarshal table maintenance task configuration")
}
// SaveReplicationTaskConfig saves replication task configuration to protobuf file // SaveReplicationTaskConfig saves replication task configuration to protobuf file
func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error { func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error {
return cp.saveTaskConfig(ReplicationTaskConfigFile, config) return cp.saveTaskConfig(ReplicationTaskConfigFile, config)
@ -631,6 +680,8 @@ func (cp *ConfigPersistence) SaveTaskPolicy(taskType string, policy *worker_pb.T
return cp.SaveBalanceTaskPolicy(policy) return cp.SaveBalanceTaskPolicy(policy)
case "replication": case "replication":
return cp.SaveReplicationTaskPolicy(policy) return cp.SaveReplicationTaskPolicy(policy)
case "table_maintenance":
return cp.SaveTableMaintenanceTaskPolicy(policy)
} }
return fmt.Errorf("unknown task type: %s", taskType) return fmt.Errorf("unknown task type: %s", taskType)
} }

10
weed/admin/handlers/maintenance_handlers.go

@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/table_maintenance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/seaweedfs/seaweedfs/weed/worker/types"
) )
@ -246,6 +247,8 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
config = &balance.Config{} config = &balance.Config{}
case types.TaskTypeErasureCoding: case types.TaskTypeErasureCoding:
config = &erasure_coding.Config{} config = &erasure_coding.Config{}
case types.TaskTypeTableMaintenance:
config = &table_maintenance.Config{}
default: default:
c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName}) c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName})
return return
@ -305,6 +308,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
glog.V(1).Infof("Parsed balance config - Enabled: %v, MaxConcurrent: %d, ScanIntervalSeconds: %d, ImbalanceThreshold: %f, MinServerCount: %d", glog.V(1).Infof("Parsed balance config - Enabled: %v, MaxConcurrent: %d, ScanIntervalSeconds: %d, ImbalanceThreshold: %f, MinServerCount: %d",
balanceConfig.Enabled, balanceConfig.MaxConcurrent, balanceConfig.ScanIntervalSeconds, balanceConfig.ImbalanceThreshold, balanceConfig.MinServerCount) balanceConfig.Enabled, balanceConfig.MaxConcurrent, balanceConfig.ScanIntervalSeconds, balanceConfig.ImbalanceThreshold, balanceConfig.MinServerCount)
} }
case types.TaskTypeTableMaintenance:
if tmConfig, ok := config.(*table_maintenance.Config); ok {
glog.V(1).Infof("Parsed table_maintenance config - Enabled: %v, MaxConcurrent: %d, ScanIntervalMinutes: %d, CompactionFileThreshold: %d",
tmConfig.Enabled, tmConfig.MaxConcurrent, tmConfig.ScanIntervalMinutes, tmConfig.CompactionFileThreshold)
}
} }
// Validate the configuration // Validate the configuration
@ -544,6 +552,8 @@ func (h *MaintenanceHandlers) saveTaskConfigToProtobuf(taskType types.TaskType,
return configPersistence.SaveErasureCodingTaskPolicy(taskPolicy) return configPersistence.SaveErasureCodingTaskPolicy(taskPolicy)
case types.TaskTypeBalance: case types.TaskTypeBalance:
return configPersistence.SaveBalanceTaskPolicy(taskPolicy) return configPersistence.SaveBalanceTaskPolicy(taskPolicy)
case types.TaskTypeTableMaintenance:
return configPersistence.SaveTableMaintenanceTaskPolicy(taskPolicy)
default: default:
return fmt.Errorf("unsupported task type for protobuf persistence: %s", taskType) return fmt.Errorf("unsupported task type for protobuf persistence: %s", taskType)
} }

1
weed/admin/maintenance/maintenance_worker.go

@ -15,6 +15,7 @@ import (
// Import task packages to trigger their auto-registration // Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/table_maintenance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
) )

1
weed/pb/worker.proto

@ -289,6 +289,7 @@ message TaskPolicy {
int32 max_concurrent = 2; int32 max_concurrent = 2;
int32 repeat_interval_seconds = 3; // Seconds to wait before repeating int32 repeat_interval_seconds = 3; // Seconds to wait before repeating
int32 check_interval_seconds = 4; // Seconds between checks int32 check_interval_seconds = 4; // Seconds between checks
string description = 9; // Generic field for task-specific config (JSON format)
// Typed task-specific configuration (replaces generic map) // Typed task-specific configuration (replaces generic map)
oneof task_config { oneof task_config {

13
weed/pb/worker_pb/worker.pb.go

@ -2379,6 +2379,7 @@ type TaskPolicy struct {
MaxConcurrent int32 `protobuf:"varint,2,opt,name=max_concurrent,json=maxConcurrent,proto3" json:"max_concurrent,omitempty"` MaxConcurrent int32 `protobuf:"varint,2,opt,name=max_concurrent,json=maxConcurrent,proto3" json:"max_concurrent,omitempty"`
RepeatIntervalSeconds int32 `protobuf:"varint,3,opt,name=repeat_interval_seconds,json=repeatIntervalSeconds,proto3" json:"repeat_interval_seconds,omitempty"` // Seconds to wait before repeating RepeatIntervalSeconds int32 `protobuf:"varint,3,opt,name=repeat_interval_seconds,json=repeatIntervalSeconds,proto3" json:"repeat_interval_seconds,omitempty"` // Seconds to wait before repeating
CheckIntervalSeconds int32 `protobuf:"varint,4,opt,name=check_interval_seconds,json=checkIntervalSeconds,proto3" json:"check_interval_seconds,omitempty"` // Seconds between checks CheckIntervalSeconds int32 `protobuf:"varint,4,opt,name=check_interval_seconds,json=checkIntervalSeconds,proto3" json:"check_interval_seconds,omitempty"` // Seconds between checks
Description string `protobuf:"bytes,9,opt,name=description,proto3" json:"description,omitempty"` // Generic field for task-specific config (JSON format)
// Typed task-specific configuration (replaces generic map) // Typed task-specific configuration (replaces generic map)
// //
// Types that are valid to be assigned to TaskConfig: // Types that are valid to be assigned to TaskConfig:
@ -2450,6 +2451,13 @@ func (x *TaskPolicy) GetCheckIntervalSeconds() int32 {
return 0 return 0
} }
func (x *TaskPolicy) GetDescription() string {
if x != nil {
return x.Description
}
return ""
}
func (x *TaskPolicy) GetTaskConfig() isTaskPolicy_TaskConfig { func (x *TaskPolicy) GetTaskConfig() isTaskPolicy_TaskConfig {
if x != nil { if x != nil {
return x.TaskConfig return x.TaskConfig
@ -3544,13 +3552,14 @@ const file_worker_proto_rawDesc = "" +
"\x1edefault_check_interval_seconds\x18\x04 \x01(\x05R\x1bdefaultCheckIntervalSeconds\x1aV\n" + "\x1edefault_check_interval_seconds\x18\x04 \x01(\x05R\x1bdefaultCheckIntervalSeconds\x1aV\n" +
"\x11TaskPoliciesEntry\x12\x10\n" + "\x11TaskPoliciesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12+\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12+\n" +
"\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\x82\x04\n" +
"\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\xa4\x04\n" +
"\n" + "\n" +
"TaskPolicy\x12\x18\n" + "TaskPolicy\x12\x18\n" +
"\aenabled\x18\x01 \x01(\bR\aenabled\x12%\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12%\n" +
"\x0emax_concurrent\x18\x02 \x01(\x05R\rmaxConcurrent\x126\n" + "\x0emax_concurrent\x18\x02 \x01(\x05R\rmaxConcurrent\x126\n" +
"\x17repeat_interval_seconds\x18\x03 \x01(\x05R\x15repeatIntervalSeconds\x124\n" + "\x17repeat_interval_seconds\x18\x03 \x01(\x05R\x15repeatIntervalSeconds\x124\n" +
"\x16check_interval_seconds\x18\x04 \x01(\x05R\x14checkIntervalSeconds\x12B\n" +
"\x16check_interval_seconds\x18\x04 \x01(\x05R\x14checkIntervalSeconds\x12 \n" +
"\vdescription\x18\t \x01(\tR\vdescription\x12B\n" +
"\rvacuum_config\x18\x05 \x01(\v2\x1b.worker_pb.VacuumTaskConfigH\x00R\fvacuumConfig\x12X\n" + "\rvacuum_config\x18\x05 \x01(\v2\x1b.worker_pb.VacuumTaskConfigH\x00R\fvacuumConfig\x12X\n" +
"\x15erasure_coding_config\x18\x06 \x01(\v2\".worker_pb.ErasureCodingTaskConfigH\x00R\x13erasureCodingConfig\x12E\n" + "\x15erasure_coding_config\x18\x06 \x01(\v2\".worker_pb.ErasureCodingTaskConfigH\x00R\x13erasureCodingConfig\x12E\n" +
"\x0ebalance_config\x18\a \x01(\v2\x1c.worker_pb.BalanceTaskConfigH\x00R\rbalanceConfig\x12Q\n" + "\x0ebalance_config\x18\a \x01(\v2\x1c.worker_pb.BalanceTaskConfigH\x00R\rbalanceConfig\x12Q\n" +

196
weed/worker/tasks/table_maintenance/config.go

@ -0,0 +1,196 @@
package table_maintenance
import (
"encoding/json"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with table maintenance specific settings
type Config struct {
base.BaseConfig
// ScanIntervalMinutes is how often to scan for maintenance needs
ScanIntervalMinutes int `json:"scan_interval_minutes"`
// CompactionFileThreshold is the minimum number of data files before compaction is triggered
CompactionFileThreshold int `json:"compaction_file_threshold"`
// SnapshotRetentionDays is how long to keep snapshots before expiration
SnapshotRetentionDays int `json:"snapshot_retention_days"`
}
// NewDefaultConfig returns the default configuration for table maintenance
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 24 * 60 * 60, // 24 hours
MaxConcurrent: 2,
},
ScanIntervalMinutes: 30, // Scan every 30 minutes
CompactionFileThreshold: 100, // Compact when > 100 small files
SnapshotRetentionDays: 7, // Keep snapshots for 7 days
}
}
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
policy := &worker_pb.TaskPolicy{
Enabled: c.Enabled,
MaxConcurrent: int32(c.MaxConcurrent),
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
CheckIntervalSeconds: int32(c.ScanIntervalMinutes * 60),
}
// Encode config-specific fields in Description as JSON
configData := map[string]interface{}{
"compaction_file_threshold": c.CompactionFileThreshold,
"snapshot_retention_days": c.SnapshotRetentionDays,
}
if data, err := json.Marshal(configData); err == nil {
policy.Description = string(data)
}
return policy
}
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
if policy == nil {
return fmt.Errorf("policy is nil")
}
c.Enabled = policy.Enabled
c.MaxConcurrent = int(policy.MaxConcurrent)
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
c.ScanIntervalMinutes = int(policy.CheckIntervalSeconds / 60)
// Decode config-specific fields from Description if present
if policy.Description != "" {
var configData map[string]interface{}
if err := json.Unmarshal([]byte(policy.Description), &configData); err == nil {
if val, ok := configData["compaction_file_threshold"]; ok {
if floatVal, ok := val.(float64); ok {
c.CompactionFileThreshold = int(floatVal)
}
}
if val, ok := configData["snapshot_retention_days"]; ok {
if floatVal, ok := val.(float64); ok {
c.SnapshotRetentionDays = int(floatVal)
}
}
}
}
return nil
}
// LoadConfigFromPersistence loads configuration from the persistence layer if available
func LoadConfigFromPersistence(configPersistence interface{}) *Config {
config := NewDefaultConfig()
// Try to load from persistence if available
if persistence, ok := configPersistence.(interface {
LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error)
}); ok {
if policy, err := persistence.LoadTableMaintenanceTaskPolicy(); err != nil {
glog.Warningf("Failed to load table_maintenance configuration from persistence: %v", err)
} else if policy != nil {
if err := config.FromTaskPolicy(policy); err != nil {
glog.Warningf("Failed to parse table_maintenance configuration from persistence: %v", err)
} else {
glog.V(1).Infof("Loaded table_maintenance configuration from persistence")
return config
}
}
}
glog.V(1).Infof("Using default table_maintenance configuration")
return config
}
// GetConfigSpec returns the configuration specification for the UI
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Table Maintenance",
Description: "Whether table maintenance tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic table maintenance",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_minutes",
JSONName: "scan_interval_minutes",
Type: config.FieldTypeInt,
DefaultValue: 30,
MinValue: 5,
MaxValue: 1440,
Required: true,
DisplayName: "Scan Interval (minutes)",
Description: "How often to scan for tables needing maintenance",
HelpText: "Lower values mean faster detection but higher overhead",
Placeholder: "30",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "compaction_file_threshold",
JSONName: "compaction_file_threshold",
Type: config.FieldTypeInt,
DefaultValue: 100,
MinValue: 10,
MaxValue: 10000,
Required: true,
DisplayName: "Compaction File Threshold",
Description: "Number of small files that triggers compaction",
HelpText: "Tables with more small files than this will be scheduled for compaction",
Placeholder: "100",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "snapshot_retention_days",
JSONName: "snapshot_retention_days",
Type: config.FieldTypeInt,
DefaultValue: 7,
MinValue: 1,
MaxValue: 365,
Required: true,
DisplayName: "Snapshot Retention (days)",
Description: "How long to keep snapshots before expiration",
HelpText: "Snapshots older than this will be candidates for cleanup",
Placeholder: "7",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Jobs",
Description: "Maximum number of concurrent maintenance jobs",
HelpText: "Limits the number of maintenance operations running at the same time",
Placeholder: "2",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}

192
weed/worker/tasks/table_maintenance/detection.go

@ -0,0 +1,192 @@
package table_maintenance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// TableMaintenanceDetector implements types.TaskDetector for table maintenance
type TableMaintenanceDetector struct {
config *Config
}
// NewTableMaintenanceDetector creates a new table maintenance detector
func NewTableMaintenanceDetector(config *Config) *TableMaintenanceDetector {
return &TableMaintenanceDetector{
config: config,
}
}
// ScanInterval returns how often to scan for table maintenance needs
func (d *TableMaintenanceDetector) ScanInterval() time.Duration {
if d.config != nil && d.config.ScanIntervalMinutes > 0 {
return time.Duration(d.config.ScanIntervalMinutes) * time.Minute
}
return 30 * time.Minute // Default: scan every 30 minutes
}
// DetectTasks scans for tables that need maintenance
func (d *TableMaintenanceDetector) DetectTasks(metrics []*types.VolumeHealthMetrics) ([]*types.TaskDetectionResult, error) {
glog.V(2).Infof("Table maintenance detection starting")
// Table maintenance doesn't use volume metrics - it scans table buckets
// The actual scanning is done by the admin server's table scanner
// Workers pick up jobs from the admin server's queue
// This detector returns empty results because table maintenance jobs
// are created by the admin server's table scanner, not by volume metrics
return nil, nil
}
// Detection is the function signature required by the task registration system
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
tableConfig, ok := config.(*Config)
if !ok {
tableConfig = NewDefaultConfig()
}
detector := NewTableMaintenanceDetector(tableConfig)
return detector.DetectTasks(metrics)
}
// TableMaintenanceScanner scans table buckets for maintenance needs
// This is called by the admin server to populate the maintenance queue
type TableMaintenanceScanner struct {
config *Config
}
// NewTableMaintenanceScanner creates a new table maintenance scanner
func NewTableMaintenanceScanner(config *Config) *TableMaintenanceScanner {
return &TableMaintenanceScanner{
config: config,
}
}
// ScanTableBucket scans a table bucket for tables needing maintenance
// Returns a list of maintenance jobs that should be queued
func (s *TableMaintenanceScanner) ScanTableBucket(bucketName string, tables []TableInfo) ([]*TableMaintenanceJob, error) {
glog.V(1).Infof("Scanning table bucket %s for maintenance needs", bucketName)
var jobs []*TableMaintenanceJob
for _, table := range tables {
// Check each table for maintenance needs
tableJobs := s.checkTableMaintenanceNeeds(bucketName, table)
jobs = append(jobs, tableJobs...)
}
glog.V(1).Infof("Found %d maintenance jobs for table bucket %s", len(jobs), bucketName)
return jobs, nil
}
// TableInfo represents basic table information for scanning
type TableInfo struct {
Namespace string
TableName string
TablePath string
LastCompaction time.Time
DataFileCount int
SnapshotCount int
OldestSnapshot time.Time
TotalSizeBytes int64
DeletedFileCount int
}
// checkTableMaintenanceNeeds checks if a table needs maintenance
func (s *TableMaintenanceScanner) checkTableMaintenanceNeeds(bucketName string, table TableInfo) []*TableMaintenanceJob {
var jobs []*TableMaintenanceJob
now := time.Now()
// Check for compaction needs
if s.needsCompaction(table) {
jobs = append(jobs, &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: bucketName,
Namespace: table.Namespace,
TableName: table.TableName,
TablePath: table.TablePath,
Priority: types.TaskPriorityNormal,
Reason: "Table has many small files that can be compacted",
CreatedAt: now,
})
}
// Check for snapshot expiration needs
if s.needsSnapshotExpiration(table) {
jobs = append(jobs, &TableMaintenanceJob{
JobType: JobTypeSnapshotExpiration,
TableBucket: bucketName,
Namespace: table.Namespace,
TableName: table.TableName,
TablePath: table.TablePath,
Priority: types.TaskPriorityLow,
Reason: "Table has expired snapshots that can be removed",
CreatedAt: now,
})
}
// Check for orphan cleanup needs
if s.needsOrphanCleanup(table) {
jobs = append(jobs, &TableMaintenanceJob{
JobType: JobTypeOrphanCleanup,
TableBucket: bucketName,
Namespace: table.Namespace,
TableName: table.TableName,
TablePath: table.TablePath,
Priority: types.TaskPriorityLow,
Reason: "Table has orphaned files that can be removed",
CreatedAt: now,
})
}
return jobs
}
// needsCompaction checks if a table needs compaction
func (s *TableMaintenanceScanner) needsCompaction(table TableInfo) bool {
// Use config value directly - config is always set by NewTableMaintenanceScanner
return table.DataFileCount > s.config.CompactionFileThreshold
}
// needsSnapshotExpiration checks if a table has expired snapshots
func (s *TableMaintenanceScanner) needsSnapshotExpiration(table TableInfo) bool {
if table.SnapshotCount <= 1 {
return false // Keep at least one snapshot
}
// Use config value directly - config is always set by NewTableMaintenanceScanner
cutoff := time.Now().AddDate(0, 0, -s.config.SnapshotRetentionDays)
return table.OldestSnapshot.Before(cutoff)
}
// needsOrphanCleanup checks if a table might have orphaned files
func (s *TableMaintenanceScanner) needsOrphanCleanup(table TableInfo) bool {
// Tables with deleted files might have orphans
return table.DeletedFileCount > 0
}
// CreateTaskParams creates task parameters for a maintenance job.
// The job type is encoded in the Node field as "job_type:table_path" format
// to allow the worker to know which maintenance operation to perform.
func CreateTaskParams(job *TableMaintenanceJob) *worker_pb.TaskParams {
// Encode job type in the node field: "job_type:table_path"
nodeValue := fmt.Sprintf("%s:%s", job.JobType, job.TablePath)
return &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: nodeValue,
},
},
VolumeId: 0, // Not volume-specific
Collection: job.TableBucket,
}
}

325
weed/worker/tasks/table_maintenance/iceberg_ops.go

@ -0,0 +1,325 @@
package table_maintenance
import (
"context"
"encoding/json"
"fmt"
"io"
"path"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// IcebergTableMetadata represents the Iceberg table metadata structure
type IcebergTableMetadata struct {
FormatVersion int `json:"format-version"`
TableUUID string `json:"table-uuid"`
Location string `json:"location"`
Snapshots []IcebergSnapshot `json:"snapshots,omitempty"`
CurrentSnap int64 `json:"current-snapshot-id"`
Refs map[string]SnapRef `json:"refs,omitempty"`
}
// IcebergSnapshot represents an Iceberg table snapshot
type IcebergSnapshot struct {
SnapshotID int64 `json:"snapshot-id"`
TimestampMs int64 `json:"timestamp-ms"`
ManifestList string `json:"manifest-list"`
Summary map[string]string `json:"summary,omitempty"`
ParentSnapshotID *int64 `json:"parent-snapshot-id,omitempty"`
}
// SnapRef represents a snapshot reference (branch or tag)
type SnapRef struct {
SnapshotID int64 `json:"snapshot-id"`
Type string `json:"type"`
}
// IcebergManifestList represents an Iceberg manifest list
type IcebergManifestList struct {
Manifests []IcebergManifestEntry `json:"manifests"`
}
// IcebergManifestEntry represents an entry in a manifest list
type IcebergManifestEntry struct {
ManifestPath string `json:"manifest_path"`
ManifestLength int64 `json:"manifest_length"`
PartitionSpecID int `json:"partition_spec_id"`
AddedFilesCount int `json:"added_files_count"`
ExistingFiles int `json:"existing_files_count"`
DeletedFiles int `json:"deleted_files_count"`
}
// IcebergManifest represents an Iceberg manifest file containing data file entries
type IcebergManifest struct {
Entries []IcebergDataFileEntry `json:"entries"`
}
// IcebergDataFileEntry represents a data file entry in a manifest
type IcebergDataFileEntry struct {
Status int `json:"status"` // 0=existing, 1=added, 2=deleted
DataFile DataFile `json:"data_file"`
}
// DataFile represents an Iceberg data file
type DataFile struct {
FilePath string `json:"file_path"`
FileFormat string `json:"file_format"`
RecordCount int64 `json:"record_count"`
FileSizeInBytes int64 `json:"file_size_in_bytes"`
}
// TableMaintenanceContext provides context for table maintenance operations
type TableMaintenanceContext struct {
FilerClient filer_pb.SeaweedFilerClient
TablePath string
MetadataDir string
DataDir string
Config *Config
}
// NewTableMaintenanceContext creates a new maintenance context
func NewTableMaintenanceContext(client filer_pb.SeaweedFilerClient, tablePath string, config *Config) *TableMaintenanceContext {
return &TableMaintenanceContext{
FilerClient: client,
TablePath: tablePath,
MetadataDir: path.Join(tablePath, "metadata"),
DataDir: path.Join(tablePath, "data"),
Config: config,
}
}
// ReadTableMetadata reads the current Iceberg table metadata
func (mc *TableMaintenanceContext) ReadTableMetadata(ctx context.Context) (*IcebergTableMetadata, string, error) {
// Find the latest metadata file (v*.metadata.json)
metadataFiles, err := mc.listMetadataFiles(ctx)
if err != nil {
return nil, "", fmt.Errorf("failed to list metadata files: %w", err)
}
if len(metadataFiles) == 0 {
return nil, "", fmt.Errorf("no metadata files found in %s", mc.MetadataDir)
}
// Sort to get the latest version
sort.Strings(metadataFiles)
latestMetadataFile := metadataFiles[len(metadataFiles)-1]
metadataPath := path.Join(mc.MetadataDir, latestMetadataFile)
// Read the metadata file content
content, err := mc.readFileContent(ctx, metadataPath)
if err != nil {
return nil, "", fmt.Errorf("failed to read metadata file %s: %w", metadataPath, err)
}
var metadata IcebergTableMetadata
if err := json.Unmarshal(content, &metadata); err != nil {
return nil, "", fmt.Errorf("failed to parse metadata: %w", err)
}
return &metadata, metadataPath, nil
}
// listMetadataFiles lists all metadata files in the metadata directory
func (mc *TableMaintenanceContext) listMetadataFiles(ctx context.Context) ([]string, error) {
return mc.listFilesWithPattern(ctx, mc.MetadataDir, "v", ".metadata.json")
}
// listFilesWithPattern lists files matching a prefix and suffix pattern
func (mc *TableMaintenanceContext) listFilesWithPattern(ctx context.Context, dir, prefix, suffix string) ([]string, error) {
var files []string
resp, err := mc.FilerClient.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: dir,
Limit: 1000,
})
if err != nil {
return nil, err
}
for {
entry, err := resp.Recv()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
name := entry.Entry.Name
if strings.HasPrefix(name, prefix) && strings.HasSuffix(name, suffix) {
files = append(files, name)
}
}
return files, nil
}
// listAllFiles lists all files in a directory recursively
func (mc *TableMaintenanceContext) listAllFiles(ctx context.Context, dir string) ([]string, error) {
var files []string
resp, err := mc.FilerClient.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: dir,
Limit: 10000,
})
if err != nil {
return nil, err
}
for {
entry, err := resp.Recv()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
if entry.Entry.IsDirectory {
// Recurse into subdirectory
subFiles, err := mc.listAllFiles(ctx, path.Join(dir, entry.Entry.Name))
if err != nil {
glog.V(2).Infof("Failed to list subdirectory %s: %v", entry.Entry.Name, err)
continue
}
files = append(files, subFiles...)
} else {
files = append(files, path.Join(dir, entry.Entry.Name))
}
}
return files, nil
}
// readFileContent reads the content of a file.
// Note: This implementation handles inline content only. For chunked files (large metadata files),
// a full implementation would need to use the filer read interface to assemble chunks from volume servers.
// Iceberg metadata files are typically small (KB-MB range) and fit in inline content.
func (mc *TableMaintenanceContext) readFileContent(ctx context.Context, filePath string) ([]byte, error) {
dir, name := splitPath(filePath)
resp, err := filer_pb.LookupEntry(ctx, mc.FilerClient, &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
})
if err != nil {
return nil, err
}
// For small metadata files, the content may be inline
if len(resp.Entry.Content) > 0 {
return resp.Entry.Content, nil
}
// For chunked files, we need to read from volume servers using the chunk reading API.
// This requires access to volume server clients which would be passed via context.
// For now, return an error - the caller should use the filer's HTTP read API for large files.
return nil, fmt.Errorf("file %s requires chunk reading - use filer HTTP API for large files", filePath)
}
// deleteFile deletes a single file
func (mc *TableMaintenanceContext) deleteFile(ctx context.Context, filePath string) error {
dir, name := splitPath(filePath)
_, err := mc.FilerClient.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
Directory: dir,
Name: name,
IsDeleteData: true,
})
return err
}
// GetReferencedFiles returns all files referenced by the current table metadata.
// IMPORTANT: This is a partial implementation. Full implementation requires:
// 1. Parsing Avro manifest list files to extract manifest file paths
// 2. Parsing Avro manifest files to extract data file paths
// Currently only marks manifest list files as referenced. DO NOT use for orphan deletion
// until manifest parsing is implemented via an Avro library (e.g., goavro).
func (mc *TableMaintenanceContext) GetReferencedFiles(ctx context.Context, metadata *IcebergTableMetadata) (map[string]bool, error) {
referenced := make(map[string]bool)
for _, snapshot := range metadata.Snapshots {
// Add manifest list file
if snapshot.ManifestList != "" {
referenced[snapshot.ManifestList] = true
}
// NOTE: Full implementation would:
// 1. Read the manifest list Avro file
// 2. Extract all manifest file paths from it
// 3. For each manifest, read the Avro file
// 4. Extract all data file paths from the manifest entries
// This requires goavro or similar library for Avro deserialization
}
return referenced, nil
}
// GetExpiredSnapshots returns snapshots older than the retention period
func (mc *TableMaintenanceContext) GetExpiredSnapshots(metadata *IcebergTableMetadata, retentionDays int) []IcebergSnapshot {
var expired []IcebergSnapshot
cutoffTime := time.Now().AddDate(0, 0, -retentionDays).UnixMilli()
for _, snapshot := range metadata.Snapshots {
// Never expire the current snapshot
if snapshot.SnapshotID == metadata.CurrentSnap {
continue
}
// Check if referenced by any branch/tag
isReferenced := false
for _, ref := range metadata.Refs {
if ref.SnapshotID == snapshot.SnapshotID {
isReferenced = true
break
}
}
if !isReferenced && snapshot.TimestampMs < cutoffTime {
expired = append(expired, snapshot)
}
}
return expired
}
// GetSmallDataFiles returns data files smaller than the target size
func (mc *TableMaintenanceContext) GetSmallDataFiles(ctx context.Context, targetSizeBytes int64) ([]string, error) {
var smallFiles []string
// List all files in the data directory with their size information
resp, err := mc.FilerClient.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: mc.DataDir,
Limit: 10000,
})
if err != nil {
return nil, err
}
for {
entry, err := resp.Recv()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
if !entry.Entry.IsDirectory {
// Use the FileSize from the Entry attributes directly
if entry.Entry.Attributes != nil && entry.Entry.Attributes.FileSize < uint64(targetSizeBytes) {
smallFiles = append(smallFiles, path.Join(mc.DataDir, entry.Entry.Name))
}
}
}
return smallFiles, nil
}
// splitPath splits a path into directory and name components
func splitPath(p string) (dir, name string) {
dir = path.Dir(p)
name = path.Base(p)
return
}

139
weed/worker/tasks/table_maintenance/register.go

@ -0,0 +1,139 @@
package table_maintenance
import (
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Global variable to hold the task definition for configuration updates
var globalTaskDef *base.TaskDefinition
// Auto-register this task when the package is imported
func init() {
RegisterTableMaintenanceTask()
// Register config updater
tasks.AutoRegisterConfigUpdater(types.TaskTypeTableMaintenance, UpdateConfigFromPersistence)
}
// RegisterTableMaintenanceTask registers the table maintenance task with the task system
func RegisterTableMaintenanceTask() {
// Create configuration instance
config := NewDefaultConfig()
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeTableMaintenance,
Name: "table_maintenance",
DisplayName: "Table Maintenance",
Description: "Performs maintenance operations on S3 Table Buckets including compaction, snapshot expiration, and orphan cleanup",
Icon: "fas fa-table text-info",
Capabilities: []string{"table_maintenance", "iceberg", "s3tables"},
Config: config,
ConfigSpec: GetConfigSpec(),
CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
if params == nil {
return nil, fmt.Errorf("task parameters are required")
}
if len(params.Sources) == 0 {
return nil, fmt.Errorf("at least one source (table path) is required")
}
// Parse table info from parameters
tablePath := params.Sources[0].Node
tableBucket := params.Collection
// Determine job type from source node format:
// Format: "job_type:table_path" (e.g., "compaction:/table-buckets/bucket/ns/table")
// If no prefix, default to compaction for backward compatibility.
// NOTE: A proper implementation would define TableMaintenanceTaskParams in
// weed/pb/worker.proto to pass job details explicitly, similar to VacuumTaskParams.
jobType := JobTypeCompaction
if colonIdx := strings.Index(tablePath, ":"); colonIdx > 0 && colonIdx < len(tablePath)-1 {
jobTypeStr := tablePath[:colonIdx]
tablePath = tablePath[colonIdx+1:]
switch TableMaintenanceJobType(jobTypeStr) {
case JobTypeCompaction:
jobType = JobTypeCompaction
case JobTypeSnapshotExpiration:
jobType = JobTypeSnapshotExpiration
case JobTypeOrphanCleanup:
jobType = JobTypeOrphanCleanup
case JobTypeManifestRewrite:
jobType = JobTypeManifestRewrite
default:
glog.Warningf("Unknown job type '%s', defaulting to compaction", jobTypeStr)
}
}
// Parse namespace and tableName from tablePath
// Expected format: /table-buckets/bucketName/namespaceName/tableName
namespace := ""
tableName := ""
parts := strings.Split(tablePath, "/")
if len(parts) > 3 {
namespace = parts[3]
}
if len(parts) > 4 {
tableName = parts[4]
}
// Create the maintenance job
job := &TableMaintenanceJob{
JobType: jobType,
TableBucket: tableBucket,
TablePath: tablePath,
Priority: types.TaskPriorityNormal,
CreatedAt: time.Now(),
}
return NewTableMaintenanceTask(
fmt.Sprintf("table-maintenance-%s-%d", tableBucket, time.Now().UnixNano()),
tableBucket,
namespace,
tableName,
job,
), nil
},
DetectionFunc: Detection,
ScanInterval: 30 * time.Minute,
SchedulingFunc: Scheduling,
MaxConcurrent: 2,
RepeatInterval: 24 * time.Hour,
}
// Store task definition globally for configuration updates
globalTaskDef = taskDef
// Register everything with a single function call
base.RegisterTask(taskDef)
glog.V(1).Infof("Registered table_maintenance task type")
}
// UpdateConfigFromPersistence updates the table maintenance configuration from persistence
func UpdateConfigFromPersistence(configPersistence interface{}) error {
if globalTaskDef == nil {
return fmt.Errorf("table_maintenance task not registered")
}
// Load configuration from persistence
newConfig := LoadConfigFromPersistence(configPersistence)
if newConfig == nil {
return fmt.Errorf("failed to load configuration from persistence")
}
// Update the task definition's config
globalTaskDef.Config = newConfig
glog.V(1).Infof("Updated table_maintenance task configuration from persistence")
return nil
}

85
weed/worker/tasks/table_maintenance/scheduling.go

@ -0,0 +1,85 @@
package table_maintenance
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// TableMaintenanceScheduler implements types.TaskScheduler for table maintenance
type TableMaintenanceScheduler struct {
config *Config
}
// NewTableMaintenanceScheduler creates a new scheduler
func NewTableMaintenanceScheduler(config *Config) *TableMaintenanceScheduler {
return &TableMaintenanceScheduler{
config: config,
}
}
// ShouldSchedule determines if a task should be scheduled based on the detection result
func (s *TableMaintenanceScheduler) ShouldSchedule(result *types.TaskDetectionResult, lastExecution time.Time) bool {
// Don't schedule if disabled
if s.config != nil && !s.config.Enabled {
return false
}
// Schedule if never executed
if lastExecution.IsZero() {
return true
}
// Schedule based on repeat interval
repeatInterval := s.GetDefaultRepeatInterval()
return time.Since(lastExecution) >= repeatInterval
}
// GetMaxConcurrent returns the maximum concurrent tasks
func (s *TableMaintenanceScheduler) GetMaxConcurrent() int {
if s.config != nil && s.config.MaxConcurrent > 0 {
return s.config.MaxConcurrent
}
return 2 // Default
}
// GetDefaultRepeatInterval returns the default repeat interval between task executions
func (s *TableMaintenanceScheduler) GetDefaultRepeatInterval() time.Duration {
// Table maintenance can run more frequently than volume maintenance
return 24 * time.Hour // Once per day per table
}
// Scheduling is the function signature required by the task registration system
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
tableConfig, ok := config.(*Config)
if !ok {
tableConfig = NewDefaultConfig()
}
// Count running table maintenance tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeTableMaintenance {
runningCount++
}
}
// Check concurrency limit
if runningCount >= tableConfig.MaxConcurrent {
return false
}
// Check for available workers with table maintenance capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeTableMaintenance {
return true
}
}
}
}
return false
}

505
weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go

@ -0,0 +1,505 @@
package table_maintenance
import (
"context"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Integration tests for table maintenance
// These tests verify the complete workflow of table maintenance operations
func TestTableMaintenanceWorkflow_Compaction(t *testing.T) {
t.Skip("TODO: Enable when actual executeCompaction implementation is complete")
// Test the complete compaction workflow
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
TablePath: "/table-buckets/test-bucket/test-namespace/test-table",
Priority: types.TaskPriorityNormal,
Reason: "Table has many small files",
CreatedAt: time.Now(),
Params: map[string]string{
"target_file_size": "128MB",
},
}
task := NewTableMaintenanceTask("compaction-test-1", "test-bucket", "test-namespace", "test-table", job)
// Verify initial state
if task.GetStatus() != types.TaskStatusPending {
t.Errorf("Expected pending status, got %v", task.GetStatus())
}
// Execute the task
ctx := context.Background()
params := &worker_pb.TaskParams{
Collection: "test-bucket",
Sources: []*worker_pb.TaskSource{
{Node: "/table-buckets/test-bucket/test-namespace/test-table"},
},
}
err := task.Execute(ctx, params)
if err != nil {
t.Errorf("Unexpected error during compaction: %v", err)
}
// Verify completion
if task.GetStatus() != types.TaskStatusCompleted {
t.Errorf("Expected completed status, got %v", task.GetStatus())
}
if task.GetProgress() != 100 {
t.Errorf("Expected progress 100, got %v", task.GetProgress())
}
}
func TestTableMaintenanceWorkflow_SnapshotExpiration(t *testing.T) {
t.Skip("TODO: Enable when actual executeSnapshotExpiration implementation is complete")
job := &TableMaintenanceJob{
JobType: JobTypeSnapshotExpiration,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
TablePath: "/table-buckets/test-bucket/test-namespace/test-table",
Priority: types.TaskPriorityLow,
Reason: "Table has expired snapshots",
CreatedAt: time.Now(),
Params: map[string]string{
"retention_days": "7",
},
}
task := NewTableMaintenanceTask("snapshot-exp-test-1", "test-bucket", "test-namespace", "test-table", job)
ctx := context.Background()
params := &worker_pb.TaskParams{
Collection: "test-bucket",
}
err := task.Execute(ctx, params)
if err != nil {
t.Errorf("Unexpected error during snapshot expiration: %v", err)
}
if task.GetStatus() != types.TaskStatusCompleted {
t.Errorf("Expected completed status, got %v", task.GetStatus())
}
}
func TestTableMaintenanceWorkflow_OrphanCleanup(t *testing.T) {
t.Skip("TODO: Enable when actual executeOrphanCleanup implementation is complete")
job := &TableMaintenanceJob{
JobType: JobTypeOrphanCleanup,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
TablePath: "/table-buckets/test-bucket/test-namespace/test-table",
Priority: types.TaskPriorityLow,
Reason: "Table has orphan files",
CreatedAt: time.Now(),
Params: map[string]string{
"orphan_age_hours": "24",
},
}
task := NewTableMaintenanceTask("orphan-cleanup-test-1", "test-bucket", "test-namespace", "test-table", job)
ctx := context.Background()
params := &worker_pb.TaskParams{
Collection: "test-bucket",
}
err := task.Execute(ctx, params)
if err != nil {
t.Errorf("Unexpected error during orphan cleanup: %v", err)
}
if task.GetStatus() != types.TaskStatusCompleted {
t.Errorf("Expected completed status, got %v", task.GetStatus())
}
}
func TestTableMaintenanceWorkflow_ManifestRewrite(t *testing.T) {
t.Skip("TODO: Enable when actual executeManifestRewrite implementation is complete")
job := &TableMaintenanceJob{
JobType: JobTypeManifestRewrite,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
TablePath: "/table-buckets/test-bucket/test-namespace/test-table",
Priority: types.TaskPriorityLow,
Reason: "Table has fragmented manifests",
CreatedAt: time.Now(),
Params: map[string]string{
"target_manifest_entries": "1000",
},
}
task := NewTableMaintenanceTask("manifest-rewrite-test-1", "test-bucket", "test-namespace", "test-table", job)
ctx := context.Background()
params := &worker_pb.TaskParams{
Collection: "test-bucket",
}
err := task.Execute(ctx, params)
if err != nil {
t.Errorf("Unexpected error during manifest rewrite: %v", err)
}
if task.GetStatus() != types.TaskStatusCompleted {
t.Errorf("Expected completed status, got %v", task.GetStatus())
}
}
func TestTableMaintenanceWorkflow_CancellationDuringExecution(t *testing.T) {
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
TablePath: "/table-buckets/test-bucket/test-namespace/test-table",
Priority: types.TaskPriorityNormal,
CreatedAt: time.Now(),
}
task := NewTableMaintenanceTask("cancel-test-1", "test-bucket", "test-namespace", "test-table", job)
// Cancel before execution
err := task.Cancel()
if err != nil {
t.Errorf("Unexpected error during cancellation: %v", err)
}
if task.GetStatus() != types.TaskStatusCancelled {
t.Errorf("Expected cancelled status, got %v", task.GetStatus())
}
if !task.IsCancellable() {
t.Error("Task should be cancellable")
}
}
func TestTableMaintenanceWorkflow_UnknownJobType(t *testing.T) {
job := &TableMaintenanceJob{
JobType: TableMaintenanceJobType("unknown_type"),
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
CreatedAt: time.Now(),
}
task := NewTableMaintenanceTask("unknown-test-1", "test-bucket", "test-namespace", "test-table", job)
ctx := context.Background()
params := &worker_pb.TaskParams{}
err := task.Execute(ctx, params)
if err == nil {
t.Error("Expected error for unknown job type")
}
if task.GetStatus() != types.TaskStatusFailed {
t.Errorf("Expected failed status, got %v", task.GetStatus())
}
}
func TestDetectionToTaskWorkflow(t *testing.T) {
// Test the full detection to task creation workflow
config := NewDefaultConfig()
scanner := NewTableMaintenanceScanner(config)
// Simulate table info that needs maintenance
tables := []TableInfo{
{
Namespace: "default",
TableName: "large_table",
TablePath: "/table-buckets/bucket1/default/large_table",
DataFileCount: 150, // Above threshold
SnapshotCount: 10,
OldestSnapshot: time.Now().AddDate(0, 0, -30), // Old snapshots
TotalSizeBytes: 1024 * 1024 * 1024, // 1GB
DeletedFileCount: 5,
},
{
Namespace: "default",
TableName: "small_table",
TablePath: "/table-buckets/bucket1/default/small_table",
DataFileCount: 10, // Below threshold
SnapshotCount: 2,
OldestSnapshot: time.Now().AddDate(0, 0, -3), // Recent
TotalSizeBytes: 1024 * 1024, // 1MB
DeletedFileCount: 0,
},
}
jobs, err := scanner.ScanTableBucket("bucket1", tables)
if err != nil {
t.Fatalf("Unexpected error scanning table bucket: %v", err)
}
// Should find maintenance jobs for large_table only
if len(jobs) == 0 {
t.Error("Expected to find maintenance jobs for large_table")
}
// Verify job types
hasCompaction := false
hasSnapshotExpiration := false
hasOrphanCleanup := false
for _, job := range jobs {
switch job.JobType {
case JobTypeCompaction:
hasCompaction = true
if job.TableName != "large_table" {
t.Errorf("Expected compaction job for large_table, got %s", job.TableName)
}
case JobTypeSnapshotExpiration:
hasSnapshotExpiration = true
case JobTypeOrphanCleanup:
hasOrphanCleanup = true
}
}
if !hasCompaction {
t.Error("Expected compaction job for table with many files")
}
if !hasSnapshotExpiration {
t.Error("Expected snapshot expiration job for table with old snapshots")
}
if !hasOrphanCleanup {
t.Error("Expected orphan cleanup job for table with deleted files")
}
}
func TestSchedulingWithConcurrencyLimits(t *testing.T) {
config := NewDefaultConfig()
config.MaxConcurrent = 2
// Create running tasks at limit
runningTasks := []*types.TaskInput{
{Type: types.TaskTypeTableMaintenance},
{Type: types.TaskTypeTableMaintenance},
}
// Create available workers
workers := []*types.WorkerData{
{
ID: "worker1",
CurrentLoad: 0,
MaxConcurrent: 5,
Capabilities: []types.TaskType{types.TaskTypeTableMaintenance},
},
}
// New task to schedule
newTask := &types.TaskInput{
Type: types.TaskTypeTableMaintenance,
}
// Should not schedule when at max concurrent
result := Scheduling(newTask, runningTasks, workers, config)
if result {
t.Error("Should not schedule when at max concurrent limit")
}
// Reduce running tasks
runningTasks = []*types.TaskInput{
{Type: types.TaskTypeTableMaintenance},
}
// Should now schedule
result = Scheduling(newTask, runningTasks, workers, config)
if !result {
t.Error("Should schedule when under max concurrent limit")
}
}
func TestSchedulingWithNoCapableWorkers(t *testing.T) {
config := NewDefaultConfig()
runningTasks := []*types.TaskInput{}
// Workers without table maintenance capability
workers := []*types.WorkerData{
{
ID: "worker1",
CurrentLoad: 0,
MaxConcurrent: 5,
Capabilities: []types.TaskType{types.TaskTypeVacuum}, // Only vacuum
},
}
newTask := &types.TaskInput{
Type: types.TaskTypeTableMaintenance,
}
result := Scheduling(newTask, runningTasks, workers, config)
if result {
t.Error("Should not schedule when no workers have required capability")
}
}
func TestSchedulingWithOverloadedWorkers(t *testing.T) {
config := NewDefaultConfig()
runningTasks := []*types.TaskInput{}
// Worker at capacity
workers := []*types.WorkerData{
{
ID: "worker1",
CurrentLoad: 5, // At max
MaxConcurrent: 5,
Capabilities: []types.TaskType{types.TaskTypeTableMaintenance},
},
}
newTask := &types.TaskInput{
Type: types.TaskTypeTableMaintenance,
}
result := Scheduling(newTask, runningTasks, workers, config)
if result {
t.Error("Should not schedule when all workers are at capacity")
}
}
func TestConfigPersistence(t *testing.T) {
// Test config to policy conversion
config := NewDefaultConfig()
config.ScanIntervalMinutes = 60
config.CompactionFileThreshold = 200
config.SnapshotRetentionDays = 14
config.MaxConcurrent = 4
policy := config.ToTaskPolicy()
if policy == nil {
t.Fatal("ToTaskPolicy returned nil")
}
if !policy.Enabled {
t.Error("Expected enabled policy")
}
if policy.MaxConcurrent != 4 {
t.Errorf("Expected MaxConcurrent 4, got %d", policy.MaxConcurrent)
}
// Test round-trip
newConfig := NewDefaultConfig()
err := newConfig.FromTaskPolicy(policy)
if err != nil {
t.Fatalf("FromTaskPolicy failed: %v", err)
}
if newConfig.MaxConcurrent != 4 {
t.Errorf("Expected MaxConcurrent 4 after round-trip, got %d", newConfig.MaxConcurrent)
}
// Verify custom fields are preserved (the fix for the review comment)
if newConfig.CompactionFileThreshold != 200 {
t.Errorf("Expected CompactionFileThreshold 200 after round-trip, got %d", newConfig.CompactionFileThreshold)
}
if newConfig.SnapshotRetentionDays != 14 {
t.Errorf("Expected SnapshotRetentionDays 14 after round-trip, got %d", newConfig.SnapshotRetentionDays)
}
if newConfig.ScanIntervalMinutes != 60 {
t.Errorf("Expected ScanIntervalMinutes 60 after round-trip, got %d", newConfig.ScanIntervalMinutes)
}
}
func TestIcebergOps_GetExpiredSnapshots(t *testing.T) {
mc := &TableMaintenanceContext{
Config: NewDefaultConfig(),
}
now := time.Now()
metadata := &IcebergTableMetadata{
FormatVersion: 2,
CurrentSnap: 3,
Snapshots: []IcebergSnapshot{
{SnapshotID: 1, TimestampMs: now.AddDate(0, 0, -30).UnixMilli()}, // Old, not current
{SnapshotID: 2, TimestampMs: now.AddDate(0, 0, -10).UnixMilli()}, // Old, not current
{SnapshotID: 3, TimestampMs: now.AddDate(0, 0, -1).UnixMilli()}, // Current snapshot
},
Refs: map[string]SnapRef{
"main": {SnapshotID: 3, Type: "branch"},
},
}
expired := mc.GetExpiredSnapshots(metadata, 7)
// Should find snapshots 1 and 2 as expired (older than 7 days, not current, not referenced)
if len(expired) != 2 {
t.Errorf("Expected 2 expired snapshots, got %d", len(expired))
}
// Current snapshot should never be expired
for _, s := range expired {
if s.SnapshotID == 3 {
t.Error("Current snapshot should never be expired")
}
}
}
func TestParseBytes(t *testing.T) {
testCases := []struct {
input string
expected int64
}{
{"128MB", 128 * 1024 * 1024},
{"1GB", 1024 * 1024 * 1024},
{"512KB", 512 * 1024},
{"1024B", 1024},
{"1024", 1024},
{" 256 MB ", 256 * 1024 * 1024},
}
for _, tc := range testCases {
result, err := parseBytes(tc.input)
if err != nil {
t.Errorf("parseBytes(%q) failed: %v", tc.input, err)
continue
}
if result != tc.expected {
t.Errorf("parseBytes(%q) = %d, expected %d", tc.input, result, tc.expected)
}
}
}
func TestParseInt(t *testing.T) {
testCases := []struct {
input string
expected int
}{
{"42", 42},
{" 100 ", 100},
{"0", 0},
{"-5", -5},
}
for _, tc := range testCases {
result, err := parseInt(tc.input)
if err != nil {
t.Errorf("parseInt(%q) failed: %v", tc.input, err)
continue
}
if result != tc.expected {
t.Errorf("parseInt(%q) = %d, expected %d", tc.input, result, tc.expected)
}
}
}

370
weed/worker/tasks/table_maintenance/table_maintenance_task.go

@ -0,0 +1,370 @@
package table_maintenance
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
)
// TableMaintenanceTask handles maintenance operations for S3 Table Buckets
// This includes:
// - Iceberg table compaction
// - Snapshot expiration
// - Orphan file cleanup
// - Manifest optimization
type TableMaintenanceTask struct {
*base.BaseTask
TableBucket string
Namespace string
TableName string
MaintenanceJob *TableMaintenanceJob
status types.TaskStatus
startTime time.Time
progress float64
}
// TableMaintenanceJob represents a specific maintenance operation for a table
type TableMaintenanceJob struct {
JobType TableMaintenanceJobType `json:"job_type"`
TableBucket string `json:"table_bucket"`
Namespace string `json:"namespace"`
TableName string `json:"table_name"`
TablePath string `json:"table_path"`
Priority types.TaskPriority `json:"priority"`
Reason string `json:"reason"`
CreatedAt time.Time `json:"created_at"`
Params map[string]string `json:"params,omitempty"`
}
// TableMaintenanceJobType represents different table maintenance operations
type TableMaintenanceJobType string
const (
// JobTypeCompaction compacts small data files into larger ones
JobTypeCompaction TableMaintenanceJobType = "compaction"
// JobTypeSnapshotExpiration removes expired snapshots
JobTypeSnapshotExpiration TableMaintenanceJobType = "snapshot_expiration"
// JobTypeOrphanCleanup removes orphaned data and metadata files
JobTypeOrphanCleanup TableMaintenanceJobType = "orphan_cleanup"
// JobTypeManifestRewrite rewrites manifest files for optimization
JobTypeManifestRewrite TableMaintenanceJobType = "manifest_rewrite"
)
// NewTableMaintenanceTask creates a new table maintenance task
func NewTableMaintenanceTask(id, tableBucket, namespace, tableName string, job *TableMaintenanceJob) *TableMaintenanceTask {
return &TableMaintenanceTask{
BaseTask: base.NewBaseTask(id, types.TaskTypeTableMaintenance),
TableBucket: tableBucket,
Namespace: namespace,
TableName: tableName,
MaintenanceJob: job,
status: types.TaskStatusPending,
}
}
// Validate validates the task parameters
func (t *TableMaintenanceTask) Validate(params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task params cannot be nil")
}
return nil
}
// EstimateTime estimates the time needed for the task
func (t *TableMaintenanceTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
// Estimate based on job type
switch t.MaintenanceJob.JobType {
case JobTypeCompaction:
return 5 * time.Minute
case JobTypeSnapshotExpiration:
return 1 * time.Minute
case JobTypeOrphanCleanup:
return 3 * time.Minute
case JobTypeManifestRewrite:
return 2 * time.Minute
default:
return 5 * time.Minute
}
}
// GetID returns the task ID
func (t *TableMaintenanceTask) GetID() string {
return t.ID()
}
// GetType returns the task type
func (t *TableMaintenanceTask) GetType() types.TaskType {
return types.TaskTypeTableMaintenance
}
// GetStatus returns the current task status
func (t *TableMaintenanceTask) GetStatus() types.TaskStatus {
return t.status
}
// GetProgress returns the task progress (0-100)
func (t *TableMaintenanceTask) GetProgress() float64 {
return t.progress
}
// Execute runs the table maintenance task
func (t *TableMaintenanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
t.status = types.TaskStatusInProgress
t.startTime = time.Now()
glog.Infof("Starting table maintenance task %s: %s on %s/%s/%s",
t.ID(), t.MaintenanceJob.JobType, t.TableBucket, t.Namespace, t.TableName)
// Execute the appropriate maintenance operation
var err error
switch t.MaintenanceJob.JobType {
case JobTypeCompaction:
err = t.executeCompaction(ctx)
case JobTypeSnapshotExpiration:
err = t.executeSnapshotExpiration(ctx)
case JobTypeOrphanCleanup:
err = t.executeOrphanCleanup(ctx)
case JobTypeManifestRewrite:
err = t.executeManifestRewrite(ctx)
default:
t.status = types.TaskStatusFailed
err = fmt.Errorf("unknown job type: %s", t.MaintenanceJob.JobType)
}
// Set status based on execution result
if err != nil {
t.status = types.TaskStatusFailed
} else if t.status == types.TaskStatusInProgress {
// Only mark as completed if no error and still in progress
t.status = types.TaskStatusCompleted
}
glog.Infof("Table maintenance task %s completed with status: %s", t.ID(), t.status)
return err
}
// executeCompaction performs Iceberg table compaction
func (t *TableMaintenanceTask) executeCompaction(ctx context.Context) error {
t.progress = 10
glog.V(1).Infof("Executing compaction for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
// Compaction requires a filer client - check if we have one from params
// In production, this would be passed via the task execution context
t.progress = 20
t.ReportProgressWithStage(20, "Analyzing data files")
// Target file size for compaction (128MB default)
targetSizeBytes := int64(128 * 1024 * 1024)
if sizeStr, ok := t.MaintenanceJob.Params["target_file_size"]; ok {
if size, err := parseBytes(sizeStr); err == nil {
targetSizeBytes = size
}
}
t.progress = 40
t.ReportProgressWithStage(40, "Identifying small files")
// Log compaction plan (actual compaction requires reading/writing parquet files)
glog.V(1).Infof("Compaction plan for %s: target file size %d bytes",
t.MaintenanceJob.TablePath, targetSizeBytes)
t.progress = 60
t.ReportProgressWithStage(60, "Planning compaction groups")
// Compaction would involve:
// 1. Group small files by partition
// 2. Read parquet files in each group
// 3. Write combined parquet file
// 4. Create new manifest pointing to combined file
// 5. Create new metadata version
// This requires parquet library integration
t.progress = 80
t.ReportProgressWithStage(80, "Compaction analysis complete")
glog.Infof("Compaction analysis completed for table %s/%s/%s (full implementation requires parquet library)",
t.TableBucket, t.Namespace, t.TableName)
t.progress = 100
t.ReportProgressWithStage(100, "Completed")
return nil
}
// executeSnapshotExpiration removes expired snapshots
func (t *TableMaintenanceTask) executeSnapshotExpiration(ctx context.Context) error {
t.progress = 10
t.ReportProgressWithStage(10, "Reading table metadata")
glog.V(1).Infof("Executing snapshot expiration for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
// Get retention period from params or use default
retentionDays := 7
if daysStr, ok := t.MaintenanceJob.Params["retention_days"]; ok {
if days, err := parseInt(daysStr); err == nil {
retentionDays = days
}
}
t.progress = 30
t.ReportProgressWithStage(30, "Analyzing snapshots")
// Snapshot expiration would involve:
// 1. Read current metadata to get snapshot list
// 2. Identify snapshots older than retention that are not referenced
// 3. Collect files only referenced by expired snapshots
// 4. Create new metadata without expired snapshots
// 5. Delete orphaned files
glog.V(1).Infof("Snapshot expiration plan for %s: retention %d days",
t.MaintenanceJob.TablePath, retentionDays)
t.progress = 60
t.ReportProgressWithStage(60, "Identifying expired snapshots")
// Track what would be expired
cutoffTime := time.Now().AddDate(0, 0, -retentionDays)
glog.V(1).Infof("Would expire snapshots older than %s", cutoffTime.Format(time.RFC3339))
t.progress = 80
t.ReportProgressWithStage(80, "Expiration analysis complete")
glog.Infof("Snapshot expiration analysis completed for table %s/%s/%s",
t.TableBucket, t.Namespace, t.TableName)
t.progress = 100
t.ReportProgressWithStage(100, "Completed")
return nil
}
// executeOrphanCleanup removes orphaned files
func (t *TableMaintenanceTask) executeOrphanCleanup(ctx context.Context) error {
t.progress = 10
t.ReportProgressWithStage(10, "Scanning table files")
glog.V(1).Infof("Executing orphan cleanup for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
t.progress = 30
t.ReportProgressWithStage(30, "Reading table metadata")
// Orphan cleanup would involve:
// 1. List all files in data/ and metadata/ directories
// 2. Parse current metadata to get all referenced files
// 3. Compute the set difference (files on disk - referenced files)
// 4. Delete orphaned files with safety checks
glog.V(1).Infof("Orphan cleanup analysis for %s", t.MaintenanceJob.TablePath)
t.progress = 50
t.ReportProgressWithStage(50, "Comparing file references")
// Safety: only delete files older than a certain age to avoid race conditions
// with concurrent writes
orphanAgeThresholdHours := 24
if ageStr, ok := t.MaintenanceJob.Params["orphan_age_hours"]; ok {
if age, err := parseInt(ageStr); err == nil {
orphanAgeThresholdHours = age
}
}
glog.V(1).Infof("Would delete orphan files older than %d hours", orphanAgeThresholdHours)
t.progress = 80
t.ReportProgressWithStage(80, "Orphan analysis complete")
glog.Infof("Orphan cleanup analysis completed for table %s/%s/%s",
t.TableBucket, t.Namespace, t.TableName)
t.progress = 100
t.ReportProgressWithStage(100, "Completed")
return nil
}
// executeManifestRewrite optimizes manifest files
func (t *TableMaintenanceTask) executeManifestRewrite(ctx context.Context) error {
t.progress = 10
t.ReportProgressWithStage(10, "Scanning manifests")
glog.V(1).Infof("Executing manifest rewrite for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
t.progress = 30
t.ReportProgressWithStage(30, "Reading manifest structure")
// Manifest rewrite would involve:
// 1. Read current manifest list and all manifests
// 2. Identify manifests that can be combined (small manifests)
// 3. Remove entries for deleted files from manifests
// 4. Write new optimized manifests
// 5. Create new manifest list and metadata version
// Get target manifest size
targetManifestEntries := 1000
if entriesStr, ok := t.MaintenanceJob.Params["target_manifest_entries"]; ok {
if entries, err := parseInt(entriesStr); err == nil {
targetManifestEntries = entries
}
}
glog.V(1).Infof("Manifest rewrite plan for %s: target %d entries per manifest",
t.MaintenanceJob.TablePath, targetManifestEntries)
t.progress = 60
t.ReportProgressWithStage(60, "Analyzing manifest optimization")
// Track optimization opportunities
glog.V(1).Infof("Analyzing manifests for optimization opportunities")
t.progress = 80
t.ReportProgressWithStage(80, "Manifest analysis complete")
glog.Infof("Manifest rewrite analysis completed for table %s/%s/%s (full implementation requires Avro library)",
t.TableBucket, t.Namespace, t.TableName)
t.progress = 100
t.ReportProgressWithStage(100, "Completed")
return nil
}
// parseBytes parses a byte size string (e.g., "128MB") to bytes
func parseBytes(s string) (int64, error) {
s = strings.TrimSpace(strings.ToUpper(s))
multiplier := int64(1)
if strings.HasSuffix(s, "GB") {
multiplier = 1024 * 1024 * 1024
s = strings.TrimSuffix(s, "GB")
} else if strings.HasSuffix(s, "MB") {
multiplier = 1024 * 1024
s = strings.TrimSuffix(s, "MB")
} else if strings.HasSuffix(s, "KB") {
multiplier = 1024
s = strings.TrimSuffix(s, "KB")
} else if strings.HasSuffix(s, "B") {
s = strings.TrimSuffix(s, "B")
}
val, err := strconv.ParseInt(strings.TrimSpace(s), 10, 64)
if err != nil {
return 0, err
}
return val * multiplier, nil
}
// parseInt parses an integer string
func parseInt(s string) (int, error) {
return strconv.Atoi(strings.TrimSpace(s))
}
// Cancel cancels the task
func (t *TableMaintenanceTask) Cancel() error {
t.status = types.TaskStatusCancelled
return nil
}
// Cleanup performs cleanup after task completion
func (t *TableMaintenanceTask) Cleanup() error {
return nil
}

251
weed/worker/tasks/table_maintenance/table_maintenance_test.go

@ -0,0 +1,251 @@
package table_maintenance
import (
"context"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
func TestNewTableMaintenanceTask(t *testing.T) {
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
TablePath: "/table-buckets/test-bucket/test-namespace/test-table",
Priority: types.TaskPriorityNormal,
Reason: "Test job",
CreatedAt: time.Now(),
}
task := NewTableMaintenanceTask("test-id", "test-bucket", "test-namespace", "test-table", job)
if task == nil {
t.Fatal("Expected non-nil task")
}
if task.ID() != "test-id" {
t.Errorf("Expected ID 'test-id', got '%s'", task.ID())
}
if task.Type() != types.TaskTypeTableMaintenance {
t.Errorf("Expected type TableMaintenance, got %v", task.Type())
}
if task.TableBucket != "test-bucket" {
t.Errorf("Expected bucket 'test-bucket', got '%s'", task.TableBucket)
}
if task.GetStatus() != types.TaskStatusPending {
t.Errorf("Expected status Pending, got %v", task.GetStatus())
}
}
func TestTableMaintenanceTask_Validate(t *testing.T) {
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
}
task := NewTableMaintenanceTask("test-id", "test-bucket", "ns", "table", job)
// Test nil params
err := task.Validate(nil)
if err == nil {
t.Error("Expected error for nil params")
}
// Test valid params
params := &worker_pb.TaskParams{
Collection: "test-bucket",
}
err = task.Validate(params)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
func TestTableMaintenanceTask_EstimateTime(t *testing.T) {
testCases := []struct {
jobType TableMaintenanceJobType
expected time.Duration
}{
{JobTypeCompaction, 5 * time.Minute},
{JobTypeSnapshotExpiration, 1 * time.Minute},
{JobTypeOrphanCleanup, 3 * time.Minute},
{JobTypeManifestRewrite, 2 * time.Minute},
}
for _, tc := range testCases {
job := &TableMaintenanceJob{JobType: tc.jobType}
task := NewTableMaintenanceTask("test", "bucket", "ns", "table", job)
estimate := task.EstimateTime(nil)
if estimate != tc.expected {
t.Errorf("For job type %s: expected %v, got %v", tc.jobType, tc.expected, estimate)
}
}
}
func TestTableMaintenanceTask_Execute(t *testing.T) {
t.Skip("TODO: Enable when actual execute* implementations are complete")
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
}
task := NewTableMaintenanceTask("test-id", "test-bucket", "test-namespace", "test-table", job)
ctx := context.Background()
params := &worker_pb.TaskParams{}
err := task.Execute(ctx, params)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if task.GetStatus() != types.TaskStatusCompleted {
t.Errorf("Expected status Completed, got %v", task.GetStatus())
}
if task.GetProgress() != 100 {
t.Errorf("Expected progress 100, got %v", task.GetProgress())
}
}
func TestTableMaintenanceTask_Cancel(t *testing.T) {
job := &TableMaintenanceJob{JobType: JobTypeCompaction}
task := NewTableMaintenanceTask("test", "bucket", "ns", "table", job)
err := task.Cancel()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if task.GetStatus() != types.TaskStatusCancelled {
t.Errorf("Expected status Cancelled, got %v", task.GetStatus())
}
}
func TestNewDefaultConfig(t *testing.T) {
config := NewDefaultConfig()
if config == nil {
t.Fatal("Expected non-nil config")
}
if !config.Enabled {
t.Error("Expected enabled by default")
}
if config.ScanIntervalMinutes != 30 {
t.Errorf("Expected 30 minutes, got %d", config.ScanIntervalMinutes)
}
if config.CompactionFileThreshold != 100 {
t.Errorf("Expected 100 files, got %d", config.CompactionFileThreshold)
}
if config.SnapshotRetentionDays != 7 {
t.Errorf("Expected 7 days, got %d", config.SnapshotRetentionDays)
}
}
func TestTableMaintenanceScanner_NeedsCompaction(t *testing.T) {
config := NewDefaultConfig()
scanner := NewTableMaintenanceScanner(config)
// Table with few files - no compaction needed
table1 := TableInfo{
DataFileCount: 50,
}
if scanner.needsCompaction(table1) {
t.Error("Should not need compaction with only 50 files")
}
// Table with many files - needs compaction
table2 := TableInfo{
DataFileCount: 150,
}
if !scanner.needsCompaction(table2) {
t.Error("Should need compaction with 150 files")
}
}
func TestTableMaintenanceScanner_NeedsSnapshotExpiration(t *testing.T) {
config := NewDefaultConfig()
scanner := NewTableMaintenanceScanner(config)
// Single snapshot - no expiration needed
table1 := TableInfo{
SnapshotCount: 1,
OldestSnapshot: time.Now().AddDate(0, 0, -30),
}
if scanner.needsSnapshotExpiration(table1) {
t.Error("Should not expire the only snapshot")
}
// Recent snapshots - no expiration needed
table2 := TableInfo{
SnapshotCount: 5,
OldestSnapshot: time.Now().AddDate(0, 0, -3),
}
if scanner.needsSnapshotExpiration(table2) {
t.Error("Should not expire recent snapshots")
}
// Old snapshots - expiration needed
table3 := TableInfo{
SnapshotCount: 5,
OldestSnapshot: time.Now().AddDate(0, 0, -30),
}
if !scanner.needsSnapshotExpiration(table3) {
t.Error("Should expire old snapshots")
}
}
func TestScheduling(t *testing.T) {
config := NewDefaultConfig()
// Test with available workers
task := &types.TaskInput{
Type: types.TaskTypeTableMaintenance,
}
runningTasks := []*types.TaskInput{}
workers := []*types.WorkerData{
{
ID: "worker1",
CurrentLoad: 0,
MaxConcurrent: 5,
Capabilities: []types.TaskType{types.TaskTypeTableMaintenance},
},
}
result := Scheduling(task, runningTasks, workers, config)
if !result {
t.Error("Should schedule when workers are available")
}
// Test at max concurrent
runningTasks = []*types.TaskInput{
{Type: types.TaskTypeTableMaintenance},
{Type: types.TaskTypeTableMaintenance},
}
result = Scheduling(task, runningTasks, workers, config)
if result {
t.Error("Should not schedule when at max concurrent")
}
}
func TestGetConfigSpec(t *testing.T) {
spec := GetConfigSpec()
if len(spec.Fields) == 0 {
t.Error("Expected non-empty config spec")
}
// Check for expected fields
fieldNames := make(map[string]bool)
for _, field := range spec.Fields {
fieldNames[field.Name] = true
}
expectedFields := []string{"enabled", "scan_interval_minutes", "compaction_file_threshold", "snapshot_retention_days", "max_concurrent"}
for _, name := range expectedFields {
if !fieldNames[name] {
t.Errorf("Missing expected field: %s", name)
}
}
}

9
weed/worker/types/task_types.go

@ -11,10 +11,11 @@ import (
type TaskType string type TaskType string
const ( const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeBalance TaskType = "balance"
TaskTypeReplication TaskType = "replication"
TaskTypeVacuum TaskType = "vacuum"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeBalance TaskType = "balance"
TaskTypeReplication TaskType = "replication"
TaskTypeTableMaintenance TaskType = "table_maintenance"
) )
// TaskStatus represents the status of a maintenance task // TaskStatus represents the status of a maintenance task

Loading…
Cancel
Save