package plugin import ( "encoding/json" "sort" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "google.golang.org/protobuf/encoding/protojson" ) const ( maxTrackedJobsTotal = 1000 maxActivityRecords = 4000 maxRelatedJobs = 100 ) var ( StateSucceeded = strings.ToLower(plugin_pb.JobState_JOB_STATE_SUCCEEDED.String()) StateFailed = strings.ToLower(plugin_pb.JobState_JOB_STATE_FAILED.String()) StateCanceled = strings.ToLower(plugin_pb.JobState_JOB_STATE_CANCELED.String()) ) // activityLess reports whether activity a occurred after activity b (newest-first order). // A nil OccurredAt is treated as the zero time. func activityLess(a, b JobActivity) bool { ta := time.Time{} if a.OccurredAt != nil { ta = *a.OccurredAt } tb := time.Time{} if b.OccurredAt != nil { tb = *b.OccurredAt } return ta.After(tb) } func (r *Plugin) loadPersistedMonitorState() error { trackedJobs, err := r.store.LoadTrackedJobs() if err != nil { return err } activities, err := r.store.LoadActivities() if err != nil { return err } if len(trackedJobs) > 0 { r.jobsMu.Lock() for i := range trackedJobs { job := trackedJobs[i] if strings.TrimSpace(job.JobID) == "" { continue } // Backward compatibility: migrate older inline detail payloads // out of tracked_jobs.json into dedicated per-job detail files. if hasTrackedJobRichDetails(job) { if err := r.store.SaveJobDetail(job); err != nil { glog.Warningf("Plugin failed to migrate detail snapshot for job %s: %v", job.JobID, err) } } stripTrackedJobDetailFields(&job) jobCopy := job r.jobs[job.JobID] = &jobCopy } r.pruneTrackedJobsLocked() r.jobsMu.Unlock() } if len(activities) > maxActivityRecords { activities = activities[len(activities)-maxActivityRecords:] } if len(activities) > 0 { r.activitiesMu.Lock() r.activities = append([]JobActivity(nil), activities...) r.activitiesMu.Unlock() } return nil } func (r *Plugin) ListTrackedJobs(jobType string, state string, limit int) []TrackedJob { r.jobsMu.RLock() defer r.jobsMu.RUnlock() normalizedJobType := strings.TrimSpace(jobType) normalizedState := strings.TrimSpace(strings.ToLower(state)) items := make([]TrackedJob, 0, len(r.jobs)) for _, job := range r.jobs { if job == nil { continue } if normalizedJobType != "" && job.JobType != normalizedJobType { continue } if normalizedState != "" && strings.ToLower(job.State) != normalizedState { continue } items = append(items, cloneTrackedJob(*job)) } sort.Slice(items, func(i, j int) bool { ti := time.Time{} if items[i].UpdatedAt != nil { ti = *items[i].UpdatedAt } tj := time.Time{} if items[j].UpdatedAt != nil { tj = *items[j].UpdatedAt } if !ti.Equal(tj) { return ti.After(tj) } return items[i].JobID < items[j].JobID }) if limit > 0 && len(items) > limit { items = items[:limit] } return items } func (r *Plugin) GetTrackedJob(jobID string) (*TrackedJob, bool) { r.jobsMu.RLock() defer r.jobsMu.RUnlock() job, ok := r.jobs[jobID] if !ok || job == nil { return nil, false } clone := cloneTrackedJob(*job) return &clone, true } func (r *Plugin) ListActivities(jobType string, limit int) []JobActivity { r.activitiesMu.RLock() defer r.activitiesMu.RUnlock() normalized := strings.TrimSpace(jobType) activities := make([]JobActivity, 0, len(r.activities)) for _, activity := range r.activities { if normalized != "" && activity.JobType != normalized { continue } activities = append(activities, activity) } sort.Slice(activities, func(i, j int) bool { return activityLess(activities[i], activities[j]) }) if limit > 0 && len(activities) > limit { activities = activities[:limit] } return activities } func (r *Plugin) ListJobActivities(jobID string, limit int) []JobActivity { normalizedJobID := strings.TrimSpace(jobID) if normalizedJobID == "" { return nil } r.activitiesMu.RLock() activities := make([]JobActivity, 0, len(r.activities)) for _, activity := range r.activities { if strings.TrimSpace(activity.JobID) != normalizedJobID { continue } activities = append(activities, activity) } r.activitiesMu.RUnlock() sort.Slice(activities, func(i, j int) bool { return !activityLess(activities[i], activities[j]) // oldest-first for job timeline }) if limit > 0 && len(activities) > limit { activities = activities[len(activities)-limit:] } return activities } func (r *Plugin) BuildJobDetail(jobID string, activityLimit int, relatedLimit int) (*JobDetail, bool, error) { normalizedJobID := strings.TrimSpace(jobID) if normalizedJobID == "" { return nil, false, nil } // Clamp relatedLimit to a safe range to avoid excessive memory allocation from untrusted input. if relatedLimit <= 0 { relatedLimit = 0 } else if relatedLimit > maxRelatedJobs { relatedLimit = maxRelatedJobs } r.jobsMu.RLock() trackedSnapshot, ok := r.jobs[normalizedJobID] if ok && trackedSnapshot != nil { candidate := cloneTrackedJob(*trackedSnapshot) stripTrackedJobDetailFields(&candidate) trackedSnapshot = &candidate } else { trackedSnapshot = nil } r.jobsMu.RUnlock() detailJob, err := r.store.LoadJobDetail(normalizedJobID) if err != nil { return nil, false, err } if trackedSnapshot == nil && detailJob == nil { return nil, false, nil } if detailJob == nil && trackedSnapshot != nil { clone := cloneTrackedJob(*trackedSnapshot) detailJob = &clone } if detailJob == nil { return nil, false, nil } if trackedSnapshot != nil { mergeTrackedStatusIntoDetail(detailJob, trackedSnapshot) } detailJob.Parameters = enrichTrackedJobParameters(detailJob.JobType, detailJob.Parameters) r.activitiesMu.RLock() activities := append([]JobActivity(nil), r.activities...) r.activitiesMu.RUnlock() detail := &JobDetail{ Job: detailJob, Activities: filterJobActivitiesFromSlice(activities, normalizedJobID, activityLimit), LastUpdated: timeToPtr(time.Now().UTC()), } if history, err := r.store.LoadRunHistory(detailJob.JobType); err != nil { return nil, true, err } else if history != nil { for i := range history.SuccessfulRuns { record := history.SuccessfulRuns[i] if strings.TrimSpace(record.JobID) == normalizedJobID { recordCopy := record detail.RunRecord = &recordCopy break } } if detail.RunRecord == nil { for i := range history.ErrorRuns { record := history.ErrorRuns[i] if strings.TrimSpace(record.JobID) == normalizedJobID { recordCopy := record detail.RunRecord = &recordCopy break } } } } if relatedLimit > 0 { related := make([]TrackedJob, 0, relatedLimit) r.jobsMu.RLock() for _, candidate := range r.jobs { if strings.TrimSpace(candidate.JobType) != strings.TrimSpace(detailJob.JobType) { continue } if strings.TrimSpace(candidate.JobID) == normalizedJobID { continue } cloned := cloneTrackedJob(*candidate) stripTrackedJobDetailFields(&cloned) related = append(related, cloned) if len(related) >= relatedLimit { break } } r.jobsMu.RUnlock() detail.RelatedJobs = related } return detail, true, nil } func filterJobActivitiesFromSlice(all []JobActivity, jobID string, limit int) []JobActivity { if strings.TrimSpace(jobID) == "" || len(all) == 0 { return nil } activities := make([]JobActivity, 0, len(all)) for _, activity := range all { if strings.TrimSpace(activity.JobID) != jobID { continue } activities = append(activities, activity) } sort.Slice(activities, func(i, j int) bool { return !activityLess(activities[i], activities[j]) // oldest-first for job timeline }) if limit > 0 && len(activities) > limit { activities = activities[len(activities)-limit:] } return activities } func stripTrackedJobDetailFields(job *TrackedJob) { if job == nil { return } job.Detail = "" job.Parameters = nil job.Labels = nil job.ResultOutputValues = nil } func hasTrackedJobRichDetails(job TrackedJob) bool { return strings.TrimSpace(job.Detail) != "" || len(job.Parameters) > 0 || len(job.Labels) > 0 || len(job.ResultOutputValues) > 0 } func mergeTrackedStatusIntoDetail(detail *TrackedJob, tracked *TrackedJob) { if detail == nil || tracked == nil { return } if detail.JobType == "" { detail.JobType = tracked.JobType } if detail.RequestID == "" { detail.RequestID = tracked.RequestID } if detail.WorkerID == "" { detail.WorkerID = tracked.WorkerID } if detail.DedupeKey == "" { detail.DedupeKey = tracked.DedupeKey } if detail.Summary == "" { detail.Summary = tracked.Summary } if detail.State == "" { detail.State = tracked.State } if detail.Progress == 0 { detail.Progress = tracked.Progress } if detail.Stage == "" { detail.Stage = tracked.Stage } if detail.Message == "" { detail.Message = tracked.Message } if detail.Attempt == 0 { detail.Attempt = tracked.Attempt } if detail.CreatedAt == nil || detail.CreatedAt.IsZero() { detail.CreatedAt = tracked.CreatedAt } if detail.UpdatedAt == nil || detail.UpdatedAt.IsZero() { detail.UpdatedAt = tracked.UpdatedAt } if detail.CompletedAt == nil || detail.CompletedAt.IsZero() { detail.CompletedAt = tracked.CompletedAt } if detail.ErrorMessage == "" { detail.ErrorMessage = tracked.ErrorMessage } if detail.ResultSummary == "" { detail.ResultSummary = tracked.ResultSummary } } func (r *Plugin) handleJobProgressUpdate(workerID string, update *plugin_pb.JobProgressUpdate) { if update == nil { return } now := time.Now().UTC() resolvedWorkerID := strings.TrimSpace(workerID) if strings.TrimSpace(update.JobId) != "" { r.jobsMu.Lock() job := r.jobs[update.JobId] if job == nil { job = &TrackedJob{ JobID: update.JobId, JobType: update.JobType, RequestID: update.RequestId, WorkerID: resolvedWorkerID, CreatedAt: timeToPtr(now), } r.jobs[update.JobId] = job } if update.JobType != "" { job.JobType = update.JobType } if update.RequestId != "" { job.RequestID = update.RequestId } if job.WorkerID != "" { resolvedWorkerID = job.WorkerID } else if resolvedWorkerID != "" { job.WorkerID = resolvedWorkerID } job.State = strings.ToLower(update.State.String()) job.Progress = update.ProgressPercent job.Stage = update.Stage job.Message = update.Message job.UpdatedAt = timeToPtr(now) r.pruneTrackedJobsLocked() r.dirtyJobs = true r.jobsMu.Unlock() } r.trackWorkerActivities(update.JobType, update.JobId, update.RequestId, resolvedWorkerID, update.Activities) if update.Message != "" || update.Stage != "" { source := "worker_progress" if strings.TrimSpace(update.JobId) == "" { source = "worker_detection" } r.appendActivity(JobActivity{ JobID: update.JobId, JobType: update.JobType, RequestID: update.RequestId, WorkerID: resolvedWorkerID, Source: source, Message: update.Message, Stage: update.Stage, OccurredAt: timeToPtr(now), }) } } func (r *Plugin) trackExecutionStart(requestID, workerID string, job *plugin_pb.JobSpec, attempt int32) { if job == nil || strings.TrimSpace(job.JobId) == "" { return } now := time.Now().UTC() r.jobsMu.Lock() tracked := r.jobs[job.JobId] if tracked == nil { tracked = &TrackedJob{ JobID: job.JobId, CreatedAt: timeToPtr(now), } r.jobs[job.JobId] = tracked } tracked.JobType = job.JobType tracked.RequestID = requestID tracked.WorkerID = workerID tracked.DedupeKey = job.DedupeKey tracked.Summary = job.Summary tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_ASSIGNED.String()) tracked.Progress = 0 tracked.Stage = "assigned" tracked.Message = "job assigned to worker" tracked.Attempt = attempt if tracked.CreatedAt == nil || tracked.CreatedAt.IsZero() { tracked.CreatedAt = timeToPtr(now) } tracked.UpdatedAt = timeToPtr(now) trackedSnapshot := cloneTrackedJob(*tracked) r.pruneTrackedJobsLocked() r.dirtyJobs = true r.jobsMu.Unlock() r.persistJobDetailSnapshot(job.JobId, func(detail *TrackedJob) { detail.JobID = job.JobId detail.JobType = job.JobType detail.RequestID = requestID detail.WorkerID = workerID detail.DedupeKey = job.DedupeKey detail.Summary = job.Summary detail.Detail = job.Detail detail.Parameters = enrichTrackedJobParameters(job.JobType, configValueMapToPlain(job.Parameters)) if len(job.Labels) > 0 { labels := make(map[string]string, len(job.Labels)) for key, value := range job.Labels { labels[key] = value } detail.Labels = labels } else { detail.Labels = nil } detail.State = trackedSnapshot.State detail.Progress = trackedSnapshot.Progress detail.Stage = trackedSnapshot.Stage detail.Message = trackedSnapshot.Message detail.Attempt = attempt if detail.CreatedAt == nil || detail.CreatedAt.IsZero() { detail.CreatedAt = trackedSnapshot.CreatedAt } detail.UpdatedAt = trackedSnapshot.UpdatedAt }) r.appendActivity(JobActivity{ JobID: job.JobId, JobType: job.JobType, RequestID: requestID, WorkerID: workerID, Source: "admin_dispatch", Message: "job assigned", Stage: "assigned", OccurredAt: timeToPtr(now), }) } func (r *Plugin) trackExecutionQueued(job *plugin_pb.JobSpec) { if job == nil || strings.TrimSpace(job.JobId) == "" { return } now := time.Now().UTC() r.jobsMu.Lock() tracked := r.jobs[job.JobId] if tracked == nil { tracked = &TrackedJob{ JobID: job.JobId, CreatedAt: timeToPtr(now), } r.jobs[job.JobId] = tracked } tracked.JobType = job.JobType tracked.DedupeKey = job.DedupeKey tracked.Summary = job.Summary tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_PENDING.String()) tracked.Progress = 0 tracked.Stage = "queued" tracked.Message = "waiting for available executor" if tracked.CreatedAt == nil || tracked.CreatedAt.IsZero() { tracked.CreatedAt = timeToPtr(now) } tracked.UpdatedAt = timeToPtr(now) trackedSnapshot := cloneTrackedJob(*tracked) r.pruneTrackedJobsLocked() r.dirtyJobs = true r.jobsMu.Unlock() r.persistJobDetailSnapshot(job.JobId, func(detail *TrackedJob) { detail.JobID = job.JobId detail.JobType = job.JobType detail.DedupeKey = job.DedupeKey detail.Summary = job.Summary detail.Detail = job.Detail detail.Parameters = enrichTrackedJobParameters(job.JobType, configValueMapToPlain(job.Parameters)) if len(job.Labels) > 0 { labels := make(map[string]string, len(job.Labels)) for key, value := range job.Labels { labels[key] = value } detail.Labels = labels } else { detail.Labels = nil } detail.State = trackedSnapshot.State detail.Progress = trackedSnapshot.Progress detail.Stage = trackedSnapshot.Stage detail.Message = trackedSnapshot.Message if detail.CreatedAt == nil || detail.CreatedAt.IsZero() { detail.CreatedAt = trackedSnapshot.CreatedAt } detail.UpdatedAt = trackedSnapshot.UpdatedAt }) r.appendActivity(JobActivity{ JobID: job.JobId, JobType: job.JobType, Source: "admin_scheduler", Message: "job queued for execution", Stage: "queued", OccurredAt: timeToPtr(now), }) } func (r *Plugin) trackExecutionCompletion(completed *plugin_pb.JobCompleted) *TrackedJob { if completed == nil || strings.TrimSpace(completed.JobId) == "" { return nil } now := time.Now().UTC() if completed.CompletedAt != nil { now = completed.CompletedAt.AsTime().UTC() } r.jobsMu.Lock() tracked := r.jobs[completed.JobId] if tracked == nil { tracked = &TrackedJob{ JobID: completed.JobId, CreatedAt: timeToPtr(now), } r.jobs[completed.JobId] = tracked } if completed.JobType != "" { tracked.JobType = completed.JobType } if completed.RequestId != "" { tracked.RequestID = completed.RequestId } if completed.Success { tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_SUCCEEDED.String()) tracked.Progress = 100 tracked.Stage = "completed" if completed.Result != nil { tracked.ResultSummary = completed.Result.Summary } tracked.Message = tracked.ResultSummary if tracked.Message == "" { tracked.Message = "completed" } tracked.ErrorMessage = "" } else { tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_FAILED.String()) tracked.Stage = "failed" tracked.ErrorMessage = completed.ErrorMessage tracked.Message = completed.ErrorMessage } tracked.UpdatedAt = timeToPtr(now) tracked.CompletedAt = timeToPtr(now) r.pruneTrackedJobsLocked() clone := cloneTrackedJob(*tracked) r.dirtyJobs = true r.jobsMu.Unlock() r.persistJobDetailSnapshot(completed.JobId, func(detail *TrackedJob) { detail.JobID = completed.JobId if completed.JobType != "" { detail.JobType = completed.JobType } if completed.RequestId != "" { detail.RequestID = completed.RequestId } detail.State = clone.State detail.Progress = clone.Progress detail.Stage = clone.Stage detail.Message = clone.Message detail.ErrorMessage = clone.ErrorMessage detail.ResultSummary = clone.ResultSummary if completed.Success && completed.Result != nil { detail.ResultOutputValues = configValueMapToPlain(completed.Result.OutputValues) } else { detail.ResultOutputValues = nil } if detail.CreatedAt == nil || detail.CreatedAt.IsZero() { detail.CreatedAt = clone.CreatedAt } if detail.UpdatedAt == nil || detail.UpdatedAt.IsZero() { detail.UpdatedAt = clone.UpdatedAt } if detail.CompletedAt == nil || detail.CompletedAt.IsZero() { detail.CompletedAt = clone.CompletedAt } }) r.appendActivity(JobActivity{ JobID: completed.JobId, JobType: completed.JobType, RequestID: completed.RequestId, WorkerID: clone.WorkerID, Source: "worker_completion", Message: clone.Message, Stage: clone.Stage, OccurredAt: timeToPtr(now), }) return &clone } func (r *Plugin) trackWorkerActivities(jobType, jobID, requestID, workerID string, events []*plugin_pb.ActivityEvent) { if len(events) == 0 { return } for _, event := range events { if event == nil { continue } timestamp := time.Now().UTC() if event.CreatedAt != nil { timestamp = event.CreatedAt.AsTime().UTC() } r.appendActivity(JobActivity{ JobID: jobID, JobType: jobType, RequestID: requestID, WorkerID: workerID, Source: strings.ToLower(event.Source.String()), Message: event.Message, Stage: event.Stage, Details: configValueMapToPlain(event.Details), OccurredAt: timeToPtr(timestamp), }) } } func (r *Plugin) appendActivity(activity JobActivity) { if activity.OccurredAt == nil || activity.OccurredAt.IsZero() { activity.OccurredAt = timeToPtr(time.Now().UTC()) } r.activitiesMu.Lock() r.activities = append(r.activities, activity) if len(r.activities) > maxActivityRecords { r.activities = r.activities[len(r.activities)-maxActivityRecords:] } r.dirtyActivities = true r.activitiesMu.Unlock() } func (r *Plugin) pruneTrackedJobsLocked() { if len(r.jobs) <= maxTrackedJobsTotal { return } type sortableJob struct { jobID string updatedAt time.Time } terminalJobs := make([]sortableJob, 0) for jobID, job := range r.jobs { if job.State == StateSucceeded || job.State == StateFailed || job.State == StateCanceled { updAt := time.Time{} if job.UpdatedAt != nil { updAt = *job.UpdatedAt } terminalJobs = append(terminalJobs, sortableJob{jobID, updAt}) } } if len(terminalJobs) == 0 { return } sort.Slice(terminalJobs, func(i, j int) bool { return terminalJobs[i].updatedAt.Before(terminalJobs[j].updatedAt) }) toDelete := len(r.jobs) - maxTrackedJobsTotal if toDelete <= 0 { return } if toDelete > len(terminalJobs) { toDelete = len(terminalJobs) } for i := 0; i < toDelete; i++ { delete(r.jobs, terminalJobs[i].jobID) } } func configValueMapToPlain(values map[string]*plugin_pb.ConfigValue) map[string]interface{} { if len(values) == 0 { return nil } payload, err := protojson.MarshalOptions{UseProtoNames: true}.Marshal(&plugin_pb.ValueMap{Fields: values}) if err != nil { return nil } decoded := map[string]interface{}{} if err := json.Unmarshal(payload, &decoded); err != nil { return nil } fields, ok := decoded["fields"].(map[string]interface{}) if !ok { return nil } return fields } func (r *Plugin) persistTrackedJobsSnapshot() { r.jobsMu.Lock() r.dirtyJobs = false jobs := make([]TrackedJob, 0, len(r.jobs)) for _, job := range r.jobs { if job == nil || strings.TrimSpace(job.JobID) == "" { continue } clone := cloneTrackedJob(*job) stripTrackedJobDetailFields(&clone) jobs = append(jobs, clone) } r.jobsMu.Unlock() if len(jobs) == 0 { return } sort.Slice(jobs, func(i, j int) bool { ti := time.Time{} if jobs[i].UpdatedAt != nil { ti = *jobs[i].UpdatedAt } tj := time.Time{} if jobs[j].UpdatedAt != nil { tj = *jobs[j].UpdatedAt } if !ti.Equal(tj) { return ti.After(tj) } return jobs[i].JobID < jobs[j].JobID }) if len(jobs) > maxTrackedJobsTotal { jobs = jobs[:maxTrackedJobsTotal] } if err := r.store.SaveTrackedJobs(jobs); err != nil { glog.Warningf("Plugin failed to persist tracked jobs: %v", err) } } func (r *Plugin) persistJobDetailSnapshot(jobID string, apply func(detail *TrackedJob)) { normalizedJobID, _ := sanitizeJobID(jobID) if normalizedJobID == "" { return } r.jobDetailsMu.Lock() defer r.jobDetailsMu.Unlock() detail, err := r.store.LoadJobDetail(normalizedJobID) if err != nil { glog.Warningf("Plugin failed to load job detail snapshot for %s: %v", normalizedJobID, err) return } if detail == nil { detail = &TrackedJob{ JobID: normalizedJobID, } } if apply != nil { apply(detail) } if err := r.store.SaveJobDetail(*detail); err != nil { glog.Warningf("Plugin failed to persist job detail snapshot for %s: %v", normalizedJobID, err) } } func (r *Plugin) persistActivitiesSnapshot() { r.activitiesMu.Lock() r.dirtyActivities = false activities := append([]JobActivity(nil), r.activities...) r.activitiesMu.Unlock() if len(activities) == 0 { return } if len(activities) > maxActivityRecords { activities = activities[len(activities)-maxActivityRecords:] } if err := r.store.SaveActivities(activities); err != nil { glog.Warningf("Plugin failed to persist activities: %v", err) } } func (r *Plugin) persistenceLoop() { defer r.wg.Done() for { select { case <-r.shutdownCh: r.persistTrackedJobsSnapshot() r.persistActivitiesSnapshot() return case <-r.persistTicker.C: r.jobsMu.RLock() needsJobsFlush := r.dirtyJobs r.jobsMu.RUnlock() if needsJobsFlush { r.persistTrackedJobsSnapshot() } r.activitiesMu.RLock() needsActivitiesFlush := r.dirtyActivities r.activitiesMu.RUnlock() if needsActivitiesFlush { r.persistActivitiesSnapshot() } } } }