diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 897fb88a7..d8173e0f6 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -32,6 +32,42 @@ const ( maxPluginRunTimeout = 30 * time.Minute ) +func matchesPluginLane(jobType, laneFilter string) bool { + laneFilter = strings.TrimSpace(laneFilter) + if laneFilter == "" { + return true + } + return plugin.JobTypeLane(jobType) == plugin.SchedulerLane(laneFilter) +} + +func filterTrackedJobsByLane(jobs []plugin.TrackedJob, laneFilter string) []plugin.TrackedJob { + if strings.TrimSpace(laneFilter) == "" { + return jobs + } + + filtered := make([]plugin.TrackedJob, 0, len(jobs)) + for _, job := range jobs { + if matchesPluginLane(job.JobType, laneFilter) { + filtered = append(filtered, job) + } + } + return filtered +} + +func filterActivitiesByLane(activities []plugin.JobActivity, laneFilter string) []plugin.JobActivity { + if strings.TrimSpace(laneFilter) == "" { + return activities + } + + filtered := make([]plugin.JobActivity, 0, len(activities)) + for _, activity := range activities { + if matchesPluginLane(activity.JobType, laneFilter) { + filtered = append(filtered, activity) + } + } + return filtered +} + // GetPluginStatusAPI returns plugin status. func (s *AdminServer) GetPluginStatusAPI(w http.ResponseWriter, r *http.Request) { plugin := s.GetPlugin() @@ -53,16 +89,35 @@ func (s *AdminServer) GetPluginStatusAPI(w http.ResponseWriter, r *http.Request) } // GetPluginWorkersAPI returns currently connected plugin workers. +// Accepts an optional ?lane= query parameter to filter by scheduler lane. func (s *AdminServer) GetPluginWorkersAPI(w http.ResponseWriter, r *http.Request) { workers := s.GetPluginWorkers() if workers == nil { writeJSON(w, http.StatusOK, []interface{}{}) return } + + laneFilter := strings.TrimSpace(r.URL.Query().Get("lane")) + if laneFilter != "" { + lane := plugin.SchedulerLane(laneFilter) + filtered := make([]*plugin.WorkerSession, 0, len(workers)) + for _, worker := range workers { + for jobType := range worker.Capabilities { + if plugin.JobTypeLane(jobType) == lane { + filtered = append(filtered, worker) + break + } + } + } + writeJSON(w, http.StatusOK, filtered) + return + } + writeJSON(w, http.StatusOK, workers) } // GetPluginJobTypesAPI returns known plugin job types from workers and persisted data. +// Accepts an optional ?lane= query parameter to filter by scheduler lane. func (s *AdminServer) GetPluginJobTypesAPI(w http.ResponseWriter, r *http.Request) { jobTypes, err := s.ListPluginJobTypes() if err != nil { @@ -73,6 +128,20 @@ func (s *AdminServer) GetPluginJobTypesAPI(w http.ResponseWriter, r *http.Reques writeJSON(w, http.StatusOK, []interface{}{}) return } + + laneFilter := strings.TrimSpace(r.URL.Query().Get("lane")) + if laneFilter != "" { + lane := plugin.SchedulerLane(laneFilter) + filtered := make([]plugin.JobTypeInfo, 0, len(jobTypes)) + for _, jt := range jobTypes { + if plugin.JobTypeLane(jt.JobType) == lane { + filtered = append(filtered, jt) + } + } + writeJSON(w, http.StatusOK, filtered) + return + } + writeJSON(w, http.StatusOK, jobTypes) } @@ -81,13 +150,14 @@ func (s *AdminServer) GetPluginJobsAPI(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() jobType := strings.TrimSpace(query.Get("job_type")) state := strings.TrimSpace(query.Get("state")) + laneFilter := strings.TrimSpace(query.Get("lane")) limit := parsePositiveInt(query.Get("limit"), 200) jobs := s.ListPluginJobs(jobType, state, limit) if jobs == nil { writeJSON(w, http.StatusOK, []interface{}{}) return } - writeJSON(w, http.StatusOK, jobs) + writeJSON(w, http.StatusOK, filterTrackedJobsByLane(jobs, laneFilter)) } // GetPluginJobAPI returns one tracked job. @@ -176,18 +246,21 @@ func (s *AdminServer) ExpirePluginJobAPI(w http.ResponseWriter, r *http.Request) func (s *AdminServer) GetPluginActivitiesAPI(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() jobType := strings.TrimSpace(query.Get("job_type")) + laneFilter := strings.TrimSpace(query.Get("lane")) limit := parsePositiveInt(query.Get("limit"), 500) activities := s.ListPluginActivities(jobType, limit) if activities == nil { writeJSON(w, http.StatusOK, []interface{}{}) return } - writeJSON(w, http.StatusOK, activities) + writeJSON(w, http.StatusOK, filterActivitiesByLane(activities, laneFilter)) } // GetPluginSchedulerStatesAPI returns per-job-type scheduler status for monitoring. +// Accepts optional ?job_type= and ?lane= query parameters. func (s *AdminServer) GetPluginSchedulerStatesAPI(w http.ResponseWriter, r *http.Request) { jobTypeFilter := strings.TrimSpace(r.URL.Query().Get("job_type")) + laneFilter := strings.TrimSpace(r.URL.Query().Get("lane")) states, err := s.ListPluginSchedulerStates() if err != nil { @@ -195,12 +268,16 @@ func (s *AdminServer) GetPluginSchedulerStatesAPI(w http.ResponseWriter, r *http return } - if jobTypeFilter != "" { + if jobTypeFilter != "" || laneFilter != "" { filtered := make([]interface{}, 0, len(states)) for _, state := range states { - if state.JobType == jobTypeFilter { - filtered = append(filtered, state) + if jobTypeFilter != "" && state.JobType != jobTypeFilter { + continue } + if laneFilter != "" && state.Lane != laneFilter { + continue + } + filtered = append(filtered, state) } writeJSON(w, http.StatusOK, filtered) return @@ -215,6 +292,7 @@ func (s *AdminServer) GetPluginSchedulerStatesAPI(w http.ResponseWriter, r *http } // GetPluginSchedulerStatusAPI returns scheduler status including in-process jobs and lock state. +// Accepts optional ?lane= query parameter to scope to a specific lane. func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http.Request) { pluginSvc := s.GetPlugin() if pluginSvc == nil { @@ -224,6 +302,21 @@ func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http return } + laneFilter := strings.TrimSpace(r.URL.Query().Get("lane")) + + if laneFilter != "" { + lane := plugin.SchedulerLane(laneFilter) + response := map[string]interface{}{ + "enabled": true, + "scheduler": pluginSvc.GetLaneSchedulerStatus(lane), + } + if s.pluginLock != nil { + response["lock"] = s.pluginLock.Status() + } + writeJSON(w, http.StatusOK, response) + return + } + response := map[string]interface{}{ "enabled": true, "scheduler": pluginSvc.GetSchedulerStatus(), @@ -235,6 +328,28 @@ func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http writeJSON(w, http.StatusOK, response) } +// GetPluginLanesAPI returns all scheduler lanes and their current status. +func (s *AdminServer) GetPluginLanesAPI(w http.ResponseWriter, r *http.Request) { + pluginSvc := s.GetPlugin() + if pluginSvc == nil { + writeJSON(w, http.StatusOK, []interface{}{}) + return + } + + lanes := plugin.AllLanes() + result := make([]map[string]interface{}, 0, len(lanes)) + for _, lane := range lanes { + laneStatus := pluginSvc.GetLaneSchedulerStatus(lane) + result = append(result, map[string]interface{}{ + "lane": string(lane), + "idle_sleep_sec": int(plugin.LaneIdleSleep(lane) / time.Second), + "job_types": plugin.LaneJobTypes(lane), + "status": laneStatus, + }) + } + writeJSON(w, http.StatusOK, result) +} + // RequestPluginJobTypeSchemaAPI asks a worker for one job type schema. func (s *AdminServer) RequestPluginJobTypeSchemaAPI(w http.ResponseWriter, r *http.Request) { jobType := strings.TrimSpace(mux.Vars(r)["jobType"]) diff --git a/weed/admin/dash/plugin_api_test.go b/weed/admin/dash/plugin_api_test.go index 5e535382a..3637f3dab 100644 --- a/weed/admin/dash/plugin_api_test.go +++ b/weed/admin/dash/plugin_api_test.go @@ -220,3 +220,39 @@ func TestApplyDescriptorDefaultsToPersistedConfigReplacesBlankAdminScript(t *tes t.Fatalf("expected blank script to be replaced by default, got=%q", scriptKind.StringValue) } } + +func TestFilterTrackedJobsByLane(t *testing.T) { + t.Parallel() + + jobs := []plugin.TrackedJob{ + {JobID: "vacuum-1", JobType: "vacuum"}, + {JobID: "iceberg-1", JobType: "iceberg_maintenance"}, + {JobID: "lifecycle-1", JobType: "s3_lifecycle"}, + } + + filtered := filterTrackedJobsByLane(jobs, "iceberg") + if len(filtered) != 1 { + t.Fatalf("expected 1 iceberg job, got %d", len(filtered)) + } + if filtered[0].JobID != "iceberg-1" { + t.Fatalf("expected iceberg job to be retained, got %+v", filtered[0]) + } +} + +func TestFilterActivitiesByLane(t *testing.T) { + t.Parallel() + + activities := []plugin.JobActivity{ + {JobID: "vacuum-1", JobType: "vacuum"}, + {JobID: "iceberg-1", JobType: "iceberg_maintenance"}, + {JobID: "lifecycle-1", JobType: "s3_lifecycle"}, + } + + filtered := filterActivitiesByLane(activities, "lifecycle") + if len(filtered) != 1 { + t.Fatalf("expected 1 lifecycle activity, got %d", len(filtered)) + } + if filtered[0].JobID != "lifecycle-1" { + t.Fatalf("expected lifecycle activity to be retained, got %+v", filtered[0]) + } +} diff --git a/weed/admin/handlers/admin_handlers.go b/weed/admin/handlers/admin_handlers.go index 6594a0e0f..650df45cc 100644 --- a/weed/admin/handlers/admin_handlers.go +++ b/weed/admin/handlers/admin_handlers.go @@ -147,6 +147,13 @@ func (h *AdminHandlers) registerUIRoutes(r *mux.Router) { r.HandleFunc("/plugin/detection", h.pluginHandlers.ShowPluginDetection).Methods(http.MethodGet) r.HandleFunc("/plugin/execution", h.pluginHandlers.ShowPluginExecution).Methods(http.MethodGet) r.HandleFunc("/plugin/monitoring", h.pluginHandlers.ShowPluginMonitoring).Methods(http.MethodGet) + r.HandleFunc("/plugin/lanes/{lane}", h.pluginHandlers.ShowPluginLane).Methods(http.MethodGet) + r.HandleFunc("/plugin/lanes/{lane}/configuration", h.pluginHandlers.ShowPluginLaneConfiguration).Methods(http.MethodGet) + r.HandleFunc("/plugin/lanes/{lane}/queue", h.pluginHandlers.ShowPluginLaneQueue).Methods(http.MethodGet) + r.HandleFunc("/plugin/lanes/{lane}/detection", h.pluginHandlers.ShowPluginLaneDetection).Methods(http.MethodGet) + r.HandleFunc("/plugin/lanes/{lane}/execution", h.pluginHandlers.ShowPluginLaneExecution).Methods(http.MethodGet) + r.HandleFunc("/plugin/lanes/{lane}/monitoring", h.pluginHandlers.ShowPluginLaneMonitoring).Methods(http.MethodGet) + r.HandleFunc("/plugin/lanes/{lane}/workers", h.pluginHandlers.ShowPluginLaneWorkers).Methods(http.MethodGet) } func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) { @@ -245,6 +252,7 @@ func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) { pluginApi := api.PathPrefix("/plugin").Subrouter() pluginApi.HandleFunc("/status", h.adminServer.GetPluginStatusAPI).Methods(http.MethodGet) + pluginApi.HandleFunc("/lanes", h.adminServer.GetPluginLanesAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/workers", h.adminServer.GetPluginWorkersAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/job-types", h.adminServer.GetPluginJobTypesAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/jobs", h.adminServer.GetPluginJobsAPI).Methods(http.MethodGet) diff --git a/weed/admin/handlers/admin_handlers_routes_test.go b/weed/admin/handlers/admin_handlers_routes_test.go index ff4632bf6..a34102d7e 100644 --- a/weed/admin/handlers/admin_handlers_routes_test.go +++ b/weed/admin/handlers/admin_handlers_routes_test.go @@ -53,6 +53,16 @@ func TestSetupRoutes_RegistersPluginPages_NoAuth(t *testing.T) { assertHasRoute(t, router, http.MethodGet, "/plugin/detection") assertHasRoute(t, router, http.MethodGet, "/plugin/execution") assertHasRoute(t, router, http.MethodGet, "/plugin/monitoring") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/default") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/default/configuration") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/default/queue") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/default/detection") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/default/execution") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/default/monitoring") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/default/workers") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/lifecycle") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/lifecycle/configuration") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/lifecycle/workers") } func TestSetupRoutes_RegistersPluginPages_WithAuth(t *testing.T) { @@ -66,6 +76,13 @@ func TestSetupRoutes_RegistersPluginPages_WithAuth(t *testing.T) { assertHasRoute(t, router, http.MethodGet, "/plugin/detection") assertHasRoute(t, router, http.MethodGet, "/plugin/execution") assertHasRoute(t, router, http.MethodGet, "/plugin/monitoring") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/iceberg") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/iceberg/configuration") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/iceberg/queue") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/iceberg/detection") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/iceberg/execution") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/iceberg/monitoring") + assertHasRoute(t, router, http.MethodGet, "/plugin/lanes/iceberg/workers") } func newRouteTestAdminHandlers() *AdminHandlers { diff --git a/weed/admin/handlers/plugin_handlers.go b/weed/admin/handlers/plugin_handlers.go index b4d0ec74b..1c6964abe 100644 --- a/weed/admin/handlers/plugin_handlers.go +++ b/weed/admin/handlers/plugin_handlers.go @@ -4,7 +4,9 @@ import ( "bytes" "net/http" + "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/admin/dash" + adminplugin "github.com/seaweedfs/seaweedfs/weed/admin/plugin" "github.com/seaweedfs/seaweedfs/weed/admin/view/app" "github.com/seaweedfs/seaweedfs/weed/admin/view/layout" ) @@ -52,9 +54,73 @@ func (h *PluginHandlers) ShowPluginMonitoring(w http.ResponseWriter, r *http.Req h.renderPluginPage(w, r, "detection") } +// ShowPluginLane displays a lane overview page using the shared plugin UI. +func (h *PluginHandlers) ShowPluginLane(w http.ResponseWriter, r *http.Request) { + h.renderPluginPageWithLane(w, r, "overview") +} + +// ShowPluginLaneConfiguration displays a lane-specific configuration page. +func (h *PluginHandlers) ShowPluginLaneConfiguration(w http.ResponseWriter, r *http.Request) { + h.renderPluginPageWithLane(w, r, "configuration") +} + +// ShowPluginLaneQueue displays a lane-specific queue page. +func (h *PluginHandlers) ShowPluginLaneQueue(w http.ResponseWriter, r *http.Request) { + h.renderPluginPageWithLane(w, r, "queue") +} + +// ShowPluginLaneDetection displays a lane-specific detection page. +func (h *PluginHandlers) ShowPluginLaneDetection(w http.ResponseWriter, r *http.Request) { + h.renderPluginPageWithLane(w, r, "detection") +} + +// ShowPluginLaneExecution displays a lane-specific execution page. +func (h *PluginHandlers) ShowPluginLaneExecution(w http.ResponseWriter, r *http.Request) { + h.renderPluginPageWithLane(w, r, "execution") +} + +// ShowPluginLaneMonitoring displays a lane-specific monitoring page. +func (h *PluginHandlers) ShowPluginLaneMonitoring(w http.ResponseWriter, r *http.Request) { + // Backward-compatible alias for the old monitoring URL. + h.renderPluginPageWithLane(w, r, "detection") +} + +// ShowPluginLaneWorkers displays workers filtered to a specific scheduler lane. +func (h *PluginHandlers) ShowPluginLaneWorkers(w http.ResponseWriter, r *http.Request) { + // Backward-compatible alias for the old lane overview URL. + h.renderPluginPageWithLane(w, r, "overview") +} + +func (h *PluginHandlers) renderPluginPageWithLane(w http.ResponseWriter, r *http.Request, page string) { + initialJob := r.URL.Query().Get("job") + lane := mux.Vars(r)["lane"] + component := app.Plugin(page, initialJob, lane) + viewCtx := layout.NewViewContext(r, dash.UsernameFromContext(r.Context()), dash.CSRFTokenFromContext(r.Context())) + layoutComponent := layout.Layout(viewCtx, component) + + var buf bytes.Buffer + if err := layoutComponent.Render(r.Context(), &buf); err != nil { + writeJSONError(w, http.StatusInternalServerError, "Failed to render template: "+err.Error()) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(buf.Bytes()) +} + func (h *PluginHandlers) renderPluginPage(w http.ResponseWriter, r *http.Request, page string) { initialJob := r.URL.Query().Get("job") - component := app.Plugin(page, initialJob) + lane := r.URL.Query().Get("lane") + if lane == "" && initialJob != "" { + // Derive lane from job type so that e.g. ?job=iceberg_maintenance + // scopes the page to the iceberg lane automatically. + lane = string(adminplugin.JobTypeLane(initialJob)) + } + if lane == "" { + lane = "default" + } + component := app.Plugin(page, initialJob, lane) viewCtx := layout.NewViewContext(r, dash.UsernameFromContext(r.Context()), dash.CSRFTokenFromContext(r.Context())) layoutComponent := layout.Layout(viewCtx, component) diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 4351c2172..e14e7ae41 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -69,6 +69,8 @@ type Plugin struct { detectorLeaseMu sync.Mutex detectorLeases map[string]string + lanes map[SchedulerLane]*schedulerLaneState + schedulerExecMu sync.Mutex schedulerExecReservations map[string]int adminScriptRunMu sync.RWMutex @@ -158,6 +160,11 @@ func New(options Options) (*Plugin, error) { schedulerTick = defaultSchedulerTick } + lanes := make(map[SchedulerLane]*schedulerLaneState, len(AllLanes())) + for _, lane := range AllLanes() { + lanes[lane] = newLaneState(lane) + } + plugin := &Plugin{ store: store, registry: NewRegistry(), @@ -167,6 +174,7 @@ func New(options Options) (*Plugin, error) { clusterContextProvider: options.ClusterContextProvider, configDefaultsProvider: options.ConfigDefaultsProvider, lockManager: options.LockManager, + lanes: lanes, sessions: make(map[string]*streamSession), pendingSchema: make(map[string]chan *plugin_pb.ConfigSchemaResponse), pendingDetection: make(map[string]*pendingDetectionState), @@ -191,8 +199,10 @@ func New(options Options) (*Plugin, error) { } if plugin.clusterContextProvider != nil { - plugin.wg.Add(1) - go plugin.schedulerLoop() + for _, ls := range plugin.lanes { + plugin.wg.Add(1) + go plugin.laneSchedulerLoop(ls) + } } plugin.wg.Add(1) go plugin.persistenceLoop() diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 06bab7e83..27a16f7b9 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -48,7 +48,9 @@ type schedulerPolicy struct { ExecutorReserveBackoff time.Duration } -func (r *Plugin) schedulerLoop() { +// laneSchedulerLoop is the main scheduling goroutine for a single lane. +// Each lane runs independently with its own timing, lock scope, and wake channel. +func (r *Plugin) laneSchedulerLoop(ls *schedulerLaneState) { defer r.wg.Done() for { select { @@ -57,16 +59,16 @@ func (r *Plugin) schedulerLoop() { default: } - hadJobs := r.runSchedulerIteration() - r.recordSchedulerIterationComplete(hadJobs) + hadJobs := r.runLaneSchedulerIteration(ls) + r.recordLaneIterationComplete(ls, hadJobs) if hadJobs { continue } - r.setSchedulerLoopState("", "sleeping") - idleSleep := defaultSchedulerIdleSleep - if nextRun := r.earliestNextDetectionAt(); !nextRun.IsZero() { + r.setLaneLoopState(ls, "", "sleeping") + idleSleep := LaneIdleSleep(ls.lane) + if nextRun := r.earliestLaneDetectionAt(ls.lane); !nextRun.IsZero() { if until := time.Until(nextRun); until <= 0 { idleSleep = 0 } else if until < idleSleep { @@ -82,7 +84,7 @@ func (r *Plugin) schedulerLoop() { case <-r.shutdownCh: timer.Stop() return - case <-r.schedulerWakeCh: + case <-ls.wakeCh: if !timer.Stop() { <-timer.C } @@ -92,7 +94,90 @@ func (r *Plugin) schedulerLoop() { } } +// schedulerLoop is kept for backward compatibility; it delegates to +// laneSchedulerLoop with the default lane. New code should not call this. +func (r *Plugin) schedulerLoop() { + ls := r.lanes[LaneDefault] + if ls == nil { + ls = newLaneState(LaneDefault) + } + r.laneSchedulerLoop(ls) +} + +// runLaneSchedulerIteration runs one scheduling pass for a single lane, +// processing only the job types assigned to that lane. +func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool { + r.expireStaleJobs(time.Now().UTC()) + + allJobTypes := r.registry.DetectableJobTypes() + // Filter to only job types belonging to this lane. + var jobTypes []string + for _, jt := range allJobTypes { + if JobTypeLane(jt) == ls.lane { + jobTypes = append(jobTypes, jt) + } + } + if len(jobTypes) == 0 { + r.setLaneLoopState(ls, "", "idle") + return false + } + + r.setLaneLoopState(ls, "", "waiting_for_lock") + lockName := fmt.Sprintf("plugin scheduler:%s", ls.lane) + releaseLock, err := r.acquireAdminLock(lockName) + if err != nil { + glog.Warningf("Plugin scheduler [%s] failed to acquire lock: %v", ls.lane, err) + r.setLaneLoopState(ls, "", "idle") + return false + } + if releaseLock != nil { + defer releaseLock() + } + + active := make(map[string]struct{}, len(jobTypes)) + hadJobs := false + + for _, jobType := range jobTypes { + active[jobType] = struct{}{} + + policy, enabled, err := r.loadSchedulerPolicy(jobType) + if err != nil { + glog.Warningf("Plugin scheduler [%s] failed to load policy for %s: %v", ls.lane, jobType, err) + continue + } + if !enabled { + r.clearSchedulerJobType(jobType) + continue + } + initialDelay := time.Duration(0) + if runInfo := r.snapshotSchedulerRun(jobType); runInfo.lastRunStartedAt.IsZero() { + initialDelay = 5 * time.Second + } + if !r.markDetectionDue(jobType, policy.DetectionInterval, initialDelay) { + continue + } + + detected := r.runJobTypeIteration(jobType, policy) + if detected { + hadJobs = true + } + } + + r.pruneSchedulerState(active) + r.pruneDetectorLeases(active) + r.setLaneLoopState(ls, "", "idle") + return hadJobs +} + +// runSchedulerIteration is kept for backward compatibility. It runs a +// single iteration across ALL job types (equivalent to the old single-loop +// behavior). It is only used by the legacy schedulerLoop() fallback. func (r *Plugin) runSchedulerIteration() bool { + ls := r.lanes[LaneDefault] + if ls == nil { + ls = newLaneState(LaneDefault) + } + // For backward compat, the old function processes all job types. r.expireStaleJobs(time.Now().UTC()) jobTypes := r.registry.DetectableJobTypes() @@ -147,16 +232,38 @@ func (r *Plugin) runSchedulerIteration() bool { return hadJobs } -func (r *Plugin) wakeScheduler() { +// wakeLane wakes the scheduler goroutine for a specific lane. +func (r *Plugin) wakeLane(lane SchedulerLane) { if r == nil { return } - select { - case r.schedulerWakeCh <- struct{}{}: - default: + if ls, ok := r.lanes[lane]; ok { + select { + case ls.wakeCh <- struct{}{}: + default: + } + } +} + +// wakeAllLanes wakes all lane scheduler goroutines. +func (r *Plugin) wakeAllLanes() { + if r == nil { + return + } + for _, ls := range r.lanes { + select { + case ls.wakeCh <- struct{}{}: + default: + } } } +// wakeScheduler wakes the lane that owns the given job type, or all lanes +// if no job type is specified. Kept for backward compatibility. +func (r *Plugin) wakeScheduler() { + r.wakeAllLanes() +} + func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) bool { r.recordSchedulerRunStart(jobType) r.clearWaitingJobQueue(jobType) @@ -454,6 +561,7 @@ func (r *Plugin) ListSchedulerStates() ([]SchedulerJobTypeState, error) { jobType := jobTypeInfo.JobType state := SchedulerJobTypeState{ JobType: jobType, + Lane: string(JobTypeLane(jobType)), DetectionInFlight: detectionInFlight[jobType], } @@ -573,6 +681,35 @@ func (r *Plugin) markDetectionDue(jobType string, interval, initialDelay time.Du return true } +// earliestLaneDetectionAt returns the earliest next detection time among +// job types that belong to the given lane. +func (r *Plugin) earliestLaneDetectionAt(lane SchedulerLane) time.Time { + if r == nil { + return time.Time{} + } + + r.schedulerMu.Lock() + defer r.schedulerMu.Unlock() + + var earliest time.Time + for jobType, nextRun := range r.nextDetectionAt { + if JobTypeLane(jobType) != lane { + continue + } + if nextRun.IsZero() { + continue + } + if earliest.IsZero() || nextRun.Before(earliest) { + earliest = nextRun + } + } + + return earliest +} + +// earliestNextDetectionAt returns the earliest next detection time across +// all job types regardless of lane. Kept for backward compatibility and +// the global scheduler status API. func (r *Plugin) earliestNextDetectionAt() time.Time { if r == nil { return time.Time{} @@ -868,6 +1005,17 @@ func (r *Plugin) tryReserveExecutorCapacity( executor *WorkerSession, jobType string, policy schedulerPolicy, +) (func(), bool) { + return r.tryReserveExecutorCapacityForLane(executor, jobType, policy, JobTypeLane(jobType)) +} + +// tryReserveExecutorCapacityForLane reserves an execution slot on the +// per-lane reservation pool so that lanes cannot starve each other. +func (r *Plugin) tryReserveExecutorCapacityForLane( + executor *WorkerSession, + jobType string, + policy schedulerPolicy, + lane SchedulerLane, ) (func(), bool) { if executor == nil || strings.TrimSpace(executor.WorkerID) == "" { return nil, false @@ -884,21 +1032,60 @@ func (r *Plugin) tryReserveExecutorCapacity( workerID := strings.TrimSpace(executor.WorkerID) - r.schedulerExecMu.Lock() - reserved := r.schedulerExecReservations[workerID] - if heartbeatUsed+reserved >= limit { + ls := r.lanes[lane] + if ls == nil { + // Fallback to global reservations if lane state is missing. + r.schedulerExecMu.Lock() + reserved := r.schedulerExecReservations[workerID] + if heartbeatUsed+reserved >= limit { + r.schedulerExecMu.Unlock() + return nil, false + } + r.schedulerExecReservations[workerID] = reserved + 1 r.schedulerExecMu.Unlock() + release := func() { r.releaseExecutorCapacity(workerID) } + return release, true + } + + ls.execMu.Lock() + reserved := ls.execRes[workerID] + if heartbeatUsed+reserved >= limit { + ls.execMu.Unlock() return nil, false } - r.schedulerExecReservations[workerID] = reserved + 1 - r.schedulerExecMu.Unlock() + ls.execRes[workerID] = reserved + 1 + ls.execMu.Unlock() release := func() { - r.releaseExecutorCapacity(workerID) + r.releaseExecutorCapacityForLane(workerID, lane) } return release, true } +// releaseExecutorCapacityForLane releases a reservation from the per-lane pool. +func (r *Plugin) releaseExecutorCapacityForLane(workerID string, lane SchedulerLane) { + workerID = strings.TrimSpace(workerID) + if workerID == "" { + return + } + + ls := r.lanes[lane] + if ls == nil { + r.releaseExecutorCapacity(workerID) + return + } + + ls.execMu.Lock() + defer ls.execMu.Unlock() + + current := ls.execRes[workerID] + if current <= 1 { + delete(ls.execRes, workerID) + return + } + ls.execRes[workerID] = current - 1 +} + func (r *Plugin) releaseExecutorCapacity(workerID string) { workerID = strings.TrimSpace(workerID) if workerID == "" { diff --git a/weed/admin/plugin/scheduler_lane.go b/weed/admin/plugin/scheduler_lane.go new file mode 100644 index 000000000..efa808e73 --- /dev/null +++ b/weed/admin/plugin/scheduler_lane.go @@ -0,0 +1,109 @@ +package plugin + +import ( + "sync" + "time" +) + +// SchedulerLane identifies an independent scheduling track. Each lane runs +// its own goroutine, maintains its own detection timing, and acquires its +// own admin lock so that workloads in different lanes never block each other. +type SchedulerLane string + +const ( + // LaneDefault handles volume management operations (vacuum, balance, + // erasure coding) and admin scripts. It is the fallback lane for any + // job type that is not explicitly mapped elsewhere. + LaneDefault SchedulerLane = "default" + + // LaneIceberg handles table-bucket Iceberg compaction and maintenance. + LaneIceberg SchedulerLane = "iceberg" + + // LaneLifecycle handles S3 object store lifecycle management + // (expiration, transition, abort incomplete multipart uploads). + LaneLifecycle SchedulerLane = "lifecycle" +) + +// AllLanes returns every defined scheduler lane in a stable order. +func AllLanes() []SchedulerLane { + return []SchedulerLane{LaneDefault, LaneIceberg, LaneLifecycle} +} + +// laneIdleSleep maps each lane to its default idle sleep duration. +// Each lane can sleep for a different amount when no work is detected, +// independent of the per-job-type DetectionInterval. +var laneIdleSleep = map[SchedulerLane]time.Duration{ + LaneDefault: 61 * time.Second, + LaneIceberg: 61 * time.Second, + LaneLifecycle: 5 * time.Minute, +} + +// LaneIdleSleep returns the idle sleep duration for the given lane, +// falling back to defaultSchedulerIdleSleep if the lane is unknown. +func LaneIdleSleep(lane SchedulerLane) time.Duration { + if d, ok := laneIdleSleep[lane]; ok { + return d + } + return defaultSchedulerIdleSleep +} + +// jobTypeLaneMap is the hardcoded mapping from job type to scheduler lane. +// Job types not present here are assigned to LaneDefault. +var jobTypeLaneMap = map[string]SchedulerLane{ + // Volume management (default lane) + "vacuum": LaneDefault, + "volume_balance": LaneDefault, + "ec_balance": LaneDefault, + "erasure_coding": LaneDefault, + "admin_script": LaneDefault, + + // Iceberg table maintenance + "iceberg_maintenance": LaneIceberg, + + // S3 lifecycle management + "s3_lifecycle": LaneLifecycle, +} + +// JobTypeLane returns the scheduler lane for the given job type. +// Unknown job types are assigned to LaneDefault. +func JobTypeLane(jobType string) SchedulerLane { + if lane, ok := jobTypeLaneMap[jobType]; ok { + return lane + } + return LaneDefault +} + +// LaneJobTypes returns the set of known job types assigned to the given lane. +func LaneJobTypes(lane SchedulerLane) []string { + var result []string + for jobType, l := range jobTypeLaneMap { + if l == lane { + result = append(result, jobType) + } + } + return result +} + +// schedulerLaneState holds the per-lane runtime state used by the scheduler. +type schedulerLaneState struct { + lane SchedulerLane + wakeCh chan struct{} + + loopMu sync.Mutex + loop schedulerLoopState + + // Per-lane execution reservation pool. Each lane tracks how many + // execution slots it has reserved on each worker independently, + // so lanes cannot starve each other. + execMu sync.Mutex + execRes map[string]int +} + +// newLaneState creates a schedulerLaneState for the given lane. +func newLaneState(lane SchedulerLane) *schedulerLaneState { + return &schedulerLaneState{ + lane: lane, + wakeCh: make(chan struct{}, 1), + execRes: make(map[string]int), + } +} diff --git a/weed/admin/plugin/scheduler_lane_test.go b/weed/admin/plugin/scheduler_lane_test.go new file mode 100644 index 000000000..6217e7f22 --- /dev/null +++ b/weed/admin/plugin/scheduler_lane_test.go @@ -0,0 +1,47 @@ +package plugin + +import ( + "testing" +) + +func TestJobTypeLaneMapCoversKnownTypes(t *testing.T) { + // Every job type in the map must resolve to a valid lane. + for jobType, lane := range jobTypeLaneMap { + if lane != LaneDefault && lane != LaneIceberg && lane != LaneLifecycle { + t.Errorf("jobTypeLaneMap[%q] = %q, want a known lane", jobType, lane) + } + } +} + +func TestJobTypeLaneFallsBackToDefault(t *testing.T) { + if got := JobTypeLane("unknown_job_type"); got != LaneDefault { + t.Errorf("JobTypeLane(unknown) = %q, want %q", got, LaneDefault) + } +} + +func TestAllLanesHaveIdleSleep(t *testing.T) { + for _, lane := range AllLanes() { + if d := LaneIdleSleep(lane); d <= 0 { + t.Errorf("LaneIdleSleep(%q) = %v, want > 0", lane, d) + } + } +} + +func TestKnownJobTypesInMap(t *testing.T) { + // Ensure the well-known job types are mapped. This catches drift + // if a handler's job type string changes without updating the map. + expected := map[string]SchedulerLane{ + "vacuum": LaneDefault, + "volume_balance": LaneDefault, + "ec_balance": LaneDefault, + "erasure_coding": LaneDefault, + "admin_script": LaneDefault, + "iceberg_maintenance": LaneIceberg, + "s3_lifecycle": LaneLifecycle, + } + for jobType, wantLane := range expected { + if got := JobTypeLane(jobType); got != wantLane { + t.Errorf("JobTypeLane(%q) = %q, want %q", jobType, got, wantLane) + } + } +} diff --git a/weed/admin/plugin/scheduler_status.go b/weed/admin/plugin/scheduler_status.go index 75ae55aa9..e448a2800 100644 --- a/weed/admin/plugin/scheduler_status.go +++ b/weed/admin/plugin/scheduler_status.go @@ -8,6 +8,7 @@ import ( type SchedulerStatus struct { Now time.Time `json:"now"` + Lane string `json:"lane,omitempty"` SchedulerTickSeconds int `json:"scheduler_tick_seconds"` IdleSleepSeconds int `json:"idle_sleep_seconds,omitempty"` NextDetectionAt *time.Time `json:"next_detection_at,omitempty"` @@ -213,21 +214,104 @@ func (r *Plugin) snapshotSchedulerLoopState() schedulerLoopState { return r.schedulerLoopState } -func (r *Plugin) GetSchedulerStatus() SchedulerStatus { +// aggregateLaneLoopStates merges per-lane loop states into a single +// schedulerLoopState for the aggregate GetSchedulerStatus API. It picks +// the most recent iteration completion, any currently-active job type, +// and a phase that reflects whether any lane is actively working. +func (r *Plugin) aggregateLaneLoopStates() schedulerLoopState { + if r == nil || len(r.lanes) == 0 { + return r.snapshotSchedulerLoopState() + } + + var agg schedulerLoopState + for _, ls := range r.lanes { + snap := r.snapshotLaneLoopState(ls) + if snap.lastIterationCompleted.After(agg.lastIterationCompleted) { + agg.lastIterationCompleted = snap.lastIterationCompleted + } + if snap.lastIterationHadJobs { + agg.lastIterationHadJobs = true + } + // Prefer showing an active phase over idle/sleeping. + if snap.currentJobType != "" { + agg.currentJobType = snap.currentJobType + agg.currentPhase = snap.currentPhase + } + } + // If no lane is actively processing, show the most interesting phase. + if agg.currentPhase == "" { + for _, ls := range r.lanes { + snap := r.snapshotLaneLoopState(ls) + if snap.currentPhase != "" { + agg.currentPhase = snap.currentPhase + break + } + } + } + return agg +} + +// --- Per-lane loop state helpers --- + +func (r *Plugin) setLaneLoopState(ls *schedulerLaneState, jobType, phase string) { + if r == nil || ls == nil { + return + } + ls.loopMu.Lock() + ls.loop.currentJobType = jobType + ls.loop.currentPhase = phase + ls.loopMu.Unlock() +} + +func (r *Plugin) recordLaneIterationComplete(ls *schedulerLaneState, hadJobs bool) { + if r == nil || ls == nil { + return + } + ls.loopMu.Lock() + ls.loop.lastIterationHadJobs = hadJobs + ls.loop.lastIterationCompleted = time.Now().UTC() + ls.loopMu.Unlock() +} + +func (r *Plugin) snapshotLaneLoopState(ls *schedulerLaneState) schedulerLoopState { + if r == nil || ls == nil { + return schedulerLoopState{} + } + ls.loopMu.Lock() + defer ls.loopMu.Unlock() + return ls.loop +} + +// GetLaneSchedulerStatus returns scheduler status scoped to a single lane. +func (r *Plugin) GetLaneSchedulerStatus(lane SchedulerLane) SchedulerStatus { + ls := r.lanes[lane] + if ls == nil { + return SchedulerStatus{Now: time.Now().UTC()} + } now := time.Now().UTC() - loopState := r.snapshotSchedulerLoopState() + loopState := r.snapshotLaneLoopState(ls) + idleSleep := LaneIdleSleep(lane) + allInProcess := r.listInProcessJobs(now) + laneInProcess := make([]SchedulerJobStatus, 0, len(allInProcess)) + for _, job := range allInProcess { + if JobTypeLane(job.JobType) == lane { + laneInProcess = append(laneInProcess, job) + } + } + status := SchedulerStatus{ Now: now, + Lane: string(lane), SchedulerTickSeconds: int(secondsFromDuration(r.schedulerTick)), - InProcessJobs: r.listInProcessJobs(now), - IdleSleepSeconds: int(defaultSchedulerIdleSleep / time.Second), + InProcessJobs: laneInProcess, + IdleSleepSeconds: int(idleSleep / time.Second), CurrentJobType: loopState.currentJobType, CurrentPhase: loopState.currentPhase, LastIterationHadJobs: loopState.lastIterationHadJobs, } - nextDetectionAt := r.earliestNextDetectionAt() + nextDetectionAt := r.earliestLaneDetectionAt(lane) if nextDetectionAt.IsZero() && loopState.currentPhase == "sleeping" && !loopState.lastIterationCompleted.IsZero() { - nextDetectionAt = loopState.lastIterationCompleted.Add(defaultSchedulerIdleSleep) + nextDetectionAt = loopState.lastIterationCompleted.Add(idleSleep) } if !nextDetectionAt.IsZero() { at := nextDetectionAt @@ -243,6 +327,92 @@ func (r *Plugin) GetSchedulerStatus() SchedulerStatus { return status } + waiting := make([]SchedulerWaitingStatus, 0) + jobTypes := make([]SchedulerJobTypeStatus, 0) + + for _, state := range states { + if JobTypeLane(state.JobType) != lane { + continue + } + jobType := state.JobType + info := r.snapshotSchedulerDetection(jobType) + + jobStatus := SchedulerJobTypeStatus{ + JobType: jobType, + Enabled: state.Enabled, + DetectionInFlight: state.DetectionInFlight, + NextDetectionAt: state.NextDetectionAt, + DetectionIntervalSeconds: state.DetectionIntervalSeconds, + } + if !info.lastDetectedAt.IsZero() { + jobStatus.LastDetectedAt = timeToPtr(info.lastDetectedAt) + jobStatus.LastDetectedCount = info.lastDetectedCount + } + if info.lastError != "" { + jobStatus.LastDetectionError = info.lastError + } + if info.lastSkippedReason != "" { + jobStatus.LastDetectionSkipped = info.lastSkippedReason + } + jobTypes = append(jobTypes, jobStatus) + + if state.DetectionInFlight { + waiting = append(waiting, SchedulerWaitingStatus{ + Reason: "detection_in_flight", + JobType: jobType, + }) + } else if state.Enabled && state.NextDetectionAt != nil && now.Before(*state.NextDetectionAt) { + waiting = append(waiting, SchedulerWaitingStatus{ + Reason: "next_detection_at", + JobType: jobType, + Until: state.NextDetectionAt, + }) + } + } + + sort.Slice(jobTypes, func(i, j int) bool { + return jobTypes[i].JobType < jobTypes[j].JobType + }) + + status.Waiting = waiting + status.JobTypes = jobTypes + return status +} + +func (r *Plugin) GetSchedulerStatus() SchedulerStatus { + now := time.Now().UTC() + + // Aggregate loop state across all lanes instead of reading the + // legacy single-loop state which is no longer updated. + aggregated := r.aggregateLaneLoopStates() + + status := SchedulerStatus{ + Now: now, + SchedulerTickSeconds: int(secondsFromDuration(r.schedulerTick)), + InProcessJobs: r.listInProcessJobs(now), + IdleSleepSeconds: int(defaultSchedulerIdleSleep / time.Second), + CurrentJobType: aggregated.currentJobType, + CurrentPhase: aggregated.currentPhase, + LastIterationHadJobs: aggregated.lastIterationHadJobs, + } + nextDetectionAt := r.earliestNextDetectionAt() + if nextDetectionAt.IsZero() && aggregated.currentPhase == "sleeping" && !aggregated.lastIterationCompleted.IsZero() { + nextDetectionAt = aggregated.lastIterationCompleted.Add(defaultSchedulerIdleSleep) + } + if !nextDetectionAt.IsZero() { + at := nextDetectionAt + status.NextDetectionAt = &at + } + if !aggregated.lastIterationCompleted.IsZero() { + at := aggregated.lastIterationCompleted + status.LastIterationDoneAt = &at + } + + states, err := r.ListSchedulerStates() + if err != nil { + return status + } + waiting := make([]SchedulerWaitingStatus, 0) jobTypes := make([]SchedulerJobTypeStatus, 0, len(states)) diff --git a/weed/admin/plugin/types.go b/weed/admin/plugin/types.go index fcac0b2c6..d4804ac95 100644 --- a/weed/admin/plugin/types.go +++ b/weed/admin/plugin/types.go @@ -83,6 +83,7 @@ type JobDetail struct { type SchedulerJobTypeState struct { JobType string `json:"job_type"` + Lane string `json:"lane"` Enabled bool `json:"enabled"` PolicyError string `json:"policy_error,omitempty"` DetectionInFlight bool `json:"detection_in_flight"` diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index e1b897057..25aa883f7 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -1,19 +1,25 @@ package app -templ Plugin(page string, initialJob string) { +templ Plugin(page string, initialJob string, lane string) { {{ currentPage := page if currentPage == "" { currentPage = "overview" } + currentLane := lane + if currentLane == "" { + currentLane = "default" + } + laneTitle := pluginLaneTitle(currentLane) + laneDescription := pluginLaneDescription(currentLane) }} -
Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.
+{ laneDescription }