From e687177457ee74a20600d5ae2ab15af761f91d21 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 31 Jan 2026 15:30:06 -0800 Subject: [PATCH] fix: address Gemini code review feedback - Critical: Add comment explaining job type handling in CreateTask (job type determined by table scanner, protobuf params to be added) - High: Fix Execute error handling - failed tasks now properly marked as Failed instead of being incorrectly marked as Completed by defer - Medium: Align ScanIntervalSeconds with RepeatInterval (24 hours) - Medium: Add warning logs when config persistence loading fails - Medium: Remove duplicated default values in detection.go, use config values directly from scanner --- weed/worker/tasks/table_maintenance/config.go | 10 +++++-- .../tasks/table_maintenance/detection.go | 15 +++------- .../tasks/table_maintenance/register.go | 10 +++++-- .../table_maintenance_task.go | 30 +++++++++++-------- 4 files changed, 37 insertions(+), 28 deletions(-) diff --git a/weed/worker/tasks/table_maintenance/config.go b/weed/worker/tasks/table_maintenance/config.go index 396d19e30..2c7e171e4 100644 --- a/weed/worker/tasks/table_maintenance/config.go +++ b/weed/worker/tasks/table_maintenance/config.go @@ -25,7 +25,7 @@ func NewDefaultConfig() *Config { return &Config{ BaseConfig: base.BaseConfig{ Enabled: true, - ScanIntervalSeconds: 30 * 60, // 30 minutes + ScanIntervalSeconds: 24 * 60 * 60, // 24 hours MaxConcurrent: 2, }, ScanIntervalMinutes: 30, // Scan every 30 minutes @@ -68,8 +68,12 @@ func LoadConfigFromPersistence(configPersistence interface{}) *Config { if persistence, ok := configPersistence.(interface { LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error) }); ok { - if policy, err := persistence.LoadTableMaintenanceTaskPolicy(); err == nil && policy != nil { - if err := config.FromTaskPolicy(policy); err == nil { + if policy, err := persistence.LoadTableMaintenanceTaskPolicy(); err != nil { + glog.Warningf("Failed to load table_maintenance configuration from persistence: %v", err) + } else if policy != nil { + if err := config.FromTaskPolicy(policy); err != nil { + glog.Warningf("Failed to parse table_maintenance configuration from persistence: %v", err) + } else { glog.V(1).Infof("Loaded table_maintenance configuration from persistence") return config } diff --git a/weed/worker/tasks/table_maintenance/detection.go b/weed/worker/tasks/table_maintenance/detection.go index 275219ec8..b251b5918 100644 --- a/weed/worker/tasks/table_maintenance/detection.go +++ b/weed/worker/tasks/table_maintenance/detection.go @@ -152,25 +152,18 @@ func (s *TableMaintenanceScanner) checkTableMaintenanceNeeds(bucketName string, // needsCompaction checks if a table needs compaction func (s *TableMaintenanceScanner) needsCompaction(table TableInfo) bool { - threshold := 100 // Default threshold - if s.config != nil && s.config.CompactionFileThreshold > 0 { - threshold = s.config.CompactionFileThreshold - } - return table.DataFileCount > threshold + // Use config value directly - config is always set by NewTableMaintenanceScanner + return table.DataFileCount > s.config.CompactionFileThreshold } // needsSnapshotExpiration checks if a table has expired snapshots func (s *TableMaintenanceScanner) needsSnapshotExpiration(table TableInfo) bool { - retentionDays := 7 // Default retention - if s.config != nil && s.config.SnapshotRetentionDays > 0 { - retentionDays = s.config.SnapshotRetentionDays - } - if table.SnapshotCount <= 1 { return false // Keep at least one snapshot } - cutoff := time.Now().AddDate(0, 0, -retentionDays) + // Use config value directly - config is always set by NewTableMaintenanceScanner + cutoff := time.Now().AddDate(0, 0, -s.config.SnapshotRetentionDays) return table.OldestSnapshot.Before(cutoff) } diff --git a/weed/worker/tasks/table_maintenance/register.go b/weed/worker/tasks/table_maintenance/register.go index e2a8b5f21..a1cfb9e70 100644 --- a/weed/worker/tasks/table_maintenance/register.go +++ b/weed/worker/tasks/table_maintenance/register.go @@ -50,9 +50,15 @@ func RegisterTableMaintenanceTask() { tablePath := params.Sources[0].Node tableBucket := params.Collection - // Create a default maintenance job (actual job type would come from queue) + // Parse job type from params if available + // TODO: Define TableMaintenanceTaskParams in protobuf to pass job type explicitly + // For now, default to compaction. In production, the job type would be determined + // by the table scanner based on the table's maintenance needs (see detection.go) + jobType := JobTypeCompaction + + // Create a default maintenance job job := &TableMaintenanceJob{ - JobType: JobTypeCompaction, + JobType: jobType, TableBucket: tableBucket, TablePath: tablePath, Priority: types.TaskPriorityNormal, diff --git a/weed/worker/tasks/table_maintenance/table_maintenance_task.go b/weed/worker/tasks/table_maintenance/table_maintenance_task.go index f9f82986e..f71ee5027 100644 --- a/weed/worker/tasks/table_maintenance/table_maintenance_task.go +++ b/weed/worker/tasks/table_maintenance/table_maintenance_task.go @@ -122,26 +122,32 @@ func (t *TableMaintenanceTask) Execute(ctx context.Context, params *worker_pb.Ta glog.Infof("Starting table maintenance task %s: %s on %s/%s/%s", t.ID(), t.MaintenanceJob.JobType, t.TableBucket, t.Namespace, t.TableName) - defer func() { - if t.status == types.TaskStatusInProgress { - t.status = types.TaskStatusCompleted - } - glog.Infof("Table maintenance task %s completed with status: %s", t.ID(), t.status) - }() - + // Execute the appropriate maintenance operation + var err error switch t.MaintenanceJob.JobType { case JobTypeCompaction: - return t.executeCompaction(ctx) + err = t.executeCompaction(ctx) case JobTypeSnapshotExpiration: - return t.executeSnapshotExpiration(ctx) + err = t.executeSnapshotExpiration(ctx) case JobTypeOrphanCleanup: - return t.executeOrphanCleanup(ctx) + err = t.executeOrphanCleanup(ctx) case JobTypeManifestRewrite: - return t.executeManifestRewrite(ctx) + err = t.executeManifestRewrite(ctx) default: t.status = types.TaskStatusFailed - return fmt.Errorf("unknown job type: %s", t.MaintenanceJob.JobType) + err = fmt.Errorf("unknown job type: %s", t.MaintenanceJob.JobType) } + + // Set status based on execution result + if err != nil { + t.status = types.TaskStatusFailed + } else if t.status == types.TaskStatusInProgress { + // Only mark as completed if no error and still in progress + t.status = types.TaskStatusCompleted + } + + glog.Infof("Table maintenance task %s completed with status: %s", t.ID(), t.status) + return err } // executeCompaction performs Iceberg table compaction