You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
276 lines
8.4 KiB
276 lines
8.4 KiB
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
func TestRunDetectionSendsCancelOnContextDone(t *testing.T) {
|
|
t.Parallel()
|
|
pluginSvc, err := New(Options{})
|
|
if err != nil {
|
|
t.Fatalf("New plugin error: %v", err)
|
|
}
|
|
defer pluginSvc.Shutdown()
|
|
|
|
const workerID = "worker-detect"
|
|
const jobType = "vacuum"
|
|
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
|
WorkerId: workerID,
|
|
Capabilities: []*plugin_pb.JobTypeCapability{
|
|
{JobType: jobType, CanDetect: true, MaxDetectionConcurrency: 1},
|
|
},
|
|
})
|
|
session := &streamSession{workerID: workerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 4)}
|
|
pluginSvc.putSession(session)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
_, runErr := pluginSvc.RunDetection(ctx, jobType, &plugin_pb.ClusterContext{}, 10)
|
|
errCh <- runErr
|
|
}()
|
|
|
|
first := <-session.outgoing
|
|
if first.GetRunDetectionRequest() == nil {
|
|
t.Fatalf("expected first message to be run_detection_request")
|
|
}
|
|
|
|
cancel()
|
|
|
|
second := <-session.outgoing
|
|
cancelReq := second.GetCancelRequest()
|
|
if cancelReq == nil {
|
|
t.Fatalf("expected second message to be cancel_request")
|
|
}
|
|
if cancelReq.TargetId != first.RequestId {
|
|
t.Fatalf("unexpected cancel target id: got=%s want=%s", cancelReq.TargetId, first.RequestId)
|
|
}
|
|
if cancelReq.TargetKind != plugin_pb.WorkKind_WORK_KIND_DETECTION {
|
|
t.Fatalf("unexpected cancel target kind: %v", cancelReq.TargetKind)
|
|
}
|
|
|
|
runErr := <-errCh
|
|
if !errors.Is(runErr, context.Canceled) {
|
|
t.Fatalf("expected context canceled error, got %v", runErr)
|
|
}
|
|
}
|
|
|
|
func TestExecuteJobSendsCancelOnContextDone(t *testing.T) {
|
|
t.Parallel()
|
|
pluginSvc, err := New(Options{})
|
|
if err != nil {
|
|
t.Fatalf("New plugin error: %v", err)
|
|
}
|
|
defer pluginSvc.Shutdown()
|
|
|
|
const workerID = "worker-exec"
|
|
const jobType = "vacuum"
|
|
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
|
WorkerId: workerID,
|
|
Capabilities: []*plugin_pb.JobTypeCapability{
|
|
{JobType: jobType, CanExecute: true, MaxExecutionConcurrency: 1},
|
|
},
|
|
})
|
|
session := &streamSession{workerID: workerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 4)}
|
|
pluginSvc.putSession(session)
|
|
|
|
job := &plugin_pb.JobSpec{JobId: "job-1", JobType: jobType}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
_, runErr := pluginSvc.ExecuteJob(ctx, job, &plugin_pb.ClusterContext{}, 1)
|
|
errCh <- runErr
|
|
}()
|
|
|
|
first := <-session.outgoing
|
|
if first.GetExecuteJobRequest() == nil {
|
|
t.Fatalf("expected first message to be execute_job_request")
|
|
}
|
|
|
|
cancel()
|
|
|
|
second := <-session.outgoing
|
|
cancelReq := second.GetCancelRequest()
|
|
if cancelReq == nil {
|
|
t.Fatalf("expected second message to be cancel_request")
|
|
}
|
|
if cancelReq.TargetId != first.RequestId {
|
|
t.Fatalf("unexpected cancel target id: got=%s want=%s", cancelReq.TargetId, first.RequestId)
|
|
}
|
|
if cancelReq.TargetKind != plugin_pb.WorkKind_WORK_KIND_EXECUTION {
|
|
t.Fatalf("unexpected cancel target kind: %v", cancelReq.TargetKind)
|
|
}
|
|
|
|
runErr := <-errCh
|
|
if !errors.Is(runErr, context.Canceled) {
|
|
t.Fatalf("expected context canceled error, got %v", runErr)
|
|
}
|
|
}
|
|
|
|
func TestAdminScriptExecutionBlocksOtherDetection(t *testing.T) {
|
|
pluginSvc, err := New(Options{})
|
|
if err != nil {
|
|
t.Fatalf("New plugin error: %v", err)
|
|
}
|
|
defer pluginSvc.Shutdown()
|
|
|
|
const adminWorkerID = "worker-admin-script"
|
|
const otherWorkerID = "worker-vacuum"
|
|
|
|
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
|
WorkerId: adminWorkerID,
|
|
Capabilities: []*plugin_pb.JobTypeCapability{
|
|
{JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1},
|
|
},
|
|
})
|
|
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
|
WorkerId: otherWorkerID,
|
|
Capabilities: []*plugin_pb.JobTypeCapability{
|
|
{JobType: "vacuum", CanDetect: true, MaxDetectionConcurrency: 1},
|
|
},
|
|
})
|
|
adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
|
otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
|
pluginSvc.putSession(adminSession)
|
|
pluginSvc.putSession(otherSession)
|
|
|
|
adminErrCh := make(chan error, 1)
|
|
go func() {
|
|
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
|
|
JobId: "job-admin-script-1",
|
|
JobType: "admin_script",
|
|
}, &plugin_pb.ClusterContext{}, 1)
|
|
adminErrCh <- runErr
|
|
}()
|
|
|
|
adminExecMessage := <-adminSession.outgoing
|
|
if adminExecMessage.GetExecuteJobRequest() == nil {
|
|
t.Fatalf("expected admin_script execute request")
|
|
}
|
|
|
|
detectErrCh := make(chan error, 1)
|
|
go func() {
|
|
_, runErr := pluginSvc.RunDetection(context.Background(), "vacuum", &plugin_pb.ClusterContext{}, 10)
|
|
detectErrCh <- runErr
|
|
}()
|
|
|
|
select {
|
|
case unexpected := <-otherSession.outgoing:
|
|
t.Fatalf("expected vacuum detection to wait while admin_script runs, got message: %+v", unexpected)
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
|
|
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
|
|
RequestId: adminExecMessage.RequestId,
|
|
JobId: "job-admin-script-1",
|
|
JobType: "admin_script",
|
|
Success: true,
|
|
CompletedAt: timestamppb.Now(),
|
|
})
|
|
if runErr := <-adminErrCh; runErr != nil {
|
|
t.Fatalf("admin_script ExecuteJob error: %v", runErr)
|
|
}
|
|
|
|
detectMessage := <-otherSession.outgoing
|
|
detectRequest := detectMessage.GetRunDetectionRequest()
|
|
if detectRequest == nil {
|
|
t.Fatalf("expected vacuum detection request after admin_script completion")
|
|
}
|
|
pluginSvc.handleDetectionComplete(otherWorkerID, &plugin_pb.DetectionComplete{
|
|
RequestId: detectMessage.RequestId,
|
|
JobType: "vacuum",
|
|
Success: true,
|
|
})
|
|
if runErr := <-detectErrCh; runErr != nil {
|
|
t.Fatalf("vacuum RunDetection error: %v", runErr)
|
|
}
|
|
}
|
|
|
|
func TestAdminScriptExecutionBlocksOtherExecution(t *testing.T) {
|
|
pluginSvc, err := New(Options{})
|
|
if err != nil {
|
|
t.Fatalf("New plugin error: %v", err)
|
|
}
|
|
defer pluginSvc.Shutdown()
|
|
|
|
const adminWorkerID = "worker-admin-script"
|
|
const otherWorkerID = "worker-vacuum"
|
|
|
|
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
|
WorkerId: adminWorkerID,
|
|
Capabilities: []*plugin_pb.JobTypeCapability{
|
|
{JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1},
|
|
},
|
|
})
|
|
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
|
WorkerId: otherWorkerID,
|
|
Capabilities: []*plugin_pb.JobTypeCapability{
|
|
{JobType: "vacuum", CanExecute: true, MaxExecutionConcurrency: 1},
|
|
},
|
|
})
|
|
adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
|
otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
|
pluginSvc.putSession(adminSession)
|
|
pluginSvc.putSession(otherSession)
|
|
|
|
adminErrCh := make(chan error, 1)
|
|
go func() {
|
|
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
|
|
JobId: "job-admin-script-2",
|
|
JobType: "admin_script",
|
|
}, &plugin_pb.ClusterContext{}, 1)
|
|
adminErrCh <- runErr
|
|
}()
|
|
|
|
adminExecMessage := <-adminSession.outgoing
|
|
if adminExecMessage.GetExecuteJobRequest() == nil {
|
|
t.Fatalf("expected admin_script execute request")
|
|
}
|
|
|
|
otherErrCh := make(chan error, 1)
|
|
go func() {
|
|
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
|
|
JobId: "job-vacuum-1",
|
|
JobType: "vacuum",
|
|
}, &plugin_pb.ClusterContext{}, 1)
|
|
otherErrCh <- runErr
|
|
}()
|
|
|
|
select {
|
|
case unexpected := <-otherSession.outgoing:
|
|
t.Fatalf("expected vacuum execute to wait while admin_script runs, got message: %+v", unexpected)
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
|
|
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
|
|
RequestId: adminExecMessage.RequestId,
|
|
JobId: "job-admin-script-2",
|
|
JobType: "admin_script",
|
|
Success: true,
|
|
CompletedAt: timestamppb.Now(),
|
|
})
|
|
if runErr := <-adminErrCh; runErr != nil {
|
|
t.Fatalf("admin_script ExecuteJob error: %v", runErr)
|
|
}
|
|
|
|
otherExecMessage := <-otherSession.outgoing
|
|
if otherExecMessage.GetExecuteJobRequest() == nil {
|
|
t.Fatalf("expected vacuum execute request after admin_script completion")
|
|
}
|
|
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
|
|
RequestId: otherExecMessage.RequestId,
|
|
JobId: "job-vacuum-1",
|
|
JobType: "vacuum",
|
|
Success: true,
|
|
CompletedAt: timestamppb.Now(),
|
|
})
|
|
if runErr := <-otherErrCh; runErr != nil {
|
|
t.Fatalf("vacuum ExecuteJob error: %v", runErr)
|
|
}
|
|
}
|