Browse Source

plugin: gate other jobs during admin_script runs

pull/8491/head
Chris Lu 2 days ago
parent
commit
38765495c3
  1. 29
      weed/admin/plugin/plugin.go
  2. 164
      weed/admin/plugin/plugin_cancel_test.go

29
weed/admin/plugin/plugin.go

@ -24,6 +24,7 @@ const (
defaultHeartbeatInterval = 30
defaultReconnectDelay = 5
defaultPendingSchemaBuffer = 1
adminScriptJobType = "admin_script"
)
type Options struct {
@ -64,6 +65,7 @@ type Plugin struct {
schedulerExecMu sync.Mutex
schedulerExecReservations map[string]int
adminScriptRunMu sync.RWMutex
dedupeMu sync.Mutex
recentDedupeByType map[string]map[string]time.Time
@ -397,6 +399,9 @@ func (r *Plugin) RunDetectionWithReport(
clusterContext *plugin_pb.ClusterContext,
maxResults int32,
) (*DetectionReport, error) {
releaseGate := r.acquireDetectionExecutionGate(jobType, false)
defer releaseGate()
detector, err := r.pickDetector(jobType)
if err != nil {
return nil, err
@ -539,11 +544,14 @@ func (r *Plugin) ExecuteJob(
if job == nil {
return nil, fmt.Errorf("job is nil")
}
if strings.TrimSpace(job.JobType) == "" {
jobType := strings.TrimSpace(job.JobType)
if jobType == "" {
return nil, fmt.Errorf("job_type is required")
}
releaseGate := r.acquireDetectionExecutionGate(jobType, true)
defer releaseGate()
executor, err := r.registry.PickExecutor(job.JobType)
executor, err := r.registry.PickExecutor(jobType)
if err != nil {
return nil, err
}
@ -551,6 +559,23 @@ func (r *Plugin) ExecuteJob(
return r.executeJobWithExecutor(ctx, executor, job, clusterContext, attempt)
}
func (r *Plugin) acquireDetectionExecutionGate(jobType string, execution bool) func() {
normalizedJobType := strings.ToLower(strings.TrimSpace(jobType))
if execution && normalizedJobType == adminScriptJobType {
r.adminScriptRunMu.Lock()
return func() {
r.adminScriptRunMu.Unlock()
}
}
if normalizedJobType != adminScriptJobType {
r.adminScriptRunMu.RLock()
return func() {
r.adminScriptRunMu.RUnlock()
}
}
return func() {}
}
func (r *Plugin) executeJobWithExecutor(
ctx context.Context,
executor *WorkerSession,

164
weed/admin/plugin/plugin_cancel_test.go

@ -4,8 +4,10 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"google.golang.org/protobuf/types/known/timestamppb"
)
func TestRunDetectionSendsCancelOnContextDone(t *testing.T) {
@ -110,3 +112,165 @@ func TestExecuteJobSendsCancelOnContextDone(t *testing.T) {
t.Fatalf("expected context canceled error, got %v", runErr)
}
}
func TestAdminScriptExecutionBlocksOtherDetection(t *testing.T) {
pluginSvc, err := New(Options{})
if err != nil {
t.Fatalf("New plugin error: %v", err)
}
defer pluginSvc.Shutdown()
const adminWorkerID = "worker-admin-script"
const otherWorkerID = "worker-vacuum"
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
WorkerId: adminWorkerID,
Capabilities: []*plugin_pb.JobTypeCapability{
{JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1},
},
})
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
WorkerId: otherWorkerID,
Capabilities: []*plugin_pb.JobTypeCapability{
{JobType: "vacuum", CanDetect: true, MaxDetectionConcurrency: 1},
},
})
adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
pluginSvc.putSession(adminSession)
pluginSvc.putSession(otherSession)
adminErrCh := make(chan error, 1)
go func() {
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
JobId: "job-admin-script-1",
JobType: "admin_script",
}, &plugin_pb.ClusterContext{}, 1)
adminErrCh <- runErr
}()
adminExecMessage := <-adminSession.outgoing
if adminExecMessage.GetExecuteJobRequest() == nil {
t.Fatalf("expected admin_script execute request")
}
detectErrCh := make(chan error, 1)
go func() {
_, runErr := pluginSvc.RunDetection(context.Background(), "vacuum", &plugin_pb.ClusterContext{}, 10)
detectErrCh <- runErr
}()
select {
case unexpected := <-otherSession.outgoing:
t.Fatalf("expected vacuum detection to wait while admin_script runs, got message: %+v", unexpected)
case <-time.After(100 * time.Millisecond):
}
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
RequestId: adminExecMessage.RequestId,
JobId: "job-admin-script-1",
JobType: "admin_script",
Success: true,
CompletedAt: timestamppb.Now(),
})
if runErr := <-adminErrCh; runErr != nil {
t.Fatalf("admin_script ExecuteJob error: %v", runErr)
}
detectMessage := <-otherSession.outgoing
detectRequest := detectMessage.GetRunDetectionRequest()
if detectRequest == nil {
t.Fatalf("expected vacuum detection request after admin_script completion")
}
pluginSvc.handleDetectionComplete(otherWorkerID, &plugin_pb.DetectionComplete{
RequestId: detectMessage.RequestId,
JobType: "vacuum",
Success: true,
})
if runErr := <-detectErrCh; runErr != nil {
t.Fatalf("vacuum RunDetection error: %v", runErr)
}
}
func TestAdminScriptExecutionBlocksOtherExecution(t *testing.T) {
pluginSvc, err := New(Options{})
if err != nil {
t.Fatalf("New plugin error: %v", err)
}
defer pluginSvc.Shutdown()
const adminWorkerID = "worker-admin-script"
const otherWorkerID = "worker-vacuum"
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
WorkerId: adminWorkerID,
Capabilities: []*plugin_pb.JobTypeCapability{
{JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1},
},
})
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
WorkerId: otherWorkerID,
Capabilities: []*plugin_pb.JobTypeCapability{
{JobType: "vacuum", CanExecute: true, MaxExecutionConcurrency: 1},
},
})
adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
pluginSvc.putSession(adminSession)
pluginSvc.putSession(otherSession)
adminErrCh := make(chan error, 1)
go func() {
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
JobId: "job-admin-script-2",
JobType: "admin_script",
}, &plugin_pb.ClusterContext{}, 1)
adminErrCh <- runErr
}()
adminExecMessage := <-adminSession.outgoing
if adminExecMessage.GetExecuteJobRequest() == nil {
t.Fatalf("expected admin_script execute request")
}
otherErrCh := make(chan error, 1)
go func() {
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
JobId: "job-vacuum-1",
JobType: "vacuum",
}, &plugin_pb.ClusterContext{}, 1)
otherErrCh <- runErr
}()
select {
case unexpected := <-otherSession.outgoing:
t.Fatalf("expected vacuum execute to wait while admin_script runs, got message: %+v", unexpected)
case <-time.After(100 * time.Millisecond):
}
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
RequestId: adminExecMessage.RequestId,
JobId: "job-admin-script-2",
JobType: "admin_script",
Success: true,
CompletedAt: timestamppb.Now(),
})
if runErr := <-adminErrCh; runErr != nil {
t.Fatalf("admin_script ExecuteJob error: %v", runErr)
}
otherExecMessage := <-otherSession.outgoing
if otherExecMessage.GetExecuteJobRequest() == nil {
t.Fatalf("expected vacuum execute request after admin_script completion")
}
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
RequestId: otherExecMessage.RequestId,
JobId: "job-vacuum-1",
JobType: "vacuum",
Success: true,
CompletedAt: timestamppb.Now(),
})
if runErr := <-otherErrCh; runErr != nil {
t.Fatalf("vacuum ExecuteJob error: %v", runErr)
}
}
Loading…
Cancel
Save