You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1177 lines
29 KiB

package plugin
import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
maxTrackedJobsTotal = 1000
maxActivityRecords = 4000
maxRelatedJobs = 100
// stale active jobs block dedupe and scheduling; use generous defaults to
// avoid expiring legitimate long-running tasks.
defaultStaleActiveJobTimeout = 24 * time.Hour
defaultOrphanedActiveJobTimeout = 15 * time.Minute
)
var (
StateSucceeded = strings.ToLower(plugin_pb.JobState_JOB_STATE_SUCCEEDED.String())
StateFailed = strings.ToLower(plugin_pb.JobState_JOB_STATE_FAILED.String())
StateCanceled = strings.ToLower(plugin_pb.JobState_JOB_STATE_CANCELED.String())
)
type activeJobSnapshot struct {
jobID string
jobType string
workerID string
requestID string
lastUpdate time.Time
}
// activityLess reports whether activity a occurred after activity b (newest-first order).
// A nil OccurredAt is treated as the zero time.
func activityLess(a, b JobActivity) bool {
ta := time.Time{}
if a.OccurredAt != nil {
ta = *a.OccurredAt
}
tb := time.Time{}
if b.OccurredAt != nil {
tb = *b.OccurredAt
}
return ta.After(tb)
}
func (r *Plugin) loadPersistedMonitorState() error {
trackedJobs, err := r.store.LoadTrackedJobs()
if err != nil {
return err
}
activities, err := r.store.LoadActivities()
if err != nil {
return err
}
if len(trackedJobs) > 0 {
r.jobsMu.Lock()
for i := range trackedJobs {
job := trackedJobs[i]
if strings.TrimSpace(job.JobID) == "" {
continue
}
if isActiveTrackedJobState(job.State) {
if detail, detailErr := r.store.LoadJobDetail(job.JobID); detailErr != nil {
glog.Warningf("Plugin failed to load detail snapshot for job %s: %v", job.JobID, detailErr)
} else if detail != nil {
mergeTerminalDetailIntoTracked(&job, detail)
}
}
// Backward compatibility: migrate older inline detail payloads
// out of tracked_jobs.json into dedicated per-job detail files.
if hasTrackedJobRichDetails(job) {
if err := r.store.SaveJobDetail(job); err != nil {
glog.Warningf("Plugin failed to migrate detail snapshot for job %s: %v", job.JobID, err)
}
}
stripTrackedJobDetailFields(&job)
jobCopy := job
r.jobs[job.JobID] = &jobCopy
}
r.pruneTrackedJobsLocked()
r.jobsMu.Unlock()
}
if len(activities) > maxActivityRecords {
activities = activities[len(activities)-maxActivityRecords:]
}
if len(activities) > 0 {
r.activitiesMu.Lock()
r.activities = append([]JobActivity(nil), activities...)
r.activitiesMu.Unlock()
}
return nil
}
// ExpireJob marks an active job as failed so it no longer blocks scheduling.
func (r *Plugin) ExpireJob(jobID, reason string) (*TrackedJob, bool, error) {
normalizedJobID := strings.TrimSpace(jobID)
if normalizedJobID == "" {
return nil, false, ErrJobNotFound
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "job expired by admin request"
}
var jobType string
var requestID string
active := false
r.jobsMu.RLock()
if tracked := r.jobs[normalizedJobID]; tracked != nil {
jobType = tracked.JobType
requestID = tracked.RequestID
active = isActiveTrackedJobState(tracked.State)
}
r.jobsMu.RUnlock()
if jobType == "" || requestID == "" || !active {
if detail, err := r.store.LoadJobDetail(normalizedJobID); err != nil {
return nil, false, err
} else if detail != nil {
if jobType == "" {
jobType = detail.JobType
}
if requestID == "" {
requestID = detail.RequestID
}
if !active && isActiveTrackedJobState(detail.State) {
active = true
}
}
}
if jobType == "" {
return nil, false, ErrJobNotFound
}
if !active {
current, _ := r.GetTrackedJob(normalizedJobID)
if current == nil {
if detail, err := r.store.LoadJobDetail(normalizedJobID); err == nil && detail != nil {
clone := cloneTrackedJob(*detail)
current = &clone
}
}
return current, false, nil
}
now := time.Now().UTC()
r.handleJobCompleted(&plugin_pb.JobCompleted{
JobId: normalizedJobID,
JobType: jobType,
RequestId: requestID,
Success: false,
ErrorMessage: reason,
CompletedAt: timestamppb.New(now),
})
r.appendActivity(JobActivity{
JobID: normalizedJobID,
JobType: jobType,
RequestID: requestID,
Source: "admin_expire",
Message: reason,
Stage: "expired",
OccurredAt: timeToPtr(now),
})
updated, _ := r.GetTrackedJob(normalizedJobID)
return updated, true, nil
}
// expireStaleJobs marks stale active jobs as failed so they stop blocking new work.
func (r *Plugin) expireStaleJobs(now time.Time) int {
if now.IsZero() {
now = time.Now().UTC()
}
r.staleJobsMu.Lock()
defer r.staleJobsMu.Unlock()
snapshots := r.snapshotActiveJobs()
if len(snapshots) == 0 {
return 0
}
expired := 0
for _, snap := range snapshots {
if snap.lastUpdate.IsZero() {
continue
}
if stale, _, _ := r.evaluateStaleJob(now, snap.workerID, snap.lastUpdate); !stale {
continue
}
reason := r.confirmStaleReason(now, snap.jobID)
if reason == "" {
continue
}
r.handleJobCompleted(&plugin_pb.JobCompleted{
JobId: snap.jobID,
JobType: snap.jobType,
RequestId: snap.requestID,
Success: false,
ErrorMessage: reason,
CompletedAt: timestamppb.New(now),
})
expired++
}
return expired
}
func (r *Plugin) snapshotActiveJobs() []activeJobSnapshot {
r.jobsMu.RLock()
defer r.jobsMu.RUnlock()
if len(r.jobs) == 0 {
return nil
}
out := make([]activeJobSnapshot, 0, len(r.jobs))
for _, job := range r.jobs {
if job == nil {
continue
}
if !isActiveTrackedJobState(job.State) {
continue
}
out = append(out, activeJobSnapshot{
jobID: job.JobID,
jobType: job.JobType,
workerID: job.WorkerID,
requestID: job.RequestID,
lastUpdate: jobLastUpdated(job),
})
}
return out
}
func jobLastUpdated(job *TrackedJob) time.Time {
if job == nil {
return time.Time{}
}
if job.UpdatedAt != nil && !job.UpdatedAt.IsZero() {
return *job.UpdatedAt
}
if job.CreatedAt != nil && !job.CreatedAt.IsZero() {
return *job.CreatedAt
}
return time.Time{}
}
func (r *Plugin) evaluateStaleJob(now time.Time, workerID string, lastUpdate time.Time) (bool, time.Duration, string) {
if lastUpdate.IsZero() {
return false, 0, ""
}
timeout := defaultStaleActiveJobTimeout
reason := fmt.Sprintf("job expired after %s without progress", timeout)
workerID = strings.TrimSpace(workerID)
if workerID == "" {
reason = fmt.Sprintf("job expired after %s without executor assignment", timeout)
} else if !r.isWorkerAvailable(workerID) {
timeout = defaultOrphanedActiveJobTimeout
reason = fmt.Sprintf("job expired after %s without worker heartbeat (worker=%s)", timeout, workerID)
}
if now.Sub(lastUpdate) < timeout {
return false, timeout, reason
}
return true, timeout, reason
}
func (r *Plugin) confirmStaleReason(now time.Time, jobID string) string {
r.jobsMu.RLock()
job := r.jobs[jobID]
if job == nil || !isActiveTrackedJobState(job.State) {
r.jobsMu.RUnlock()
return ""
}
lastUpdate := jobLastUpdated(job)
workerID := job.WorkerID
r.jobsMu.RUnlock()
stale, _, reason := r.evaluateStaleJob(now, workerID, lastUpdate)
if !stale {
return ""
}
return reason
}
func (r *Plugin) isWorkerAvailable(workerID string) bool {
workerID = strings.TrimSpace(workerID)
if workerID == "" {
return false
}
_, ok := r.registry.Get(workerID)
return ok
}
func isTerminalTrackedJobState(state string) bool {
normalized := strings.ToLower(strings.TrimSpace(state))
switch normalized {
case StateSucceeded, StateFailed, StateCanceled:
return true
default:
return false
}
}
func mergeTerminalDetailIntoTracked(tracked *TrackedJob, detail *TrackedJob) {
if tracked == nil || detail == nil {
return
}
if !isTerminalTrackedJobState(detail.State) {
return
}
if !isActiveTrackedJobState(tracked.State) {
return
}
if detail.State != "" {
tracked.State = detail.State
}
if detail.Progress != 0 {
tracked.Progress = detail.Progress
}
if detail.Stage != "" {
tracked.Stage = detail.Stage
}
if detail.Message != "" {
tracked.Message = detail.Message
}
if detail.ErrorMessage != "" {
tracked.ErrorMessage = detail.ErrorMessage
}
if detail.ResultSummary != "" {
tracked.ResultSummary = detail.ResultSummary
}
if detail.CompletedAt != nil && !detail.CompletedAt.IsZero() {
tracked.CompletedAt = detail.CompletedAt
}
if detail.UpdatedAt != nil && !detail.UpdatedAt.IsZero() {
tracked.UpdatedAt = detail.UpdatedAt
}
if tracked.UpdatedAt == nil && tracked.CompletedAt != nil {
tracked.UpdatedAt = tracked.CompletedAt
}
}
func (r *Plugin) ListTrackedJobs(jobType string, state string, limit int) []TrackedJob {
r.jobsMu.RLock()
defer r.jobsMu.RUnlock()
normalizedJobType := strings.TrimSpace(jobType)
normalizedState := strings.TrimSpace(strings.ToLower(state))
items := make([]TrackedJob, 0, len(r.jobs))
for _, job := range r.jobs {
if job == nil {
continue
}
if normalizedJobType != "" && job.JobType != normalizedJobType {
continue
}
if normalizedState != "" && strings.ToLower(job.State) != normalizedState {
continue
}
items = append(items, cloneTrackedJob(*job))
}
sort.Slice(items, func(i, j int) bool {
ti := time.Time{}
if items[i].UpdatedAt != nil {
ti = *items[i].UpdatedAt
}
tj := time.Time{}
if items[j].UpdatedAt != nil {
tj = *items[j].UpdatedAt
}
if !ti.Equal(tj) {
return ti.After(tj)
}
return items[i].JobID < items[j].JobID
})
if limit > 0 && len(items) > limit {
items = items[:limit]
}
return items
}
func (r *Plugin) GetTrackedJob(jobID string) (*TrackedJob, bool) {
r.jobsMu.RLock()
defer r.jobsMu.RUnlock()
job, ok := r.jobs[jobID]
if !ok || job == nil {
return nil, false
}
clone := cloneTrackedJob(*job)
return &clone, true
}
func (r *Plugin) ListActivities(jobType string, limit int) []JobActivity {
r.activitiesMu.RLock()
defer r.activitiesMu.RUnlock()
normalized := strings.TrimSpace(jobType)
activities := make([]JobActivity, 0, len(r.activities))
for _, activity := range r.activities {
if normalized != "" && activity.JobType != normalized {
continue
}
activities = append(activities, activity)
}
sort.Slice(activities, func(i, j int) bool {
return activityLess(activities[i], activities[j])
})
if limit > 0 && len(activities) > limit {
activities = activities[:limit]
}
return activities
}
func (r *Plugin) ListJobActivities(jobID string, limit int) []JobActivity {
normalizedJobID := strings.TrimSpace(jobID)
if normalizedJobID == "" {
return nil
}
r.activitiesMu.RLock()
activities := make([]JobActivity, 0, len(r.activities))
for _, activity := range r.activities {
if strings.TrimSpace(activity.JobID) != normalizedJobID {
continue
}
activities = append(activities, activity)
}
r.activitiesMu.RUnlock()
sort.Slice(activities, func(i, j int) bool {
return !activityLess(activities[i], activities[j]) // oldest-first for job timeline
})
if limit > 0 && len(activities) > limit {
activities = activities[len(activities)-limit:]
}
return activities
}
func (r *Plugin) BuildJobDetail(jobID string, activityLimit int, relatedLimit int) (*JobDetail, bool, error) {
normalizedJobID := strings.TrimSpace(jobID)
if normalizedJobID == "" {
return nil, false, nil
}
// Clamp relatedLimit to a safe range to avoid excessive memory allocation from untrusted input.
if relatedLimit <= 0 {
relatedLimit = 0
} else if relatedLimit > maxRelatedJobs {
relatedLimit = maxRelatedJobs
}
r.jobsMu.RLock()
trackedSnapshot, ok := r.jobs[normalizedJobID]
if ok && trackedSnapshot != nil {
candidate := cloneTrackedJob(*trackedSnapshot)
stripTrackedJobDetailFields(&candidate)
trackedSnapshot = &candidate
} else {
trackedSnapshot = nil
}
r.jobsMu.RUnlock()
detailJob, err := r.store.LoadJobDetail(normalizedJobID)
if err != nil {
return nil, false, err
}
if trackedSnapshot == nil && detailJob == nil {
return nil, false, nil
}
if detailJob == nil && trackedSnapshot != nil {
clone := cloneTrackedJob(*trackedSnapshot)
detailJob = &clone
}
if detailJob == nil {
return nil, false, nil
}
if trackedSnapshot != nil {
mergeTrackedStatusIntoDetail(detailJob, trackedSnapshot)
}
detailJob.Parameters = enrichTrackedJobParameters(detailJob.JobType, detailJob.Parameters)
r.activitiesMu.RLock()
activities := append([]JobActivity(nil), r.activities...)
r.activitiesMu.RUnlock()
detail := &JobDetail{
Job: detailJob,
Activities: filterJobActivitiesFromSlice(activities, normalizedJobID, activityLimit),
LastUpdated: timeToPtr(time.Now().UTC()),
}
if history, err := r.store.LoadRunHistory(detailJob.JobType); err != nil {
return nil, true, err
} else if history != nil {
for i := range history.SuccessfulRuns {
record := history.SuccessfulRuns[i]
if strings.TrimSpace(record.JobID) == normalizedJobID {
recordCopy := record
detail.RunRecord = &recordCopy
break
}
}
if detail.RunRecord == nil {
for i := range history.ErrorRuns {
record := history.ErrorRuns[i]
if strings.TrimSpace(record.JobID) == normalizedJobID {
recordCopy := record
detail.RunRecord = &recordCopy
break
}
}
}
}
if relatedLimit > 0 {
related := make([]TrackedJob, 0, relatedLimit)
r.jobsMu.RLock()
for _, candidate := range r.jobs {
if strings.TrimSpace(candidate.JobType) != strings.TrimSpace(detailJob.JobType) {
continue
}
if strings.TrimSpace(candidate.JobID) == normalizedJobID {
continue
}
cloned := cloneTrackedJob(*candidate)
stripTrackedJobDetailFields(&cloned)
related = append(related, cloned)
if len(related) >= relatedLimit {
break
}
}
r.jobsMu.RUnlock()
detail.RelatedJobs = related
}
return detail, true, nil
}
func filterJobActivitiesFromSlice(all []JobActivity, jobID string, limit int) []JobActivity {
if strings.TrimSpace(jobID) == "" || len(all) == 0 {
return nil
}
activities := make([]JobActivity, 0, len(all))
for _, activity := range all {
if strings.TrimSpace(activity.JobID) != jobID {
continue
}
activities = append(activities, activity)
}
sort.Slice(activities, func(i, j int) bool {
return !activityLess(activities[i], activities[j]) // oldest-first for job timeline
})
if limit > 0 && len(activities) > limit {
activities = activities[len(activities)-limit:]
}
return activities
}
func stripTrackedJobDetailFields(job *TrackedJob) {
if job == nil {
return
}
job.Detail = ""
job.Parameters = nil
job.Labels = nil
job.ResultOutputValues = nil
}
func hasTrackedJobRichDetails(job TrackedJob) bool {
return strings.TrimSpace(job.Detail) != "" ||
len(job.Parameters) > 0 ||
len(job.Labels) > 0 ||
len(job.ResultOutputValues) > 0
}
func mergeTrackedStatusIntoDetail(detail *TrackedJob, tracked *TrackedJob) {
if detail == nil || tracked == nil {
return
}
if detail.JobType == "" {
detail.JobType = tracked.JobType
}
if detail.RequestID == "" {
detail.RequestID = tracked.RequestID
}
if detail.WorkerID == "" {
detail.WorkerID = tracked.WorkerID
}
if detail.DedupeKey == "" {
detail.DedupeKey = tracked.DedupeKey
}
if detail.Summary == "" {
detail.Summary = tracked.Summary
}
if detail.State == "" {
detail.State = tracked.State
}
if detail.Progress == 0 {
detail.Progress = tracked.Progress
}
if detail.Stage == "" {
detail.Stage = tracked.Stage
}
if detail.Message == "" {
detail.Message = tracked.Message
}
if detail.Attempt == 0 {
detail.Attempt = tracked.Attempt
}
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = tracked.CreatedAt
}
if detail.UpdatedAt == nil || detail.UpdatedAt.IsZero() {
detail.UpdatedAt = tracked.UpdatedAt
}
if detail.CompletedAt == nil || detail.CompletedAt.IsZero() {
detail.CompletedAt = tracked.CompletedAt
}
if detail.ErrorMessage == "" {
detail.ErrorMessage = tracked.ErrorMessage
}
if detail.ResultSummary == "" {
detail.ResultSummary = tracked.ResultSummary
}
}
func (r *Plugin) handleJobProgressUpdate(workerID string, update *plugin_pb.JobProgressUpdate) {
if update == nil {
return
}
now := time.Now().UTC()
resolvedWorkerID := strings.TrimSpace(workerID)
if strings.TrimSpace(update.JobId) != "" {
r.jobsMu.Lock()
job := r.jobs[update.JobId]
if job == nil {
job = &TrackedJob{
JobID: update.JobId,
JobType: update.JobType,
RequestID: update.RequestId,
WorkerID: resolvedWorkerID,
CreatedAt: timeToPtr(now),
}
r.jobs[update.JobId] = job
}
if update.JobType != "" {
job.JobType = update.JobType
}
if update.RequestId != "" {
job.RequestID = update.RequestId
}
if job.WorkerID != "" {
resolvedWorkerID = job.WorkerID
} else if resolvedWorkerID != "" {
job.WorkerID = resolvedWorkerID
}
job.State = strings.ToLower(update.State.String())
job.Progress = update.ProgressPercent
job.Stage = update.Stage
job.Message = update.Message
job.UpdatedAt = timeToPtr(now)
r.pruneTrackedJobsLocked()
r.dirtyJobs = true
r.jobsMu.Unlock()
}
r.trackWorkerActivities(update.JobType, update.JobId, update.RequestId, resolvedWorkerID, update.Activities)
if update.Message != "" || update.Stage != "" {
source := "worker_progress"
if strings.TrimSpace(update.JobId) == "" {
source = "worker_detection"
}
r.appendActivity(JobActivity{
JobID: update.JobId,
JobType: update.JobType,
RequestID: update.RequestId,
WorkerID: resolvedWorkerID,
Source: source,
Message: update.Message,
Stage: update.Stage,
OccurredAt: timeToPtr(now),
})
}
}
func (r *Plugin) trackExecutionStart(requestID, workerID string, job *plugin_pb.JobSpec, attempt int32) {
if job == nil || strings.TrimSpace(job.JobId) == "" {
return
}
now := time.Now().UTC()
r.jobsMu.Lock()
tracked := r.jobs[job.JobId]
if tracked == nil {
tracked = &TrackedJob{
JobID: job.JobId,
CreatedAt: timeToPtr(now),
}
r.jobs[job.JobId] = tracked
}
tracked.JobType = job.JobType
tracked.RequestID = requestID
tracked.WorkerID = workerID
tracked.DedupeKey = job.DedupeKey
tracked.Summary = job.Summary
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_ASSIGNED.String())
tracked.Progress = 0
tracked.Stage = "assigned"
tracked.Message = "job assigned to worker"
tracked.Attempt = attempt
if tracked.CreatedAt == nil || tracked.CreatedAt.IsZero() {
tracked.CreatedAt = timeToPtr(now)
}
tracked.UpdatedAt = timeToPtr(now)
trackedSnapshot := cloneTrackedJob(*tracked)
r.pruneTrackedJobsLocked()
r.dirtyJobs = true
r.jobsMu.Unlock()
r.persistJobDetailSnapshot(job.JobId, func(detail *TrackedJob) {
detail.JobID = job.JobId
detail.JobType = job.JobType
detail.RequestID = requestID
detail.WorkerID = workerID
detail.DedupeKey = job.DedupeKey
detail.Summary = job.Summary
detail.Detail = job.Detail
detail.Parameters = enrichTrackedJobParameters(job.JobType, configValueMapToPlain(job.Parameters))
if len(job.Labels) > 0 {
labels := make(map[string]string, len(job.Labels))
for key, value := range job.Labels {
labels[key] = value
}
detail.Labels = labels
} else {
detail.Labels = nil
}
detail.State = trackedSnapshot.State
detail.Progress = trackedSnapshot.Progress
detail.Stage = trackedSnapshot.Stage
detail.Message = trackedSnapshot.Message
detail.Attempt = attempt
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = trackedSnapshot.CreatedAt
}
detail.UpdatedAt = trackedSnapshot.UpdatedAt
})
r.appendActivity(JobActivity{
JobID: job.JobId,
JobType: job.JobType,
RequestID: requestID,
WorkerID: workerID,
Source: "admin_dispatch",
Message: "job assigned",
Stage: "assigned",
OccurredAt: timeToPtr(now),
})
}
func (r *Plugin) trackExecutionQueued(job *plugin_pb.JobSpec) {
if job == nil || strings.TrimSpace(job.JobId) == "" {
return
}
now := time.Now().UTC()
r.jobsMu.Lock()
tracked := r.jobs[job.JobId]
if tracked == nil {
tracked = &TrackedJob{
JobID: job.JobId,
CreatedAt: timeToPtr(now),
}
r.jobs[job.JobId] = tracked
}
tracked.JobType = job.JobType
tracked.DedupeKey = job.DedupeKey
tracked.Summary = job.Summary
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_PENDING.String())
tracked.Progress = 0
tracked.Stage = "queued"
tracked.Message = "waiting for available executor"
if tracked.CreatedAt == nil || tracked.CreatedAt.IsZero() {
tracked.CreatedAt = timeToPtr(now)
}
tracked.UpdatedAt = timeToPtr(now)
trackedSnapshot := cloneTrackedJob(*tracked)
r.pruneTrackedJobsLocked()
r.dirtyJobs = true
r.jobsMu.Unlock()
r.persistJobDetailSnapshot(job.JobId, func(detail *TrackedJob) {
detail.JobID = job.JobId
detail.JobType = job.JobType
detail.DedupeKey = job.DedupeKey
detail.Summary = job.Summary
detail.Detail = job.Detail
detail.Parameters = enrichTrackedJobParameters(job.JobType, configValueMapToPlain(job.Parameters))
if len(job.Labels) > 0 {
labels := make(map[string]string, len(job.Labels))
for key, value := range job.Labels {
labels[key] = value
}
detail.Labels = labels
} else {
detail.Labels = nil
}
detail.State = trackedSnapshot.State
detail.Progress = trackedSnapshot.Progress
detail.Stage = trackedSnapshot.Stage
detail.Message = trackedSnapshot.Message
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = trackedSnapshot.CreatedAt
}
detail.UpdatedAt = trackedSnapshot.UpdatedAt
})
r.appendActivity(JobActivity{
JobID: job.JobId,
JobType: job.JobType,
Source: "admin_scheduler",
Message: "job queued for execution",
Stage: "queued",
OccurredAt: timeToPtr(now),
})
}
func (r *Plugin) trackExecutionCompletion(completed *plugin_pb.JobCompleted) *TrackedJob {
if completed == nil || strings.TrimSpace(completed.JobId) == "" {
return nil
}
now := time.Now().UTC()
if completed.CompletedAt != nil {
now = completed.CompletedAt.AsTime().UTC()
}
r.jobsMu.Lock()
tracked := r.jobs[completed.JobId]
if tracked == nil {
tracked = &TrackedJob{
JobID: completed.JobId,
CreatedAt: timeToPtr(now),
}
r.jobs[completed.JobId] = tracked
}
if completed.JobType != "" {
tracked.JobType = completed.JobType
}
if completed.RequestId != "" {
tracked.RequestID = completed.RequestId
}
if completed.Success {
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_SUCCEEDED.String())
tracked.Progress = 100
tracked.Stage = "completed"
if completed.Result != nil {
tracked.ResultSummary = completed.Result.Summary
}
tracked.Message = tracked.ResultSummary
if tracked.Message == "" {
tracked.Message = "completed"
}
tracked.ErrorMessage = ""
} else {
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_FAILED.String())
tracked.Stage = "failed"
tracked.ErrorMessage = completed.ErrorMessage
tracked.Message = completed.ErrorMessage
}
tracked.UpdatedAt = timeToPtr(now)
tracked.CompletedAt = timeToPtr(now)
r.pruneTrackedJobsLocked()
clone := cloneTrackedJob(*tracked)
r.dirtyJobs = true
r.jobsMu.Unlock()
r.persistJobDetailSnapshot(completed.JobId, func(detail *TrackedJob) {
detail.JobID = completed.JobId
if completed.JobType != "" {
detail.JobType = completed.JobType
}
if completed.RequestId != "" {
detail.RequestID = completed.RequestId
}
detail.State = clone.State
detail.Progress = clone.Progress
detail.Stage = clone.Stage
detail.Message = clone.Message
detail.ErrorMessage = clone.ErrorMessage
detail.ResultSummary = clone.ResultSummary
if completed.Success && completed.Result != nil {
detail.ResultOutputValues = configValueMapToPlain(completed.Result.OutputValues)
} else {
detail.ResultOutputValues = nil
}
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = clone.CreatedAt
}
if detail.UpdatedAt == nil || detail.UpdatedAt.IsZero() {
detail.UpdatedAt = clone.UpdatedAt
}
if detail.CompletedAt == nil || detail.CompletedAt.IsZero() {
detail.CompletedAt = clone.CompletedAt
}
})
r.appendActivity(JobActivity{
JobID: completed.JobId,
JobType: completed.JobType,
RequestID: completed.RequestId,
WorkerID: clone.WorkerID,
Source: "worker_completion",
Message: clone.Message,
Stage: clone.Stage,
OccurredAt: timeToPtr(now),
})
return &clone
}
func (r *Plugin) trackWorkerActivities(jobType, jobID, requestID, workerID string, events []*plugin_pb.ActivityEvent) {
if len(events) == 0 {
return
}
for _, event := range events {
if event == nil {
continue
}
timestamp := time.Now().UTC()
if event.CreatedAt != nil {
timestamp = event.CreatedAt.AsTime().UTC()
}
r.appendActivity(JobActivity{
JobID: jobID,
JobType: jobType,
RequestID: requestID,
WorkerID: workerID,
Source: strings.ToLower(event.Source.String()),
Message: event.Message,
Stage: event.Stage,
Details: configValueMapToPlain(event.Details),
OccurredAt: timeToPtr(timestamp),
})
}
}
func (r *Plugin) appendActivity(activity JobActivity) {
if activity.OccurredAt == nil || activity.OccurredAt.IsZero() {
activity.OccurredAt = timeToPtr(time.Now().UTC())
}
r.activitiesMu.Lock()
r.activities = append(r.activities, activity)
if len(r.activities) > maxActivityRecords {
r.activities = r.activities[len(r.activities)-maxActivityRecords:]
}
r.dirtyActivities = true
r.activitiesMu.Unlock()
}
func (r *Plugin) pruneTrackedJobsLocked() {
if len(r.jobs) <= maxTrackedJobsTotal {
return
}
type sortableJob struct {
jobID string
updatedAt time.Time
}
terminalJobs := make([]sortableJob, 0)
for jobID, job := range r.jobs {
if job.State == StateSucceeded ||
job.State == StateFailed ||
job.State == StateCanceled {
updAt := time.Time{}
if job.UpdatedAt != nil {
updAt = *job.UpdatedAt
}
terminalJobs = append(terminalJobs, sortableJob{jobID, updAt})
}
}
if len(terminalJobs) == 0 {
return
}
sort.Slice(terminalJobs, func(i, j int) bool {
return terminalJobs[i].updatedAt.Before(terminalJobs[j].updatedAt)
})
toDelete := len(r.jobs) - maxTrackedJobsTotal
if toDelete <= 0 {
return
}
if toDelete > len(terminalJobs) {
toDelete = len(terminalJobs)
}
for i := 0; i < toDelete; i++ {
delete(r.jobs, terminalJobs[i].jobID)
}
}
func configValueMapToPlain(values map[string]*plugin_pb.ConfigValue) map[string]interface{} {
if len(values) == 0 {
return nil
}
payload, err := protojson.MarshalOptions{UseProtoNames: true}.Marshal(&plugin_pb.ValueMap{Fields: values})
if err != nil {
return nil
}
decoded := map[string]interface{}{}
if err := json.Unmarshal(payload, &decoded); err != nil {
return nil
}
fields, ok := decoded["fields"].(map[string]interface{})
if !ok {
return nil
}
return fields
}
func (r *Plugin) persistTrackedJobsSnapshot() {
r.jobsMu.Lock()
r.dirtyJobs = false
jobs := make([]TrackedJob, 0, len(r.jobs))
for _, job := range r.jobs {
if job == nil || strings.TrimSpace(job.JobID) == "" {
continue
}
clone := cloneTrackedJob(*job)
stripTrackedJobDetailFields(&clone)
jobs = append(jobs, clone)
}
r.jobsMu.Unlock()
if len(jobs) == 0 {
return
}
sort.Slice(jobs, func(i, j int) bool {
ti := time.Time{}
if jobs[i].UpdatedAt != nil {
ti = *jobs[i].UpdatedAt
}
tj := time.Time{}
if jobs[j].UpdatedAt != nil {
tj = *jobs[j].UpdatedAt
}
if !ti.Equal(tj) {
return ti.After(tj)
}
return jobs[i].JobID < jobs[j].JobID
})
if len(jobs) > maxTrackedJobsTotal {
jobs = jobs[:maxTrackedJobsTotal]
}
if err := r.store.SaveTrackedJobs(jobs); err != nil {
glog.Warningf("Plugin failed to persist tracked jobs: %v", err)
}
}
func (r *Plugin) persistJobDetailSnapshot(jobID string, apply func(detail *TrackedJob)) {
normalizedJobID, _ := sanitizeJobID(jobID)
if normalizedJobID == "" {
return
}
r.jobDetailsMu.Lock()
defer r.jobDetailsMu.Unlock()
detail, err := r.store.LoadJobDetail(normalizedJobID)
if err != nil {
glog.Warningf("Plugin failed to load job detail snapshot for %s: %v", normalizedJobID, err)
return
}
if detail == nil {
detail = &TrackedJob{
JobID: normalizedJobID,
}
}
if apply != nil {
apply(detail)
}
if err := r.store.SaveJobDetail(*detail); err != nil {
glog.Warningf("Plugin failed to persist job detail snapshot for %s: %v", normalizedJobID, err)
}
}
func (r *Plugin) persistActivitiesSnapshot() {
r.activitiesMu.Lock()
r.dirtyActivities = false
activities := append([]JobActivity(nil), r.activities...)
r.activitiesMu.Unlock()
if len(activities) == 0 {
return
}
if len(activities) > maxActivityRecords {
activities = activities[len(activities)-maxActivityRecords:]
}
if err := r.store.SaveActivities(activities); err != nil {
glog.Warningf("Plugin failed to persist activities: %v", err)
}
}
func (r *Plugin) persistenceLoop() {
defer r.wg.Done()
for {
select {
case <-r.shutdownCh:
r.persistTrackedJobsSnapshot()
r.persistActivitiesSnapshot()
return
case <-r.persistTicker.C:
r.jobsMu.RLock()
needsJobsFlush := r.dirtyJobs
r.jobsMu.RUnlock()
if needsJobsFlush {
r.persistTrackedJobsSnapshot()
}
r.activitiesMu.RLock()
needsActivitiesFlush := r.dirtyActivities
r.activitiesMu.RUnlock()
if needsActivitiesFlush {
r.persistActivitiesSnapshot()
}
}
}
}