Browse Source

feat(s3tables): add table maintenance job scanning and worker pickup

This implements a table maintenance task type for S3 Table Buckets that:

- Adds TaskTypeTableMaintenance to the worker task type system
- Creates table_maintenance package with complete task implementation:
  - Config: Configuration with scan interval, compaction threshold, retention
  - Detection: Scans table buckets for tables needing maintenance
  - Scheduling: Manages concurrent job limits and worker selection
  - Task execution: Handles compaction, snapshot expiration, orphan cleanup
- Registers table maintenance with the maintenance worker system
- Workers can pick up table maintenance jobs from admin server's queue

Maintenance job types supported:
- Compaction: Combine small data files into larger ones
- Snapshot expiration: Remove old snapshots past retention
- Orphan cleanup: Remove unreferenced data and metadata files
- Manifest rewrite: Optimize manifest files

Includes comprehensive test coverage for all components.
pull/8177/head
Chris Lu 2 weeks ago
parent
commit
5013bcec1a
  1. 1
      weed/admin/maintenance/maintenance_worker.go
  2. 165
      weed/worker/tasks/table_maintenance/config.go
  3. 194
      weed/worker/tasks/table_maintenance/detection.go
  4. 103
      weed/worker/tasks/table_maintenance/register.go
  5. 99
      weed/worker/tasks/table_maintenance/scheduling.go
  6. 220
      weed/worker/tasks/table_maintenance/table_maintenance_task.go
  7. 250
      weed/worker/tasks/table_maintenance/table_maintenance_test.go
  8. 9
      weed/worker/types/task_types.go

1
weed/admin/maintenance/maintenance_worker.go

@ -15,6 +15,7 @@ import (
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/table_maintenance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)

165
weed/worker/tasks/table_maintenance/config.go

@ -0,0 +1,165 @@
package table_maintenance
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with table maintenance specific settings
type Config struct {
base.BaseConfig
// ScanIntervalMinutes is how often to scan for maintenance needs
ScanIntervalMinutes int `json:"scan_interval_minutes"`
// CompactionFileThreshold is the minimum number of data files before compaction is triggered
CompactionFileThreshold int `json:"compaction_file_threshold"`
// SnapshotRetentionDays is how long to keep snapshots before expiration
SnapshotRetentionDays int `json:"snapshot_retention_days"`
}
// NewDefaultConfig returns the default configuration for table maintenance
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 30 * 60, // 30 minutes
MaxConcurrent: 2,
},
ScanIntervalMinutes: 30, // Scan every 30 minutes
CompactionFileThreshold: 100, // Compact when > 100 small files
SnapshotRetentionDays: 7, // Keep snapshots for 7 days
}
}
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
Enabled: c.Enabled,
MaxConcurrent: int32(c.MaxConcurrent),
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
CheckIntervalSeconds: int32(c.ScanIntervalMinutes * 60),
// Table maintenance doesn't have a specific protobuf config yet
// Would need to add TableMaintenanceTaskConfig to worker_pb
}
}
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
if policy == nil {
return fmt.Errorf("policy is nil")
}
c.Enabled = policy.Enabled
c.MaxConcurrent = int(policy.MaxConcurrent)
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
c.ScanIntervalMinutes = int(policy.CheckIntervalSeconds / 60)
return nil
}
// LoadConfigFromPersistence loads configuration from the persistence layer if available
func LoadConfigFromPersistence(configPersistence interface{}) *Config {
config := NewDefaultConfig()
// Try to load from persistence if available
if persistence, ok := configPersistence.(interface {
LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error)
}); ok {
if policy, err := persistence.LoadTableMaintenanceTaskPolicy(); err == nil && policy != nil {
if err := config.FromTaskPolicy(policy); err == nil {
glog.V(1).Infof("Loaded table_maintenance configuration from persistence")
return config
}
}
}
glog.V(1).Infof("Using default table_maintenance configuration")
return config
}
// GetConfigSpec returns the configuration specification for the UI
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Table Maintenance",
Description: "Whether table maintenance tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic table maintenance",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_minutes",
JSONName: "scan_interval_minutes",
Type: config.FieldTypeInt,
DefaultValue: 30,
MinValue: 5,
MaxValue: 1440,
Required: true,
DisplayName: "Scan Interval (minutes)",
Description: "How often to scan for tables needing maintenance",
HelpText: "Lower values mean faster detection but higher overhead",
Placeholder: "30",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "compaction_file_threshold",
JSONName: "compaction_file_threshold",
Type: config.FieldTypeInt,
DefaultValue: 100,
MinValue: 10,
MaxValue: 10000,
Required: true,
DisplayName: "Compaction File Threshold",
Description: "Number of small files that triggers compaction",
HelpText: "Tables with more small files than this will be scheduled for compaction",
Placeholder: "100",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "snapshot_retention_days",
JSONName: "snapshot_retention_days",
Type: config.FieldTypeInt,
DefaultValue: 7,
MinValue: 1,
MaxValue: 365,
Required: true,
DisplayName: "Snapshot Retention (days)",
Description: "How long to keep snapshots before expiration",
HelpText: "Snapshots older than this will be candidates for cleanup",
Placeholder: "7",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 2,
MinValue: 1,
MaxValue: 10,
Required: true,
DisplayName: "Max Concurrent Jobs",
Description: "Maximum number of concurrent maintenance jobs",
HelpText: "Limits the number of maintenance operations running at the same time",
Placeholder: "2",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}

194
weed/worker/tasks/table_maintenance/detection.go

@ -0,0 +1,194 @@
package table_maintenance
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// TableMaintenanceDetector implements types.TaskDetector for table maintenance
type TableMaintenanceDetector struct {
config *Config
}
// NewTableMaintenanceDetector creates a new table maintenance detector
func NewTableMaintenanceDetector(config *Config) *TableMaintenanceDetector {
return &TableMaintenanceDetector{
config: config,
}
}
// ScanInterval returns how often to scan for table maintenance needs
func (d *TableMaintenanceDetector) ScanInterval() time.Duration {
if d.config != nil && d.config.ScanIntervalMinutes > 0 {
return time.Duration(d.config.ScanIntervalMinutes) * time.Minute
}
return 30 * time.Minute // Default: scan every 30 minutes
}
// DetectTasks scans for tables that need maintenance
func (d *TableMaintenanceDetector) DetectTasks(metrics []*types.VolumeHealthMetrics) ([]*types.TaskDetectionResult, error) {
glog.V(2).Infof("Table maintenance detection starting")
// Table maintenance doesn't use volume metrics - it scans table buckets
// The actual scanning is done by the admin server's table scanner
// Workers pick up jobs from the admin server's queue
// This detector returns empty results because table maintenance jobs
// are created by the admin server's table scanner, not by volume metrics
return nil, nil
}
// Detection is the function signature required by the task registration system
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
}
tableConfig, ok := config.(*Config)
if !ok {
tableConfig = NewDefaultConfig()
}
detector := NewTableMaintenanceDetector(tableConfig)
return detector.DetectTasks(metrics)
}
// TableMaintenanceScanner scans table buckets for maintenance needs
// This is called by the admin server to populate the maintenance queue
type TableMaintenanceScanner struct {
config *Config
}
// NewTableMaintenanceScanner creates a new table maintenance scanner
func NewTableMaintenanceScanner(config *Config) *TableMaintenanceScanner {
return &TableMaintenanceScanner{
config: config,
}
}
// ScanTableBucket scans a table bucket for tables needing maintenance
// Returns a list of maintenance jobs that should be queued
func (s *TableMaintenanceScanner) ScanTableBucket(bucketName string, tables []TableInfo) ([]*TableMaintenanceJob, error) {
glog.V(1).Infof("Scanning table bucket %s for maintenance needs", bucketName)
var jobs []*TableMaintenanceJob
for _, table := range tables {
// Check each table for maintenance needs
tableJobs := s.checkTableMaintenanceNeeds(bucketName, table)
jobs = append(jobs, tableJobs...)
}
glog.V(1).Infof("Found %d maintenance jobs for table bucket %s", len(jobs), bucketName)
return jobs, nil
}
// TableInfo represents basic table information for scanning
type TableInfo struct {
Namespace string
TableName string
TablePath string
LastCompaction time.Time
DataFileCount int
SnapshotCount int
OldestSnapshot time.Time
TotalSizeBytes int64
DeletedFileCount int
}
// checkTableMaintenanceNeeds checks if a table needs maintenance
func (s *TableMaintenanceScanner) checkTableMaintenanceNeeds(bucketName string, table TableInfo) []*TableMaintenanceJob {
var jobs []*TableMaintenanceJob
now := time.Now()
// Check for compaction needs
if s.needsCompaction(table) {
jobs = append(jobs, &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: bucketName,
Namespace: table.Namespace,
TableName: table.TableName,
TablePath: table.TablePath,
Priority: types.TaskPriorityNormal,
Reason: "Table has many small files that can be compacted",
CreatedAt: now,
})
}
// Check for snapshot expiration needs
if s.needsSnapshotExpiration(table) {
jobs = append(jobs, &TableMaintenanceJob{
JobType: JobTypeSnapshotExpiration,
TableBucket: bucketName,
Namespace: table.Namespace,
TableName: table.TableName,
TablePath: table.TablePath,
Priority: types.TaskPriorityLow,
Reason: "Table has expired snapshots that can be removed",
CreatedAt: now,
})
}
// Check for orphan cleanup needs
if s.needsOrphanCleanup(table) {
jobs = append(jobs, &TableMaintenanceJob{
JobType: JobTypeOrphanCleanup,
TableBucket: bucketName,
Namespace: table.Namespace,
TableName: table.TableName,
TablePath: table.TablePath,
Priority: types.TaskPriorityLow,
Reason: "Table has orphaned files that can be removed",
CreatedAt: now,
})
}
return jobs
}
// needsCompaction checks if a table needs compaction
func (s *TableMaintenanceScanner) needsCompaction(table TableInfo) bool {
threshold := 100 // Default threshold
if s.config != nil && s.config.CompactionFileThreshold > 0 {
threshold = s.config.CompactionFileThreshold
}
return table.DataFileCount > threshold
}
// needsSnapshotExpiration checks if a table has expired snapshots
func (s *TableMaintenanceScanner) needsSnapshotExpiration(table TableInfo) bool {
retentionDays := 7 // Default retention
if s.config != nil && s.config.SnapshotRetentionDays > 0 {
retentionDays = s.config.SnapshotRetentionDays
}
if table.SnapshotCount <= 1 {
return false // Keep at least one snapshot
}
cutoff := time.Now().AddDate(0, 0, -retentionDays)
return table.OldestSnapshot.Before(cutoff)
}
// needsOrphanCleanup checks if a table might have orphaned files
func (s *TableMaintenanceScanner) needsOrphanCleanup(table TableInfo) bool {
// Tables with deleted files might have orphans
return table.DeletedFileCount > 0
}
// CreateTaskParams creates task parameters for a maintenance job
func CreateTaskParams(job *TableMaintenanceJob) *worker_pb.TaskParams {
return &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: job.TablePath,
},
},
VolumeId: 0, // Not volume-specific
Collection: job.TableBucket,
}
}

103
weed/worker/tasks/table_maintenance/register.go

@ -0,0 +1,103 @@
package table_maintenance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Global variable to hold the task definition for configuration updates
var globalTaskDef *base.TaskDefinition
// Auto-register this task when the package is imported
func init() {
RegisterTableMaintenanceTask()
// Register config updater
tasks.AutoRegisterConfigUpdater(types.TaskTypeTableMaintenance, UpdateConfigFromPersistence)
}
// RegisterTableMaintenanceTask registers the table maintenance task with the task system
func RegisterTableMaintenanceTask() {
// Create configuration instance
config := NewDefaultConfig()
// Create complete task definition
taskDef := &base.TaskDefinition{
Type: types.TaskTypeTableMaintenance,
Name: "table_maintenance",
DisplayName: "Table Maintenance",
Description: "Performs maintenance operations on S3 Table Buckets including compaction, snapshot expiration, and orphan cleanup",
Icon: "fas fa-table text-info",
Capabilities: []string{"table_maintenance", "iceberg", "s3tables"},
Config: config,
ConfigSpec: GetConfigSpec(),
CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
if params == nil {
return nil, fmt.Errorf("task parameters are required")
}
if len(params.Sources) == 0 {
return nil, fmt.Errorf("at least one source (table path) is required")
}
// Parse table info from parameters
tablePath := params.Sources[0].Node
tableBucket := params.Collection
// Create a default maintenance job (actual job type would come from queue)
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: tableBucket,
TablePath: tablePath,
Priority: types.TaskPriorityNormal,
CreatedAt: time.Now(),
}
return NewTableMaintenanceTask(
fmt.Sprintf("table-maintenance-%s-%d", tableBucket, time.Now().UnixNano()),
tableBucket,
"", // Namespace parsed from path if needed
"", // Table name parsed from path if needed
job,
), nil
},
DetectionFunc: Detection,
ScanInterval: 30 * time.Minute,
SchedulingFunc: Scheduling,
MaxConcurrent: 2,
RepeatInterval: 24 * time.Hour,
}
// Store task definition globally for configuration updates
globalTaskDef = taskDef
// Register everything with a single function call
base.RegisterTask(taskDef)
glog.V(1).Infof("Registered table_maintenance task type")
}
// UpdateConfigFromPersistence updates the table maintenance configuration from persistence
func UpdateConfigFromPersistence(configPersistence interface{}) error {
if globalTaskDef == nil {
return fmt.Errorf("table_maintenance task not registered")
}
// Load configuration from persistence
newConfig := LoadConfigFromPersistence(configPersistence)
if newConfig == nil {
return fmt.Errorf("failed to load configuration from persistence")
}
// Update the task definition's config
globalTaskDef.Config = newConfig
glog.V(1).Infof("Updated table_maintenance task configuration from persistence")
return nil
}

99
weed/worker/tasks/table_maintenance/scheduling.go

@ -0,0 +1,99 @@
package table_maintenance
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// TableMaintenanceScheduler implements types.TaskScheduler for table maintenance
type TableMaintenanceScheduler struct {
config *Config
}
// NewTableMaintenanceScheduler creates a new scheduler
func NewTableMaintenanceScheduler(config *Config) *TableMaintenanceScheduler {
return &TableMaintenanceScheduler{
config: config,
}
}
// ShouldSchedule determines if a task should be scheduled based on the detection result
func (s *TableMaintenanceScheduler) ShouldSchedule(result *types.TaskDetectionResult, lastExecution time.Time) bool {
// Don't schedule if disabled
if s.config != nil && !s.config.Enabled {
return false
}
// Schedule if never executed
if lastExecution.IsZero() {
return true
}
// Schedule based on repeat interval
repeatInterval := s.GetDefaultRepeatInterval()
return time.Since(lastExecution) >= repeatInterval
}
// GetMaxConcurrent returns the maximum concurrent tasks
func (s *TableMaintenanceScheduler) GetMaxConcurrent() int {
if s.config != nil && s.config.MaxConcurrent > 0 {
return s.config.MaxConcurrent
}
return 2 // Default
}
// GetDefaultRepeatInterval returns the default repeat interval between task executions
func (s *TableMaintenanceScheduler) GetDefaultRepeatInterval() time.Duration {
// Table maintenance can run more frequently than volume maintenance
return 24 * time.Hour // Once per day per table
}
// Scheduling is the function signature required by the task registration system
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
tableConfig, ok := config.(*Config)
if !ok {
tableConfig = NewDefaultConfig()
}
// Count running table maintenance tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeTableMaintenance {
runningCount++
}
}
// Check concurrency limit
if runningCount >= tableConfig.MaxConcurrent {
return false
}
// Check for available workers with table maintenance capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeTableMaintenance {
return true
}
}
}
}
return false
}
// CreateTaskFromDetectionResult creates typed task parameters from a detection result
func CreateTaskFromDetectionResult(result *types.TaskDetectionResult) *worker_pb.TaskParams {
// For table maintenance, the source is the table path
return &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: result.Server, // Table path
},
},
Collection: result.Collection, // Table bucket name
}
}

220
weed/worker/tasks/table_maintenance/table_maintenance_task.go

@ -0,0 +1,220 @@
package table_maintenance
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
)
// TableMaintenanceTask handles maintenance operations for S3 Table Buckets
// This includes:
// - Iceberg table compaction
// - Snapshot expiration
// - Orphan file cleanup
// - Manifest optimization
type TableMaintenanceTask struct {
*base.BaseTask
TableBucket string
Namespace string
TableName string
MaintenanceJob *TableMaintenanceJob
status types.TaskStatus
startTime time.Time
progress float64
}
// TableMaintenanceJob represents a specific maintenance operation for a table
type TableMaintenanceJob struct {
JobType TableMaintenanceJobType `json:"job_type"`
TableBucket string `json:"table_bucket"`
Namespace string `json:"namespace"`
TableName string `json:"table_name"`
TablePath string `json:"table_path"`
Priority types.TaskPriority `json:"priority"`
Reason string `json:"reason"`
CreatedAt time.Time `json:"created_at"`
Params map[string]string `json:"params,omitempty"`
}
// TableMaintenanceJobType represents different table maintenance operations
type TableMaintenanceJobType string
const (
// JobTypeCompaction compacts small data files into larger ones
JobTypeCompaction TableMaintenanceJobType = "compaction"
// JobTypeSnapshotExpiration removes expired snapshots
JobTypeSnapshotExpiration TableMaintenanceJobType = "snapshot_expiration"
// JobTypeOrphanCleanup removes orphaned data and metadata files
JobTypeOrphanCleanup TableMaintenanceJobType = "orphan_cleanup"
// JobTypeManifestRewrite rewrites manifest files for optimization
JobTypeManifestRewrite TableMaintenanceJobType = "manifest_rewrite"
)
// NewTableMaintenanceTask creates a new table maintenance task
func NewTableMaintenanceTask(id, tableBucket, namespace, tableName string, job *TableMaintenanceJob) *TableMaintenanceTask {
return &TableMaintenanceTask{
BaseTask: base.NewBaseTask(id, types.TaskTypeTableMaintenance),
TableBucket: tableBucket,
Namespace: namespace,
TableName: tableName,
MaintenanceJob: job,
status: types.TaskStatusPending,
}
}
// Validate validates the task parameters
func (t *TableMaintenanceTask) Validate(params *worker_pb.TaskParams) error {
if params == nil {
return fmt.Errorf("task params cannot be nil")
}
return nil
}
// EstimateTime estimates the time needed for the task
func (t *TableMaintenanceTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
// Estimate based on job type
switch t.MaintenanceJob.JobType {
case JobTypeCompaction:
return 5 * time.Minute
case JobTypeSnapshotExpiration:
return 1 * time.Minute
case JobTypeOrphanCleanup:
return 3 * time.Minute
case JobTypeManifestRewrite:
return 2 * time.Minute
default:
return 5 * time.Minute
}
}
// GetID returns the task ID
func (t *TableMaintenanceTask) GetID() string {
return t.ID()
}
// GetType returns the task type
func (t *TableMaintenanceTask) GetType() types.TaskType {
return types.TaskTypeTableMaintenance
}
// GetStatus returns the current task status
func (t *TableMaintenanceTask) GetStatus() types.TaskStatus {
return t.status
}
// GetProgress returns the task progress (0-100)
func (t *TableMaintenanceTask) GetProgress() float64 {
return t.progress
}
// Execute runs the table maintenance task
func (t *TableMaintenanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
t.status = types.TaskStatusInProgress
t.startTime = time.Now()
glog.Infof("Starting table maintenance task %s: %s on %s/%s/%s",
t.ID(), t.MaintenanceJob.JobType, t.TableBucket, t.Namespace, t.TableName)
defer func() {
if t.status == types.TaskStatusInProgress {
t.status = types.TaskStatusCompleted
}
glog.Infof("Table maintenance task %s completed with status: %s", t.ID(), t.status)
}()
switch t.MaintenanceJob.JobType {
case JobTypeCompaction:
return t.executeCompaction(ctx)
case JobTypeSnapshotExpiration:
return t.executeSnapshotExpiration(ctx)
case JobTypeOrphanCleanup:
return t.executeOrphanCleanup(ctx)
case JobTypeManifestRewrite:
return t.executeManifestRewrite(ctx)
default:
t.status = types.TaskStatusFailed
return fmt.Errorf("unknown job type: %s", t.MaintenanceJob.JobType)
}
}
// executeCompaction performs Iceberg table compaction
func (t *TableMaintenanceTask) executeCompaction(ctx context.Context) error {
t.progress = 10
glog.V(1).Infof("Executing compaction for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
// TODO: Implement actual Iceberg compaction logic
// This would:
// 1. Read current table metadata
// 2. Identify small data files that should be compacted
// 3. Create new compacted files
// 4. Update metadata to point to new files
// 5. Mark old files for deletion
t.progress = 100
return nil
}
// executeSnapshotExpiration removes expired snapshots
func (t *TableMaintenanceTask) executeSnapshotExpiration(ctx context.Context) error {
t.progress = 10
glog.V(1).Infof("Executing snapshot expiration for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
// TODO: Implement snapshot expiration logic
// This would:
// 1. Read current table metadata
// 2. Identify snapshots older than retention period
// 3. Remove expired snapshot metadata
// 4. Identify files only referenced by expired snapshots
// 5. Mark those files for deletion
t.progress = 100
return nil
}
// executeOrphanCleanup removes orphaned files
func (t *TableMaintenanceTask) executeOrphanCleanup(ctx context.Context) error {
t.progress = 10
glog.V(1).Infof("Executing orphan cleanup for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
// TODO: Implement orphan file cleanup logic
// This would:
// 1. List all files in data/ and metadata/ directories
// 2. Read current table metadata to get referenced files
// 3. Delete files not referenced by any snapshot
t.progress = 100
return nil
}
// executeManifestRewrite optimizes manifest files
func (t *TableMaintenanceTask) executeManifestRewrite(ctx context.Context) error {
t.progress = 10
glog.V(1).Infof("Executing manifest rewrite for table %s/%s/%s", t.TableBucket, t.Namespace, t.TableName)
// TODO: Implement manifest rewrite logic
// This would:
// 1. Read current manifest files
// 2. Combine small manifests
// 3. Remove deleted file entries from manifests
// 4. Write optimized manifests
// 5. Update metadata to point to new manifests
t.progress = 100
return nil
}
// Cancel cancels the task
func (t *TableMaintenanceTask) Cancel() error {
t.status = types.TaskStatusCancelled
return nil
}
// Cleanup performs cleanup after task completion
func (t *TableMaintenanceTask) Cleanup() error {
return nil
}

250
weed/worker/tasks/table_maintenance/table_maintenance_test.go

@ -0,0 +1,250 @@
package table_maintenance
import (
"context"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
func TestNewTableMaintenanceTask(t *testing.T) {
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
TablePath: "/table-buckets/test-bucket/test-namespace/test-table",
Priority: types.TaskPriorityNormal,
Reason: "Test job",
CreatedAt: time.Now(),
}
task := NewTableMaintenanceTask("test-id", "test-bucket", "test-namespace", "test-table", job)
if task == nil {
t.Fatal("Expected non-nil task")
}
if task.ID() != "test-id" {
t.Errorf("Expected ID 'test-id', got '%s'", task.ID())
}
if task.Type() != types.TaskTypeTableMaintenance {
t.Errorf("Expected type TableMaintenance, got %v", task.Type())
}
if task.TableBucket != "test-bucket" {
t.Errorf("Expected bucket 'test-bucket', got '%s'", task.TableBucket)
}
if task.GetStatus() != types.TaskStatusPending {
t.Errorf("Expected status Pending, got %v", task.GetStatus())
}
}
func TestTableMaintenanceTask_Validate(t *testing.T) {
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
}
task := NewTableMaintenanceTask("test-id", "test-bucket", "ns", "table", job)
// Test nil params
err := task.Validate(nil)
if err == nil {
t.Error("Expected error for nil params")
}
// Test valid params
params := &worker_pb.TaskParams{
Collection: "test-bucket",
}
err = task.Validate(params)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
func TestTableMaintenanceTask_EstimateTime(t *testing.T) {
testCases := []struct {
jobType TableMaintenanceJobType
expected time.Duration
}{
{JobTypeCompaction, 5 * time.Minute},
{JobTypeSnapshotExpiration, 1 * time.Minute},
{JobTypeOrphanCleanup, 3 * time.Minute},
{JobTypeManifestRewrite, 2 * time.Minute},
}
for _, tc := range testCases {
job := &TableMaintenanceJob{JobType: tc.jobType}
task := NewTableMaintenanceTask("test", "bucket", "ns", "table", job)
estimate := task.EstimateTime(nil)
if estimate != tc.expected {
t.Errorf("For job type %s: expected %v, got %v", tc.jobType, tc.expected, estimate)
}
}
}
func TestTableMaintenanceTask_Execute(t *testing.T) {
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
TableBucket: "test-bucket",
Namespace: "test-namespace",
TableName: "test-table",
}
task := NewTableMaintenanceTask("test-id", "test-bucket", "test-namespace", "test-table", job)
ctx := context.Background()
params := &worker_pb.TaskParams{}
err := task.Execute(ctx, params)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if task.GetStatus() != types.TaskStatusCompleted {
t.Errorf("Expected status Completed, got %v", task.GetStatus())
}
if task.GetProgress() != 100 {
t.Errorf("Expected progress 100, got %v", task.GetProgress())
}
}
func TestTableMaintenanceTask_Cancel(t *testing.T) {
job := &TableMaintenanceJob{JobType: JobTypeCompaction}
task := NewTableMaintenanceTask("test", "bucket", "ns", "table", job)
err := task.Cancel()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if task.GetStatus() != types.TaskStatusCancelled {
t.Errorf("Expected status Cancelled, got %v", task.GetStatus())
}
}
func TestNewDefaultConfig(t *testing.T) {
config := NewDefaultConfig()
if config == nil {
t.Fatal("Expected non-nil config")
}
if !config.Enabled {
t.Error("Expected enabled by default")
}
if config.ScanIntervalMinutes != 30 {
t.Errorf("Expected 30 minutes, got %d", config.ScanIntervalMinutes)
}
if config.CompactionFileThreshold != 100 {
t.Errorf("Expected 100 files, got %d", config.CompactionFileThreshold)
}
if config.SnapshotRetentionDays != 7 {
t.Errorf("Expected 7 days, got %d", config.SnapshotRetentionDays)
}
}
func TestTableMaintenanceScanner_NeedsCompaction(t *testing.T) {
config := NewDefaultConfig()
scanner := NewTableMaintenanceScanner(config)
// Table with few files - no compaction needed
table1 := TableInfo{
DataFileCount: 50,
}
if scanner.needsCompaction(table1) {
t.Error("Should not need compaction with only 50 files")
}
// Table with many files - needs compaction
table2 := TableInfo{
DataFileCount: 150,
}
if !scanner.needsCompaction(table2) {
t.Error("Should need compaction with 150 files")
}
}
func TestTableMaintenanceScanner_NeedsSnapshotExpiration(t *testing.T) {
config := NewDefaultConfig()
scanner := NewTableMaintenanceScanner(config)
// Single snapshot - no expiration needed
table1 := TableInfo{
SnapshotCount: 1,
OldestSnapshot: time.Now().AddDate(0, 0, -30),
}
if scanner.needsSnapshotExpiration(table1) {
t.Error("Should not expire the only snapshot")
}
// Recent snapshots - no expiration needed
table2 := TableInfo{
SnapshotCount: 5,
OldestSnapshot: time.Now().AddDate(0, 0, -3),
}
if scanner.needsSnapshotExpiration(table2) {
t.Error("Should not expire recent snapshots")
}
// Old snapshots - expiration needed
table3 := TableInfo{
SnapshotCount: 5,
OldestSnapshot: time.Now().AddDate(0, 0, -30),
}
if !scanner.needsSnapshotExpiration(table3) {
t.Error("Should expire old snapshots")
}
}
func TestScheduling(t *testing.T) {
config := NewDefaultConfig()
// Test with available workers
task := &types.TaskInput{
Type: types.TaskTypeTableMaintenance,
}
runningTasks := []*types.TaskInput{}
workers := []*types.WorkerData{
{
ID: "worker1",
CurrentLoad: 0,
MaxConcurrent: 5,
Capabilities: []types.TaskType{types.TaskTypeTableMaintenance},
},
}
result := Scheduling(task, runningTasks, workers, config)
if !result {
t.Error("Should schedule when workers are available")
}
// Test at max concurrent
runningTasks = []*types.TaskInput{
{Type: types.TaskTypeTableMaintenance},
{Type: types.TaskTypeTableMaintenance},
}
result = Scheduling(task, runningTasks, workers, config)
if result {
t.Error("Should not schedule when at max concurrent")
}
}
func TestGetConfigSpec(t *testing.T) {
spec := GetConfigSpec()
if len(spec.Fields) == 0 {
t.Error("Expected non-empty config spec")
}
// Check for expected fields
fieldNames := make(map[string]bool)
for _, field := range spec.Fields {
fieldNames[field.Name] = true
}
expectedFields := []string{"enabled", "scan_interval_minutes", "compaction_file_threshold", "snapshot_retention_days", "max_concurrent"}
for _, name := range expectedFields {
if !fieldNames[name] {
t.Errorf("Missing expected field: %s", name)
}
}
}

9
weed/worker/types/task_types.go

@ -11,10 +11,11 @@ import (
type TaskType string
const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeBalance TaskType = "balance"
TaskTypeReplication TaskType = "replication"
TaskTypeVacuum TaskType = "vacuum"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeBalance TaskType = "balance"
TaskTypeReplication TaskType = "replication"
TaskTypeTableMaintenance TaskType = "table_maintenance"
)
// TaskStatus represents the status of a maintenance task

Loading…
Cancel
Save