From b17e2b411a183369bc21eb4b37e57cc90309efb8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Mar 2026 13:48:42 -0700 Subject: [PATCH] Add dynamic timeouts to plugin worker vacuum gRPC calls (#8593) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add dynamic timeouts to plugin worker vacuum gRPC calls All vacuum gRPC calls used context.Background() with no deadline, so the plugin scheduler's execution timeout could kill a job while a large volume compact was still in progress. Use volume-size-scaled timeouts matching the topology vacuum approach: 3 min/GB for compact, 1 min/GB for check, commit, and cleanup. Fixes #8591 * scale scheduler execution timeout by volume size The scheduler's per-job execution timeout (default 240s) would kill vacuum jobs on large volumes before they finish. Three changes: 1. Vacuum detection now includes estimated_runtime_seconds in job proposals, computed as 5 min/GB of volume size. 2. The scheduler checks for estimated_runtime_seconds in job parameters and uses it as the execution timeout when larger than the default — a generic mechanism any handler can use. 3. Vacuum task gRPC calls now use the passed-in ctx as parent instead of context.Background(), so scheduler cancellation propagates to in-flight RPCs. * extend job type runtime when proposals need more time The JobTypeMaxRuntime (default 30 min) wraps both detection and execution. Its context is the parent of all per-job execution contexts, so even with per-job estimated_runtime_seconds, jobCtx would cancel everything when it expires. After detection, scan proposals for the maximum estimated_runtime_seconds. If any proposal needs more time than the remaining JobTypeMaxRuntime, create a new execution context with enough headroom. This lets large vacuum jobs complete without being killed by the job type deadline while still respecting the configured limit for normal-sized jobs. * log missing volume size metric, remove dead minimum runtime guard Add a debug log in vacuumTimeout when t.volumeSize is 0 so operators can investigate why metrics are missing for a volume. Remove the unreachable estimatedRuntimeSeconds < 180 check in buildVacuumProposal — volumeSizeGB always >= 1 (due to +1 floor), so estimatedRuntimeSeconds is always >= 300. * cap estimated runtime and fix status check context - Cap maxEstimatedRuntime and per-job timeout overrides to 8 hours to prevent unbounded timeouts from bad metrics. - Check execCtx.Err() instead of jobCtx.Err() for status reporting, since dispatch runs under execCtx which may have a longer deadline. A successful dispatch under execCtx was misreported as "timeout" when jobCtx had expired. --- weed/admin/plugin/plugin_scheduler.go | 55 +++++++++++++++++++++++-- weed/plugin/worker/vacuum_handler.go | 7 ++++ weed/worker/tasks/vacuum/vacuum_task.go | 51 ++++++++++++++++------- 3 files changed, 96 insertions(+), 17 deletions(-) diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 5b6eba7ab..00c43283e 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -32,6 +32,7 @@ const ( defaultClusterContextTimeout = 10 * time.Second defaultWaitingBacklogFloor = 8 defaultWaitingBacklogMultiplier = 4 + maxEstimatedRuntimeCap = 8 * time.Hour ) type schedulerPolicy struct { @@ -293,6 +294,26 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo r.setSchedulerLoopState(jobType, "executing") + // Scan proposals for the maximum estimated_runtime_seconds so the + // execution phase gets enough time for large jobs (e.g. vacuum on + // big volumes). If any proposal needs more time than the remaining + // JobTypeMaxRuntime, extend the execution context accordingly. + var maxEstimatedRuntime time.Duration + for _, p := range filtered { + if p.Parameters != nil { + if est, ok := p.Parameters["estimated_runtime_seconds"]; ok { + if v := est.GetInt64Value(); v > 0 { + if d := time.Duration(v) * time.Second; d > maxEstimatedRuntime { + maxEstimatedRuntime = d + } + } + } + } + } + if maxEstimatedRuntime > maxEstimatedRuntimeCap { + maxEstimatedRuntime = maxEstimatedRuntimeCap + } + remaining = time.Until(start.Add(maxRuntime)) if remaining <= 0 { r.appendActivity(JobActivity{ @@ -306,6 +327,17 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo return detected } + // If the longest estimated job exceeds the remaining JobTypeMaxRuntime, + // create a new execution context with enough headroom instead of using + // jobCtx which would cancel too early. + execCtx := jobCtx + execCancel := context.CancelFunc(func() {}) + if maxEstimatedRuntime > 0 && maxEstimatedRuntime > remaining { + execCtx, execCancel = context.WithTimeout(context.Background(), maxEstimatedRuntime) + remaining = maxEstimatedRuntime + } + defer execCancel() + execPolicy := policy if execPolicy.ExecutionTimeout <= 0 { execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout @@ -314,10 +346,10 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo execPolicy.ExecutionTimeout = remaining } - successCount, errorCount, canceledCount := r.dispatchScheduledProposals(jobCtx, jobType, filtered, clusterContext, execPolicy) + successCount, errorCount, canceledCount := r.dispatchScheduledProposals(execCtx, jobType, filtered, clusterContext, execPolicy) status := "success" - if jobCtx.Err() != nil { + if execCtx.Err() != nil { status = "timeout" } else if errorCount > 0 || canceledCount > 0 { status = "error" @@ -937,7 +969,24 @@ func (r *Plugin) executeScheduledJobWithExecutor( if parent == nil { parent = context.Background() } - execCtx, cancel := context.WithTimeout(parent, policy.ExecutionTimeout) + // Use the job's estimated runtime if provided and larger than the + // default execution timeout. This lets handlers like vacuum scale + // the timeout based on volume size so large volumes are not killed. + timeout := policy.ExecutionTimeout + if job.Parameters != nil { + if est, ok := job.Parameters["estimated_runtime_seconds"]; ok { + if v := est.GetInt64Value(); v > 0 { + estimated := time.Duration(v) * time.Second + if estimated > maxEstimatedRuntimeCap { + estimated = maxEstimatedRuntimeCap + } + if estimated > timeout { + timeout = estimated + } + } + } + } + execCtx, cancel := context.WithTimeout(parent, timeout) _, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt)) cancel() if err == nil { diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index d1a93d299..1ad58b30d 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -544,6 +544,10 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo summary = summary + " on " + result.Server } + // Estimate runtime: 5 min/GB (compact + commit + overhead) + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 5 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -564,6 +568,9 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo "collection": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "vacuum", diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go index 4b890fada..3b226334b 100644 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ b/weed/worker/tasks/vacuum/vacuum_task.go @@ -25,6 +25,7 @@ type VacuumTask struct { garbageThreshold float64 progress float64 grpcDialOption grpc.DialOption + volumeSize uint64 } // NewVacuumTask creates a new unified vacuum task instance @@ -51,6 +52,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) } t.garbageThreshold = vacuumParams.GarbageThreshold + t.volumeSize = params.VolumeSize t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, @@ -62,7 +64,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) // Step 1: Check volume status and garbage ratio t.ReportProgress(10.0) t.GetLogger().Info("Checking volume status") - eligible, currentGarbageRatio, err := t.checkVacuumEligibility() + eligible, currentGarbageRatio, err := t.checkVacuumEligibility(ctx) if err != nil { return fmt.Errorf("failed to check vacuum eligibility: %v", err) } @@ -83,14 +85,14 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) "threshold": t.garbageThreshold, }).Info("Performing vacuum operation") - if err := t.performVacuum(); err != nil { + if err := t.performVacuum(ctx); err != nil { return fmt.Errorf("failed to perform vacuum: %v", err) } // Step 3: Verify vacuum results t.ReportProgress(90.0) t.GetLogger().Info("Verifying vacuum results") - if err := t.verifyVacuumResults(); err != nil { + if err := t.verifyVacuumResults(ctx); err != nil { glog.Warningf("Vacuum verification failed: %v", err) // Don't fail the task - vacuum operation itself succeeded } @@ -146,15 +148,28 @@ func (t *VacuumTask) GetProgress() float64 { return t.progress } +// vacuumTimeout returns a dynamic timeout scaled by volume size, matching the +// topology vacuum approach. base is the per-GB multiplier (e.g. 1 minute for +// check, 3 minutes for compact). +func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration { + if t.volumeSize == 0 { + glog.V(1).Infof("volume %d has no size metric, using minimum timeout", t.volumeID) + } + sizeGB := int64(t.volumeSize/1024/1024/1024) + 1 + return base * time.Duration(sizeGB) +} + // Helper methods for real vacuum operations // checkVacuumEligibility checks if the volume meets vacuum criteria -func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { +func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, error) { var garbageRatio float64 err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + checkCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(checkCtx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -178,12 +193,14 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { } // performVacuum executes the actual vacuum operation -func (t *VacuumTask) performVacuum() error { +func (t *VacuumTask) performVacuum(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // Step 1: Compact the volume + // Step 1: Compact the volume (3 min per GB, matching topology vacuum) t.GetLogger().Info("Compacting volume") - stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + compactCtx, compactCancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute)) + defer compactCancel() + stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -202,18 +219,22 @@ func (t *VacuumTask) performVacuum() error { glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes) } - // Step 2: Commit the vacuum + // Step 2: Commit the vacuum (1 min per GB) t.GetLogger().Info("Committing vacuum operation") - _, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + commitCtx, commitCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer commitCancel() + _, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: t.volumeID, }) if err != nil { return fmt.Errorf("vacuum commit failed: %v", err) } - // Step 3: Cleanup old files + // Step 3: Cleanup old files (1 min per GB) t.GetLogger().Info("Cleaning up vacuum files") - _, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ + cleanupCtx, cleanupCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cleanupCancel() + _, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -226,10 +247,12 @@ func (t *VacuumTask) performVacuum() error { } // verifyVacuumResults checks the volume status after vacuum -func (t *VacuumTask) verifyVacuumResults() error { +func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil {