diff --git a/weed/admin/maintenance/maintenance_worker.go b/weed/admin/maintenance/maintenance_worker.go index e4a6b4cf6..fcc6a211e 100644 --- a/weed/admin/maintenance/maintenance_worker.go +++ b/weed/admin/maintenance/maintenance_worker.go @@ -15,6 +15,7 @@ import ( // Import task packages to trigger their auto-registration _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/table_maintenance" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) diff --git a/weed/worker/tasks/table_maintenance/config.go b/weed/worker/tasks/table_maintenance/config.go new file mode 100644 index 000000000..396d19e30 --- /dev/null +++ b/weed/worker/tasks/table_maintenance/config.go @@ -0,0 +1,165 @@ +package table_maintenance + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with table maintenance specific settings +type Config struct { + base.BaseConfig + // ScanIntervalMinutes is how often to scan for maintenance needs + ScanIntervalMinutes int `json:"scan_interval_minutes"` + // CompactionFileThreshold is the minimum number of data files before compaction is triggered + CompactionFileThreshold int `json:"compaction_file_threshold"` + // SnapshotRetentionDays is how long to keep snapshots before expiration + SnapshotRetentionDays int `json:"snapshot_retention_days"` +} + +// NewDefaultConfig returns the default configuration for table maintenance +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30 * 60, // 30 minutes + MaxConcurrent: 2, + }, + ScanIntervalMinutes: 30, // Scan every 30 minutes + CompactionFileThreshold: 100, // Compact when > 100 small files + SnapshotRetentionDays: 7, // Keep snapshots for 7 days + } +} + +// ToTaskPolicy converts configuration to a TaskPolicy protobuf message +func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalMinutes * 60), + // Table maintenance doesn't have a specific protobuf config yet + // Would need to add TableMaintenanceTaskConfig to worker_pb + } +} + +// FromTaskPolicy loads configuration from a TaskPolicy protobuf message +func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) + c.ScanIntervalMinutes = int(policy.CheckIntervalSeconds / 60) + + return nil +} + +// LoadConfigFromPersistence loads configuration from the persistence layer if available +func LoadConfigFromPersistence(configPersistence interface{}) *Config { + config := NewDefaultConfig() + + // Try to load from persistence if available + if persistence, ok := configPersistence.(interface { + LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error) + }); ok { + if policy, err := persistence.LoadTableMaintenanceTaskPolicy(); err == nil && policy != nil { + if err := config.FromTaskPolicy(policy); err == nil { + glog.V(1).Infof("Loaded table_maintenance configuration from persistence") + return config + } + } + } + + glog.V(1).Infof("Using default table_maintenance configuration") + return config +} + +// GetConfigSpec returns the configuration specification for the UI +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Table Maintenance", + Description: "Whether table maintenance tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic table maintenance", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_minutes", + JSONName: "scan_interval_minutes", + Type: config.FieldTypeInt, + DefaultValue: 30, + MinValue: 5, + MaxValue: 1440, + Required: true, + DisplayName: "Scan Interval (minutes)", + Description: "How often to scan for tables needing maintenance", + HelpText: "Lower values mean faster detection but higher overhead", + Placeholder: "30", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "compaction_file_threshold", + JSONName: "compaction_file_threshold", + Type: config.FieldTypeInt, + DefaultValue: 100, + MinValue: 10, + MaxValue: 10000, + Required: true, + DisplayName: "Compaction File Threshold", + Description: "Number of small files that triggers compaction", + HelpText: "Tables with more small files than this will be scheduled for compaction", + Placeholder: "100", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "snapshot_retention_days", + JSONName: "snapshot_retention_days", + Type: config.FieldTypeInt, + DefaultValue: 7, + MinValue: 1, + MaxValue: 365, + Required: true, + DisplayName: "Snapshot Retention (days)", + Description: "How long to keep snapshots before expiration", + HelpText: "Snapshots older than this will be candidates for cleanup", + Placeholder: "7", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 10, + Required: true, + DisplayName: "Max Concurrent Jobs", + Description: "Maximum number of concurrent maintenance jobs", + HelpText: "Limits the number of maintenance operations running at the same time", + Placeholder: "2", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} diff --git a/weed/worker/tasks/table_maintenance/detection.go b/weed/worker/tasks/table_maintenance/detection.go new file mode 100644 index 000000000..275219ec8 --- /dev/null +++ b/weed/worker/tasks/table_maintenance/detection.go @@ -0,0 +1,194 @@ +package table_maintenance + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TableMaintenanceDetector implements types.TaskDetector for table maintenance +type TableMaintenanceDetector struct { + config *Config +} + +// NewTableMaintenanceDetector creates a new table maintenance detector +func NewTableMaintenanceDetector(config *Config) *TableMaintenanceDetector { + return &TableMaintenanceDetector{ + config: config, + } +} + +// ScanInterval returns how often to scan for table maintenance needs +func (d *TableMaintenanceDetector) ScanInterval() time.Duration { + if d.config != nil && d.config.ScanIntervalMinutes > 0 { + return time.Duration(d.config.ScanIntervalMinutes) * time.Minute + } + return 30 * time.Minute // Default: scan every 30 minutes +} + +// DetectTasks scans for tables that need maintenance +func (d *TableMaintenanceDetector) DetectTasks(metrics []*types.VolumeHealthMetrics) ([]*types.TaskDetectionResult, error) { + glog.V(2).Infof("Table maintenance detection starting") + + // Table maintenance doesn't use volume metrics - it scans table buckets + // The actual scanning is done by the admin server's table scanner + // Workers pick up jobs from the admin server's queue + + // This detector returns empty results because table maintenance jobs + // are created by the admin server's table scanner, not by volume metrics + return nil, nil +} + +// Detection is the function signature required by the task registration system +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + tableConfig, ok := config.(*Config) + if !ok { + tableConfig = NewDefaultConfig() + } + + detector := NewTableMaintenanceDetector(tableConfig) + return detector.DetectTasks(metrics) +} + +// TableMaintenanceScanner scans table buckets for maintenance needs +// This is called by the admin server to populate the maintenance queue +type TableMaintenanceScanner struct { + config *Config +} + +// NewTableMaintenanceScanner creates a new table maintenance scanner +func NewTableMaintenanceScanner(config *Config) *TableMaintenanceScanner { + return &TableMaintenanceScanner{ + config: config, + } +} + +// ScanTableBucket scans a table bucket for tables needing maintenance +// Returns a list of maintenance jobs that should be queued +func (s *TableMaintenanceScanner) ScanTableBucket(bucketName string, tables []TableInfo) ([]*TableMaintenanceJob, error) { + glog.V(1).Infof("Scanning table bucket %s for maintenance needs", bucketName) + + var jobs []*TableMaintenanceJob + + for _, table := range tables { + // Check each table for maintenance needs + tableJobs := s.checkTableMaintenanceNeeds(bucketName, table) + jobs = append(jobs, tableJobs...) + } + + glog.V(1).Infof("Found %d maintenance jobs for table bucket %s", len(jobs), bucketName) + return jobs, nil +} + +// TableInfo represents basic table information for scanning +type TableInfo struct { + Namespace string + TableName string + TablePath string + LastCompaction time.Time + DataFileCount int + SnapshotCount int + OldestSnapshot time.Time + TotalSizeBytes int64 + DeletedFileCount int +} + +// checkTableMaintenanceNeeds checks if a table needs maintenance +func (s *TableMaintenanceScanner) checkTableMaintenanceNeeds(bucketName string, table TableInfo) []*TableMaintenanceJob { + var jobs []*TableMaintenanceJob + now := time.Now() + + // Check for compaction needs + if s.needsCompaction(table) { + jobs = append(jobs, &TableMaintenanceJob{ + JobType: JobTypeCompaction, + TableBucket: bucketName, + Namespace: table.Namespace, + TableName: table.TableName, + TablePath: table.TablePath, + Priority: types.TaskPriorityNormal, + Reason: "Table has many small files that can be compacted", + CreatedAt: now, + }) + } + + // Check for snapshot expiration needs + if s.needsSnapshotExpiration(table) { + jobs = append(jobs, &TableMaintenanceJob{ + JobType: JobTypeSnapshotExpiration, + TableBucket: bucketName, + Namespace: table.Namespace, + TableName: table.TableName, + TablePath: table.TablePath, + Priority: types.TaskPriorityLow, + Reason: "Table has expired snapshots that can be removed", + CreatedAt: now, + }) + } + + // Check for orphan cleanup needs + if s.needsOrphanCleanup(table) { + jobs = append(jobs, &TableMaintenanceJob{ + JobType: JobTypeOrphanCleanup, + TableBucket: bucketName, + Namespace: table.Namespace, + TableName: table.TableName, + TablePath: table.TablePath, + Priority: types.TaskPriorityLow, + Reason: "Table has orphaned files that can be removed", + CreatedAt: now, + }) + } + + return jobs +} + +// needsCompaction checks if a table needs compaction +func (s *TableMaintenanceScanner) needsCompaction(table TableInfo) bool { + threshold := 100 // Default threshold + if s.config != nil && s.config.CompactionFileThreshold > 0 { + threshold = s.config.CompactionFileThreshold + } + return table.DataFileCount > threshold +} + +// needsSnapshotExpiration checks if a table has expired snapshots +func (s *TableMaintenanceScanner) needsSnapshotExpiration(table TableInfo) bool { + retentionDays := 7 // Default retention + if s.config != nil && s.config.SnapshotRetentionDays > 0 { + retentionDays = s.config.SnapshotRetentionDays + } + + if table.SnapshotCount <= 1 { + return false // Keep at least one snapshot + } + + cutoff := time.Now().AddDate(0, 0, -retentionDays) + return table.OldestSnapshot.Before(cutoff) +} + +// needsOrphanCleanup checks if a table might have orphaned files +func (s *TableMaintenanceScanner) needsOrphanCleanup(table TableInfo) bool { + // Tables with deleted files might have orphans + return table.DeletedFileCount > 0 +} + +// CreateTaskParams creates task parameters for a maintenance job +func CreateTaskParams(job *TableMaintenanceJob) *worker_pb.TaskParams { + return &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: job.TablePath, + }, + }, + VolumeId: 0, // Not volume-specific + Collection: job.TableBucket, + } +} diff --git a/weed/worker/tasks/table_maintenance/register.go b/weed/worker/tasks/table_maintenance/register.go new file mode 100644 index 000000000..e2a8b5f21 --- /dev/null +++ b/weed/worker/tasks/table_maintenance/register.go @@ -0,0 +1,103 @@ +package table_maintenance + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Global variable to hold the task definition for configuration updates +var globalTaskDef *base.TaskDefinition + +// Auto-register this task when the package is imported +func init() { + RegisterTableMaintenanceTask() + + // Register config updater + tasks.AutoRegisterConfigUpdater(types.TaskTypeTableMaintenance, UpdateConfigFromPersistence) +} + +// RegisterTableMaintenanceTask registers the table maintenance task with the task system +func RegisterTableMaintenanceTask() { + // Create configuration instance + config := NewDefaultConfig() + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeTableMaintenance, + Name: "table_maintenance", + DisplayName: "Table Maintenance", + Description: "Performs maintenance operations on S3 Table Buckets including compaction, snapshot expiration, and orphan cleanup", + Icon: "fas fa-table text-info", + Capabilities: []string{"table_maintenance", "iceberg", "s3tables"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) { + if params == nil { + return nil, fmt.Errorf("task parameters are required") + } + if len(params.Sources) == 0 { + return nil, fmt.Errorf("at least one source (table path) is required") + } + + // Parse table info from parameters + tablePath := params.Sources[0].Node + tableBucket := params.Collection + + // Create a default maintenance job (actual job type would come from queue) + job := &TableMaintenanceJob{ + JobType: JobTypeCompaction, + TableBucket: tableBucket, + TablePath: tablePath, + Priority: types.TaskPriorityNormal, + CreatedAt: time.Now(), + } + + return NewTableMaintenanceTask( + fmt.Sprintf("table-maintenance-%s-%d", tableBucket, time.Now().UnixNano()), + tableBucket, + "", // Namespace parsed from path if needed + "", // Table name parsed from path if needed + job, + ), nil + }, + DetectionFunc: Detection, + ScanInterval: 30 * time.Minute, + SchedulingFunc: Scheduling, + MaxConcurrent: 2, + RepeatInterval: 24 * time.Hour, + } + + // Store task definition globally for configuration updates + globalTaskDef = taskDef + + // Register everything with a single function call + base.RegisterTask(taskDef) + + glog.V(1).Infof("Registered table_maintenance task type") +} + +// UpdateConfigFromPersistence updates the table maintenance configuration from persistence +func UpdateConfigFromPersistence(configPersistence interface{}) error { + if globalTaskDef == nil { + return fmt.Errorf("table_maintenance task not registered") + } + + // Load configuration from persistence + newConfig := LoadConfigFromPersistence(configPersistence) + if newConfig == nil { + return fmt.Errorf("failed to load configuration from persistence") + } + + // Update the task definition's config + globalTaskDef.Config = newConfig + + glog.V(1).Infof("Updated table_maintenance task configuration from persistence") + return nil +} diff --git a/weed/worker/tasks/table_maintenance/scheduling.go b/weed/worker/tasks/table_maintenance/scheduling.go new file mode 100644 index 000000000..a2c49dacd --- /dev/null +++ b/weed/worker/tasks/table_maintenance/scheduling.go @@ -0,0 +1,99 @@ +package table_maintenance + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TableMaintenanceScheduler implements types.TaskScheduler for table maintenance +type TableMaintenanceScheduler struct { + config *Config +} + +// NewTableMaintenanceScheduler creates a new scheduler +func NewTableMaintenanceScheduler(config *Config) *TableMaintenanceScheduler { + return &TableMaintenanceScheduler{ + config: config, + } +} + +// ShouldSchedule determines if a task should be scheduled based on the detection result +func (s *TableMaintenanceScheduler) ShouldSchedule(result *types.TaskDetectionResult, lastExecution time.Time) bool { + // Don't schedule if disabled + if s.config != nil && !s.config.Enabled { + return false + } + + // Schedule if never executed + if lastExecution.IsZero() { + return true + } + + // Schedule based on repeat interval + repeatInterval := s.GetDefaultRepeatInterval() + return time.Since(lastExecution) >= repeatInterval +} + +// GetMaxConcurrent returns the maximum concurrent tasks +func (s *TableMaintenanceScheduler) GetMaxConcurrent() int { + if s.config != nil && s.config.MaxConcurrent > 0 { + return s.config.MaxConcurrent + } + return 2 // Default +} + +// GetDefaultRepeatInterval returns the default repeat interval between task executions +func (s *TableMaintenanceScheduler) GetDefaultRepeatInterval() time.Duration { + // Table maintenance can run more frequently than volume maintenance + return 24 * time.Hour // Once per day per table +} + +// Scheduling is the function signature required by the task registration system +func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool { + tableConfig, ok := config.(*Config) + if !ok { + tableConfig = NewDefaultConfig() + } + + // Count running table maintenance tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeTableMaintenance { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= tableConfig.MaxConcurrent { + return false + } + + // Check for available workers with table maintenance capability + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeTableMaintenance { + return true + } + } + } + } + + return false +} + +// CreateTaskFromDetectionResult creates typed task parameters from a detection result +func CreateTaskFromDetectionResult(result *types.TaskDetectionResult) *worker_pb.TaskParams { + // For table maintenance, the source is the table path + return &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: result.Server, // Table path + }, + }, + Collection: result.Collection, // Table bucket name + } +} diff --git a/weed/worker/tasks/table_maintenance/table_maintenance_task.go b/weed/worker/tasks/table_maintenance/table_maintenance_task.go new file mode 100644 index 000000000..c4675b3cd --- /dev/null +++ b/weed/worker/tasks/table_maintenance/table_maintenance_task.go @@ -0,0 +1,220 @@ +package table_maintenance + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/types" + "github.com/seaweedfs/seaweedfs/weed/worker/types/base" +) + +// TableMaintenanceTask handles maintenance operations for S3 Table Buckets +// This includes: +// - Iceberg table compaction +// - Snapshot expiration +// - Orphan file cleanup +// - Manifest optimization +type TableMaintenanceTask struct { + *base.BaseTask + TableBucket string + Namespace string + TableName string + MaintenanceJob *TableMaintenanceJob + status types.TaskStatus + startTime time.Time + progress float64 +} + +// TableMaintenanceJob represents a specific maintenance operation for a table +type TableMaintenanceJob struct { + JobType TableMaintenanceJobType `json:"job_type"` + TableBucket string `json:"table_bucket"` + Namespace string `json:"namespace"` + TableName string `json:"table_name"` + TablePath string `json:"table_path"` + Priority types.TaskPriority `json:"priority"` + Reason string `json:"reason"` + CreatedAt time.Time `json:"created_at"` + Params map[string]string `json:"params,omitempty"` +} + +// TableMaintenanceJobType represents different table maintenance operations +type TableMaintenanceJobType string + +const ( + // JobTypeCompaction compacts small data files into larger ones + JobTypeCompaction TableMaintenanceJobType = "compaction" + // JobTypeSnapshotExpiration removes expired snapshots + JobTypeSnapshotExpiration TableMaintenanceJobType = "snapshot_expiration" + // JobTypeOrphanCleanup removes orphaned data and metadata files + JobTypeOrphanCleanup TableMaintenanceJobType = "orphan_cleanup" + // JobTypeManifestRewrite rewrites manifest files for optimization + JobTypeManifestRewrite TableMaintenanceJobType = "manifest_rewrite" +) + +// NewTableMaintenanceTask creates a new table maintenance task +func NewTableMaintenanceTask(id, tableBucket, namespace, tableName string, job *TableMaintenanceJob) *TableMaintenanceTask { + return &TableMaintenanceTask{ + BaseTask: base.NewBaseTask(id, types.TaskTypeTableMaintenance), + TableBucket: tableBucket, + Namespace: namespace, + TableName: tableName, + MaintenanceJob: job, + status: types.TaskStatusPending, + } +} + +// Validate validates the task parameters +func (t *TableMaintenanceTask) Validate(params *worker_pb.TaskParams) error { + if params == nil { + return fmt.Errorf("task params cannot be nil") + } + return nil +} + +// EstimateTime estimates the time needed for the task +func (t *TableMaintenanceTask) EstimateTime(params *worker_pb.TaskParams) time.Duration { + // Estimate based on job type + switch t.MaintenanceJob.JobType { + case JobTypeCompaction: + return 5 * time.Minute + case JobTypeSnapshotExpiration: + return 1 * time.Minute + case JobTypeOrphanCleanup: + return 3 * time.Minute + case JobTypeManifestRewrite: + return 2 * time.Minute + default: + return 5 * time.Minute + } +} + +// GetID returns the task ID +func (t *TableMaintenanceTask) GetID() string { + return t.ID() +} + +// GetType returns the task type +func (t *TableMaintenanceTask) GetType() types.TaskType { + return types.TaskTypeTableMaintenance +} + +// GetStatus returns the current task status +func (t *TableMaintenanceTask) GetStatus() types.TaskStatus { + return t.status +} + +// GetProgress returns the task progress (0-100) +func (t *TableMaintenanceTask) GetProgress() float64 { + return t.progress +} + +// Execute runs the table maintenance task +func (t *TableMaintenanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { + t.status = types.TaskStatusInProgress + t.startTime = time.Now() + + glog.Infof("Starting table maintenance task %s: %s on %s/%s/%s", + t.ID(), t.MaintenanceJob.JobType, t.TableBucket, t.Namespace, t.TableName) + + defer func() { + if t.status == types.TaskStatusInProgress { + t.status = types.TaskStatusCompleted + } + glog.Infof("Table maintenance task %s completed with status: %s", t.ID(), t.status) + }() + + switch t.MaintenanceJob.JobType { + case JobTypeCompaction: + return t.executeCompaction(ctx) + case JobTypeSnapshotExpiration: + return t.executeSnapshotExpiration(ctx) + case JobTypeOrphanCleanup: + return t.executeOrphanCleanup(ctx) + case JobTypeManifestRewrite: + return t.executeManifestRewrite(ctx) + default: + t.status = types.TaskStatusFailed + return fmt.Errorf("unknown job type: %s", t.MaintenanceJob.JobType) + } +} + +// executeCompaction performs Iceberg table compaction +func (t *TableMaintenanceTask) executeCompaction(ctx context.Context) error { + t.progress = 10 + glog.V(1).Infof("Executing compaction for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName) + + // TODO: Implement actual Iceberg compaction logic + // This would: + // 1. Read current table metadata + // 2. Identify small data files that should be compacted + // 3. Create new compacted files + // 4. Update metadata to point to new files + // 5. Mark old files for deletion + + t.progress = 100 + return nil +} + +// executeSnapshotExpiration removes expired snapshots +func (t *TableMaintenanceTask) executeSnapshotExpiration(ctx context.Context) error { + t.progress = 10 + glog.V(1).Infof("Executing snapshot expiration for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName) + + // TODO: Implement snapshot expiration logic + // This would: + // 1. Read current table metadata + // 2. Identify snapshots older than retention period + // 3. Remove expired snapshot metadata + // 4. Identify files only referenced by expired snapshots + // 5. Mark those files for deletion + + t.progress = 100 + return nil +} + +// executeOrphanCleanup removes orphaned files +func (t *TableMaintenanceTask) executeOrphanCleanup(ctx context.Context) error { + t.progress = 10 + glog.V(1).Infof("Executing orphan cleanup for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName) + + // TODO: Implement orphan file cleanup logic + // This would: + // 1. List all files in data/ and metadata/ directories + // 2. Read current table metadata to get referenced files + // 3. Delete files not referenced by any snapshot + + t.progress = 100 + return nil +} + +// executeManifestRewrite optimizes manifest files +func (t *TableMaintenanceTask) executeManifestRewrite(ctx context.Context) error { + t.progress = 10 + glog.V(1).Infof("Executing manifest rewrite for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName) + + // TODO: Implement manifest rewrite logic + // This would: + // 1. Read current manifest files + // 2. Combine small manifests + // 3. Remove deleted file entries from manifests + // 4. Write optimized manifests + // 5. Update metadata to point to new manifests + + t.progress = 100 + return nil +} + +// Cancel cancels the task +func (t *TableMaintenanceTask) Cancel() error { + t.status = types.TaskStatusCancelled + return nil +} + +// Cleanup performs cleanup after task completion +func (t *TableMaintenanceTask) Cleanup() error { + return nil +} diff --git a/weed/worker/tasks/table_maintenance/table_maintenance_test.go b/weed/worker/tasks/table_maintenance/table_maintenance_test.go new file mode 100644 index 000000000..85616b9c3 --- /dev/null +++ b/weed/worker/tasks/table_maintenance/table_maintenance_test.go @@ -0,0 +1,250 @@ +package table_maintenance + +import ( + "context" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +func TestNewTableMaintenanceTask(t *testing.T) { + job := &TableMaintenanceJob{ + JobType: JobTypeCompaction, + TableBucket: "test-bucket", + Namespace: "test-namespace", + TableName: "test-table", + TablePath: "/table-buckets/test-bucket/test-namespace/test-table", + Priority: types.TaskPriorityNormal, + Reason: "Test job", + CreatedAt: time.Now(), + } + + task := NewTableMaintenanceTask("test-id", "test-bucket", "test-namespace", "test-table", job) + + if task == nil { + t.Fatal("Expected non-nil task") + } + if task.ID() != "test-id" { + t.Errorf("Expected ID 'test-id', got '%s'", task.ID()) + } + if task.Type() != types.TaskTypeTableMaintenance { + t.Errorf("Expected type TableMaintenance, got %v", task.Type()) + } + if task.TableBucket != "test-bucket" { + t.Errorf("Expected bucket 'test-bucket', got '%s'", task.TableBucket) + } + if task.GetStatus() != types.TaskStatusPending { + t.Errorf("Expected status Pending, got %v", task.GetStatus()) + } +} + +func TestTableMaintenanceTask_Validate(t *testing.T) { + job := &TableMaintenanceJob{ + JobType: JobTypeCompaction, + TableBucket: "test-bucket", + } + task := NewTableMaintenanceTask("test-id", "test-bucket", "ns", "table", job) + + // Test nil params + err := task.Validate(nil) + if err == nil { + t.Error("Expected error for nil params") + } + + // Test valid params + params := &worker_pb.TaskParams{ + Collection: "test-bucket", + } + err = task.Validate(params) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } +} + +func TestTableMaintenanceTask_EstimateTime(t *testing.T) { + testCases := []struct { + jobType TableMaintenanceJobType + expected time.Duration + }{ + {JobTypeCompaction, 5 * time.Minute}, + {JobTypeSnapshotExpiration, 1 * time.Minute}, + {JobTypeOrphanCleanup, 3 * time.Minute}, + {JobTypeManifestRewrite, 2 * time.Minute}, + } + + for _, tc := range testCases { + job := &TableMaintenanceJob{JobType: tc.jobType} + task := NewTableMaintenanceTask("test", "bucket", "ns", "table", job) + + estimate := task.EstimateTime(nil) + if estimate != tc.expected { + t.Errorf("For job type %s: expected %v, got %v", tc.jobType, tc.expected, estimate) + } + } +} + +func TestTableMaintenanceTask_Execute(t *testing.T) { + job := &TableMaintenanceJob{ + JobType: JobTypeCompaction, + TableBucket: "test-bucket", + Namespace: "test-namespace", + TableName: "test-table", + } + task := NewTableMaintenanceTask("test-id", "test-bucket", "test-namespace", "test-table", job) + + ctx := context.Background() + params := &worker_pb.TaskParams{} + + err := task.Execute(ctx, params) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if task.GetStatus() != types.TaskStatusCompleted { + t.Errorf("Expected status Completed, got %v", task.GetStatus()) + } + if task.GetProgress() != 100 { + t.Errorf("Expected progress 100, got %v", task.GetProgress()) + } +} + +func TestTableMaintenanceTask_Cancel(t *testing.T) { + job := &TableMaintenanceJob{JobType: JobTypeCompaction} + task := NewTableMaintenanceTask("test", "bucket", "ns", "table", job) + + err := task.Cancel() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if task.GetStatus() != types.TaskStatusCancelled { + t.Errorf("Expected status Cancelled, got %v", task.GetStatus()) + } +} + +func TestNewDefaultConfig(t *testing.T) { + config := NewDefaultConfig() + + if config == nil { + t.Fatal("Expected non-nil config") + } + if !config.Enabled { + t.Error("Expected enabled by default") + } + if config.ScanIntervalMinutes != 30 { + t.Errorf("Expected 30 minutes, got %d", config.ScanIntervalMinutes) + } + if config.CompactionFileThreshold != 100 { + t.Errorf("Expected 100 files, got %d", config.CompactionFileThreshold) + } + if config.SnapshotRetentionDays != 7 { + t.Errorf("Expected 7 days, got %d", config.SnapshotRetentionDays) + } +} + +func TestTableMaintenanceScanner_NeedsCompaction(t *testing.T) { + config := NewDefaultConfig() + scanner := NewTableMaintenanceScanner(config) + + // Table with few files - no compaction needed + table1 := TableInfo{ + DataFileCount: 50, + } + if scanner.needsCompaction(table1) { + t.Error("Should not need compaction with only 50 files") + } + + // Table with many files - needs compaction + table2 := TableInfo{ + DataFileCount: 150, + } + if !scanner.needsCompaction(table2) { + t.Error("Should need compaction with 150 files") + } +} + +func TestTableMaintenanceScanner_NeedsSnapshotExpiration(t *testing.T) { + config := NewDefaultConfig() + scanner := NewTableMaintenanceScanner(config) + + // Single snapshot - no expiration needed + table1 := TableInfo{ + SnapshotCount: 1, + OldestSnapshot: time.Now().AddDate(0, 0, -30), + } + if scanner.needsSnapshotExpiration(table1) { + t.Error("Should not expire the only snapshot") + } + + // Recent snapshots - no expiration needed + table2 := TableInfo{ + SnapshotCount: 5, + OldestSnapshot: time.Now().AddDate(0, 0, -3), + } + if scanner.needsSnapshotExpiration(table2) { + t.Error("Should not expire recent snapshots") + } + + // Old snapshots - expiration needed + table3 := TableInfo{ + SnapshotCount: 5, + OldestSnapshot: time.Now().AddDate(0, 0, -30), + } + if !scanner.needsSnapshotExpiration(table3) { + t.Error("Should expire old snapshots") + } +} + +func TestScheduling(t *testing.T) { + config := NewDefaultConfig() + + // Test with available workers + task := &types.TaskInput{ + Type: types.TaskTypeTableMaintenance, + } + runningTasks := []*types.TaskInput{} + workers := []*types.WorkerData{ + { + ID: "worker1", + CurrentLoad: 0, + MaxConcurrent: 5, + Capabilities: []types.TaskType{types.TaskTypeTableMaintenance}, + }, + } + + result := Scheduling(task, runningTasks, workers, config) + if !result { + t.Error("Should schedule when workers are available") + } + + // Test at max concurrent + runningTasks = []*types.TaskInput{ + {Type: types.TaskTypeTableMaintenance}, + {Type: types.TaskTypeTableMaintenance}, + } + result = Scheduling(task, runningTasks, workers, config) + if result { + t.Error("Should not schedule when at max concurrent") + } +} + +func TestGetConfigSpec(t *testing.T) { + spec := GetConfigSpec() + + if len(spec.Fields) == 0 { + t.Error("Expected non-empty config spec") + } + + // Check for expected fields + fieldNames := make(map[string]bool) + for _, field := range spec.Fields { + fieldNames[field.Name] = true + } + + expectedFields := []string{"enabled", "scan_interval_minutes", "compaction_file_threshold", "snapshot_retention_days", "max_concurrent"} + for _, name := range expectedFields { + if !fieldNames[name] { + t.Errorf("Missing expected field: %s", name) + } + } +} diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go index c4cafd07f..c4fce50f6 100644 --- a/weed/worker/types/task_types.go +++ b/weed/worker/types/task_types.go @@ -11,10 +11,11 @@ import ( type TaskType string const ( - TaskTypeVacuum TaskType = "vacuum" - TaskTypeErasureCoding TaskType = "erasure_coding" - TaskTypeBalance TaskType = "balance" - TaskTypeReplication TaskType = "replication" + TaskTypeVacuum TaskType = "vacuum" + TaskTypeErasureCoding TaskType = "erasure_coding" + TaskTypeBalance TaskType = "balance" + TaskTypeReplication TaskType = "replication" + TaskTypeTableMaintenance TaskType = "table_maintenance" ) // TaskStatus represents the status of a maintenance task