Browse Source

fix: address Gemini code review feedback

- Critical: Add comment explaining job type handling in CreateTask
  (job type determined by table scanner, protobuf params to be added)
- High: Fix Execute error handling - failed tasks now properly marked as Failed
  instead of being incorrectly marked as Completed by defer
- Medium: Align ScanIntervalSeconds with RepeatInterval (24 hours)
- Medium: Add warning logs when config persistence loading fails
- Medium: Remove duplicated default values in detection.go,
  use config values directly from scanner
pull/8177/head
Chris Lu 2 weeks ago
parent
commit
e687177457
  1. 10
      weed/worker/tasks/table_maintenance/config.go
  2. 15
      weed/worker/tasks/table_maintenance/detection.go
  3. 10
      weed/worker/tasks/table_maintenance/register.go
  4. 30
      weed/worker/tasks/table_maintenance/table_maintenance_task.go

10
weed/worker/tasks/table_maintenance/config.go

@ -25,7 +25,7 @@ func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 30 * 60, // 30 minutes
ScanIntervalSeconds: 24 * 60 * 60, // 24 hours
MaxConcurrent: 2,
},
ScanIntervalMinutes: 30, // Scan every 30 minutes
@ -68,8 +68,12 @@ func LoadConfigFromPersistence(configPersistence interface{}) *Config {
if persistence, ok := configPersistence.(interface {
LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error)
}); ok {
if policy, err := persistence.LoadTableMaintenanceTaskPolicy(); err == nil && policy != nil {
if err := config.FromTaskPolicy(policy); err == nil {
if policy, err := persistence.LoadTableMaintenanceTaskPolicy(); err != nil {
glog.Warningf("Failed to load table_maintenance configuration from persistence: %v", err)
} else if policy != nil {
if err := config.FromTaskPolicy(policy); err != nil {
glog.Warningf("Failed to parse table_maintenance configuration from persistence: %v", err)
} else {
glog.V(1).Infof("Loaded table_maintenance configuration from persistence")
return config
}

15
weed/worker/tasks/table_maintenance/detection.go

@ -152,25 +152,18 @@ func (s *TableMaintenanceScanner) checkTableMaintenanceNeeds(bucketName string,
// needsCompaction checks if a table needs compaction
func (s *TableMaintenanceScanner) needsCompaction(table TableInfo) bool {
threshold := 100 // Default threshold
if s.config != nil && s.config.CompactionFileThreshold > 0 {
threshold = s.config.CompactionFileThreshold
}
return table.DataFileCount > threshold
// Use config value directly - config is always set by NewTableMaintenanceScanner
return table.DataFileCount > s.config.CompactionFileThreshold
}
// needsSnapshotExpiration checks if a table has expired snapshots
func (s *TableMaintenanceScanner) needsSnapshotExpiration(table TableInfo) bool {
retentionDays := 7 // Default retention
if s.config != nil && s.config.SnapshotRetentionDays > 0 {
retentionDays = s.config.SnapshotRetentionDays
}
if table.SnapshotCount <= 1 {
return false // Keep at least one snapshot
}
cutoff := time.Now().AddDate(0, 0, -retentionDays)
// Use config value directly - config is always set by NewTableMaintenanceScanner
cutoff := time.Now().AddDate(0, 0, -s.config.SnapshotRetentionDays)
return table.OldestSnapshot.Before(cutoff)
}

10
weed/worker/tasks/table_maintenance/register.go

@ -50,9 +50,15 @@ func RegisterTableMaintenanceTask() {
tablePath := params.Sources[0].Node
tableBucket := params.Collection
// Create a default maintenance job (actual job type would come from queue)
// Parse job type from params if available
// TODO: Define TableMaintenanceTaskParams in protobuf to pass job type explicitly
// For now, default to compaction. In production, the job type would be determined
// by the table scanner based on the table's maintenance needs (see detection.go)
jobType := JobTypeCompaction
// Create a default maintenance job
job := &TableMaintenanceJob{
JobType: JobTypeCompaction,
JobType: jobType,
TableBucket: tableBucket,
TablePath: tablePath,
Priority: types.TaskPriorityNormal,

30
weed/worker/tasks/table_maintenance/table_maintenance_task.go

@ -122,26 +122,32 @@ func (t *TableMaintenanceTask) Execute(ctx context.Context, params *worker_pb.Ta
glog.Infof("Starting table maintenance task %s: %s on %s/%s/%s",
t.ID(), t.MaintenanceJob.JobType, t.TableBucket, t.Namespace, t.TableName)
defer func() {
if t.status == types.TaskStatusInProgress {
t.status = types.TaskStatusCompleted
}
glog.Infof("Table maintenance task %s completed with status: %s", t.ID(), t.status)
}()
// Execute the appropriate maintenance operation
var err error
switch t.MaintenanceJob.JobType {
case JobTypeCompaction:
return t.executeCompaction(ctx)
err = t.executeCompaction(ctx)
case JobTypeSnapshotExpiration:
return t.executeSnapshotExpiration(ctx)
err = t.executeSnapshotExpiration(ctx)
case JobTypeOrphanCleanup:
return t.executeOrphanCleanup(ctx)
err = t.executeOrphanCleanup(ctx)
case JobTypeManifestRewrite:
return t.executeManifestRewrite(ctx)
err = t.executeManifestRewrite(ctx)
default:
t.status = types.TaskStatusFailed
return fmt.Errorf("unknown job type: %s", t.MaintenanceJob.JobType)
err = fmt.Errorf("unknown job type: %s", t.MaintenanceJob.JobType)
}
// Set status based on execution result
if err != nil {
t.status = types.TaskStatusFailed
} else if t.status == types.TaskStatusInProgress {
// Only mark as completed if no error and still in progress
t.status = types.TaskStatusCompleted
}
glog.Infof("Table maintenance task %s completed with status: %s", t.ID(), t.status)
return err
}
// executeCompaction performs Iceberg table compaction

Loading…
Cancel
Save