From 90277ceed52b6e79c48b6872ecadcb007afe61aa Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 21 Mar 2026 23:18:42 +0200 Subject: [PATCH] admin/plugin: migrate inline job details asynchronously to avoid slow startup (#8721) loadPersistedMonitorState performed a backward-compatibility migration that wrote every job with inline rich detail fields to a dedicated per-job detail file synchronously during startup. On deployments with many historical jobs (e.g. 1000+) stored on distributed block storage (e.g. Longhorn), each individual file write requires an fsync round-trip, making startup disproportionately slow and causing readiness/liveness probe failures. The in-memory state is populated correctly before the goroutine is started because stripTrackedJobDetailFields is still called in-place; only the disk writes are deferred. A completion log message at V(1) is emitted once the background migration finishes. Co-authored-by: Anton Ustyugov --- weed/admin/plugin/plugin_monitor.go | 32 ++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/weed/admin/plugin/plugin_monitor.go b/weed/admin/plugin/plugin_monitor.go index 332738a0d..73202d0a9 100644 --- a/weed/admin/plugin/plugin_monitor.go +++ b/weed/admin/plugin/plugin_monitor.go @@ -63,6 +63,12 @@ func (r *Plugin) loadPersistedMonitorState() error { } if len(trackedJobs) > 0 { + // Collect jobs that still carry inline rich details (old format) so we + // can migrate them to dedicated per-job detail files asynchronously. + // Writing many files synchronously during startup can be extremely slow + // on distributed storage (e.g. Longhorn), so we defer those writes. + var jobsToMigrate []TrackedJob + r.jobsMu.Lock() for i := range trackedJobs { job := trackedJobs[i] @@ -78,10 +84,9 @@ func (r *Plugin) loadPersistedMonitorState() error { } // Backward compatibility: migrate older inline detail payloads // out of tracked_jobs.json into dedicated per-job detail files. + // Collect for async migration instead of writing synchronously. if hasTrackedJobRichDetails(job) { - if err := r.store.SaveJobDetail(job); err != nil { - glog.Warningf("Plugin failed to migrate detail snapshot for job %s: %v", job.JobID, err) - } + jobsToMigrate = append(jobsToMigrate, job) } stripTrackedJobDetailFields(&job) jobCopy := job @@ -89,6 +94,10 @@ func (r *Plugin) loadPersistedMonitorState() error { } r.pruneTrackedJobsLocked() r.jobsMu.Unlock() + + if len(jobsToMigrate) > 0 { + go r.migrateInlineJobDetails(jobsToMigrate) + } } if len(activities) > maxActivityRecords { @@ -103,6 +112,23 @@ func (r *Plugin) loadPersistedMonitorState() error { return nil } +// migrateInlineJobDetails writes inline detail payloads from old-format +// tracked_jobs.json entries to dedicated per-job detail files. It is called +// asynchronously during startup to avoid blocking plugin initialisation. +func (r *Plugin) migrateInlineJobDetails(jobs []TrackedJob) { + migrated := 0 + for _, job := range jobs { + if err := r.store.SaveJobDetail(job); err != nil { + glog.Warningf("Plugin failed to migrate detail snapshot for job %s: %v", job.JobID, err) + } else { + migrated++ + } + } + if migrated > 0 { + glog.V(1).Infof("Plugin migrated %d inline job details to dedicated files", migrated) + } +} + // ExpireJob marks an active job as failed so it no longer blocks scheduling. func (r *Plugin) ExpireJob(jobID, reason string) (*TrackedJob, bool, error) { normalizedJobID := strings.TrimSpace(jobID)