From 18ccc9b773d84aa641d1050b2ad0ae133db71011 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 3 Mar 2026 23:09:49 -0800 Subject: [PATCH] Plugin scheduler: sequential iterations with max runtime (#8496) * pb: add job type max runtime setting * plugin: default job type max runtime * plugin: redesign scheduler loop * admin ui: update scheduler settings * plugin: fix scheduler loop state name * plugin scheduler: restore backlog skip * plugin scheduler: drop legacy detection helper * admin api: require scheduler config body * admin ui: preserve detection interval on save * plugin scheduler: use job context and drain cancels * plugin scheduler: respect detection intervals * plugin scheduler: gate runs and drain queue * ec test: reuse req/resp vars * ec test: add scheduler debug logs * Adjust scheduler idle sleep and initial run delay * Clear pending job queue before scheduler runs * Log next detection time in EC integration test * Improve plugin scheduler debug logging in EC test * Expose scheduler next detection time * Log scheduler next detection time in EC test * Wake scheduler on config or worker updates * Expose scheduler sleep interval in UI * Fix scheduler sleep save value selection * Set scheduler idle sleep default to 613s * Show scheduler next run time in plugin UI --------- Co-authored-by: Copilot --- .../admin_dockertest/ec_integration_test.go | 164 ++++- weed/admin/dash/plugin_api.go | 50 ++ weed/admin/handlers/admin_handlers.go | 2 + weed/admin/plugin/config_store.go | 56 ++ weed/admin/plugin/plugin.go | 57 +- weed/admin/plugin/plugin_monitor.go | 73 +++ weed/admin/plugin/plugin_scheduler.go | 613 +++++++++++++----- weed/admin/plugin/plugin_scheduler_test.go | 23 +- weed/admin/plugin/scheduler_config.go | 31 + weed/admin/plugin/scheduler_status.go | 112 ++++ weed/admin/plugin/types.go | 4 + weed/admin/view/app/plugin.templ | 211 +++++- weed/admin/view/app/plugin_templ.go | 4 +- weed/pb/plugin.proto | 2 + weed/pb/plugin_pb/plugin.pb.go | 24 +- weed/plugin/worker/admin_script_handler.go | 1 + weed/plugin/worker/erasure_coding_handler.go | 1 + weed/plugin/worker/vacuum_handler.go | 1 + weed/plugin/worker/volume_balance_handler.go | 1 + 19 files changed, 1240 insertions(+), 190 deletions(-) create mode 100644 weed/admin/plugin/scheduler_config.go diff --git a/test/erasure_coding/admin_dockertest/ec_integration_test.go b/test/erasure_coding/admin_dockertest/ec_integration_test.go index 21f872ea1..28856f4f1 100644 --- a/test/erasure_coding/admin_dockertest/ec_integration_test.go +++ b/test/erasure_coding/admin_dockertest/ec_integration_test.go @@ -153,6 +153,38 @@ func waitForUrl(t *testing.T, url string, retries int) { t.Fatalf("Timeout waiting for %s", url) } +func fetchJSON(url string, out interface{}) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("status %d: %s", resp.StatusCode, string(body)) + } + return json.NewDecoder(resp.Body).Decode(out) +} + +func mapField(obj map[string]interface{}, key string) (interface{}, bool) { + if obj == nil { + return nil, false + } + if value, ok := obj[key]; ok { + return value, true + } + return nil, false +} + +func mapFieldAny(obj map[string]interface{}, keys ...string) (interface{}, bool) { + for _, key := range keys { + if value, ok := mapField(obj, key); ok { + return value, true + } + } + return nil, false +} + func TestEcEndToEnd(t *testing.T) { defer cleanup() ensureEnvironment(t) @@ -162,6 +194,28 @@ func TestEcEndToEnd(t *testing.T) { // 1. Configure plugin job types for fast EC detection/execution. t.Log("Configuring plugin job types via API...") + schedulerConfig := map[string]interface{}{ + "idle_sleep_seconds": 1, + } + jsonBody, err := json.Marshal(schedulerConfig) + if err != nil { + t.Fatalf("Failed to marshal scheduler config: %v", err) + } + req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/scheduler-config", bytes.NewBuffer(jsonBody)) + if err != nil { + t.Fatalf("Failed to create scheduler config request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Failed to update scheduler config: %v", err) + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("Failed to update scheduler config (status %d): %s", resp.StatusCode, string(body)) + } + resp.Body.Close() + // Disable volume balance to reduce interference for this EC-focused test. balanceConfig := map[string]interface{}{ "job_type": "volume_balance", @@ -169,16 +223,16 @@ func TestEcEndToEnd(t *testing.T) { "enabled": false, }, } - jsonBody, err := json.Marshal(balanceConfig) + jsonBody, err = json.Marshal(balanceConfig) if err != nil { t.Fatalf("Failed to marshal volume_balance config: %v", err) } - req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody)) + req, err = http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody)) if err != nil { t.Fatalf("Failed to create volume_balance config request: %v", err) } req.Header.Set("Content-Type", "application/json") - resp, err := client.Do(req) + resp, err = client.Do(req) if err != nil { t.Fatalf("Failed to update volume_balance config: %v", err) } @@ -275,6 +329,7 @@ func TestEcEndToEnd(t *testing.T) { startTime := time.Now() ecVerified := false var lastBody []byte + debugTick := 0 for time.Since(startTime) < 300*time.Second { // 3.1 Check Master Topology @@ -300,25 +355,104 @@ func TestEcEndToEnd(t *testing.T) { } } - // 3.2 Debug: Check workers and jobs - wResp, wErr := http.Get(AdminUrl + "/api/plugin/workers") + // 3.2 Debug: Check workers, jobs, and scheduler status + debugTick++ + + var workers []map[string]interface{} workerCount := 0 - if wErr == nil { - var workers []interface{} - json.NewDecoder(wResp.Body).Decode(&workers) - wResp.Body.Close() + ecDetectorCount := 0 + ecExecutorCount := 0 + if err := fetchJSON(AdminUrl+"/api/plugin/workers", &workers); err == nil { workerCount = len(workers) + for _, worker := range workers { + capsValue, ok := mapFieldAny(worker, "capabilities", "Capabilities") + if !ok { + continue + } + caps, ok := capsValue.(map[string]interface{}) + if !ok { + continue + } + if capValue, ok := caps["erasure_coding"].(map[string]interface{}); ok { + if capValue["can_detect"] == true { + ecDetectorCount++ + } + if capValue["can_execute"] == true { + ecExecutorCount++ + } + } + } } - tResp, tErr := http.Get(AdminUrl + "/api/plugin/jobs?limit=1000") + var tasks []map[string]interface{} taskCount := 0 - if tErr == nil { - var tasks []interface{} - json.NewDecoder(tResp.Body).Decode(&tasks) - tResp.Body.Close() + ecTaskCount := 0 + ecTaskStates := map[string]int{} + if err := fetchJSON(AdminUrl+"/api/plugin/jobs?limit=1000", &tasks); err == nil { taskCount = len(tasks) + for _, task := range tasks { + jobType, _ := task["job_type"].(string) + state, _ := task["state"].(string) + if jobType == "erasure_coding" { + ecTaskCount++ + ecTaskStates[state]++ + } + } + } + + t.Logf("Waiting for EC... (Workers: %d det=%d exec=%d, Tasks: %d ec=%d, EC States: %+v)", + workerCount, ecDetectorCount, ecExecutorCount, taskCount, ecTaskCount, ecTaskStates) + + if debugTick%3 == 0 { + var pluginStatus map[string]interface{} + if err := fetchJSON(AdminUrl+"/api/plugin/status", &pluginStatus); err == nil { + t.Logf("Plugin status: enabled=%v worker_count=%v worker_grpc_port=%v configured=%v", + pluginStatus["enabled"], pluginStatus["worker_count"], pluginStatus["worker_grpc_port"], pluginStatus["configured"]) + } + + var schedulerStatus map[string]interface{} + if err := fetchJSON(AdminUrl+"/api/plugin/scheduler-status", &schedulerStatus); err == nil { + if schedValue, ok := schedulerStatus["scheduler"].(map[string]interface{}); ok { + t.Logf("Scheduler status: current_job_type=%v phase=%v last_iteration_had_jobs=%v idle_sleep_seconds=%v last_iteration_done_at=%v next_detection_at=%v", + schedValue["current_job_type"], schedValue["current_phase"], + schedValue["last_iteration_had_jobs"], schedValue["idle_sleep_seconds"], schedValue["last_iteration_done_at"], schedValue["next_detection_at"]) + } else { + t.Logf("Scheduler status: %v", schedulerStatus) + } + } + + var schedulerStates []map[string]interface{} + if err := fetchJSON(AdminUrl+"/api/plugin/scheduler-states", &schedulerStates); err == nil { + for _, state := range schedulerStates { + if state["job_type"] == "erasure_coding" { + t.Logf("EC scheduler state: enabled=%v detection_in_flight=%v detector_available=%v executor_workers=%v next_detection_at=%v last_run_status=%v last_run_started_at=%v last_run_completed_at=%v", + state["enabled"], state["detection_in_flight"], state["detector_available"], + state["executor_worker_count"], state["next_detection_at"], state["last_run_status"], + state["last_run_started_at"], state["last_run_completed_at"]) + break + } + } + } + + var jobTypes []map[string]interface{} + if err := fetchJSON(AdminUrl+"/api/plugin/job-types", &jobTypes); err == nil { + var names []string + for _, jobType := range jobTypes { + if name, ok := jobType["job_type"].(string); ok && name != "" { + names = append(names, name) + } + } + t.Logf("Plugin job types: %v", names) + } + + var activities []map[string]interface{} + if err := fetchJSON(AdminUrl+"/api/plugin/activities?job_type=erasure_coding&limit=5", &activities); err == nil { + for i := len(activities) - 1; i >= 0; i-- { + act := activities[i] + t.Logf("EC activity: stage=%v message=%v occurred_at=%v", act["stage"], act["message"], act["occurred_at"]) + } + } } - t.Logf("Waiting for EC... (Workers: %d, Active Tasks: %d)", workerCount, taskCount) time.Sleep(10 * time.Second) } diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 79d78e254..632800405 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -235,6 +235,53 @@ func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http writeJSON(w, http.StatusOK, response) } +// GetPluginSchedulerConfigAPI returns scheduler configuration. +func (s *AdminServer) GetPluginSchedulerConfigAPI(w http.ResponseWriter, r *http.Request) { + pluginSvc := s.GetPlugin() + if pluginSvc == nil { + writeJSONError(w, http.StatusNotFound, "plugin is not enabled") + return + } + + writeJSON(w, http.StatusOK, pluginSvc.GetSchedulerConfig()) +} + +// UpdatePluginSchedulerConfigAPI updates scheduler configuration. +func (s *AdminServer) UpdatePluginSchedulerConfigAPI(w http.ResponseWriter, r *http.Request) { + pluginSvc := s.GetPlugin() + if pluginSvc == nil { + writeJSONError(w, http.StatusNotFound, "plugin is not enabled") + return + } + + var req struct { + IdleSleepSeconds *int32 `json:"idle_sleep_seconds"` + } + + if err := decodeJSONBody(newJSONMaxReader(w, r), &req); err != nil { + if errors.Is(err, io.EOF) { + writeJSONError(w, http.StatusBadRequest, "request body is required") + return + } + writeJSONError(w, http.StatusBadRequest, "invalid request body: "+err.Error()) + return + } + if req.IdleSleepSeconds == nil { + writeJSONError(w, http.StatusBadRequest, "idle_sleep_seconds is required") + return + } + + updated, err := pluginSvc.UpdateSchedulerConfig(plugin.SchedulerConfig{ + IdleSleepSeconds: *req.IdleSleepSeconds, + }) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusOK, updated) +} + // RequestPluginJobTypeSchemaAPI asks a worker for one job type schema. func (s *AdminServer) RequestPluginJobTypeSchemaAPI(w http.ResponseWriter, r *http.Request) { jobType := strings.TrimSpace(mux.Vars(r)["jobType"]) @@ -867,6 +914,9 @@ func applyDescriptorDefaultsToPersistedConfig( if runtime.PerWorkerExecutionConcurrency <= 0 { runtime.PerWorkerExecutionConcurrency = defaults.PerWorkerExecutionConcurrency } + if runtime.JobTypeMaxRuntimeSeconds <= 0 { + runtime.JobTypeMaxRuntimeSeconds = defaults.JobTypeMaxRuntimeSeconds + } if runtime.RetryBackoffSeconds <= 0 { runtime.RetryBackoffSeconds = defaults.RetryBackoffSeconds } diff --git a/weed/admin/handlers/admin_handlers.go b/weed/admin/handlers/admin_handlers.go index 3ce607150..ff0d8651a 100644 --- a/weed/admin/handlers/admin_handlers.go +++ b/weed/admin/handlers/admin_handlers.go @@ -229,6 +229,8 @@ func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) { pluginApi.HandleFunc("/status", h.adminServer.GetPluginStatusAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/workers", h.adminServer.GetPluginWorkersAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/job-types", h.adminServer.GetPluginJobTypesAPI).Methods(http.MethodGet) + pluginApi.HandleFunc("/scheduler-config", h.adminServer.GetPluginSchedulerConfigAPI).Methods(http.MethodGet) + pluginApi.Handle("/scheduler-config", wrapWrite(h.adminServer.UpdatePluginSchedulerConfigAPI)).Methods(http.MethodPut) pluginApi.HandleFunc("/jobs", h.adminServer.GetPluginJobsAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/jobs/{jobId}", h.adminServer.GetPluginJobAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/jobs/{jobId}/detail", h.adminServer.GetPluginJobDetailAPI).Methods(http.MethodGet) diff --git a/weed/admin/plugin/config_store.go b/weed/admin/plugin/config_store.go index 263050d84..9a2b8484e 100644 --- a/weed/admin/plugin/config_store.go +++ b/weed/admin/plugin/config_store.go @@ -30,6 +30,7 @@ const ( runsJSONFileName = "runs.json" trackedJobsJSONFileName = "tracked_jobs.json" activitiesJSONFileName = "activities.json" + schedulerJSONFileName = "scheduler.json" defaultDirPerm = 0o755 defaultFilePerm = 0o644 ) @@ -53,6 +54,7 @@ type ConfigStore struct { memTrackedJobs []TrackedJob memActivities []JobActivity memJobDetails map[string]TrackedJob + memScheduler *SchedulerConfig } func NewConfigStore(adminDataDir string) (*ConfigStore, error) { @@ -93,6 +95,60 @@ func (s *ConfigStore) BaseDir() string { return s.baseDir } +func (s *ConfigStore) LoadSchedulerConfig() (*SchedulerConfig, error) { + s.mu.RLock() + if !s.configured { + cfg := s.memScheduler + s.mu.RUnlock() + if cfg == nil { + return nil, nil + } + clone := *cfg + return &clone, nil + } + s.mu.RUnlock() + + path := filepath.Join(s.baseDir, schedulerJSONFileName) + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("read scheduler config: %w", err) + } + + var cfg SchedulerConfig + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("unmarshal scheduler config: %w", err) + } + return &cfg, nil +} + +func (s *ConfigStore) SaveSchedulerConfig(config *SchedulerConfig) error { + if config == nil { + return fmt.Errorf("scheduler config is nil") + } + normalized := normalizeSchedulerConfig(*config) + + s.mu.Lock() + if !s.configured { + s.memScheduler = &normalized + s.mu.Unlock() + return nil + } + s.mu.Unlock() + + payload, err := json.MarshalIndent(normalized, "", " ") + if err != nil { + return fmt.Errorf("marshal scheduler config: %w", err) + } + path := filepath.Join(s.baseDir, schedulerJSONFileName) + if err := os.WriteFile(path, payload, defaultFilePerm); err != nil { + return fmt.Errorf("save scheduler config: %w", err) + } + return nil +} + func (s *ConfigStore) SaveDescriptor(jobType string, descriptor *plugin_pb.JobTypeDescriptor) error { if descriptor == nil { return fmt.Errorf("descriptor is nil") diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 6ec49612c..aecf44757 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -68,6 +68,13 @@ type Plugin struct { adminScriptRunMu sync.RWMutex schedulerDetectionMu sync.Mutex schedulerDetection map[string]*schedulerDetectionInfo + schedulerRunMu sync.Mutex + schedulerRun map[string]*schedulerRunInfo + schedulerLoopMu sync.Mutex + schedulerLoopState schedulerLoopState + schedulerConfigMu sync.RWMutex + schedulerConfig SchedulerConfig + schedulerWakeCh chan struct{} dedupeMu sync.Mutex recentDedupeByType map[string]map[string]time.Time @@ -164,14 +171,31 @@ func New(options Options) (*Plugin, error) { detectorLeases: make(map[string]string), schedulerExecReservations: make(map[string]int), schedulerDetection: make(map[string]*schedulerDetectionInfo), + schedulerRun: make(map[string]*schedulerRunInfo), recentDedupeByType: make(map[string]map[string]time.Time), jobs: make(map[string]*TrackedJob), activities: make([]JobActivity, 0, 256), persistTicker: time.NewTicker(2 * time.Second), + schedulerWakeCh: make(chan struct{}, 1), shutdownCh: make(chan struct{}), } plugin.ctx, plugin.ctxCancel = context.WithCancel(context.Background()) + if cfg, err := plugin.store.LoadSchedulerConfig(); err != nil { + glog.Warningf("Plugin failed to load scheduler config: %v", err) + plugin.schedulerConfig = DefaultSchedulerConfig() + } else if cfg == nil { + defaults := DefaultSchedulerConfig() + plugin.schedulerConfig = defaults + if plugin.store.IsConfigured() { + if err := plugin.store.SaveSchedulerConfig(&defaults); err != nil { + glog.Warningf("Plugin failed to persist scheduler defaults: %v", err) + } + } + } else { + plugin.schedulerConfig = normalizeSchedulerConfig(*cfg) + } + if err := plugin.loadPersistedMonitorState(); err != nil { glog.Warningf("Plugin failed to load persisted monitoring state: %v", err) } @@ -371,7 +395,11 @@ func (r *Plugin) LoadJobTypeConfig(jobType string) (*plugin_pb.PersistedJobTypeC } func (r *Plugin) SaveJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig) error { - return r.store.SaveJobTypeConfig(config) + if err := r.store.SaveJobTypeConfig(config); err != nil { + return err + } + r.wakeScheduler() + return nil } func (r *Plugin) LoadDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) { @@ -390,6 +418,31 @@ func (r *Plugin) BaseDir() string { return r.store.BaseDir() } +func (r *Plugin) GetSchedulerConfig() SchedulerConfig { + if r == nil { + return DefaultSchedulerConfig() + } + r.schedulerConfigMu.RLock() + cfg := r.schedulerConfig + r.schedulerConfigMu.RUnlock() + return normalizeSchedulerConfig(cfg) +} + +func (r *Plugin) UpdateSchedulerConfig(cfg SchedulerConfig) (SchedulerConfig, error) { + if r == nil { + return DefaultSchedulerConfig(), fmt.Errorf("plugin is not initialized") + } + normalized := normalizeSchedulerConfig(cfg) + if err := r.store.SaveSchedulerConfig(&normalized); err != nil { + return SchedulerConfig{}, err + } + r.schedulerConfigMu.Lock() + r.schedulerConfig = normalized + r.schedulerConfigMu.Unlock() + r.wakeScheduler() + return normalized, nil +} + func (r *Plugin) acquireAdminLock(reason string) (func(), error) { if r == nil || r.lockManager == nil { return func() {}, nil @@ -912,6 +965,7 @@ func (r *Plugin) handleWorkerMessage(workerID string, message *plugin_pb.WorkerT switch body := message.Body.(type) { case *plugin_pb.WorkerToAdminMessage_Hello: r.registry.UpsertFromHello(body.Hello) + r.wakeScheduler() case *plugin_pb.WorkerToAdminMessage_Heartbeat: r.registry.UpdateHeartbeat(workerID, body.Heartbeat) case *plugin_pb.WorkerToAdminMessage_ConfigSchemaResponse: @@ -1011,6 +1065,7 @@ func (r *Plugin) ensureJobTypeConfigFromDescriptor(jobType string, descriptor *p PerWorkerExecutionConcurrency: defaults.PerWorkerExecutionConcurrency, RetryLimit: defaults.RetryLimit, RetryBackoffSeconds: defaults.RetryBackoffSeconds, + JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds, } } diff --git a/weed/admin/plugin/plugin_monitor.go b/weed/admin/plugin/plugin_monitor.go index c66252518..332738a0d 100644 --- a/weed/admin/plugin/plugin_monitor.go +++ b/weed/admin/plugin/plugin_monitor.go @@ -861,6 +861,79 @@ func (r *Plugin) trackExecutionQueued(job *plugin_pb.JobSpec) { }) } +func (r *Plugin) cancelQueuedJob(job *plugin_pb.JobSpec, cause error) { + reason := "job canceled" + if cause != nil { + reason = cause.Error() + } + r.markJobCanceled(job, reason) +} + +func (r *Plugin) markJobCanceled(job *plugin_pb.JobSpec, reason string) { + if job == nil || strings.TrimSpace(job.JobId) == "" { + return + } + + now := time.Now().UTC() + if strings.TrimSpace(reason) == "" { + reason = "job canceled" + } + + r.jobsMu.Lock() + tracked := r.jobs[job.JobId] + if tracked == nil { + tracked = &TrackedJob{ + JobID: job.JobId, + CreatedAt: timeToPtr(now), + } + r.jobs[job.JobId] = tracked + } + + if job.JobType != "" { + tracked.JobType = job.JobType + } + tracked.State = StateCanceled + tracked.Stage = "canceled" + tracked.Message = reason + tracked.ErrorMessage = reason + tracked.Progress = 0 + if tracked.CreatedAt == nil || tracked.CreatedAt.IsZero() { + tracked.CreatedAt = timeToPtr(now) + } + tracked.UpdatedAt = timeToPtr(now) + tracked.CompletedAt = timeToPtr(now) + trackedSnapshot := cloneTrackedJob(*tracked) + r.pruneTrackedJobsLocked() + r.dirtyJobs = true + r.jobsMu.Unlock() + + r.persistJobDetailSnapshot(job.JobId, func(detail *TrackedJob) { + detail.JobID = job.JobId + if job.JobType != "" { + detail.JobType = job.JobType + } + detail.State = trackedSnapshot.State + detail.Stage = trackedSnapshot.Stage + detail.Message = trackedSnapshot.Message + detail.ErrorMessage = trackedSnapshot.ErrorMessage + detail.Progress = trackedSnapshot.Progress + if detail.CreatedAt == nil || detail.CreatedAt.IsZero() { + detail.CreatedAt = trackedSnapshot.CreatedAt + } + detail.UpdatedAt = trackedSnapshot.UpdatedAt + detail.CompletedAt = trackedSnapshot.CompletedAt + }) + + r.appendActivity(JobActivity{ + JobID: job.JobId, + JobType: job.JobType, + Source: "admin_scheduler", + Message: reason, + Stage: "canceled", + OccurredAt: timeToPtr(now), + }) +} + func (r *Plugin) trackExecutionCompletion(completed *plugin_pb.JobCompleted) *TrackedJob { if completed == nil || strings.TrimSpace(completed.JobId) == "" { return nil diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 67e463c56..2a91f4e07 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -13,13 +13,17 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -var errExecutorAtCapacity = errors.New("executor is at capacity") +var ( + errExecutorAtCapacity = errors.New("executor is at capacity") + errSchedulerShutdown = errors.New("scheduler shutdown") +) const ( defaultSchedulerTick = 5 * time.Second defaultScheduledDetectionInterval = 300 * time.Second defaultScheduledDetectionTimeout = 45 * time.Second defaultScheduledExecutionTimeout = 90 * time.Second + defaultScheduledJobTypeMaxRuntime = 30 * time.Minute defaultScheduledMaxResults int32 = 1000 defaultScheduledExecutionConcurrency = 1 defaultScheduledPerWorkerConcurrency = 1 @@ -34,6 +38,7 @@ type schedulerPolicy struct { DetectionInterval time.Duration DetectionTimeout time.Duration ExecutionTimeout time.Duration + JobTypeMaxRuntime time.Duration RetryBackoff time.Duration MaxResults int32 ExecutionConcurrency int @@ -44,31 +49,72 @@ type schedulerPolicy struct { func (r *Plugin) schedulerLoop() { defer r.wg.Done() - ticker := time.NewTicker(r.schedulerTick) - defer ticker.Stop() + for { + select { + case <-r.shutdownCh: + return + default: + } - // Try once immediately on startup. - r.runSchedulerTick() + hadJobs := r.runSchedulerIteration() + r.recordSchedulerIterationComplete(hadJobs) - for { + if hadJobs { + continue + } + + r.setSchedulerLoopState("", "sleeping") + idleSleep := r.GetSchedulerConfig().IdleSleepDuration() + if nextRun := r.earliestNextDetectionAt(); !nextRun.IsZero() { + if until := time.Until(nextRun); until <= 0 { + idleSleep = 0 + } else if until < idleSleep { + idleSleep = until + } + } + if idleSleep <= 0 { + continue + } + + timer := time.NewTimer(idleSleep) select { case <-r.shutdownCh: + timer.Stop() return - case <-ticker.C: - r.runSchedulerTick() + case <-r.schedulerWakeCh: + if !timer.Stop() { + <-timer.C + } + continue + case <-timer.C: } } } -func (r *Plugin) runSchedulerTick() { +func (r *Plugin) runSchedulerIteration() bool { r.expireStaleJobs(time.Now().UTC()) jobTypes := r.registry.DetectableJobTypes() if len(jobTypes) == 0 { - return + r.setSchedulerLoopState("", "idle") + return false + } + + r.setSchedulerLoopState("", "waiting_for_lock") + releaseLock, err := r.acquireAdminLock("plugin scheduler iteration") + if err != nil { + glog.Warningf("Plugin scheduler failed to acquire lock: %v", err) + r.setSchedulerLoopState("", "idle") + return false + } + if releaseLock != nil { + defer releaseLock() } active := make(map[string]struct{}, len(jobTypes)) + schedulerIdleSleep := r.GetSchedulerConfig().IdleSleepDuration() + hadJobs := false + for _, jobType := range jobTypes { active[jobType] = struct{}{} @@ -81,20 +127,212 @@ func (r *Plugin) runSchedulerTick() { r.clearSchedulerJobType(jobType) continue } - - if !r.markDetectionDue(jobType, policy.DetectionInterval) { + initialDelay := time.Duration(0) + if runInfo := r.snapshotSchedulerRun(jobType); runInfo.lastRunStartedAt.IsZero() { + initialDelay = schedulerIdleSleep / 2 + } + if !r.markDetectionDue(jobType, policy.DetectionInterval, initialDelay) { continue } - r.wg.Add(1) - go func(jt string, p schedulerPolicy) { - defer r.wg.Done() - r.runScheduledDetection(jt, p) - }(jobType, policy) + detected := r.runJobTypeIteration(jobType, policy) + if detected { + hadJobs = true + } } r.pruneSchedulerState(active) r.pruneDetectorLeases(active) + r.setSchedulerLoopState("", "idle") + return hadJobs +} + +func (r *Plugin) wakeScheduler() { + if r == nil { + return + } + select { + case r.schedulerWakeCh <- struct{}{}: + default: + } +} + +func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) bool { + r.recordSchedulerRunStart(jobType) + r.clearWaitingJobQueue(jobType) + r.setSchedulerLoopState(jobType, "detecting") + r.markJobTypeInFlight(jobType) + defer r.finishDetection(jobType) + + start := time.Now().UTC() + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: "scheduled detection started", + Stage: "detecting", + OccurredAt: timeToPtr(start), + }) + + if skip, waitingCount, waitingThreshold := r.shouldSkipDetectionForWaitingJobs(jobType, policy); skip { + r.recordSchedulerDetectionSkip(jobType, fmt.Sprintf("waiting backlog %d reached threshold %d", waitingCount, waitingThreshold)) + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection skipped: waiting backlog %d reached threshold %d", waitingCount, waitingThreshold), + Stage: "skipped_waiting_backlog", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + r.recordSchedulerRunComplete(jobType, "skipped") + return false + } + + maxRuntime := policy.JobTypeMaxRuntime + if maxRuntime <= 0 { + maxRuntime = defaultScheduledJobTypeMaxRuntime + } + jobCtx, cancel := context.WithTimeout(context.Background(), maxRuntime) + defer cancel() + + clusterContext, err := r.loadSchedulerClusterContext(jobCtx) + if err != nil { + r.recordSchedulerDetectionError(jobType, err) + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection aborted: %v", err), + Stage: "failed", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + r.recordSchedulerRunComplete(jobType, "error") + return false + } + + detectionTimeout := policy.DetectionTimeout + remaining := time.Until(start.Add(maxRuntime)) + if remaining <= 0 { + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: "scheduled run timed out before detection", + Stage: "timeout", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + r.recordSchedulerRunComplete(jobType, "timeout") + return false + } + if detectionTimeout <= 0 { + detectionTimeout = defaultScheduledDetectionTimeout + } + if detectionTimeout > remaining { + detectionTimeout = remaining + } + + detectCtx, cancelDetect := context.WithTimeout(jobCtx, detectionTimeout) + proposals, err := r.RunDetection(detectCtx, jobType, clusterContext, policy.MaxResults) + cancelDetect() + if err != nil { + r.recordSchedulerDetectionError(jobType, err) + stage := "failed" + status := "error" + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + stage = "timeout" + status = "timeout" + } + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection failed: %v", err), + Stage: stage, + OccurredAt: timeToPtr(time.Now().UTC()), + }) + r.recordSchedulerRunComplete(jobType, status) + return false + } + + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection completed: %d proposal(s)", len(proposals)), + Stage: "detected", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + r.recordSchedulerDetectionSuccess(jobType, len(proposals)) + + detected := len(proposals) > 0 + + filteredByActive, skippedActive := r.filterProposalsWithActiveJobs(jobType, proposals) + if skippedActive > 0 { + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection skipped %d proposal(s) due to active assigned/running jobs", skippedActive), + Stage: "deduped_active_jobs", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + } + + if len(filteredByActive) == 0 { + r.recordSchedulerRunComplete(jobType, "success") + return detected + } + + filtered := r.filterScheduledProposals(filteredByActive) + if len(filtered) != len(filteredByActive) { + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection deduped %d proposal(s) within this run", len(filteredByActive)-len(filtered)), + Stage: "deduped", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + } + + if len(filtered) == 0 { + r.recordSchedulerRunComplete(jobType, "success") + return detected + } + + r.setSchedulerLoopState(jobType, "executing") + + remaining = time.Until(start.Add(maxRuntime)) + if remaining <= 0 { + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: "scheduled execution skipped: job type max runtime reached", + Stage: "timeout", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + r.recordSchedulerRunComplete(jobType, "timeout") + return detected + } + + execPolicy := policy + if execPolicy.ExecutionTimeout <= 0 { + execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout + } + if execPolicy.ExecutionTimeout > remaining { + execPolicy.ExecutionTimeout = remaining + } + + successCount, errorCount, canceledCount := r.dispatchScheduledProposals(jobCtx, jobType, filtered, clusterContext, execPolicy) + + status := "success" + if jobCtx.Err() != nil { + status = "timeout" + } else if errorCount > 0 || canceledCount > 0 { + status = "error" + } + + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled execution finished: success=%d error=%d canceled=%d", successCount, errorCount, canceledCount), + Stage: "executed", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + r.recordSchedulerRunComplete(jobType, status) + return detected } func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, error) { @@ -119,6 +357,7 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err DetectionInterval: durationFromSeconds(adminRuntime.DetectionIntervalSeconds, defaultScheduledDetectionInterval), DetectionTimeout: durationFromSeconds(adminRuntime.DetectionTimeoutSeconds, defaultScheduledDetectionTimeout), ExecutionTimeout: defaultScheduledExecutionTimeout, + JobTypeMaxRuntime: durationFromSeconds(adminRuntime.JobTypeMaxRuntimeSeconds, defaultScheduledJobTypeMaxRuntime), RetryBackoff: durationFromSeconds(adminRuntime.RetryBackoffSeconds, defaultScheduledRetryBackoff), MaxResults: adminRuntime.MaxJobsPerDetection, ExecutionConcurrency: int(adminRuntime.GlobalExecutionConcurrency), @@ -148,6 +387,9 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err if policy.RetryLimit < 0 { policy.RetryLimit = 0 } + if policy.JobTypeMaxRuntime <= 0 { + policy.JobTypeMaxRuntime = defaultScheduledJobTypeMaxRuntime + } // Plugin protocol currently has only detection timeout in admin settings. execTimeout := time.Duration(adminRuntime.DetectionTimeoutSeconds*2) * time.Second @@ -199,6 +441,7 @@ func (r *Plugin) ListSchedulerStates() ([]SchedulerJobTypeState, error) { state.DetectionIntervalSeconds = secondsFromDuration(policy.DetectionInterval) state.DetectionTimeoutSeconds = secondsFromDuration(policy.DetectionTimeout) state.ExecutionTimeoutSeconds = secondsFromDuration(policy.ExecutionTimeout) + state.JobTypeMaxRuntimeSeconds = secondsFromDuration(policy.JobTypeMaxRuntime) state.MaxJobsPerDetection = policy.MaxResults state.GlobalExecutionConcurrency = policy.ExecutionConcurrency state.PerWorkerExecutionConcurrency = policy.PerWorkerConcurrency @@ -207,6 +450,19 @@ func (r *Plugin) ListSchedulerStates() ([]SchedulerJobTypeState, error) { } } + runInfo := r.snapshotSchedulerRun(jobType) + if !runInfo.lastRunStartedAt.IsZero() { + at := runInfo.lastRunStartedAt + state.LastRunStartedAt = &at + } + if !runInfo.lastRunCompletedAt.IsZero() { + at := runInfo.lastRunCompletedAt + state.LastRunCompletedAt = &at + } + if runInfo.lastRunStatus != "" { + state.LastRunStatus = runInfo.lastRunStatus + } + leasedWorkerID := r.getDetectorLease(jobType) if leasedWorkerID != "" { state.DetectorWorkerID = leasedWorkerID @@ -258,10 +514,11 @@ func deriveSchedulerAdminRuntime( PerWorkerExecutionConcurrency: defaults.PerWorkerExecutionConcurrency, RetryLimit: defaults.RetryLimit, RetryBackoffSeconds: defaults.RetryBackoffSeconds, + JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds, } } -func (r *Plugin) markDetectionDue(jobType string, interval time.Duration) bool { +func (r *Plugin) markDetectionDue(jobType string, interval, initialDelay time.Duration) bool { now := time.Now().UTC() r.schedulerMu.Lock() @@ -275,12 +532,43 @@ func (r *Plugin) markDetectionDue(jobType string, interval time.Duration) bool { if exists && now.Before(nextRun) { return false } + if !exists && initialDelay > 0 { + r.nextDetectionAt[jobType] = now.Add(initialDelay) + return false + } r.nextDetectionAt[jobType] = now.Add(interval) r.detectionInFlight[jobType] = true return true } +func (r *Plugin) earliestNextDetectionAt() time.Time { + if r == nil { + return time.Time{} + } + + r.schedulerMu.Lock() + defer r.schedulerMu.Unlock() + + var earliest time.Time + for _, nextRun := range r.nextDetectionAt { + if nextRun.IsZero() { + continue + } + if earliest.IsZero() || nextRun.Before(earliest) { + earliest = nextRun + } + } + + return earliest +} + +func (r *Plugin) markJobTypeInFlight(jobType string) { + r.schedulerMu.Lock() + r.detectionInFlight[jobType] = true + r.schedulerMu.Unlock() +} + func (r *Plugin) finishDetection(jobType string) { r.schedulerMu.Lock() delete(r.detectionInFlight, jobType) @@ -318,125 +606,18 @@ func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) { } } -func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) { - defer r.finishDetection(jobType) - - releaseLock, lockErr := r.acquireAdminLock(fmt.Sprintf("plugin scheduled detection %s", jobType)) - if lockErr != nil { - r.recordSchedulerDetectionError(jobType, lockErr) - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled detection aborted: failed to acquire lock: %v", lockErr), - Stage: "failed", - OccurredAt: timeToPtr(time.Now().UTC()), - }) - return - } - if releaseLock != nil { - defer releaseLock() - } - - start := time.Now().UTC() - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: "scheduled detection started", - Stage: "detecting", - OccurredAt: timeToPtr(start), - }) - - if skip, waitingCount, waitingThreshold := r.shouldSkipDetectionForWaitingJobs(jobType, policy); skip { - r.recordSchedulerDetectionSkip(jobType, fmt.Sprintf("waiting backlog %d reached threshold %d", waitingCount, waitingThreshold)) - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled detection skipped: waiting backlog %d reached threshold %d", waitingCount, waitingThreshold), - Stage: "skipped_waiting_backlog", - OccurredAt: timeToPtr(time.Now().UTC()), - }) - return - } - - clusterContext, err := r.loadSchedulerClusterContext() - if err != nil { - r.recordSchedulerDetectionError(jobType, err) - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled detection aborted: %v", err), - Stage: "failed", - OccurredAt: timeToPtr(time.Now().UTC()), - }) - return - } - - ctx, cancel := context.WithTimeout(context.Background(), policy.DetectionTimeout) - proposals, err := r.RunDetection(ctx, jobType, clusterContext, policy.MaxResults) - cancel() - if err != nil { - r.recordSchedulerDetectionError(jobType, err) - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled detection failed: %v", err), - Stage: "failed", - OccurredAt: timeToPtr(time.Now().UTC()), - }) - return - } - - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled detection completed: %d proposal(s)", len(proposals)), - Stage: "detected", - OccurredAt: timeToPtr(time.Now().UTC()), - }) - r.recordSchedulerDetectionSuccess(jobType, len(proposals)) - - filteredByActive, skippedActive := r.filterProposalsWithActiveJobs(jobType, proposals) - if skippedActive > 0 { - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled detection skipped %d proposal(s) due to active assigned/running jobs", skippedActive), - Stage: "deduped_active_jobs", - OccurredAt: timeToPtr(time.Now().UTC()), - }) - } - - if len(filteredByActive) == 0 { - return - } - - filtered := r.filterScheduledProposals(filteredByActive) - if len(filtered) != len(filteredByActive) { - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled detection deduped %d proposal(s) within this run", len(filteredByActive)-len(filtered)), - Stage: "deduped", - OccurredAt: timeToPtr(time.Now().UTC()), - }) - } - - if len(filtered) == 0 { - return - } - - r.dispatchScheduledProposals(jobType, filtered, clusterContext, policy) -} - -func (r *Plugin) loadSchedulerClusterContext() (*plugin_pb.ClusterContext, error) { +func (r *Plugin) loadSchedulerClusterContext(ctx context.Context) (*plugin_pb.ClusterContext, error) { if r.clusterContextProvider == nil { return nil, fmt.Errorf("cluster context provider is not configured") } - ctx, cancel := context.WithTimeout(context.Background(), defaultClusterContextTimeout) + if ctx == nil { + ctx = context.Background() + } + clusterCtx, cancel := context.WithTimeout(ctx, defaultClusterContextTimeout) defer cancel() - clusterContext, err := r.clusterContextProvider(ctx) + clusterContext, err := r.clusterContextProvider(clusterCtx) if err != nil { return nil, err } @@ -447,11 +628,16 @@ func (r *Plugin) loadSchedulerClusterContext() (*plugin_pb.ClusterContext, error } func (r *Plugin) dispatchScheduledProposals( + ctx context.Context, jobType string, proposals []*plugin_pb.JobProposal, clusterContext *plugin_pb.ClusterContext, policy schedulerPolicy, -) { +) (int, int, int) { + if ctx == nil { + ctx = context.Background() + } + jobQueue := make(chan *plugin_pb.JobSpec, len(proposals)) for index, proposal := range proposals { job := buildScheduledJobSpec(jobType, proposal, index) @@ -459,7 +645,7 @@ func (r *Plugin) dispatchScheduledProposals( select { case <-r.shutdownCh: close(jobQueue) - return + return 0, 0, 0 default: jobQueue <- job } @@ -470,6 +656,7 @@ func (r *Plugin) dispatchScheduledProposals( var statsMu sync.Mutex successCount := 0 errorCount := 0 + canceledCount := 0 workerCount := policy.ExecutionConcurrency if workerCount < 1 { @@ -481,6 +668,7 @@ func (r *Plugin) dispatchScheduledProposals( go func() { defer wg.Done() + jobLoop: for job := range jobQueue { select { case <-r.shutdownCh: @@ -488,19 +676,36 @@ func (r *Plugin) dispatchScheduledProposals( default: } + if ctx.Err() != nil { + r.cancelQueuedJob(job, ctx.Err()) + statsMu.Lock() + canceledCount++ + statsMu.Unlock() + continue + } + for { select { case <-r.shutdownCh: return default: } + if ctx.Err() != nil { + r.cancelQueuedJob(job, ctx.Err()) + statsMu.Lock() + canceledCount++ + statsMu.Unlock() + continue jobLoop + } - executor, release, reserveErr := r.reserveScheduledExecutor(jobType, policy) + executor, release, reserveErr := r.reserveScheduledExecutor(ctx, jobType, policy) if reserveErr != nil { - select { - case <-r.shutdownCh: - return - default: + if ctx.Err() != nil { + r.cancelQueuedJob(job, ctx.Err()) + statsMu.Lock() + canceledCount++ + statsMu.Unlock() + continue jobLoop } statsMu.Lock() errorCount++ @@ -515,16 +720,23 @@ func (r *Plugin) dispatchScheduledProposals( break } - err := r.executeScheduledJobWithExecutor(executor, job, clusterContext, policy) + err := r.executeScheduledJobWithExecutor(ctx, executor, job, clusterContext, policy) release() if errors.Is(err, errExecutorAtCapacity) { r.trackExecutionQueued(job) - if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) { + if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) { return } continue } if err != nil { + if ctx.Err() != nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + r.cancelQueuedJob(job, err) + statsMu.Lock() + canceledCount++ + statsMu.Unlock() + continue jobLoop + } statsMu.Lock() errorCount++ statsMu.Unlock() @@ -550,23 +762,34 @@ func (r *Plugin) dispatchScheduledProposals( wg.Wait() - r.appendActivity(JobActivity{ - JobType: jobType, - Source: "admin_scheduler", - Message: fmt.Sprintf("scheduled execution finished: success=%d error=%d", successCount, errorCount), - Stage: "executed", - OccurredAt: timeToPtr(time.Now().UTC()), - }) + drainErr := ctx.Err() + if drainErr == nil { + drainErr = errSchedulerShutdown + } + for job := range jobQueue { + r.cancelQueuedJob(job, drainErr) + canceledCount++ + } + + return successCount, errorCount, canceledCount } func (r *Plugin) reserveScheduledExecutor( + ctx context.Context, jobType string, policy schedulerPolicy, ) (*WorkerSession, func(), error) { + if ctx == nil { + ctx = context.Background() + } + deadline := time.Now().Add(policy.ExecutionTimeout) if policy.ExecutionTimeout <= 0 { deadline = time.Now().Add(10 * time.Minute) // Default cap } + if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) { + deadline = ctxDeadline + } for { select { @@ -574,6 +797,9 @@ func (r *Plugin) reserveScheduledExecutor( return nil, nil, fmt.Errorf("plugin is shutting down") default: } + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } if time.Now().After(deadline) { return nil, nil, fmt.Errorf("timed out waiting for executor capacity for %s", jobType) @@ -581,7 +807,10 @@ func (r *Plugin) reserveScheduledExecutor( executors, err := r.registry.ListExecutors(jobType) if err != nil { - if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) { + if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) { + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } return nil, nil, fmt.Errorf("plugin is shutting down") } continue @@ -595,7 +824,10 @@ func (r *Plugin) reserveScheduledExecutor( return executor, release, nil } - if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) { + if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) { + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } return nil, nil, fmt.Errorf("plugin is shutting down") } } @@ -680,6 +912,7 @@ func schedulerWorkerExecutionLimit(executor *WorkerSession, jobType string, poli } func (r *Plugin) executeScheduledJobWithExecutor( + ctx context.Context, executor *WorkerSession, job *plugin_pb.JobSpec, clusterContext *plugin_pb.ClusterContext, @@ -697,8 +930,15 @@ func (r *Plugin) executeScheduledJobWithExecutor( return fmt.Errorf("plugin is shutting down") default: } + if ctx != nil && ctx.Err() != nil { + return ctx.Err() + } - execCtx, cancel := context.WithTimeout(context.Background(), policy.ExecutionTimeout) + parent := ctx + if parent == nil { + parent = context.Background() + } + execCtx, cancel := context.WithTimeout(parent, policy.ExecutionTimeout) _, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt)) cancel() if err == nil { @@ -718,7 +958,10 @@ func (r *Plugin) executeScheduledJobWithExecutor( Stage: "retry", OccurredAt: timeToPtr(time.Now().UTC()), }) - if !waitForShutdownOrTimer(r.shutdownCh, policy.RetryBackoff) { + if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.RetryBackoff) { + if ctx != nil && ctx.Err() != nil { + return ctx.Err() + } return fmt.Errorf("plugin is shutting down") } } @@ -764,6 +1007,53 @@ func (r *Plugin) countWaitingTrackedJobs(jobType string) int { return waiting } +func (r *Plugin) clearWaitingJobQueue(jobType string) int { + normalizedJobType := strings.TrimSpace(jobType) + if normalizedJobType == "" { + return 0 + } + + jobIDs := make([]string, 0) + seen := make(map[string]struct{}) + + r.jobsMu.RLock() + for _, job := range r.jobs { + if job == nil { + continue + } + if strings.TrimSpace(job.JobType) != normalizedJobType { + continue + } + if !isWaitingTrackedJobState(job.State) { + continue + } + jobID := strings.TrimSpace(job.JobID) + if jobID == "" { + continue + } + if _, ok := seen[jobID]; ok { + continue + } + seen[jobID] = struct{}{} + jobIDs = append(jobIDs, jobID) + } + r.jobsMu.RUnlock() + + if len(jobIDs) == 0 { + return 0 + } + + reason := fmt.Sprintf("cleared queued job before %s run", normalizedJobType) + for _, jobID := range jobIDs { + r.markJobCanceled(&plugin_pb.JobSpec{ + JobId: jobID, + JobType: normalizedJobType, + }, reason) + } + + return len(jobIDs) +} + func waitingBacklogThreshold(policy schedulerPolicy) int { concurrency := policy.ExecutionConcurrency if concurrency <= 0 { @@ -861,6 +1151,27 @@ func waitForShutdownOrTimer(shutdown <-chan struct{}, duration time.Duration) bo } } +func waitForShutdownOrTimerWithContext(shutdown <-chan struct{}, ctx context.Context, duration time.Duration) bool { + if duration <= 0 { + return true + } + if ctx == nil { + ctx = context.Background() + } + + timer := time.NewTimer(duration) + defer timer.Stop() + + select { + case <-shutdown: + return false + case <-ctx.Done(): + return false + case <-timer.C: + return true + } +} + // filterProposalsWithActiveJobs removes proposals whose dedupe keys already have active jobs. // It first expires stale tracked jobs via expireStaleJobs, which can mutate scheduler state, // so callers should treat this method as a stateful operation. diff --git a/weed/admin/plugin/plugin_scheduler_test.go b/weed/admin/plugin/plugin_scheduler_test.go index 3fa9067c2..171c5a33f 100644 --- a/weed/admin/plugin/plugin_scheduler_test.go +++ b/weed/admin/plugin/plugin_scheduler_test.go @@ -1,6 +1,7 @@ package plugin import ( + "context" "fmt" "testing" "time" @@ -28,6 +29,7 @@ func TestLoadSchedulerPolicyUsesAdminConfig(t *testing.T) { PerWorkerExecutionConcurrency: 2, RetryLimit: 4, RetryBackoffSeconds: 7, + JobTypeMaxRuntimeSeconds: 1800, }, }) if err != nil { @@ -53,6 +55,9 @@ func TestLoadSchedulerPolicyUsesAdminConfig(t *testing.T) { if policy.RetryLimit != 4 { t.Fatalf("unexpected retry limit: got=%d", policy.RetryLimit) } + if policy.JobTypeMaxRuntime != 30*time.Minute { + t.Fatalf("unexpected max runtime: got=%v", policy.JobTypeMaxRuntime) + } } func TestLoadSchedulerPolicyUsesDescriptorDefaultsWhenConfigMissing(t *testing.T) { @@ -75,6 +80,7 @@ func TestLoadSchedulerPolicyUsesDescriptorDefaultsWhenConfigMissing(t *testing.T PerWorkerExecutionConcurrency: 2, RetryLimit: 3, RetryBackoffSeconds: 6, + JobTypeMaxRuntimeSeconds: 1200, }, }) if err != nil { @@ -97,6 +103,9 @@ func TestLoadSchedulerPolicyUsesDescriptorDefaultsWhenConfigMissing(t *testing.T if policy.PerWorkerConcurrency != 2 { t.Fatalf("unexpected per-worker concurrency: got=%d", policy.PerWorkerConcurrency) } + if policy.JobTypeMaxRuntime != 20*time.Minute { + t.Fatalf("unexpected max runtime: got=%v", policy.JobTypeMaxRuntime) + } } func TestReserveScheduledExecutorRespectsPerWorkerLimit(t *testing.T) { @@ -126,13 +135,13 @@ func TestReserveScheduledExecutorRespectsPerWorkerLimit(t *testing.T) { ExecutorReserveBackoff: time.Millisecond, } - executor1, release1, err := pluginSvc.reserveScheduledExecutor("balance", policy) + executor1, release1, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if err != nil { t.Fatalf("reserve executor 1: %v", err) } defer release1() - executor2, release2, err := pluginSvc.reserveScheduledExecutor("balance", policy) + executor2, release2, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if err != nil { t.Fatalf("reserve executor 2: %v", err) } @@ -254,7 +263,7 @@ func TestReserveScheduledExecutorTimesOutWhenNoExecutor(t *testing.T) { start := time.Now() pluginSvc.Shutdown() - _, _, err = pluginSvc.reserveScheduledExecutor("missing-job-type", policy) + _, _, err = pluginSvc.reserveScheduledExecutor(context.Background(), "missing-job-type", policy) if err == nil { t.Fatalf("expected reservation shutdown error") } @@ -285,7 +294,7 @@ func TestReserveScheduledExecutorWaitsForWorkerCapacity(t *testing.T) { ExecutorReserveBackoff: 5 * time.Millisecond, } - _, release1, err := pluginSvc.reserveScheduledExecutor("balance", policy) + _, release1, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if err != nil { t.Fatalf("reserve executor 1: %v", err) } @@ -296,7 +305,7 @@ func TestReserveScheduledExecutorWaitsForWorkerCapacity(t *testing.T) { } secondReserveCh := make(chan reserveResult, 1) go func() { - _, release2, reserveErr := pluginSvc.reserveScheduledExecutor("balance", policy) + _, release2, reserveErr := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if release2 != nil { release2() } @@ -394,6 +403,7 @@ func TestListSchedulerStatesIncludesPolicyAndState(t *testing.T) { PerWorkerExecutionConcurrency: 2, RetryLimit: 1, RetryBackoffSeconds: 9, + JobTypeMaxRuntimeSeconds: 900, }, }) if err != nil { @@ -446,6 +456,9 @@ func TestListSchedulerStatesIncludesPolicyAndState(t *testing.T) { if state.ExecutionTimeoutSeconds != 90 { t.Fatalf("unexpected execution timeout: got=%d", state.ExecutionTimeoutSeconds) } + if state.JobTypeMaxRuntimeSeconds != 900 { + t.Fatalf("unexpected job type max runtime: got=%d", state.JobTypeMaxRuntimeSeconds) + } if state.MaxJobsPerDetection != 80 { t.Fatalf("unexpected max jobs per detection: got=%d", state.MaxJobsPerDetection) } diff --git a/weed/admin/plugin/scheduler_config.go b/weed/admin/plugin/scheduler_config.go new file mode 100644 index 000000000..1feb1a736 --- /dev/null +++ b/weed/admin/plugin/scheduler_config.go @@ -0,0 +1,31 @@ +package plugin + +import "time" + +const ( + defaultSchedulerIdleSleep = 613 * time.Second +) + +type SchedulerConfig struct { + IdleSleepSeconds int32 `json:"idle_sleep_seconds"` +} + +func DefaultSchedulerConfig() SchedulerConfig { + return SchedulerConfig{ + IdleSleepSeconds: int32(defaultSchedulerIdleSleep / time.Second), + } +} + +func normalizeSchedulerConfig(cfg SchedulerConfig) SchedulerConfig { + if cfg.IdleSleepSeconds <= 0 { + return DefaultSchedulerConfig() + } + return cfg +} + +func (c SchedulerConfig) IdleSleepDuration() time.Duration { + if c.IdleSleepSeconds <= 0 { + return defaultSchedulerIdleSleep + } + return time.Duration(c.IdleSleepSeconds) * time.Second +} diff --git a/weed/admin/plugin/scheduler_status.go b/weed/admin/plugin/scheduler_status.go index 673afa232..a2d9e621b 100644 --- a/weed/admin/plugin/scheduler_status.go +++ b/weed/admin/plugin/scheduler_status.go @@ -9,6 +9,12 @@ import ( type SchedulerStatus struct { Now time.Time `json:"now"` SchedulerTickSeconds int `json:"scheduler_tick_seconds"` + IdleSleepSeconds int `json:"idle_sleep_seconds,omitempty"` + NextDetectionAt *time.Time `json:"next_detection_at,omitempty"` + CurrentJobType string `json:"current_job_type,omitempty"` + CurrentPhase string `json:"current_phase,omitempty"` + LastIterationHadJobs bool `json:"last_iteration_had_jobs,omitempty"` + LastIterationDoneAt *time.Time `json:"last_iteration_done_at,omitempty"` Waiting []SchedulerWaitingStatus `json:"waiting,omitempty"` InProcessJobs []SchedulerJobStatus `json:"in_process_jobs,omitempty"` JobTypes []SchedulerJobTypeStatus `json:"job_types,omitempty"` @@ -56,6 +62,19 @@ type schedulerDetectionInfo struct { lastSkippedReason string } +type schedulerRunInfo struct { + lastRunStartedAt time.Time + lastRunCompletedAt time.Time + lastRunStatus string +} + +type schedulerLoopState struct { + currentJobType string + currentPhase string + lastIterationHadJobs bool + lastIterationCompleted time.Time +} + func (r *Plugin) recordSchedulerDetectionSuccess(jobType string, count int) { if r == nil { return @@ -122,12 +141,105 @@ func (r *Plugin) snapshotSchedulerDetection(jobType string) schedulerDetectionIn return *info } +func (r *Plugin) recordSchedulerRunStart(jobType string) { + if r == nil { + return + } + r.schedulerRunMu.Lock() + defer r.schedulerRunMu.Unlock() + info := r.schedulerRun[jobType] + if info == nil { + info = &schedulerRunInfo{} + r.schedulerRun[jobType] = info + } + info.lastRunStartedAt = time.Now().UTC() + info.lastRunStatus = "" +} + +func (r *Plugin) recordSchedulerRunComplete(jobType, status string) { + if r == nil { + return + } + r.schedulerRunMu.Lock() + defer r.schedulerRunMu.Unlock() + info := r.schedulerRun[jobType] + if info == nil { + info = &schedulerRunInfo{} + r.schedulerRun[jobType] = info + } + info.lastRunCompletedAt = time.Now().UTC() + info.lastRunStatus = status +} + +func (r *Plugin) snapshotSchedulerRun(jobType string) schedulerRunInfo { + if r == nil { + return schedulerRunInfo{} + } + r.schedulerRunMu.Lock() + defer r.schedulerRunMu.Unlock() + info := r.schedulerRun[jobType] + if info == nil { + return schedulerRunInfo{} + } + return *info +} + +func (r *Plugin) setSchedulerLoopState(jobType, phase string) { + if r == nil { + return + } + r.schedulerLoopMu.Lock() + r.schedulerLoopState.currentJobType = jobType + r.schedulerLoopState.currentPhase = phase + r.schedulerLoopMu.Unlock() +} + +func (r *Plugin) recordSchedulerIterationComplete(hadJobs bool) { + if r == nil { + return + } + r.schedulerLoopMu.Lock() + r.schedulerLoopState.lastIterationHadJobs = hadJobs + r.schedulerLoopState.lastIterationCompleted = time.Now().UTC() + r.schedulerLoopMu.Unlock() +} + +func (r *Plugin) snapshotSchedulerLoopState() schedulerLoopState { + if r == nil { + return schedulerLoopState{} + } + r.schedulerLoopMu.Lock() + defer r.schedulerLoopMu.Unlock() + return r.schedulerLoopState +} + func (r *Plugin) GetSchedulerStatus() SchedulerStatus { now := time.Now().UTC() + loopState := r.snapshotSchedulerLoopState() + schedulerConfig := r.GetSchedulerConfig() status := SchedulerStatus{ Now: now, SchedulerTickSeconds: int(secondsFromDuration(r.schedulerTick)), InProcessJobs: r.listInProcessJobs(now), + IdleSleepSeconds: int(schedulerConfig.IdleSleepSeconds), + CurrentJobType: loopState.currentJobType, + CurrentPhase: loopState.currentPhase, + LastIterationHadJobs: loopState.lastIterationHadJobs, + } + nextDetectionAt := r.earliestNextDetectionAt() + if nextDetectionAt.IsZero() && loopState.currentPhase == "sleeping" && !loopState.lastIterationCompleted.IsZero() { + idleSleep := schedulerConfig.IdleSleepDuration() + if idleSleep > 0 { + nextDetectionAt = loopState.lastIterationCompleted.Add(idleSleep) + } + } + if !nextDetectionAt.IsZero() { + at := nextDetectionAt + status.NextDetectionAt = &at + } + if !loopState.lastIterationCompleted.IsZero() { + at := loopState.lastIterationCompleted + status.LastIterationDoneAt = &at } states, err := r.ListSchedulerStates() diff --git a/weed/admin/plugin/types.go b/weed/admin/plugin/types.go index cc23e17fe..fcac0b2c6 100644 --- a/weed/admin/plugin/types.go +++ b/weed/admin/plugin/types.go @@ -90,6 +90,7 @@ type SchedulerJobTypeState struct { DetectionIntervalSeconds int32 `json:"detection_interval_seconds,omitempty"` DetectionTimeoutSeconds int32 `json:"detection_timeout_seconds,omitempty"` ExecutionTimeoutSeconds int32 `json:"execution_timeout_seconds,omitempty"` + JobTypeMaxRuntimeSeconds int32 `json:"job_type_max_runtime_seconds,omitempty"` MaxJobsPerDetection int32 `json:"max_jobs_per_detection,omitempty"` GlobalExecutionConcurrency int `json:"global_execution_concurrency,omitempty"` PerWorkerExecutionConcurrency int `json:"per_worker_execution_concurrency,omitempty"` @@ -98,6 +99,9 @@ type SchedulerJobTypeState struct { DetectorAvailable bool `json:"detector_available"` DetectorWorkerID string `json:"detector_worker_id,omitempty"` ExecutorWorkerCount int `json:"executor_worker_count"` + LastRunStartedAt *time.Time `json:"last_run_started_at,omitempty"` + LastRunCompletedAt *time.Time `json:"last_run_completed_at,omitempty"` + LastRunStatus string `json:"last_run_status,omitempty"` } func timeToPtr(t time.Time) *time.Time { diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index 63dc16f05..dbd8684b4 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -120,7 +120,7 @@ templ Plugin(page string) {
Scheduler State
- Per job type detection schedule and execution limits + Sequential scheduler with per-job runtime limits
@@ -131,12 +131,12 @@ templ Plugin(page string) { Enabled Detector In Flight - Next Detection - Interval + Max Runtime Exec Global Exec/Worker Executor Workers Effective Exec + Last Run @@ -148,6 +148,38 @@ templ Plugin(page string) {
+
+
+
+
+
Scheduler Settings
+ Global +
+
+
+ + +
Used when no jobs are detected.
+
+ +
+
+
+
+
+
+
Next Run
+ Scheduler +
+
+
-
+
Not scheduled
+
+
+
+
@@ -242,14 +274,14 @@ templ Plugin(page string) {
-
- - -
+
+ + +
@@ -273,6 +305,33 @@ templ Plugin(page string) {
+ +
+
+
Scheduler Settings
+
+
+
+ + +
Used when no jobs are detected.
+
+ +
+
+ +
+
+
Next Run
+ Scheduler +
+
+
-
+
Not scheduled
+
+
@@ -572,6 +631,9 @@ templ Plugin(page string) { jobs: [], activities: [], schedulerStates: [], + schedulerStatus: null, + schedulerConfig: null, + schedulerConfigLoaded: false, allJobs: [], allActivities: [], loadedJobType: '', @@ -1442,8 +1504,8 @@ templ Plugin(page string) { var enabled = !!item.enabled; var inFlight = !!item.detection_in_flight; var detector = item.detector_available ? textOrDash(item.detector_worker_id) : 'No detector'; - var intervalSeconds = Number(item.detection_interval_seconds || 0); - var intervalText = intervalSeconds > 0 ? (String(intervalSeconds) + 's') : '-'; + var maxRuntimeSeconds = Number(item.job_type_max_runtime_seconds || 0); + var maxRuntimeText = maxRuntimeSeconds > 0 ? (String(maxRuntimeSeconds) + 's') : '-'; var globalExec = Number(item.global_execution_concurrency || 0); var perWorkerExec = Number(item.per_worker_execution_concurrency || 0); var executorWorkers = Number(item.executor_worker_count || 0); @@ -1452,6 +1514,13 @@ templ Plugin(page string) { var perWorkerExecText = enabled ? String(perWorkerExec) : '-'; var executorWorkersText = enabled ? String(executorWorkers) : '-'; var effectiveExecText = enabled ? String(effectiveExec) : '-'; + var lastRunStatus = textOrDash(item.last_run_status); + var lastRunTime = parseTime(item.last_run_completed_at); + var lastRunText = '-'; + if (lastRunStatus !== '-' || lastRunTime) { + var statusLabel = lastRunStatus !== '-' ? lastRunStatus : 'run'; + lastRunText = lastRunTime ? (statusLabel + ' @ ' + lastRunTime) : statusLabel; + } var enabledBadge = enabled ? 'Enabled' : 'Disabled'; var inFlightBadge = inFlight ? 'Yes' : 'No'; @@ -1465,18 +1534,38 @@ templ Plugin(page string) { '' + enabledBadge + '' + '' + escapeHtml(detector) + '' + '' + inFlightBadge + '' + - '' + escapeHtml(parseTime(item.next_detection_at) || '-') + '' + - '' + escapeHtml(intervalText) + '' + + '' + escapeHtml(maxRuntimeText) + '' + '' + escapeHtml(globalExecText) + '' + '' + escapeHtml(perWorkerExecText) + '' + '' + escapeHtml(executorWorkersText) + '' + '' + escapeHtml(effectiveExecText) + '' + + '' + escapeHtml(lastRunText) + '' + ''; } tbody.innerHTML = rows; } + function renderSchedulerStatus() { + var valueNodes = document.querySelectorAll('.plugin-scheduler-next-run'); + if (!valueNodes.length) { + return; + } + var metaNodes = document.querySelectorAll('.plugin-scheduler-next-run-meta'); + var status = state.schedulerStatus || {}; + var nextRun = parseTime(status.next_detection_at); + var display = nextRun || '-'; + valueNodes.forEach(function(node) { + node.textContent = display; + }); + var metaText = nextRun ? 'Local time' : 'Not scheduled'; + if (metaNodes.length) { + metaNodes.forEach(function(node) { + node.textContent = metaText; + }); + } + } + function renderWorkers() { var tbody = document.getElementById('plugin-workers-table-body'); if (!state.workers.length) { @@ -2372,8 +2461,8 @@ templ Plugin(page string) { } document.getElementById('plugin-admin-enabled').checked = pickBool('enabled'); - document.getElementById('plugin-admin-detection-interval').value = String(pickNumber('detection_interval_seconds')); document.getElementById('plugin-admin-detection-timeout').value = String(pickNumber('detection_timeout_seconds')); + document.getElementById('plugin-admin-max-runtime').value = String(pickNumber('job_type_max_runtime_seconds')); document.getElementById('plugin-admin-max-results').value = String(pickNumber('max_jobs_per_detection')); document.getElementById('plugin-admin-global-exec').value = String(pickNumber('global_execution_concurrency')); document.getElementById('plugin-admin-per-worker-exec').value = String(pickNumber('per_worker_execution_concurrency')); @@ -2382,6 +2471,9 @@ templ Plugin(page string) { } function collectAdminSettings() { + var existingRuntime = (state.config && state.config.admin_runtime) ? state.config.admin_runtime : {}; + var existingDetectionInterval = Number(existingRuntime.detection_interval_seconds || 0); + function getInt(id) { var raw = String(document.getElementById(id).value || '').trim(); if (!raw) { @@ -2396,8 +2488,9 @@ templ Plugin(page string) { return { enabled: !!document.getElementById('plugin-admin-enabled').checked, - detection_interval_seconds: getInt('plugin-admin-detection-interval'), + detection_interval_seconds: existingDetectionInterval, detection_timeout_seconds: getInt('plugin-admin-detection-timeout'), + job_type_max_runtime_seconds: getInt('plugin-admin-max-runtime'), max_jobs_per_detection: getInt('plugin-admin-max-results'), global_execution_concurrency: getInt('plugin-admin-global-exec'), per_worker_execution_concurrency: getInt('plugin-admin-per-worker-exec'), @@ -2713,6 +2806,75 @@ templ Plugin(page string) { } } + async function loadSchedulerConfig(forceRefresh) { + if (state.schedulerConfigLoaded && !forceRefresh) { + return; + } + var idleInputs = [ + document.getElementById('plugin-scheduler-idle-sleep'), + document.getElementById('plugin-scheduler-idle-sleep-overview'), + ].filter(Boolean); + if (idleInputs.length === 0) { + return; + } + try { + var cfg = await pluginRequest('GET', '/api/plugin/scheduler-config'); + state.schedulerConfig = cfg || {}; + state.schedulerConfigLoaded = true; + var idleSeconds = Number((cfg && cfg.idle_sleep_seconds) || 0); + idleInputs.forEach(function(input) { + input.value = idleSeconds > 0 ? String(idleSeconds) : ''; + }); + } catch (e) { + notify('Failed to load scheduler config: ' + e.message, 'error'); + } + } + + async function saveSchedulerConfig(sourceInput) { + var idleInputs = [ + document.getElementById('plugin-scheduler-idle-sleep'), + document.getElementById('plugin-scheduler-idle-sleep-overview'), + ].filter(Boolean); + if (idleInputs.length === 0) { + return; + } + var raw = ''; + if (sourceInput) { + raw = String(sourceInput.value || '').trim(); + } + if (!raw) { + for (var i = 0; i < idleInputs.length; i++) { + if (idleInputs[i] === sourceInput) { + continue; + } + var candidate = String(idleInputs[i].value || '').trim(); + if (candidate) { + raw = candidate; + break; + } + } + } + var parsed = raw ? parseInt(raw, 10) : 0; + if (Number.isNaN(parsed) || parsed < 0) { + notify('Invalid idle sleep value', 'error'); + return; + } + try { + var updated = await pluginRequest('PUT', '/api/plugin/scheduler-config', { + idle_sleep_seconds: parsed, + }); + state.schedulerConfig = updated || {}; + state.schedulerConfigLoaded = true; + var idleSeconds = Number((updated && updated.idle_sleep_seconds) || 0); + idleInputs.forEach(function(input) { + input.value = idleSeconds > 0 ? String(idleSeconds) : ''; + }); + notify('Scheduler settings saved', 'success'); + } catch (e) { + notify('Failed to save scheduler config: ' + e.message, 'error'); + } + } + function getMaxResults() { var raw = String(document.getElementById('plugin-admin-max-results').value || '').trim(); if (!raw) { @@ -2788,21 +2950,30 @@ templ Plugin(page string) { var allJobsPromise = pluginRequest('GET', '/api/plugin/jobs?limit=500'); var allActivitiesPromise = pluginRequest('GET', '/api/plugin/activities?limit=500'); var schedulerPromise = pluginRequest('GET', '/api/plugin/scheduler-states'); + var schedulerStatusPromise = pluginRequest('GET', '/api/plugin/scheduler-status'); var allJobs = await allJobsPromise; var allActivities = await allActivitiesPromise; var schedulerStates = await schedulerPromise; + var schedulerStatus = null; + try { + schedulerStatus = await schedulerStatusPromise; + } catch (e) { + schedulerStatus = null; + } state.jobs = Array.isArray(allJobs) ? allJobs : []; state.activities = Array.isArray(allActivities) ? allActivities : []; state.allJobs = state.jobs; state.allActivities = state.activities; state.schedulerStates = Array.isArray(schedulerStates) ? schedulerStates : []; + state.schedulerStatus = schedulerStatus && schedulerStatus.scheduler ? schedulerStatus.scheduler : null; renderQueueJobs(); renderDetectionJobs(); renderExecutionJobs(); renderExecutionActivities(); renderSchedulerStates(); + renderSchedulerStatus(); renderStatus(); renderJobTypeSummary(); } @@ -2880,6 +3051,19 @@ templ Plugin(page string) { saveConfig(); }); + var saveSchedulerBtn = document.getElementById('plugin-save-scheduler-btn'); + if (saveSchedulerBtn) { + saveSchedulerBtn.addEventListener('click', function() { + saveSchedulerConfig(document.getElementById('plugin-scheduler-idle-sleep')); + }); + } + var saveSchedulerBtnOverview = document.getElementById('plugin-save-scheduler-btn-overview'); + if (saveSchedulerBtnOverview) { + saveSchedulerBtnOverview.addEventListener('click', function() { + saveSchedulerConfig(document.getElementById('plugin-scheduler-idle-sleep-overview')); + }); + } + document.getElementById('plugin-trigger-detection-btn').addEventListener('click', function() { runDetection(); }); @@ -2964,6 +3148,7 @@ templ Plugin(page string) { ensureActiveNavigation(); renderNavigationState(); await refreshAll(); + await loadSchedulerConfig(false); state.refreshTimer = setInterval(function() { refreshAll(); diff --git a/weed/admin/view/app/plugin_templ.go b/weed/admin/view/app/plugin_templ.go index 5f3c5c2b6..e71b5abd1 100644 --- a/weed/admin/view/app/plugin_templ.go +++ b/weed/admin/view/app/plugin_templ.go @@ -40,13 +40,13 @@ func Plugin(page string) templ.Component { var templ_7745c5c3_Var2 string templ_7745c5c3_Var2, templ_7745c5c3_Err = templ.JoinStringErrs(currentPage) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 10, Col: 80} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `plugin.templ`, Line: 10, Col: 80} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var2)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Per job type detection schedule and execution limits
Job TypeEnabledDetectorIn FlightNext DetectionIntervalExec GlobalExec/WorkerExecutor WorkersEffective Exec
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Scheduler Settings
Global
Used when no jobs are detected.
Next Run
Scheduler
-
Not scheduled
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
Scheduler Settings
Used when no jobs are detected.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/weed/pb/plugin.proto b/weed/pb/plugin.proto index c2296c453..6a4bc7a3c 100644 --- a/weed/pb/plugin.proto +++ b/weed/pb/plugin.proto @@ -232,6 +232,7 @@ message AdminRuntimeDefaults { int32 per_worker_execution_concurrency = 6; int32 retry_limit = 7; int32 retry_backoff_seconds = 8; + int32 job_type_max_runtime_seconds = 9; } message AdminRuntimeConfig { @@ -243,6 +244,7 @@ message AdminRuntimeConfig { int32 per_worker_execution_concurrency = 6; int32 retry_limit = 7; int32 retry_backoff_seconds = 8; + int32 job_type_max_runtime_seconds = 9; } message RunDetectionRequest { diff --git a/weed/pb/plugin_pb/plugin.pb.go b/weed/pb/plugin_pb/plugin.pb.go index 5dac6afa8..e89a01084 100644 --- a/weed/pb/plugin_pb/plugin.pb.go +++ b/weed/pb/plugin_pb/plugin.pb.go @@ -2492,6 +2492,7 @@ type AdminRuntimeDefaults struct { PerWorkerExecutionConcurrency int32 `protobuf:"varint,6,opt,name=per_worker_execution_concurrency,json=perWorkerExecutionConcurrency,proto3" json:"per_worker_execution_concurrency,omitempty"` RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"` + JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2582,6 +2583,13 @@ func (x *AdminRuntimeDefaults) GetRetryBackoffSeconds() int32 { return 0 } +func (x *AdminRuntimeDefaults) GetJobTypeMaxRuntimeSeconds() int32 { + if x != nil { + return x.JobTypeMaxRuntimeSeconds + } + return 0 +} + type AdminRuntimeConfig struct { state protoimpl.MessageState `protogen:"open.v1"` Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` @@ -2592,6 +2600,7 @@ type AdminRuntimeConfig struct { PerWorkerExecutionConcurrency int32 `protobuf:"varint,6,opt,name=per_worker_execution_concurrency,json=perWorkerExecutionConcurrency,proto3" json:"per_worker_execution_concurrency,omitempty"` RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"` + JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2682,6 +2691,13 @@ func (x *AdminRuntimeConfig) GetRetryBackoffSeconds() int32 { return 0 } +func (x *AdminRuntimeConfig) GetJobTypeMaxRuntimeSeconds() int32 { + if x != nil { + return x.JobTypeMaxRuntimeSeconds + } + return 0 +} + type RunDetectionRequest struct { state protoimpl.MessageState `protogen:"open.v1"` RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` @@ -4075,7 +4091,7 @@ const file_plugin_proto_rawDesc = "" + "\x06fields\x18\x01 \x03(\v2\x1c.plugin.ValueMap.FieldsEntryR\x06fields\x1aN\n" + "\vFieldsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + - "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xbf\x03\n" + + "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xff\x03\n" + "\x14AdminRuntimeDefaults\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" + "\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" + @@ -4085,7 +4101,8 @@ const file_plugin_proto_rawDesc = "" + " per_worker_execution_concurrency\x18\x06 \x01(\x05R\x1dperWorkerExecutionConcurrency\x12\x1f\n" + "\vretry_limit\x18\a \x01(\x05R\n" + "retryLimit\x122\n" + - "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\"\xbd\x03\n" + + "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" + + "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xfd\x03\n" + "\x12AdminRuntimeConfig\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" + "\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" + @@ -4095,7 +4112,8 @@ const file_plugin_proto_rawDesc = "" + " per_worker_execution_concurrency\x18\x06 \x01(\x05R\x1dperWorkerExecutionConcurrency\x12\x1f\n" + "\vretry_limit\x18\a \x01(\x05R\n" + "retryLimit\x122\n" + - "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\"\xef\x05\n" + + "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" + + "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xef\x05\n" + "\x13RunDetectionRequest\x12\x1d\n" + "\n" + "request_id\x18\x01 \x01(\tR\trequestId\x12\x19\n" + diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go index a653b7e6b..0a4f7bfc9 100644 --- a/weed/plugin/worker/admin_script_handler.go +++ b/weed/plugin/worker/admin_script_handler.go @@ -111,6 +111,7 @@ func (h *AdminScriptHandler) Descriptor() *plugin_pb.JobTypeDescriptor { PerWorkerExecutionConcurrency: 1, RetryLimit: 0, RetryBackoffSeconds: 30, + JobTypeMaxRuntimeSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{}, } diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 42cc5c427..15f9c9a83 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -167,6 +167,7 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor { PerWorkerExecutionConcurrency: 4, RetryLimit: 1, RetryBackoffSeconds: 30, + JobTypeMaxRuntimeSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "quiet_for_seconds": { diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index e3e9a7052..ad5b7bdf7 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -152,6 +152,7 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor { PerWorkerExecutionConcurrency: 4, RetryLimit: 1, RetryBackoffSeconds: 10, + JobTypeMaxRuntimeSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "garbage_threshold": { diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 1a31f8c24..d040bbef8 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -144,6 +144,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { PerWorkerExecutionConcurrency: 4, RetryLimit: 1, RetryBackoffSeconds: 15, + JobTypeMaxRuntimeSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": {