diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 250bad1d4..1d39442e9 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -98,6 +98,7 @@ type AdminServer struct { // Maintenance system maintenanceManager *maintenance.MaintenanceManager plugin *adminplugin.Plugin + expireJobHandler func(jobID string, reason string) (*adminplugin.TrackedJob, bool, error) // Topic retention purger topicRetentionPurger *TopicRetentionPurger @@ -1020,6 +1021,17 @@ func (s *AdminServer) GetPluginJobDetail(jobID string, activityLimit, relatedLim return s.plugin.BuildJobDetail(jobID, activityLimit, relatedLimit) } +// ExpirePluginJob marks an active plugin job as failed so it no longer blocks scheduling. +func (s *AdminServer) ExpirePluginJob(jobID, reason string) (*adminplugin.TrackedJob, bool, error) { + if handler := s.expireJobHandler; handler != nil { + return handler(jobID, reason) + } + if s.plugin == nil { + return nil, false, fmt.Errorf("plugin is not enabled") + } + return s.plugin.ExpireJob(jobID, reason) +} + // ListPluginActivities returns plugin job activities for monitoring. func (s *AdminServer) ListPluginActivities(jobType string, limit int) []adminplugin.JobActivity { if s.plugin == nil { diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 65197a3fd..e97325647 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -130,6 +131,47 @@ func (s *AdminServer) GetPluginJobDetailAPI(w http.ResponseWriter, r *http.Reque writeJSON(w, http.StatusOK, detail) } +// ExpirePluginJobAPI marks a job as failed so it no longer blocks scheduling. +func (s *AdminServer) ExpirePluginJobAPI(w http.ResponseWriter, r *http.Request) { + jobID := strings.TrimSpace(mux.Vars(r)["jobId"]) + if jobID == "" { + writeJSONError(w, http.StatusBadRequest, "jobId is required") + return + } + + var req struct { + Reason string `json:"reason"` + } + + if err := decodeJSONBody(newJSONMaxReader(w, r), &req); err != nil && err != io.EOF { + writeJSONError(w, http.StatusBadRequest, "invalid request body: "+err.Error()) + return + } + + job, expired, err := s.ExpirePluginJob(jobID, req.Reason) + if err != nil { + if errors.Is(err, plugin.ErrJobNotFound) { + writeJSONError(w, http.StatusNotFound, err.Error()) + return + } + writeJSONError(w, http.StatusInternalServerError, err.Error()) + return + } + + response := map[string]interface{}{ + "job_id": jobID, + "expired": expired, + } + if job != nil { + response["job"] = job + } + if !expired { + response["message"] = "job is not active" + } + + writeJSON(w, http.StatusOK, response) +} + // GetPluginActivitiesAPI returns recent plugin activities. func (s *AdminServer) GetPluginActivitiesAPI(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() diff --git a/weed/admin/dash/plugin_api_test.go b/weed/admin/dash/plugin_api_test.go index c4f1b74e9..c6fcbb028 100644 --- a/weed/admin/dash/plugin_api_test.go +++ b/weed/admin/dash/plugin_api_test.go @@ -1,11 +1,120 @@ package dash import ( + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" "testing" + "github.com/gorilla/mux" + "github.com/seaweedfs/seaweedfs/weed/admin/plugin" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" ) +func TestExpirePluginJobAPI(t *testing.T) { + makeRequest := func(adminServer *AdminServer, jobID string, body io.Reader) *httptest.ResponseRecorder { + req := httptest.NewRequest(http.MethodPost, "/api/plugin/jobs/"+jobID+"/expire", body) + req = mux.SetURLVars(req, map[string]string{"jobId": jobID}) + recorder := httptest.NewRecorder() + adminServer.ExpirePluginJobAPI(recorder, req) + return recorder + } + + t.Run("empty job id", func(t *testing.T) { + recorder := makeRequest(&AdminServer{}, "", nil) + if recorder.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d", recorder.Code) + } + }) + + t.Run("invalid json", func(t *testing.T) { + recorder := makeRequest(&AdminServer{}, "job-id", strings.NewReader("{")) + if recorder.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d", recorder.Code) + } + }) + + t.Run("job not found", func(t *testing.T) { + adminServer := &AdminServer{ + expireJobHandler: func(jobID, reason string) (*plugin.TrackedJob, bool, error) { + return nil, false, plugin.ErrJobNotFound + }, + } + recorder := makeRequest(adminServer, "missing", strings.NewReader(`{"reason":"nope"}`)) + if recorder.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d", recorder.Code) + } + var payload map[string]any + if err := json.Unmarshal(recorder.Body.Bytes(), &payload); err != nil { + t.Fatalf("failed to unmarshal body: %v", err) + } + if payload["error"] == nil { + t.Fatalf("expected error payload, got %v", payload) + } + }) + + t.Run("successful expire", func(t *testing.T) { + expected := &plugin.TrackedJob{JobID: "foo", State: "assigned"} + adminServer := &AdminServer{ + expireJobHandler: func(jobID, reason string) (*plugin.TrackedJob, bool, error) { + if jobID != "foo" { + return nil, false, errors.New("unexpected") + } + return expected, true, nil + }, + } + recorder := makeRequest(adminServer, "foo", strings.NewReader(`{"reason":"cleanup"}`)) + if recorder.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", recorder.Code) + } + var payload map[string]any + if err := json.Unmarshal(recorder.Body.Bytes(), &payload); err != nil { + t.Fatalf("failed to decode payload: %v", err) + } + if payload["job_id"] != "foo" { + t.Fatalf("expected job_id foo, got %v", payload["job_id"]) + } + if expired, ok := payload["expired"].(bool); !ok || !expired { + t.Fatalf("expected expired=true, got %v", payload["expired"]) + } + jobData, ok := payload["job"].(map[string]any) + if !ok || jobData["job_id"] != "foo" { + t.Fatalf("expected job info with job_id, got %v", payload["job"]) + } + }) + + t.Run("non-active job", func(t *testing.T) { + adminServer := &AdminServer{ + expireJobHandler: func(jobID, reason string) (*plugin.TrackedJob, bool, error) { + return nil, false, nil + }, + } + recorder := makeRequest(adminServer, "bar", strings.NewReader(`{"reason":"ignore"}`)) + if recorder.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", recorder.Code) + } + var payload map[string]any + if err := json.Unmarshal(recorder.Body.Bytes(), &payload); err != nil { + t.Fatalf("failed to decode payload: %v", err) + } + if payload["job_id"] != "bar" { + t.Fatalf("expected job_id bar, got %v", payload["job_id"]) + } + if expired, ok := payload["expired"].(bool); !ok || expired { + t.Fatalf("expected expired=false, got %v", payload["expired"]) + } + if payload["message"] != "job is not active" { + t.Fatalf("expected message job is not active, got %v", payload["message"]) + } + if _, exists := payload["job"]; exists { + t.Fatalf("expected no job payload for non-active job, got %v", payload["job"]) + } + }) +} + func TestBuildJobSpecFromProposalDoesNotReuseProposalID(t *testing.T) { t.Parallel() diff --git a/weed/admin/handlers/admin_handlers.go b/weed/admin/handlers/admin_handlers.go index 357a30129..b548d329f 100644 --- a/weed/admin/handlers/admin_handlers.go +++ b/weed/admin/handlers/admin_handlers.go @@ -242,6 +242,7 @@ func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) { pluginApi.Handle("/job-types/{jobType}/detect", wrapWrite(h.adminServer.TriggerPluginDetectionAPI)).Methods(http.MethodPost) pluginApi.Handle("/job-types/{jobType}/run", wrapWrite(h.adminServer.RunPluginJobTypeAPI)).Methods(http.MethodPost) pluginApi.Handle("/jobs/execute", wrapWrite(h.adminServer.ExecutePluginJobAPI)).Methods(http.MethodPost) + pluginApi.Handle("/jobs/{jobId}/expire", wrapWrite(h.adminServer.ExpirePluginJobAPI)).Methods(http.MethodPost) mqApi := api.PathPrefix("/mq").Subrouter() mqApi.HandleFunc("/topics/{namespace}/{topic}", h.mqHandlers.GetTopicDetailsAPI).Methods(http.MethodGet) diff --git a/weed/admin/handlers/admin_handlers_routes_test.go b/weed/admin/handlers/admin_handlers_routes_test.go index ab33922e3..ff4632bf6 100644 --- a/weed/admin/handlers/admin_handlers_routes_test.go +++ b/weed/admin/handlers/admin_handlers_routes_test.go @@ -21,6 +21,9 @@ func TestSetupRoutes_RegistersPluginSchedulerStatesAPI_NoAuth(t *testing.T) { if !hasRoute(router, http.MethodGet, "/api/plugin/jobs/example/detail") { t.Fatalf("expected GET /api/plugin/jobs/:jobId/detail to be registered in no-auth mode") } + if !hasRoute(router, http.MethodPost, "/api/plugin/jobs/example/expire") { + t.Fatalf("expected POST /api/plugin/jobs/:jobId/expire to be registered in no-auth mode") + } } func TestSetupRoutes_RegistersPluginSchedulerStatesAPI_WithAuth(t *testing.T) { @@ -34,6 +37,9 @@ func TestSetupRoutes_RegistersPluginSchedulerStatesAPI_WithAuth(t *testing.T) { if !hasRoute(router, http.MethodGet, "/api/plugin/jobs/example/detail") { t.Fatalf("expected GET /api/plugin/jobs/:jobId/detail to be registered in auth mode") } + if !hasRoute(router, http.MethodPost, "/api/plugin/jobs/example/expire") { + t.Fatalf("expected POST /api/plugin/jobs/:jobId/expire to be registered in auth mode") + } } func TestSetupRoutes_RegistersPluginPages_NoAuth(t *testing.T) { diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index c1175b931..690777af3 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -80,6 +80,8 @@ type Plugin struct { jobsMu sync.RWMutex jobs map[string]*TrackedJob + // serialize stale job cleanup to avoid duplicate expirations + staleJobsMu sync.Mutex jobDetailsMu sync.Mutex diff --git a/weed/admin/plugin/plugin_monitor.go b/weed/admin/plugin/plugin_monitor.go index 8ced147ae..c66252518 100644 --- a/weed/admin/plugin/plugin_monitor.go +++ b/weed/admin/plugin/plugin_monitor.go @@ -2,6 +2,7 @@ package plugin import ( "encoding/json" + "fmt" "sort" "strings" "time" @@ -9,12 +10,18 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( maxTrackedJobsTotal = 1000 maxActivityRecords = 4000 maxRelatedJobs = 100 + + // stale active jobs block dedupe and scheduling; use generous defaults to + // avoid expiring legitimate long-running tasks. + defaultStaleActiveJobTimeout = 24 * time.Hour + defaultOrphanedActiveJobTimeout = 15 * time.Minute ) var ( @@ -23,6 +30,14 @@ var ( StateCanceled = strings.ToLower(plugin_pb.JobState_JOB_STATE_CANCELED.String()) ) +type activeJobSnapshot struct { + jobID string + jobType string + workerID string + requestID string + lastUpdate time.Time +} + // activityLess reports whether activity a occurred after activity b (newest-first order). // A nil OccurredAt is treated as the zero time. func activityLess(a, b JobActivity) bool { @@ -54,6 +69,13 @@ func (r *Plugin) loadPersistedMonitorState() error { if strings.TrimSpace(job.JobID) == "" { continue } + if isActiveTrackedJobState(job.State) { + if detail, detailErr := r.store.LoadJobDetail(job.JobID); detailErr != nil { + glog.Warningf("Plugin failed to load detail snapshot for job %s: %v", job.JobID, detailErr) + } else if detail != nil { + mergeTerminalDetailIntoTracked(&job, detail) + } + } // Backward compatibility: migrate older inline detail payloads // out of tracked_jobs.json into dedicated per-job detail files. if hasTrackedJobRichDetails(job) { @@ -81,6 +103,265 @@ func (r *Plugin) loadPersistedMonitorState() error { return nil } +// ExpireJob marks an active job as failed so it no longer blocks scheduling. +func (r *Plugin) ExpireJob(jobID, reason string) (*TrackedJob, bool, error) { + normalizedJobID := strings.TrimSpace(jobID) + if normalizedJobID == "" { + return nil, false, ErrJobNotFound + } + + reason = strings.TrimSpace(reason) + if reason == "" { + reason = "job expired by admin request" + } + + var jobType string + var requestID string + active := false + + r.jobsMu.RLock() + if tracked := r.jobs[normalizedJobID]; tracked != nil { + jobType = tracked.JobType + requestID = tracked.RequestID + active = isActiveTrackedJobState(tracked.State) + } + r.jobsMu.RUnlock() + + if jobType == "" || requestID == "" || !active { + if detail, err := r.store.LoadJobDetail(normalizedJobID); err != nil { + return nil, false, err + } else if detail != nil { + if jobType == "" { + jobType = detail.JobType + } + if requestID == "" { + requestID = detail.RequestID + } + if !active && isActiveTrackedJobState(detail.State) { + active = true + } + } + } + + if jobType == "" { + return nil, false, ErrJobNotFound + } + + if !active { + current, _ := r.GetTrackedJob(normalizedJobID) + if current == nil { + if detail, err := r.store.LoadJobDetail(normalizedJobID); err == nil && detail != nil { + clone := cloneTrackedJob(*detail) + current = &clone + } + } + return current, false, nil + } + + now := time.Now().UTC() + r.handleJobCompleted(&plugin_pb.JobCompleted{ + JobId: normalizedJobID, + JobType: jobType, + RequestId: requestID, + Success: false, + ErrorMessage: reason, + CompletedAt: timestamppb.New(now), + }) + r.appendActivity(JobActivity{ + JobID: normalizedJobID, + JobType: jobType, + RequestID: requestID, + Source: "admin_expire", + Message: reason, + Stage: "expired", + OccurredAt: timeToPtr(now), + }) + + updated, _ := r.GetTrackedJob(normalizedJobID) + return updated, true, nil +} + +// expireStaleJobs marks stale active jobs as failed so they stop blocking new work. +func (r *Plugin) expireStaleJobs(now time.Time) int { + if now.IsZero() { + now = time.Now().UTC() + } + + r.staleJobsMu.Lock() + defer r.staleJobsMu.Unlock() + + snapshots := r.snapshotActiveJobs() + if len(snapshots) == 0 { + return 0 + } + + expired := 0 + for _, snap := range snapshots { + if snap.lastUpdate.IsZero() { + continue + } + if stale, _, _ := r.evaluateStaleJob(now, snap.workerID, snap.lastUpdate); !stale { + continue + } + + reason := r.confirmStaleReason(now, snap.jobID) + if reason == "" { + continue + } + + r.handleJobCompleted(&plugin_pb.JobCompleted{ + JobId: snap.jobID, + JobType: snap.jobType, + RequestId: snap.requestID, + Success: false, + ErrorMessage: reason, + CompletedAt: timestamppb.New(now), + }) + expired++ + } + + return expired +} + +func (r *Plugin) snapshotActiveJobs() []activeJobSnapshot { + r.jobsMu.RLock() + defer r.jobsMu.RUnlock() + + if len(r.jobs) == 0 { + return nil + } + + out := make([]activeJobSnapshot, 0, len(r.jobs)) + for _, job := range r.jobs { + if job == nil { + continue + } + if !isActiveTrackedJobState(job.State) { + continue + } + out = append(out, activeJobSnapshot{ + jobID: job.JobID, + jobType: job.JobType, + workerID: job.WorkerID, + requestID: job.RequestID, + lastUpdate: jobLastUpdated(job), + }) + } + return out +} + +func jobLastUpdated(job *TrackedJob) time.Time { + if job == nil { + return time.Time{} + } + if job.UpdatedAt != nil && !job.UpdatedAt.IsZero() { + return *job.UpdatedAt + } + if job.CreatedAt != nil && !job.CreatedAt.IsZero() { + return *job.CreatedAt + } + return time.Time{} +} + +func (r *Plugin) evaluateStaleJob(now time.Time, workerID string, lastUpdate time.Time) (bool, time.Duration, string) { + if lastUpdate.IsZero() { + return false, 0, "" + } + + timeout := defaultStaleActiveJobTimeout + reason := fmt.Sprintf("job expired after %s without progress", timeout) + + workerID = strings.TrimSpace(workerID) + if workerID == "" { + reason = fmt.Sprintf("job expired after %s without executor assignment", timeout) + } else if !r.isWorkerAvailable(workerID) { + timeout = defaultOrphanedActiveJobTimeout + reason = fmt.Sprintf("job expired after %s without worker heartbeat (worker=%s)", timeout, workerID) + } + + if now.Sub(lastUpdate) < timeout { + return false, timeout, reason + } + return true, timeout, reason +} + +func (r *Plugin) confirmStaleReason(now time.Time, jobID string) string { + r.jobsMu.RLock() + job := r.jobs[jobID] + if job == nil || !isActiveTrackedJobState(job.State) { + r.jobsMu.RUnlock() + return "" + } + lastUpdate := jobLastUpdated(job) + workerID := job.WorkerID + r.jobsMu.RUnlock() + + stale, _, reason := r.evaluateStaleJob(now, workerID, lastUpdate) + if !stale { + return "" + } + return reason +} + +func (r *Plugin) isWorkerAvailable(workerID string) bool { + workerID = strings.TrimSpace(workerID) + if workerID == "" { + return false + } + _, ok := r.registry.Get(workerID) + return ok +} + +func isTerminalTrackedJobState(state string) bool { + normalized := strings.ToLower(strings.TrimSpace(state)) + switch normalized { + case StateSucceeded, StateFailed, StateCanceled: + return true + default: + return false + } +} + +func mergeTerminalDetailIntoTracked(tracked *TrackedJob, detail *TrackedJob) { + if tracked == nil || detail == nil { + return + } + if !isTerminalTrackedJobState(detail.State) { + return + } + if !isActiveTrackedJobState(tracked.State) { + return + } + + if detail.State != "" { + tracked.State = detail.State + } + if detail.Progress != 0 { + tracked.Progress = detail.Progress + } + if detail.Stage != "" { + tracked.Stage = detail.Stage + } + if detail.Message != "" { + tracked.Message = detail.Message + } + if detail.ErrorMessage != "" { + tracked.ErrorMessage = detail.ErrorMessage + } + if detail.ResultSummary != "" { + tracked.ResultSummary = detail.ResultSummary + } + if detail.CompletedAt != nil && !detail.CompletedAt.IsZero() { + tracked.CompletedAt = detail.CompletedAt + } + if detail.UpdatedAt != nil && !detail.UpdatedAt.IsZero() { + tracked.UpdatedAt = detail.UpdatedAt + } + if tracked.UpdatedAt == nil && tracked.CompletedAt != nil { + tracked.UpdatedAt = tracked.CompletedAt + } +} + func (r *Plugin) ListTrackedJobs(jobType string, state string, limit int) []TrackedJob { r.jobsMu.RLock() defer r.jobsMu.RUnlock() diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index e825e8069..040140413 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -61,6 +61,8 @@ func (r *Plugin) schedulerLoop() { } func (r *Plugin) runSchedulerTick() { + r.expireStaleJobs(time.Now().UTC()) + jobTypes := r.registry.DetectableJobTypes() if len(jobTypes) == 0 { return @@ -839,11 +841,16 @@ func waitForShutdownOrTimer(shutdown <-chan struct{}, duration time.Duration) bo } } +// filterProposalsWithActiveJobs removes proposals whose dedupe keys already have active jobs. +// It first expires stale tracked jobs via expireStaleJobs, which can mutate scheduler state, +// so callers should treat this method as a stateful operation. func (r *Plugin) filterProposalsWithActiveJobs(jobType string, proposals []*plugin_pb.JobProposal) ([]*plugin_pb.JobProposal, int) { if len(proposals) == 0 { return proposals, 0 } + r.expireStaleJobs(time.Now().UTC()) + activeKeys := make(map[string]struct{}) r.jobsMu.RLock() for _, job := range r.jobs { diff --git a/weed/admin/plugin/types.go b/weed/admin/plugin/types.go index 6a57a563c..cc23e17fe 100644 --- a/weed/admin/plugin/types.go +++ b/weed/admin/plugin/types.go @@ -1,6 +1,9 @@ package plugin -import "time" +import ( + "errors" + "time" +) const ( // Keep exactly the last 10 successful and last 10 error runs per job type. @@ -8,6 +11,8 @@ const ( MaxErrorRunHistory = 10 ) +var ErrJobNotFound = errors.New("job not found") + type RunOutcome string const ( diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index 252777286..5794f452c 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -485,6 +485,11 @@ templ Plugin(page string) {