From ae170f1fbb13af7b80314b73a495d512aa185b0c Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 21 Mar 2026 23:09:31 +0200 Subject: [PATCH] admin: fix manual job run to use scheduler dispatch with capacity management and retry (#8720) RunPluginJobTypeAPI previously executed proposals with a naive sequential loop calling ExecutePluginJob per proposal. This had two bugs: 1. Double-lock: RunPluginJobTypeAPI held pluginLock while calling ExecutePluginJob, which tried to re-acquire the same lock for every job in the loop. 2. No capacity management: proposals were fired directly at workers without reserveScheduledExecutor, so every job beyond the worker concurrency limit received an immediate at_capacity error with no retry or backoff. Fix: add Plugin.DispatchProposals which reuses dispatchScheduledProposals - the same code path the scheduler loop uses - with executor reservation, configurable concurrency, and per-job retry with backoff. RunPluginJobTypeAPI now calls DispatchPluginProposals (a thin AdminServer wrapper) after holding pluginLock once. Co-authored-by: Anton Ustyugov --- weed/admin/dash/admin_server.go | 17 +++++++++++ weed/admin/dash/plugin_api.go | 42 ++++----------------------- weed/admin/plugin/plugin_scheduler.go | 29 ++++++++++++++++++ 3 files changed, 52 insertions(+), 36 deletions(-) diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 666dcb1b0..d4dac31b4 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -1284,6 +1284,23 @@ func (s *AdminServer) RunPluginDetectionWithReport( return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults) } +// DispatchPluginProposals dispatches a batch of proposals using the same +// capacity-aware dispatch logic as the scheduler loop (executor reservation with +// backoff, per-job retry on transient errors). The plugin lock must already be +// held by the caller. +func (s *AdminServer) DispatchPluginProposals( + ctx context.Context, + jobType string, + proposals []*plugin_pb.JobProposal, + clusterContext *plugin_pb.ClusterContext, +) (successCount, errorCount, canceledCount int, err error) { + if s.plugin == nil { + return 0, 0, 0, fmt.Errorf("plugin is not enabled") + } + sc, ec, cc := s.plugin.DispatchProposals(ctx, jobType, proposals, clusterContext) + return sc, ec, cc, nil +} + // ExecutePluginJob dispatches one job to a capable worker and waits for completion. func (s *AdminServer) ExecutePluginJob( ctx context.Context, diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index f6756d4b3..897fb88a7 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -526,40 +526,10 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request return } - type executionResult struct { - JobID string `json:"job_id"` - Success bool `json:"success"` - Error string `json:"error,omitempty"` - Completion map[string]interface{} `json:"completion,omitempty"` - } - - results := make([]executionResult, 0, len(filteredProposals)) - successCount := 0 - errorCount := 0 - - for index, proposal := range filteredProposals { - job := buildJobSpecFromProposal(jobType, proposal, index) - completed, execErr := s.ExecutePluginJob(ctx, job, clusterContext, req.Attempt) - - result := executionResult{ - JobID: job.JobId, - Success: execErr == nil, - } - - if completed != nil { - if payload, marshalErr := protoMessageToMap(completed); marshalErr == nil { - result.Completion = payload - } - } - - if execErr != nil { - result.Error = execErr.Error() - errorCount++ - } else { - successCount++ - } - - results = append(results, result) + successCount, errorCount, canceledCount, dispatchErr := s.DispatchPluginProposals(ctx, jobType, filteredProposals, clusterContext) + if dispatchErr != nil { + writeJSONError(w, http.StatusInternalServerError, dispatchErr.Error()) + return } writeJSON(w, http.StatusOK, map[string]interface{}{ @@ -567,10 +537,10 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request "detected_count": detectedCount, "ready_to_execute_count": len(filteredProposals), "skipped_active_count": skippedActiveCount, - "executed_count": len(results), + "executed_count": successCount + errorCount + canceledCount, "success_count": successCount, "error_count": errorCount, - "execution_results": results, + "canceled_count": canceledCount, }) } diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 00c43283e..06bab7e83 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -1287,6 +1287,35 @@ func isWaitingTrackedJobState(state string) bool { return normalized == "pending" || normalized == "job_state_pending" } +// DispatchProposals dispatches a batch of proposals using the same capacity-aware +// dispatch logic as the scheduler loop: concurrent execution, executor reservation +// with backoff, and per-job retry on transient errors. The scheduler policy is +// loaded from the persisted job type config; if the job type has no config or is +// disabled a sensible default policy is used so manual runs always work. +func (r *Plugin) DispatchProposals( + ctx context.Context, + jobType string, + proposals []*plugin_pb.JobProposal, + clusterContext *plugin_pb.ClusterContext, +) (successCount, errorCount, canceledCount int) { + if len(proposals) == 0 { + return 0, 0, 0 + } + + policy, enabled, err := r.loadSchedulerPolicy(jobType) + if err != nil || !enabled { + policy = schedulerPolicy{ + ExecutionConcurrency: defaultScheduledExecutionConcurrency, + PerWorkerConcurrency: defaultScheduledPerWorkerConcurrency, + ExecutionTimeout: defaultScheduledExecutionTimeout, + RetryBackoff: defaultScheduledRetryBackoff, + ExecutorReserveBackoff: 200 * time.Millisecond, + } + } + + return r.dispatchScheduledProposals(ctx, jobType, proposals, clusterContext, policy) +} + func (r *Plugin) filterScheduledProposals(proposals []*plugin_pb.JobProposal) []*plugin_pb.JobProposal { filtered := make([]*plugin_pb.JobProposal, 0, len(proposals)) seenInRun := make(map[string]struct{}, len(proposals))