You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1243 lines
33 KiB
1243 lines
33 KiB
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
const (
|
|
defaultOutgoingBuffer = 128
|
|
defaultSendTimeout = 5 * time.Second
|
|
defaultHeartbeatInterval = 30
|
|
defaultReconnectDelay = 5
|
|
defaultPendingSchemaBuffer = 1
|
|
)
|
|
|
|
type Options struct {
|
|
DataDir string
|
|
OutgoingBufferSize int
|
|
SendTimeout time.Duration
|
|
SchedulerTick time.Duration
|
|
ClusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error)
|
|
}
|
|
|
|
type Plugin struct {
|
|
plugin_pb.UnimplementedPluginControlServiceServer
|
|
|
|
store *ConfigStore
|
|
registry *Registry
|
|
|
|
outgoingBuffer int
|
|
sendTimeout time.Duration
|
|
|
|
schedulerTick time.Duration
|
|
clusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error)
|
|
|
|
schedulerMu sync.Mutex
|
|
nextDetectionAt map[string]time.Time
|
|
detectionInFlight map[string]bool
|
|
|
|
detectorLeaseMu sync.Mutex
|
|
detectorLeases map[string]string
|
|
|
|
schedulerExecMu sync.Mutex
|
|
schedulerExecReservations map[string]int
|
|
|
|
dedupeMu sync.Mutex
|
|
recentDedupeByType map[string]map[string]time.Time
|
|
|
|
sessionsMu sync.RWMutex
|
|
sessions map[string]*streamSession
|
|
|
|
pendingSchemaMu sync.Mutex
|
|
pendingSchema map[string]chan *plugin_pb.ConfigSchemaResponse
|
|
|
|
pendingDetectionMu sync.Mutex
|
|
pendingDetection map[string]*pendingDetectionState
|
|
|
|
pendingExecutionMu sync.Mutex
|
|
pendingExecution map[string]chan *plugin_pb.JobCompleted
|
|
|
|
jobsMu sync.RWMutex
|
|
jobs map[string]*TrackedJob
|
|
|
|
jobDetailsMu sync.Mutex
|
|
|
|
activitiesMu sync.RWMutex
|
|
activities []JobActivity
|
|
|
|
dirtyJobs bool
|
|
dirtyActivities bool
|
|
persistTicker *time.Ticker
|
|
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
|
|
shutdownCh chan struct{}
|
|
shutdownOnce sync.Once
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type streamSession struct {
|
|
workerID string
|
|
outgoing chan *plugin_pb.AdminToWorkerMessage
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
type pendingDetectionState struct {
|
|
proposals []*plugin_pb.JobProposal
|
|
complete chan *plugin_pb.DetectionComplete
|
|
jobType string
|
|
workerID string
|
|
}
|
|
|
|
// DetectionReport captures one detection run including request metadata.
|
|
type DetectionReport struct {
|
|
RequestID string
|
|
JobType string
|
|
WorkerID string
|
|
Proposals []*plugin_pb.JobProposal
|
|
Complete *plugin_pb.DetectionComplete
|
|
}
|
|
|
|
func New(options Options) (*Plugin, error) {
|
|
store, err := NewConfigStore(options.DataDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bufferSize := options.OutgoingBufferSize
|
|
if bufferSize <= 0 {
|
|
bufferSize = defaultOutgoingBuffer
|
|
}
|
|
sendTimeout := options.SendTimeout
|
|
if sendTimeout <= 0 {
|
|
sendTimeout = defaultSendTimeout
|
|
}
|
|
schedulerTick := options.SchedulerTick
|
|
if schedulerTick <= 0 {
|
|
schedulerTick = defaultSchedulerTick
|
|
}
|
|
|
|
plugin := &Plugin{
|
|
store: store,
|
|
registry: NewRegistry(),
|
|
outgoingBuffer: bufferSize,
|
|
sendTimeout: sendTimeout,
|
|
schedulerTick: schedulerTick,
|
|
clusterContextProvider: options.ClusterContextProvider,
|
|
sessions: make(map[string]*streamSession),
|
|
pendingSchema: make(map[string]chan *plugin_pb.ConfigSchemaResponse),
|
|
pendingDetection: make(map[string]*pendingDetectionState),
|
|
pendingExecution: make(map[string]chan *plugin_pb.JobCompleted),
|
|
nextDetectionAt: make(map[string]time.Time),
|
|
detectionInFlight: make(map[string]bool),
|
|
detectorLeases: make(map[string]string),
|
|
schedulerExecReservations: make(map[string]int),
|
|
recentDedupeByType: make(map[string]map[string]time.Time),
|
|
jobs: make(map[string]*TrackedJob),
|
|
activities: make([]JobActivity, 0, 256),
|
|
persistTicker: time.NewTicker(2 * time.Second),
|
|
shutdownCh: make(chan struct{}),
|
|
}
|
|
plugin.ctx, plugin.ctxCancel = context.WithCancel(context.Background())
|
|
|
|
if err := plugin.loadPersistedMonitorState(); err != nil {
|
|
glog.Warningf("Plugin failed to load persisted monitoring state: %v", err)
|
|
}
|
|
|
|
if plugin.clusterContextProvider != nil {
|
|
plugin.wg.Add(1)
|
|
go plugin.schedulerLoop()
|
|
}
|
|
plugin.wg.Add(1)
|
|
go plugin.persistenceLoop()
|
|
|
|
return plugin, nil
|
|
}
|
|
|
|
func (r *Plugin) Shutdown() {
|
|
if r.ctxCancel != nil {
|
|
r.ctxCancel()
|
|
}
|
|
if r.persistTicker != nil {
|
|
r.persistTicker.Stop()
|
|
}
|
|
|
|
r.shutdownOnce.Do(func() { close(r.shutdownCh) })
|
|
|
|
r.sessionsMu.Lock()
|
|
for workerID, session := range r.sessions {
|
|
session.close()
|
|
delete(r.sessions, workerID)
|
|
}
|
|
r.sessionsMu.Unlock()
|
|
|
|
r.pendingSchemaMu.Lock()
|
|
for requestID, ch := range r.pendingSchema {
|
|
close(ch)
|
|
delete(r.pendingSchema, requestID)
|
|
}
|
|
r.pendingSchemaMu.Unlock()
|
|
|
|
r.pendingDetectionMu.Lock()
|
|
for requestID, state := range r.pendingDetection {
|
|
close(state.complete)
|
|
delete(r.pendingDetection, requestID)
|
|
}
|
|
r.pendingDetectionMu.Unlock()
|
|
|
|
r.pendingExecutionMu.Lock()
|
|
for requestID, ch := range r.pendingExecution {
|
|
close(ch)
|
|
delete(r.pendingExecution, requestID)
|
|
}
|
|
r.pendingExecutionMu.Unlock()
|
|
|
|
r.wg.Wait()
|
|
}
|
|
|
|
func (r *Plugin) WorkerStream(stream plugin_pb.PluginControlService_WorkerStreamServer) error {
|
|
first, err := stream.Recv()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("receive worker hello: %w", err)
|
|
}
|
|
|
|
hello := first.GetHello()
|
|
if hello == nil {
|
|
return fmt.Errorf("first message must be hello")
|
|
}
|
|
if strings.TrimSpace(hello.WorkerId) == "" {
|
|
return fmt.Errorf("worker_id is required")
|
|
}
|
|
|
|
workerID := hello.WorkerId
|
|
r.registry.UpsertFromHello(hello)
|
|
|
|
session := &streamSession{
|
|
workerID: workerID,
|
|
outgoing: make(chan *plugin_pb.AdminToWorkerMessage, r.outgoingBuffer),
|
|
}
|
|
r.putSession(session)
|
|
defer r.cleanupSession(workerID)
|
|
|
|
glog.V(0).Infof("Plugin worker connected: %s (%s)", workerID, hello.Address)
|
|
|
|
sendErrCh := make(chan error, 1)
|
|
r.wg.Add(1)
|
|
go func() {
|
|
defer r.wg.Done()
|
|
sendErrCh <- r.sendLoop(stream.Context(), stream, session)
|
|
}()
|
|
|
|
if err := r.sendAdminHello(workerID); err != nil {
|
|
glog.Warningf("failed to send plugin admin hello to %s: %v", workerID, err)
|
|
}
|
|
go r.prefetchDescriptorsFromHello(hello)
|
|
|
|
recvErrCh := make(chan error, 1)
|
|
r.wg.Add(1)
|
|
go func() {
|
|
defer r.wg.Done()
|
|
for {
|
|
message, recvErr := stream.Recv()
|
|
if recvErr != nil {
|
|
recvErrCh <- recvErr
|
|
return
|
|
}
|
|
r.handleWorkerMessage(workerID, message)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return nil
|
|
case err := <-sendErrCh:
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
return err
|
|
}
|
|
return nil
|
|
case recvErr := <-recvErrCh:
|
|
if errors.Is(recvErr, io.EOF) || errors.Is(recvErr, context.Canceled) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("receive plugin message from %s: %w", workerID, recvErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) RequestConfigSchema(ctx context.Context, jobType string, forceRefresh bool) (*plugin_pb.JobTypeDescriptor, error) {
|
|
if !forceRefresh {
|
|
descriptor, err := r.store.LoadDescriptor(jobType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if descriptor != nil {
|
|
return descriptor, nil
|
|
}
|
|
}
|
|
|
|
provider, err := r.registry.PickSchemaProvider(jobType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestID, err := newRequestID("schema")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
responseCh := make(chan *plugin_pb.ConfigSchemaResponse, defaultPendingSchemaBuffer)
|
|
r.pendingSchemaMu.Lock()
|
|
r.pendingSchema[requestID] = responseCh
|
|
r.pendingSchemaMu.Unlock()
|
|
defer func() {
|
|
r.pendingSchemaMu.Lock()
|
|
delete(r.pendingSchema, requestID)
|
|
r.pendingSchemaMu.Unlock()
|
|
}()
|
|
|
|
requestMessage := &plugin_pb.AdminToWorkerMessage{
|
|
RequestId: requestID,
|
|
SentAt: timestamppb.Now(),
|
|
Body: &plugin_pb.AdminToWorkerMessage_RequestConfigSchema{
|
|
RequestConfigSchema: &plugin_pb.RequestConfigSchema{
|
|
JobType: jobType,
|
|
ForceRefresh: forceRefresh,
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := r.sendToWorker(provider.WorkerID, requestMessage); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case response, ok := <-responseCh:
|
|
if !ok {
|
|
return nil, fmt.Errorf("schema request %s interrupted", requestID)
|
|
}
|
|
if response == nil {
|
|
return nil, fmt.Errorf("schema request %s returned empty response", requestID)
|
|
}
|
|
if !response.Success {
|
|
return nil, fmt.Errorf("schema request failed for %s: %s", jobType, response.ErrorMessage)
|
|
}
|
|
if response.GetJobTypeDescriptor() == nil {
|
|
return nil, fmt.Errorf("schema request for %s returned no descriptor", jobType)
|
|
}
|
|
return response.GetJobTypeDescriptor(), nil
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) LoadJobTypeConfig(jobType string) (*plugin_pb.PersistedJobTypeConfig, error) {
|
|
return r.store.LoadJobTypeConfig(jobType)
|
|
}
|
|
|
|
func (r *Plugin) SaveJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig) error {
|
|
return r.store.SaveJobTypeConfig(config)
|
|
}
|
|
|
|
func (r *Plugin) LoadDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) {
|
|
return r.store.LoadDescriptor(jobType)
|
|
}
|
|
|
|
func (r *Plugin) LoadRunHistory(jobType string) (*JobTypeRunHistory, error) {
|
|
return r.store.LoadRunHistory(jobType)
|
|
}
|
|
|
|
func (r *Plugin) IsConfigured() bool {
|
|
return r.store.IsConfigured()
|
|
}
|
|
|
|
func (r *Plugin) BaseDir() string {
|
|
return r.store.BaseDir()
|
|
}
|
|
|
|
// RunDetectionWithReport requests one detector worker and returns proposals with request metadata.
|
|
func (r *Plugin) RunDetectionWithReport(
|
|
ctx context.Context,
|
|
jobType string,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
maxResults int32,
|
|
) (*DetectionReport, error) {
|
|
detector, err := r.pickDetector(jobType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestID, err := newRequestID("detect")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
adminRuntime, adminConfigValues, workerConfigValues, err := r.loadJobTypeConfigPayload(jobType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
lastSuccessfulRun := r.loadLastSuccessfulRun(jobType)
|
|
|
|
state := &pendingDetectionState{
|
|
complete: make(chan *plugin_pb.DetectionComplete, 1),
|
|
jobType: jobType,
|
|
workerID: detector.WorkerID,
|
|
}
|
|
r.pendingDetectionMu.Lock()
|
|
r.pendingDetection[requestID] = state
|
|
r.pendingDetectionMu.Unlock()
|
|
defer func() {
|
|
r.pendingDetectionMu.Lock()
|
|
delete(r.pendingDetection, requestID)
|
|
r.pendingDetectionMu.Unlock()
|
|
}()
|
|
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
RequestID: requestID,
|
|
WorkerID: detector.WorkerID,
|
|
Source: "detector",
|
|
Stage: "requested",
|
|
Message: "detection requested",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
Details: map[string]interface{}{
|
|
"max_results": maxResults,
|
|
},
|
|
})
|
|
|
|
message := &plugin_pb.AdminToWorkerMessage{
|
|
RequestId: requestID,
|
|
SentAt: timestamppb.Now(),
|
|
Body: &plugin_pb.AdminToWorkerMessage_RunDetectionRequest{
|
|
RunDetectionRequest: &plugin_pb.RunDetectionRequest{
|
|
RequestId: requestID,
|
|
JobType: jobType,
|
|
DetectionSequence: time.Now().UnixNano(),
|
|
AdminRuntime: adminRuntime,
|
|
AdminConfigValues: adminConfigValues,
|
|
WorkerConfigValues: workerConfigValues,
|
|
ClusterContext: clusterContext,
|
|
LastSuccessfulRun: lastSuccessfulRun,
|
|
MaxResults: maxResults,
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := r.sendToWorker(detector.WorkerID, message); err != nil {
|
|
r.clearDetectorLease(jobType, detector.WorkerID)
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
RequestID: requestID,
|
|
WorkerID: detector.WorkerID,
|
|
Source: "detector",
|
|
Stage: "failed_to_send",
|
|
Message: err.Error(),
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
r.sendCancel(detector.WorkerID, requestID, plugin_pb.WorkKind_WORK_KIND_DETECTION, ctx.Err())
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
RequestID: requestID,
|
|
WorkerID: detector.WorkerID,
|
|
Source: "detector",
|
|
Stage: "canceled",
|
|
Message: "detection canceled",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
return &DetectionReport{
|
|
RequestID: requestID,
|
|
JobType: jobType,
|
|
WorkerID: detector.WorkerID,
|
|
}, ctx.Err()
|
|
case complete, ok := <-state.complete:
|
|
if !ok {
|
|
return &DetectionReport{
|
|
RequestID: requestID,
|
|
JobType: jobType,
|
|
WorkerID: detector.WorkerID,
|
|
}, fmt.Errorf("detection request %s interrupted", requestID)
|
|
}
|
|
proposals := cloneJobProposals(state.proposals)
|
|
report := &DetectionReport{
|
|
RequestID: requestID,
|
|
JobType: jobType,
|
|
WorkerID: detector.WorkerID,
|
|
Proposals: proposals,
|
|
Complete: complete,
|
|
}
|
|
if complete == nil {
|
|
return report, fmt.Errorf("detection request %s returned no completion state", requestID)
|
|
}
|
|
if !complete.Success {
|
|
return report, fmt.Errorf("detection failed for %s: %s", jobType, complete.ErrorMessage)
|
|
}
|
|
return report, nil
|
|
}
|
|
}
|
|
|
|
// RunDetection requests one detector worker to produce job proposals for a job type.
|
|
func (r *Plugin) RunDetection(
|
|
ctx context.Context,
|
|
jobType string,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
maxResults int32,
|
|
) ([]*plugin_pb.JobProposal, error) {
|
|
report, err := r.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults)
|
|
if report == nil {
|
|
return nil, err
|
|
}
|
|
return report.Proposals, err
|
|
}
|
|
|
|
// ExecuteJob sends one job to a capable executor worker and waits for completion.
|
|
func (r *Plugin) ExecuteJob(
|
|
ctx context.Context,
|
|
job *plugin_pb.JobSpec,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
attempt int32,
|
|
) (*plugin_pb.JobCompleted, error) {
|
|
if job == nil {
|
|
return nil, fmt.Errorf("job is nil")
|
|
}
|
|
if strings.TrimSpace(job.JobType) == "" {
|
|
return nil, fmt.Errorf("job_type is required")
|
|
}
|
|
|
|
executor, err := r.registry.PickExecutor(job.JobType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.executeJobWithExecutor(ctx, executor, job, clusterContext, attempt)
|
|
}
|
|
|
|
func (r *Plugin) executeJobWithExecutor(
|
|
ctx context.Context,
|
|
executor *WorkerSession,
|
|
job *plugin_pb.JobSpec,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
attempt int32,
|
|
) (*plugin_pb.JobCompleted, error) {
|
|
if executor == nil {
|
|
return nil, fmt.Errorf("executor is nil")
|
|
}
|
|
if job == nil {
|
|
return nil, fmt.Errorf("job is nil")
|
|
}
|
|
if strings.TrimSpace(job.JobType) == "" {
|
|
return nil, fmt.Errorf("job_type is required")
|
|
}
|
|
|
|
if strings.TrimSpace(job.JobId) == "" {
|
|
var err error
|
|
job.JobId, err = newRequestID("job")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
requestID, err := newRequestID("exec")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
adminRuntime, adminConfigValues, workerConfigValues, err := r.loadJobTypeConfigPayload(job.JobType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
completedCh := make(chan *plugin_pb.JobCompleted, 1)
|
|
r.pendingExecutionMu.Lock()
|
|
r.pendingExecution[requestID] = completedCh
|
|
r.pendingExecutionMu.Unlock()
|
|
defer func() {
|
|
r.pendingExecutionMu.Lock()
|
|
delete(r.pendingExecution, requestID)
|
|
r.pendingExecutionMu.Unlock()
|
|
}()
|
|
|
|
r.trackExecutionStart(requestID, executor.WorkerID, job, attempt)
|
|
|
|
message := &plugin_pb.AdminToWorkerMessage{
|
|
RequestId: requestID,
|
|
SentAt: timestamppb.Now(),
|
|
Body: &plugin_pb.AdminToWorkerMessage_ExecuteJobRequest{
|
|
ExecuteJobRequest: &plugin_pb.ExecuteJobRequest{
|
|
RequestId: requestID,
|
|
Job: job,
|
|
AdminRuntime: adminRuntime,
|
|
AdminConfigValues: adminConfigValues,
|
|
WorkerConfigValues: workerConfigValues,
|
|
ClusterContext: clusterContext,
|
|
Attempt: attempt,
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := r.sendToWorker(executor.WorkerID, message); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
r.sendCancel(executor.WorkerID, requestID, plugin_pb.WorkKind_WORK_KIND_EXECUTION, ctx.Err())
|
|
return nil, ctx.Err()
|
|
case completed, ok := <-completedCh:
|
|
if !ok {
|
|
return nil, fmt.Errorf("execution request %s interrupted", requestID)
|
|
}
|
|
if completed == nil {
|
|
return nil, fmt.Errorf("execution request %s returned empty completion", requestID)
|
|
}
|
|
if !completed.Success {
|
|
return completed, fmt.Errorf("job %s failed: %s", job.JobId, completed.ErrorMessage)
|
|
}
|
|
return completed, nil
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) ListWorkers() []*WorkerSession {
|
|
return r.registry.List()
|
|
}
|
|
|
|
func (r *Plugin) ListKnownJobTypes() ([]string, error) {
|
|
registryJobTypes := r.registry.JobTypes()
|
|
storedJobTypes, err := r.store.ListJobTypes()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
jobTypeSet := make(map[string]struct{}, len(registryJobTypes)+len(storedJobTypes))
|
|
for _, jobType := range registryJobTypes {
|
|
jobTypeSet[jobType] = struct{}{}
|
|
}
|
|
for _, jobType := range storedJobTypes {
|
|
jobTypeSet[jobType] = struct{}{}
|
|
}
|
|
|
|
out := make([]string, 0, len(jobTypeSet))
|
|
for jobType := range jobTypeSet {
|
|
out = append(out, jobType)
|
|
}
|
|
sort.Strings(out)
|
|
return out, nil
|
|
}
|
|
|
|
// FilterProposalsWithActiveJobs drops proposals that are already assigned/running.
|
|
func (r *Plugin) FilterProposalsWithActiveJobs(jobType string, proposals []*plugin_pb.JobProposal) ([]*plugin_pb.JobProposal, int) {
|
|
return r.filterProposalsWithActiveJobs(jobType, proposals)
|
|
}
|
|
|
|
func (r *Plugin) PickDetectorWorker(jobType string) (*WorkerSession, error) {
|
|
return r.pickDetector(jobType)
|
|
}
|
|
|
|
func (r *Plugin) PickExecutorWorker(jobType string) (*WorkerSession, error) {
|
|
return r.registry.PickExecutor(jobType)
|
|
}
|
|
|
|
func (r *Plugin) pickDetector(jobType string) (*WorkerSession, error) {
|
|
leasedWorkerID := r.getDetectorLease(jobType)
|
|
if leasedWorkerID != "" {
|
|
if worker, ok := r.registry.Get(leasedWorkerID); ok {
|
|
if capability := worker.Capabilities[jobType]; capability != nil && capability.CanDetect {
|
|
return worker, nil
|
|
}
|
|
}
|
|
r.clearDetectorLease(jobType, leasedWorkerID)
|
|
}
|
|
|
|
detector, err := r.registry.PickDetector(jobType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.setDetectorLease(jobType, detector.WorkerID)
|
|
return detector, nil
|
|
}
|
|
|
|
func (r *Plugin) getDetectorLease(jobType string) string {
|
|
r.detectorLeaseMu.Lock()
|
|
defer r.detectorLeaseMu.Unlock()
|
|
return r.detectorLeases[jobType]
|
|
}
|
|
|
|
func (r *Plugin) setDetectorLease(jobType string, workerID string) {
|
|
r.detectorLeaseMu.Lock()
|
|
defer r.detectorLeaseMu.Unlock()
|
|
if jobType == "" || workerID == "" {
|
|
return
|
|
}
|
|
r.detectorLeases[jobType] = workerID
|
|
}
|
|
|
|
func (r *Plugin) clearDetectorLease(jobType string, workerID string) {
|
|
r.detectorLeaseMu.Lock()
|
|
defer r.detectorLeaseMu.Unlock()
|
|
|
|
current := r.detectorLeases[jobType]
|
|
if current == "" {
|
|
return
|
|
}
|
|
if workerID != "" && current != workerID {
|
|
return
|
|
}
|
|
delete(r.detectorLeases, jobType)
|
|
}
|
|
|
|
func (r *Plugin) sendCancel(workerID, targetID string, kind plugin_pb.WorkKind, cause error) {
|
|
if strings.TrimSpace(workerID) == "" || strings.TrimSpace(targetID) == "" {
|
|
return
|
|
}
|
|
|
|
requestID, err := newRequestID("cancel")
|
|
if err != nil {
|
|
requestID = ""
|
|
}
|
|
reason := "request canceled"
|
|
if cause != nil {
|
|
reason = cause.Error()
|
|
}
|
|
|
|
message := &plugin_pb.AdminToWorkerMessage{
|
|
RequestId: requestID,
|
|
SentAt: timestamppb.Now(),
|
|
Body: &plugin_pb.AdminToWorkerMessage_CancelRequest{
|
|
CancelRequest: &plugin_pb.CancelRequest{
|
|
TargetId: targetID,
|
|
TargetKind: kind,
|
|
Reason: reason,
|
|
},
|
|
},
|
|
}
|
|
if err := r.sendToWorker(workerID, message); err != nil {
|
|
glog.V(1).Infof("Plugin failed to send cancel request to worker=%s target=%s: %v", workerID, targetID, err)
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) sendAdminHello(workerID string) error {
|
|
msg := &plugin_pb.AdminToWorkerMessage{
|
|
RequestId: "",
|
|
SentAt: timestamppb.Now(),
|
|
Body: &plugin_pb.AdminToWorkerMessage_Hello{
|
|
Hello: &plugin_pb.AdminHello{
|
|
Accepted: true,
|
|
Message: "plugin connected",
|
|
HeartbeatIntervalSeconds: defaultHeartbeatInterval,
|
|
ReconnectDelaySeconds: defaultReconnectDelay,
|
|
},
|
|
},
|
|
}
|
|
return r.sendToWorker(workerID, msg)
|
|
}
|
|
|
|
func (r *Plugin) sendLoop(
|
|
ctx context.Context,
|
|
stream plugin_pb.PluginControlService_WorkerStreamServer,
|
|
session *streamSession,
|
|
) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-r.shutdownCh:
|
|
return nil
|
|
case msg, ok := <-session.outgoing:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if err := stream.Send(msg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) sendToWorker(workerID string, message *plugin_pb.AdminToWorkerMessage) error {
|
|
r.sessionsMu.RLock()
|
|
session, ok := r.sessions[workerID]
|
|
r.sessionsMu.RUnlock()
|
|
if !ok {
|
|
return fmt.Errorf("worker %s is not connected", workerID)
|
|
}
|
|
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return fmt.Errorf("plugin is shutting down")
|
|
case session.outgoing <- message:
|
|
return nil
|
|
case <-time.After(r.sendTimeout):
|
|
return fmt.Errorf("timed out sending message to worker %s", workerID)
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) handleWorkerMessage(workerID string, message *plugin_pb.WorkerToAdminMessage) {
|
|
if message == nil {
|
|
return
|
|
}
|
|
|
|
switch body := message.Body.(type) {
|
|
case *plugin_pb.WorkerToAdminMessage_Hello:
|
|
r.registry.UpsertFromHello(body.Hello)
|
|
case *plugin_pb.WorkerToAdminMessage_Heartbeat:
|
|
r.registry.UpdateHeartbeat(workerID, body.Heartbeat)
|
|
case *plugin_pb.WorkerToAdminMessage_ConfigSchemaResponse:
|
|
r.handleConfigSchemaResponse(body.ConfigSchemaResponse)
|
|
case *plugin_pb.WorkerToAdminMessage_DetectionProposals:
|
|
r.handleDetectionProposals(workerID, body.DetectionProposals)
|
|
case *plugin_pb.WorkerToAdminMessage_DetectionComplete:
|
|
r.handleDetectionComplete(workerID, body.DetectionComplete)
|
|
case *plugin_pb.WorkerToAdminMessage_JobProgressUpdate:
|
|
r.handleJobProgressUpdate(workerID, body.JobProgressUpdate)
|
|
case *plugin_pb.WorkerToAdminMessage_JobCompleted:
|
|
r.handleJobCompleted(body.JobCompleted)
|
|
case *plugin_pb.WorkerToAdminMessage_Acknowledge:
|
|
if !body.Acknowledge.Accepted {
|
|
glog.Warningf("Plugin worker %s rejected request %s: %s", workerID, body.Acknowledge.RequestId, body.Acknowledge.Message)
|
|
}
|
|
default:
|
|
// Keep the transport open even if admin does not yet consume all message variants.
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) handleConfigSchemaResponse(response *plugin_pb.ConfigSchemaResponse) {
|
|
if response == nil {
|
|
return
|
|
}
|
|
|
|
if response.Success && response.GetJobTypeDescriptor() != nil {
|
|
jobType := response.JobType
|
|
if jobType == "" {
|
|
jobType = response.GetJobTypeDescriptor().JobType
|
|
}
|
|
if jobType != "" {
|
|
if err := r.store.SaveDescriptor(jobType, response.GetJobTypeDescriptor()); err != nil {
|
|
glog.Warningf("Plugin failed to persist descriptor for %s: %v", jobType, err)
|
|
}
|
|
if err := r.ensureJobTypeConfigFromDescriptor(jobType, response.GetJobTypeDescriptor()); err != nil {
|
|
glog.Warningf("Plugin failed to bootstrap config for %s: %v", jobType, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
r.safeSendSchemaResponse(response.RequestId, response)
|
|
}
|
|
|
|
func (r *Plugin) safeSendSchemaResponse(requestID string, response *plugin_pb.ConfigSchemaResponse) {
|
|
r.pendingSchemaMu.Lock()
|
|
ch := r.pendingSchema[requestID]
|
|
r.pendingSchemaMu.Unlock()
|
|
safeSendCh(ch, response, r.shutdownCh)
|
|
}
|
|
|
|
func safeSendCh[T any](ch chan T, val T, shutdownCh <-chan struct{}) {
|
|
if ch == nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
recover()
|
|
}()
|
|
select {
|
|
case ch <- val:
|
|
case <-shutdownCh:
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) ensureJobTypeConfigFromDescriptor(jobType string, descriptor *plugin_pb.JobTypeDescriptor) error {
|
|
if descriptor == nil || strings.TrimSpace(jobType) == "" {
|
|
return nil
|
|
}
|
|
|
|
existing, err := r.store.LoadJobTypeConfig(jobType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if existing != nil {
|
|
return nil
|
|
}
|
|
|
|
workerDefaults := CloneConfigValueMap(descriptor.WorkerDefaultValues)
|
|
if len(workerDefaults) == 0 && descriptor.WorkerConfigForm != nil {
|
|
workerDefaults = CloneConfigValueMap(descriptor.WorkerConfigForm.DefaultValues)
|
|
}
|
|
|
|
adminDefaults := map[string]*plugin_pb.ConfigValue{}
|
|
if descriptor.AdminConfigForm != nil {
|
|
adminDefaults = CloneConfigValueMap(descriptor.AdminConfigForm.DefaultValues)
|
|
}
|
|
|
|
adminRuntime := &plugin_pb.AdminRuntimeConfig{}
|
|
if descriptor.AdminRuntimeDefaults != nil {
|
|
defaults := descriptor.AdminRuntimeDefaults
|
|
adminRuntime = &plugin_pb.AdminRuntimeConfig{
|
|
Enabled: defaults.Enabled,
|
|
DetectionIntervalSeconds: defaults.DetectionIntervalSeconds,
|
|
DetectionTimeoutSeconds: defaults.DetectionTimeoutSeconds,
|
|
MaxJobsPerDetection: defaults.MaxJobsPerDetection,
|
|
GlobalExecutionConcurrency: defaults.GlobalExecutionConcurrency,
|
|
PerWorkerExecutionConcurrency: defaults.PerWorkerExecutionConcurrency,
|
|
RetryLimit: defaults.RetryLimit,
|
|
RetryBackoffSeconds: defaults.RetryBackoffSeconds,
|
|
}
|
|
}
|
|
|
|
cfg := &plugin_pb.PersistedJobTypeConfig{
|
|
JobType: jobType,
|
|
DescriptorVersion: descriptor.DescriptorVersion,
|
|
AdminConfigValues: adminDefaults,
|
|
WorkerConfigValues: workerDefaults,
|
|
AdminRuntime: adminRuntime,
|
|
UpdatedAt: timestamppb.Now(),
|
|
UpdatedBy: "plugin",
|
|
}
|
|
|
|
return r.store.SaveJobTypeConfig(cfg)
|
|
}
|
|
|
|
func (r *Plugin) handleDetectionProposals(workerID string, message *plugin_pb.DetectionProposals) {
|
|
if message == nil || message.RequestId == "" {
|
|
return
|
|
}
|
|
|
|
r.pendingDetectionMu.Lock()
|
|
state := r.pendingDetection[message.RequestId]
|
|
if state != nil {
|
|
state.proposals = append(state.proposals, cloneJobProposals(message.Proposals)...)
|
|
}
|
|
r.pendingDetectionMu.Unlock()
|
|
if state == nil {
|
|
return
|
|
}
|
|
|
|
resolvedWorkerID := strings.TrimSpace(workerID)
|
|
if resolvedWorkerID == "" {
|
|
resolvedWorkerID = state.workerID
|
|
}
|
|
resolvedJobType := strings.TrimSpace(message.JobType)
|
|
if resolvedJobType == "" {
|
|
resolvedJobType = state.jobType
|
|
}
|
|
if resolvedJobType == "" {
|
|
resolvedJobType = "unknown"
|
|
}
|
|
|
|
r.appendActivity(JobActivity{
|
|
JobType: resolvedJobType,
|
|
RequestID: message.RequestId,
|
|
WorkerID: resolvedWorkerID,
|
|
Source: "detector",
|
|
Stage: "proposals_batch",
|
|
Message: fmt.Sprintf("received %d proposal(s)", len(message.Proposals)),
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
Details: map[string]interface{}{
|
|
"batch_size": len(message.Proposals),
|
|
"has_more": message.HasMore,
|
|
},
|
|
})
|
|
|
|
for _, proposal := range message.Proposals {
|
|
if proposal == nil {
|
|
continue
|
|
}
|
|
details := map[string]interface{}{
|
|
"proposal_id": proposal.ProposalId,
|
|
"dedupe_key": proposal.DedupeKey,
|
|
"priority": proposal.Priority.String(),
|
|
"summary": proposal.Summary,
|
|
"detail": proposal.Detail,
|
|
"labels": proposal.Labels,
|
|
}
|
|
if params := configValueMapToPlain(proposal.Parameters); len(params) > 0 {
|
|
details["parameters"] = params
|
|
}
|
|
|
|
messageText := strings.TrimSpace(proposal.Summary)
|
|
if messageText == "" {
|
|
messageText = fmt.Sprintf("proposal %s", strings.TrimSpace(proposal.ProposalId))
|
|
}
|
|
if messageText == "" {
|
|
messageText = "proposal generated"
|
|
}
|
|
|
|
r.appendActivity(JobActivity{
|
|
JobType: resolvedJobType,
|
|
RequestID: message.RequestId,
|
|
WorkerID: resolvedWorkerID,
|
|
Source: "detector",
|
|
Stage: "proposal",
|
|
Message: messageText,
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
Details: details,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) handleDetectionComplete(workerID string, message *plugin_pb.DetectionComplete) {
|
|
if message == nil {
|
|
return
|
|
}
|
|
if !message.Success {
|
|
glog.Warningf("Plugin detection failed job_type=%s: %s", message.JobType, message.ErrorMessage)
|
|
}
|
|
if message.RequestId == "" {
|
|
return
|
|
}
|
|
|
|
r.pendingDetectionMu.Lock()
|
|
state := r.pendingDetection[message.RequestId]
|
|
r.pendingDetectionMu.Unlock()
|
|
if state == nil {
|
|
return
|
|
}
|
|
|
|
resolvedWorkerID := strings.TrimSpace(workerID)
|
|
if resolvedWorkerID == "" {
|
|
resolvedWorkerID = state.workerID
|
|
}
|
|
resolvedJobType := strings.TrimSpace(message.JobType)
|
|
if resolvedJobType == "" {
|
|
resolvedJobType = state.jobType
|
|
}
|
|
if resolvedJobType == "" {
|
|
resolvedJobType = "unknown"
|
|
}
|
|
|
|
stage := "completed"
|
|
messageText := "detection completed"
|
|
if !message.Success {
|
|
stage = "failed"
|
|
messageText = strings.TrimSpace(message.ErrorMessage)
|
|
if messageText == "" {
|
|
messageText = "detection failed"
|
|
}
|
|
}
|
|
r.appendActivity(JobActivity{
|
|
JobType: resolvedJobType,
|
|
RequestID: message.RequestId,
|
|
WorkerID: resolvedWorkerID,
|
|
Source: "detector",
|
|
Stage: stage,
|
|
Message: messageText,
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
Details: map[string]interface{}{
|
|
"success": message.Success,
|
|
"total_proposals": message.TotalProposals,
|
|
},
|
|
})
|
|
|
|
r.safeSendDetectionComplete(message.RequestId, message)
|
|
}
|
|
|
|
func (r *Plugin) safeSendDetectionComplete(requestID string, message *plugin_pb.DetectionComplete) {
|
|
r.pendingDetectionMu.Lock()
|
|
state, found := r.pendingDetection[requestID]
|
|
var ch chan *plugin_pb.DetectionComplete
|
|
if found && state != nil {
|
|
ch = state.complete
|
|
}
|
|
r.pendingDetectionMu.Unlock()
|
|
|
|
if ch != nil {
|
|
safeSendCh(ch, message, r.shutdownCh)
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) handleJobCompleted(completed *plugin_pb.JobCompleted) {
|
|
if completed == nil || completed.JobType == "" {
|
|
return
|
|
}
|
|
|
|
if completed.RequestId != "" {
|
|
r.safeSendJobCompleted(completed.RequestId, completed)
|
|
}
|
|
|
|
tracked := r.trackExecutionCompletion(completed)
|
|
workerID := ""
|
|
if tracked != nil && tracked.WorkerID != "" {
|
|
workerID = tracked.WorkerID
|
|
}
|
|
|
|
r.trackWorkerActivities(completed.JobType, completed.JobId, completed.RequestId, workerID, completed.Activities)
|
|
|
|
record := &JobRunRecord{
|
|
RunID: completed.RequestId,
|
|
JobID: completed.JobId,
|
|
JobType: completed.JobType,
|
|
WorkerID: "",
|
|
Outcome: RunOutcomeError,
|
|
CompletedAt: timeToPtr(time.Now().UTC()),
|
|
}
|
|
if completed.CompletedAt != nil {
|
|
record.CompletedAt = timeToPtr(completed.CompletedAt.AsTime().UTC())
|
|
}
|
|
if completed.Success {
|
|
record.Outcome = RunOutcomeSuccess
|
|
record.Message = "completed"
|
|
if completed.Result != nil && completed.Result.Summary != "" {
|
|
record.Message = completed.Result.Summary
|
|
}
|
|
} else {
|
|
record.Outcome = RunOutcomeError
|
|
record.Message = completed.ErrorMessage
|
|
}
|
|
|
|
if tracked != nil {
|
|
if workerID != "" {
|
|
record.WorkerID = workerID
|
|
}
|
|
if tracked.CreatedAt != nil && record.CompletedAt != nil && record.CompletedAt.After(*tracked.CreatedAt) {
|
|
record.DurationMs = int64(record.CompletedAt.Sub(*tracked.CreatedAt) / time.Millisecond)
|
|
}
|
|
}
|
|
|
|
if err := r.store.AppendRunRecord(completed.JobType, record); err != nil {
|
|
glog.Warningf("Plugin failed to append run record for %s: %v", completed.JobType, err)
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) safeSendJobCompleted(requestID string, completed *plugin_pb.JobCompleted) {
|
|
r.pendingExecutionMu.Lock()
|
|
ch := r.pendingExecution[requestID]
|
|
r.pendingExecutionMu.Unlock()
|
|
safeSendCh(ch, completed, r.shutdownCh)
|
|
}
|
|
|
|
func (r *Plugin) putSession(session *streamSession) {
|
|
r.sessionsMu.Lock()
|
|
defer r.sessionsMu.Unlock()
|
|
|
|
if old, exists := r.sessions[session.workerID]; exists {
|
|
old.close()
|
|
}
|
|
r.sessions[session.workerID] = session
|
|
}
|
|
|
|
func (r *Plugin) cleanupSession(workerID string) {
|
|
r.registry.Remove(workerID)
|
|
|
|
r.sessionsMu.Lock()
|
|
session, exists := r.sessions[workerID]
|
|
if exists {
|
|
delete(r.sessions, workerID)
|
|
session.close()
|
|
}
|
|
r.sessionsMu.Unlock()
|
|
|
|
glog.V(0).Infof("Plugin worker disconnected: %s", workerID)
|
|
}
|
|
|
|
func newRequestID(prefix string) (string, error) {
|
|
buf := make([]byte, 8)
|
|
if _, err := rand.Read(buf); err != nil {
|
|
return "", fmt.Errorf("generate request id: %w", err)
|
|
}
|
|
if prefix == "" {
|
|
prefix = "req"
|
|
}
|
|
return fmt.Sprintf("%s-%d-%s", prefix, time.Now().UnixNano(), hex.EncodeToString(buf)), nil
|
|
}
|
|
|
|
func (r *Plugin) loadJobTypeConfigPayload(jobType string) (
|
|
*plugin_pb.AdminRuntimeConfig,
|
|
map[string]*plugin_pb.ConfigValue,
|
|
map[string]*plugin_pb.ConfigValue,
|
|
error,
|
|
) {
|
|
config, err := r.store.LoadJobTypeConfig(jobType)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
if config == nil {
|
|
return &plugin_pb.AdminRuntimeConfig{}, map[string]*plugin_pb.ConfigValue{}, map[string]*plugin_pb.ConfigValue{}, nil
|
|
}
|
|
|
|
adminRuntime := config.AdminRuntime
|
|
if adminRuntime == nil {
|
|
adminRuntime = &plugin_pb.AdminRuntimeConfig{}
|
|
}
|
|
return adminRuntime, CloneConfigValueMap(config.AdminConfigValues), CloneConfigValueMap(config.WorkerConfigValues), nil
|
|
}
|
|
|
|
func cloneJobProposals(in []*plugin_pb.JobProposal) []*plugin_pb.JobProposal {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]*plugin_pb.JobProposal, 0, len(in))
|
|
for _, proposal := range in {
|
|
if proposal == nil {
|
|
continue
|
|
}
|
|
out = append(out, proto.Clone(proposal).(*plugin_pb.JobProposal))
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (r *Plugin) loadLastSuccessfulRun(jobType string) *timestamppb.Timestamp {
|
|
history, err := r.store.LoadRunHistory(jobType)
|
|
if err != nil {
|
|
glog.Warningf("Plugin failed to load run history for %s: %v", jobType, err)
|
|
return nil
|
|
}
|
|
if history == nil || len(history.SuccessfulRuns) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var latest time.Time
|
|
for i := range history.SuccessfulRuns {
|
|
completedAt := history.SuccessfulRuns[i].CompletedAt
|
|
if completedAt == nil || completedAt.IsZero() {
|
|
continue
|
|
}
|
|
if latest.IsZero() || completedAt.After(latest) {
|
|
latest = *completedAt
|
|
}
|
|
}
|
|
if latest.IsZero() {
|
|
return nil
|
|
}
|
|
return timestamppb.New(latest.UTC())
|
|
}
|
|
|
|
func CloneConfigValueMap(in map[string]*plugin_pb.ConfigValue) map[string]*plugin_pb.ConfigValue {
|
|
if len(in) == 0 {
|
|
return map[string]*plugin_pb.ConfigValue{}
|
|
}
|
|
out := make(map[string]*plugin_pb.ConfigValue, len(in))
|
|
for key, value := range in {
|
|
if value == nil {
|
|
continue
|
|
}
|
|
out[key] = proto.Clone(value).(*plugin_pb.ConfigValue)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *streamSession) close() {
|
|
s.closeOnce.Do(func() {
|
|
close(s.outgoing)
|
|
})
|
|
}
|