diff --git a/weed/admin/plugin/plugin_monitor.go b/weed/admin/plugin/plugin_monitor.go index 332738a0d..73202d0a9 100644 --- a/weed/admin/plugin/plugin_monitor.go +++ b/weed/admin/plugin/plugin_monitor.go @@ -63,6 +63,12 @@ func (r *Plugin) loadPersistedMonitorState() error { } if len(trackedJobs) > 0 { + // Collect jobs that still carry inline rich details (old format) so we + // can migrate them to dedicated per-job detail files asynchronously. + // Writing many files synchronously during startup can be extremely slow + // on distributed storage (e.g. Longhorn), so we defer those writes. + var jobsToMigrate []TrackedJob + r.jobsMu.Lock() for i := range trackedJobs { job := trackedJobs[i] @@ -78,10 +84,9 @@ func (r *Plugin) loadPersistedMonitorState() error { } // Backward compatibility: migrate older inline detail payloads // out of tracked_jobs.json into dedicated per-job detail files. + // Collect for async migration instead of writing synchronously. if hasTrackedJobRichDetails(job) { - if err := r.store.SaveJobDetail(job); err != nil { - glog.Warningf("Plugin failed to migrate detail snapshot for job %s: %v", job.JobID, err) - } + jobsToMigrate = append(jobsToMigrate, job) } stripTrackedJobDetailFields(&job) jobCopy := job @@ -89,6 +94,10 @@ func (r *Plugin) loadPersistedMonitorState() error { } r.pruneTrackedJobsLocked() r.jobsMu.Unlock() + + if len(jobsToMigrate) > 0 { + go r.migrateInlineJobDetails(jobsToMigrate) + } } if len(activities) > maxActivityRecords { @@ -103,6 +112,23 @@ func (r *Plugin) loadPersistedMonitorState() error { return nil } +// migrateInlineJobDetails writes inline detail payloads from old-format +// tracked_jobs.json entries to dedicated per-job detail files. It is called +// asynchronously during startup to avoid blocking plugin initialisation. +func (r *Plugin) migrateInlineJobDetails(jobs []TrackedJob) { + migrated := 0 + for _, job := range jobs { + if err := r.store.SaveJobDetail(job); err != nil { + glog.Warningf("Plugin failed to migrate detail snapshot for job %s: %v", job.JobID, err) + } else { + migrated++ + } + } + if migrated > 0 { + glog.V(1).Infof("Plugin migrated %d inline job details to dedicated files", migrated) + } +} + // ExpireJob marks an active job as failed so it no longer blocks scheduling. func (r *Plugin) ExpireJob(jobID, reason string) (*TrackedJob, bool, error) { normalizedJobID := strings.TrimSpace(jobID)