From 38765495c3c563013e60cc4821ae44f7c83be514 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 2 Mar 2026 16:49:08 -0800 Subject: [PATCH] plugin: gate other jobs during admin_script runs --- weed/admin/plugin/plugin.go | 29 ++++- weed/admin/plugin/plugin_cancel_test.go | 164 ++++++++++++++++++++++++ 2 files changed, 191 insertions(+), 2 deletions(-) diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 16a9ed3a2..4a993f994 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -24,6 +24,7 @@ const ( defaultHeartbeatInterval = 30 defaultReconnectDelay = 5 defaultPendingSchemaBuffer = 1 + adminScriptJobType = "admin_script" ) type Options struct { @@ -64,6 +65,7 @@ type Plugin struct { schedulerExecMu sync.Mutex schedulerExecReservations map[string]int + adminScriptRunMu sync.RWMutex dedupeMu sync.Mutex recentDedupeByType map[string]map[string]time.Time @@ -397,6 +399,9 @@ func (r *Plugin) RunDetectionWithReport( clusterContext *plugin_pb.ClusterContext, maxResults int32, ) (*DetectionReport, error) { + releaseGate := r.acquireDetectionExecutionGate(jobType, false) + defer releaseGate() + detector, err := r.pickDetector(jobType) if err != nil { return nil, err @@ -539,11 +544,14 @@ func (r *Plugin) ExecuteJob( if job == nil { return nil, fmt.Errorf("job is nil") } - if strings.TrimSpace(job.JobType) == "" { + jobType := strings.TrimSpace(job.JobType) + if jobType == "" { return nil, fmt.Errorf("job_type is required") } + releaseGate := r.acquireDetectionExecutionGate(jobType, true) + defer releaseGate() - executor, err := r.registry.PickExecutor(job.JobType) + executor, err := r.registry.PickExecutor(jobType) if err != nil { return nil, err } @@ -551,6 +559,23 @@ func (r *Plugin) ExecuteJob( return r.executeJobWithExecutor(ctx, executor, job, clusterContext, attempt) } +func (r *Plugin) acquireDetectionExecutionGate(jobType string, execution bool) func() { + normalizedJobType := strings.ToLower(strings.TrimSpace(jobType)) + if execution && normalizedJobType == adminScriptJobType { + r.adminScriptRunMu.Lock() + return func() { + r.adminScriptRunMu.Unlock() + } + } + if normalizedJobType != adminScriptJobType { + r.adminScriptRunMu.RLock() + return func() { + r.adminScriptRunMu.RUnlock() + } + } + return func() {} +} + func (r *Plugin) executeJobWithExecutor( ctx context.Context, executor *WorkerSession, diff --git a/weed/admin/plugin/plugin_cancel_test.go b/weed/admin/plugin/plugin_cancel_test.go index bb597e3f7..2a966ae8c 100644 --- a/weed/admin/plugin/plugin_cancel_test.go +++ b/weed/admin/plugin/plugin_cancel_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "testing" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestRunDetectionSendsCancelOnContextDone(t *testing.T) { @@ -110,3 +112,165 @@ func TestExecuteJobSendsCancelOnContextDone(t *testing.T) { t.Fatalf("expected context canceled error, got %v", runErr) } } + +func TestAdminScriptExecutionBlocksOtherDetection(t *testing.T) { + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New plugin error: %v", err) + } + defer pluginSvc.Shutdown() + + const adminWorkerID = "worker-admin-script" + const otherWorkerID = "worker-vacuum" + + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: adminWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1}, + }, + }) + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: otherWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanDetect: true, MaxDetectionConcurrency: 1}, + }, + }) + adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + pluginSvc.putSession(adminSession) + pluginSvc.putSession(otherSession) + + adminErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{ + JobId: "job-admin-script-1", + JobType: "admin_script", + }, &plugin_pb.ClusterContext{}, 1) + adminErrCh <- runErr + }() + + adminExecMessage := <-adminSession.outgoing + if adminExecMessage.GetExecuteJobRequest() == nil { + t.Fatalf("expected admin_script execute request") + } + + detectErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.RunDetection(context.Background(), "vacuum", &plugin_pb.ClusterContext{}, 10) + detectErrCh <- runErr + }() + + select { + case unexpected := <-otherSession.outgoing: + t.Fatalf("expected vacuum detection to wait while admin_script runs, got message: %+v", unexpected) + case <-time.After(100 * time.Millisecond): + } + + pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{ + RequestId: adminExecMessage.RequestId, + JobId: "job-admin-script-1", + JobType: "admin_script", + Success: true, + CompletedAt: timestamppb.Now(), + }) + if runErr := <-adminErrCh; runErr != nil { + t.Fatalf("admin_script ExecuteJob error: %v", runErr) + } + + detectMessage := <-otherSession.outgoing + detectRequest := detectMessage.GetRunDetectionRequest() + if detectRequest == nil { + t.Fatalf("expected vacuum detection request after admin_script completion") + } + pluginSvc.handleDetectionComplete(otherWorkerID, &plugin_pb.DetectionComplete{ + RequestId: detectMessage.RequestId, + JobType: "vacuum", + Success: true, + }) + if runErr := <-detectErrCh; runErr != nil { + t.Fatalf("vacuum RunDetection error: %v", runErr) + } +} + +func TestAdminScriptExecutionBlocksOtherExecution(t *testing.T) { + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New plugin error: %v", err) + } + defer pluginSvc.Shutdown() + + const adminWorkerID = "worker-admin-script" + const otherWorkerID = "worker-vacuum" + + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: adminWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1}, + }, + }) + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: otherWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanExecute: true, MaxExecutionConcurrency: 1}, + }, + }) + adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + pluginSvc.putSession(adminSession) + pluginSvc.putSession(otherSession) + + adminErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{ + JobId: "job-admin-script-2", + JobType: "admin_script", + }, &plugin_pb.ClusterContext{}, 1) + adminErrCh <- runErr + }() + + adminExecMessage := <-adminSession.outgoing + if adminExecMessage.GetExecuteJobRequest() == nil { + t.Fatalf("expected admin_script execute request") + } + + otherErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{ + JobId: "job-vacuum-1", + JobType: "vacuum", + }, &plugin_pb.ClusterContext{}, 1) + otherErrCh <- runErr + }() + + select { + case unexpected := <-otherSession.outgoing: + t.Fatalf("expected vacuum execute to wait while admin_script runs, got message: %+v", unexpected) + case <-time.After(100 * time.Millisecond): + } + + pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{ + RequestId: adminExecMessage.RequestId, + JobId: "job-admin-script-2", + JobType: "admin_script", + Success: true, + CompletedAt: timestamppb.Now(), + }) + if runErr := <-adminErrCh; runErr != nil { + t.Fatalf("admin_script ExecuteJob error: %v", runErr) + } + + otherExecMessage := <-otherSession.outgoing + if otherExecMessage.GetExecuteJobRequest() == nil { + t.Fatalf("expected vacuum execute request after admin_script completion") + } + pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{ + RequestId: otherExecMessage.RequestId, + JobId: "job-vacuum-1", + JobType: "vacuum", + Success: true, + CompletedAt: timestamppb.Now(), + }) + if runErr := <-otherErrCh; runErr != nil { + t.Fatalf("vacuum ExecuteJob error: %v", runErr) + } +}