From e1e5b4a8a681991455abcafb6064b4c195c2c13c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 3 Mar 2026 15:10:40 -0800 Subject: [PATCH] add admin script worker (#8491) * admin: add plugin lock coordination * shell: allow bypassing lock checks * plugin worker: add admin script handler * mini: include admin_script in plugin defaults * admin script UI: drop name and enlarge text * admin script: add default script * admin_script: make run interval configurable * plugin: gate other jobs during admin_script runs * plugin: use last completed admin_script run * admin: backfill plugin config defaults * templ Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * comparable to default version Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * default to run Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * format Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * shell: respect pre-set noLock for fix.replication * shell: add force no-lock mode for admin scripts * volume balance worker already exists Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * admin: expose scheduler status JSON * shell: add sleep command * shell: restrict sleep syntax * Revert "shell: respect pre-set noLock for fix.replication" This reverts commit 2b14e8b82602a740d3a473c085e3b3a14f1ddbb3. * templ Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * fix import Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * less logs Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * Reduce master client logs on canceled contexts * Update mini default job type count --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- weed/admin/dash/admin_lock_manager.go | 148 ++++ weed/admin/dash/admin_server.go | 38 ++ weed/admin/dash/plugin_api.go | 116 ++++ weed/admin/dash/plugin_api_test.go | 80 +++ weed/admin/handlers/admin_handlers.go | 1 + weed/admin/plugin/lock_manager.go | 7 + weed/admin/plugin/plugin.go | 84 ++- weed/admin/plugin/plugin_cancel_test.go | 164 +++++ weed/admin/plugin/plugin_detection_test.go | 61 ++ weed/admin/plugin/plugin_scheduler.go | 20 + weed/admin/plugin/scheduler_status.go | 234 +++++++ weed/admin/plugin/scheduler_status_test.go | 64 ++ weed/admin/view/app/plugin.templ | 3 + weed/admin/view/app/plugin_templ.go | 2 +- weed/command/mini.go | 2 +- weed/command/mini_plugin_test.go | 4 +- weed/command/plugin_worker_test.go | 12 +- weed/command/worker.go | 5 +- weed/command/worker_runtime.go | 6 +- weed/command/worker_test.go | 4 +- weed/plugin/worker/admin_script_handler.go | 635 ++++++++++++++++++ .../worker/admin_script_handler_test.go | 100 +++ weed/shell/command_ec_balance.go | 4 +- weed/shell/command_ec_common.go | 6 +- weed/shell/command_sleep.go | 43 ++ weed/shell/commands.go | 20 +- weed/wdclient/masterclient.go | 52 +- 27 files changed, 1888 insertions(+), 27 deletions(-) create mode 100644 weed/admin/dash/admin_lock_manager.go create mode 100644 weed/admin/plugin/lock_manager.go create mode 100644 weed/admin/plugin/scheduler_status.go create mode 100644 weed/admin/plugin/scheduler_status_test.go create mode 100644 weed/plugin/worker/admin_script_handler.go create mode 100644 weed/plugin/worker/admin_script_handler_test.go create mode 100644 weed/shell/command_sleep.go diff --git a/weed/admin/dash/admin_lock_manager.go b/weed/admin/dash/admin_lock_manager.go new file mode 100644 index 000000000..42ebd9c85 --- /dev/null +++ b/weed/admin/dash/admin_lock_manager.go @@ -0,0 +1,148 @@ +package dash + +import ( + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks" +) + +const ( + adminLockName = "shell" + adminLockClientName = "admin-plugin" +) + +// AdminLockManager coordinates exclusive admin locks with reference counting. +// It is safe for concurrent use. +type AdminLockManager struct { + locker *exclusive_locks.ExclusiveLocker + clientName string + + mu sync.Mutex + cond *sync.Cond + acquiring bool + holdCount int + + lastAcquiredAt time.Time + lastReleasedAt time.Time + waitingSince time.Time + waitingReason string + currentReason string +} + +func NewAdminLockManager(masterClient *wdclient.MasterClient, clientName string) *AdminLockManager { + if masterClient == nil { + return nil + } + if clientName == "" { + clientName = adminLockClientName + } + manager := &AdminLockManager{ + locker: exclusive_locks.NewExclusiveLocker(masterClient, adminLockName), + clientName: clientName, + } + manager.cond = sync.NewCond(&manager.mu) + return manager +} + +func (m *AdminLockManager) Acquire(reason string) (func(), error) { + if m == nil || m.locker == nil { + return func() {}, nil + } + + m.mu.Lock() + if reason != "" { + m.locker.SetMessage(reason) + m.currentReason = reason + } + for m.acquiring { + m.cond.Wait() + } + if m.holdCount == 0 { + m.acquiring = true + m.waitingSince = time.Now().UTC() + m.waitingReason = reason + m.mu.Unlock() + m.locker.RequestLock(m.clientName) + m.mu.Lock() + m.acquiring = false + m.holdCount = 1 + m.lastAcquiredAt = time.Now().UTC() + m.waitingSince = time.Time{} + m.waitingReason = "" + m.cond.Broadcast() + m.mu.Unlock() + return m.Release, nil + } + m.holdCount++ + if reason != "" { + m.currentReason = reason + } + m.mu.Unlock() + return m.Release, nil +} + +func (m *AdminLockManager) Release() { + if m == nil || m.locker == nil { + return + } + + m.mu.Lock() + if m.holdCount <= 0 { + m.mu.Unlock() + return + } + m.holdCount-- + shouldRelease := m.holdCount == 0 + m.mu.Unlock() + + if shouldRelease { + m.mu.Lock() + m.lastReleasedAt = time.Now().UTC() + m.currentReason = "" + m.mu.Unlock() + m.locker.ReleaseLock() + } +} + +type LockStatus struct { + Held bool `json:"held"` + HoldCount int `json:"hold_count"` + Acquiring bool `json:"acquiring"` + Message string `json:"message,omitempty"` + WaitingReason string `json:"waiting_reason,omitempty"` + LastAcquiredAt *time.Time `json:"last_acquired_at,omitempty"` + LastReleasedAt *time.Time `json:"last_released_at,omitempty"` + WaitingSince *time.Time `json:"waiting_since,omitempty"` +} + +func (m *AdminLockManager) Status() LockStatus { + if m == nil { + return LockStatus{} + } + + m.mu.Lock() + defer m.mu.Unlock() + + status := LockStatus{ + Held: m.holdCount > 0, + HoldCount: m.holdCount, + Acquiring: m.acquiring, + Message: m.currentReason, + WaitingReason: m.waitingReason, + } + if !m.lastAcquiredAt.IsZero() { + at := m.lastAcquiredAt + status.LastAcquiredAt = &at + } + if !m.lastReleasedAt.IsZero() { + at := m.lastReleasedAt + status.LastReleasedAt = &at + } + if !m.waitingSince.IsZero() { + at := m.waitingSince + status.WaitingSince = &at + } + return status +} diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 1d39442e9..2bade9dec 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -98,6 +98,7 @@ type AdminServer struct { // Maintenance system maintenanceManager *maintenance.MaintenanceManager plugin *adminplugin.Plugin + pluginLock *AdminLockManager expireJobHandler func(jobID string, reason string) (*adminplugin.TrackedJob, bool, error) // Topic retention purger @@ -135,6 +136,8 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, ctx := context.Background() go masterClient.KeepConnectedToMaster(ctx) + lockManager := NewAdminLockManager(masterClient, adminLockClientName) + server := &AdminServer{ masterClient: masterClient, templateFS: templateFS, @@ -146,6 +149,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, collectionStatsCacheThreshold: defaultStatsCacheTimeout, s3TablesManager: newS3TablesManager(), icebergPort: icebergPort, + pluginLock: lockManager, } // Initialize topic retention purger @@ -229,6 +233,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) { return server.buildDefaultPluginClusterContext(), nil }, + LockManager: lockManager, }) if err != nil && dataDir != "" { glog.Warningf("Failed to initialize plugin with dataDir=%q: %v. Falling back to in-memory plugin state.", dataDir, err) @@ -237,6 +242,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) { return server.buildDefaultPluginClusterContext(), nil }, + LockManager: lockManager, }) } if err != nil { @@ -890,6 +896,13 @@ func (s *AdminServer) GetPlugin() *adminplugin.Plugin { return s.plugin } +func (s *AdminServer) acquirePluginLock(reason string) (func(), error) { + if s == nil || s.pluginLock == nil { + return func() {}, nil + } + return s.pluginLock.Acquire(reason) +} + // RequestPluginJobTypeDescriptor asks one worker for job type schema and returns the descriptor. func (s *AdminServer) RequestPluginJobTypeDescriptor(ctx context.Context, jobType string, forceRefresh bool) (*plugin_pb.JobTypeDescriptor, error) { if s.plugin == nil { @@ -932,6 +945,13 @@ func (s *AdminServer) RunPluginDetection( if s.plugin == nil { return nil, fmt.Errorf("plugin is not enabled") } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType)) + if err != nil { + return nil, err + } + if releaseLock != nil { + defer releaseLock() + } return s.plugin.RunDetection(ctx, jobType, clusterContext, maxResults) } @@ -957,6 +977,13 @@ func (s *AdminServer) RunPluginDetectionWithReport( if s.plugin == nil { return nil, fmt.Errorf("plugin is not enabled") } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType)) + if err != nil { + return nil, err + } + if releaseLock != nil { + defer releaseLock() + } return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults) } @@ -970,6 +997,17 @@ func (s *AdminServer) ExecutePluginJob( if s.plugin == nil { return nil, fmt.Errorf("plugin is not enabled") } + jobType := "" + if job != nil { + jobType = strings.TrimSpace(job.JobType) + } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin execution %s", jobType)) + if err != nil { + return nil, err + } + if releaseLock != nil { + defer releaseLock() + } return s.plugin.ExecuteJob(ctx, job, clusterContext, attempt) } diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index e97325647..79d78e254 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -214,6 +214,27 @@ func (s *AdminServer) GetPluginSchedulerStatesAPI(w http.ResponseWriter, r *http writeJSON(w, http.StatusOK, states) } +// GetPluginSchedulerStatusAPI returns scheduler status including in-process jobs and lock state. +func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http.Request) { + pluginSvc := s.GetPlugin() + if pluginSvc == nil { + writeJSON(w, http.StatusOK, map[string]interface{}{ + "enabled": false, + }) + return + } + + response := map[string]interface{}{ + "enabled": true, + "scheduler": pluginSvc.GetSchedulerStatus(), + } + if s.pluginLock != nil { + response["lock"] = s.pluginLock.Status() + } + + writeJSON(w, http.StatusOK, response) +} + // RequestPluginJobTypeSchemaAPI asks a worker for one job type schema. func (s *AdminServer) RequestPluginJobTypeSchemaAPI(w http.ResponseWriter, r *http.Request) { jobType := strings.TrimSpace(mux.Vars(r)["jobType"]) @@ -277,6 +298,9 @@ func (s *AdminServer) GetPluginJobTypeConfigAPI(w http.ResponseWriter, r *http.R AdminRuntime: &plugin_pb.AdminRuntimeConfig{}, } } + if descriptor, err := s.LoadPluginJobTypeDescriptor(jobType); err == nil && descriptor != nil { + applyDescriptorDefaultsToPersistedConfig(config, descriptor) + } renderProtoJSON(w, http.StatusOK, config) } @@ -455,6 +479,14 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request writeJSONError(w, http.StatusBadRequest, "jobType is required") return } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detect+execute %s", jobType)) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, err.Error()) + return + } + if releaseLock != nil { + defer releaseLock() + } var req struct { ClusterContext json.RawMessage `json:"cluster_context"` @@ -771,6 +803,90 @@ func buildJobSpecFromProposal(jobType string, proposal *plugin_pb.JobProposal, i return jobSpec } +func applyDescriptorDefaultsToPersistedConfig( + config *plugin_pb.PersistedJobTypeConfig, + descriptor *plugin_pb.JobTypeDescriptor, +) { + if config == nil || descriptor == nil { + return + } + + if config.AdminConfigValues == nil { + config.AdminConfigValues = map[string]*plugin_pb.ConfigValue{} + } + if config.WorkerConfigValues == nil { + config.WorkerConfigValues = map[string]*plugin_pb.ConfigValue{} + } + if config.AdminRuntime == nil { + config.AdminRuntime = &plugin_pb.AdminRuntimeConfig{} + } + + if descriptor.AdminConfigForm != nil { + for key, value := range descriptor.AdminConfigForm.DefaultValues { + if value == nil { + continue + } + current := config.AdminConfigValues[key] + if current == nil { + config.AdminConfigValues[key] = proto.Clone(value).(*plugin_pb.ConfigValue) + continue + } + if strings.EqualFold(descriptor.JobType, "admin_script") && + key == "script" && + isBlankStringConfigValue(current) { + config.AdminConfigValues[key] = proto.Clone(value).(*plugin_pb.ConfigValue) + } + } + } + if descriptor.WorkerConfigForm != nil { + for key, value := range descriptor.WorkerConfigForm.DefaultValues { + if value == nil { + continue + } + if config.WorkerConfigValues[key] != nil { + continue + } + config.WorkerConfigValues[key] = proto.Clone(value).(*plugin_pb.ConfigValue) + } + } + if descriptor.AdminRuntimeDefaults != nil { + runtime := config.AdminRuntime + defaults := descriptor.AdminRuntimeDefaults + if runtime.DetectionIntervalSeconds <= 0 { + runtime.DetectionIntervalSeconds = defaults.DetectionIntervalSeconds + } + if runtime.DetectionTimeoutSeconds <= 0 { + runtime.DetectionTimeoutSeconds = defaults.DetectionTimeoutSeconds + } + if runtime.MaxJobsPerDetection <= 0 { + runtime.MaxJobsPerDetection = defaults.MaxJobsPerDetection + } + if runtime.GlobalExecutionConcurrency <= 0 { + runtime.GlobalExecutionConcurrency = defaults.GlobalExecutionConcurrency + } + if runtime.PerWorkerExecutionConcurrency <= 0 { + runtime.PerWorkerExecutionConcurrency = defaults.PerWorkerExecutionConcurrency + } + if runtime.RetryBackoffSeconds <= 0 { + runtime.RetryBackoffSeconds = defaults.RetryBackoffSeconds + } + if runtime.RetryLimit < 0 { + runtime.RetryLimit = defaults.RetryLimit + } + } +} + +func isBlankStringConfigValue(value *plugin_pb.ConfigValue) bool { + if value == nil { + return true + } + kind, ok := value.Kind.(*plugin_pb.ConfigValue_StringValue) + if !ok { + return false + } + return strings.TrimSpace(kind.StringValue) == "" +} + func parsePositiveInt(raw string, defaultValue int) int { value, err := strconv.Atoi(strings.TrimSpace(raw)) if err != nil || value <= 0 { diff --git a/weed/admin/dash/plugin_api_test.go b/weed/admin/dash/plugin_api_test.go index c6fcbb028..5e535382a 100644 --- a/weed/admin/dash/plugin_api_test.go +++ b/weed/admin/dash/plugin_api_test.go @@ -140,3 +140,83 @@ func TestBuildJobSpecFromProposalDoesNotReuseProposalID(t *testing.T) { t.Fatalf("dedupe key must be preserved: got=%s want=%s", jobA.DedupeKey, proposal.DedupeKey) } } + +func TestApplyDescriptorDefaultsToPersistedConfigBackfillsAdminDefaults(t *testing.T) { + t.Parallel() + + config := &plugin_pb.PersistedJobTypeConfig{ + JobType: "admin_script", + AdminConfigValues: map[string]*plugin_pb.ConfigValue{}, + WorkerConfigValues: map[string]*plugin_pb.ConfigValue{}, + AdminRuntime: &plugin_pb.AdminRuntimeConfig{}, + } + descriptor := &plugin_pb.JobTypeDescriptor{ + JobType: "admin_script", + AdminConfigForm: &plugin_pb.ConfigForm{ + DefaultValues: map[string]*plugin_pb.ConfigValue{ + "script": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "volume.balance -apply"}, + }, + "run_interval_minutes": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 17}, + }, + }, + }, + AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ + DetectionIntervalSeconds: 60, + DetectionTimeoutSeconds: 300, + }, + } + + applyDescriptorDefaultsToPersistedConfig(config, descriptor) + + script := config.AdminConfigValues["script"] + if script == nil { + t.Fatalf("expected script default to be backfilled") + } + scriptKind, ok := script.Kind.(*plugin_pb.ConfigValue_StringValue) + if !ok || scriptKind.StringValue == "" { + t.Fatalf("expected non-empty script default, got=%+v", script) + } + if config.AdminRuntime.DetectionIntervalSeconds != 60 { + t.Fatalf("expected runtime detection interval default to be backfilled") + } +} + +func TestApplyDescriptorDefaultsToPersistedConfigReplacesBlankAdminScript(t *testing.T) { + t.Parallel() + + config := &plugin_pb.PersistedJobTypeConfig{ + JobType: "admin_script", + AdminConfigValues: map[string]*plugin_pb.ConfigValue{ + "script": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: " "}, + }, + }, + AdminRuntime: &plugin_pb.AdminRuntimeConfig{}, + } + descriptor := &plugin_pb.JobTypeDescriptor{ + JobType: "admin_script", + AdminConfigForm: &plugin_pb.ConfigForm{ + DefaultValues: map[string]*plugin_pb.ConfigValue{ + "script": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "volume.fix.replication -apply"}, + }, + }, + }, + } + + applyDescriptorDefaultsToPersistedConfig(config, descriptor) + + script := config.AdminConfigValues["script"] + if script == nil { + t.Fatalf("expected script config value") + } + scriptKind, ok := script.Kind.(*plugin_pb.ConfigValue_StringValue) + if !ok { + t.Fatalf("expected string script config value, got=%T", script.Kind) + } + if scriptKind.StringValue != "volume.fix.replication -apply" { + t.Fatalf("expected blank script to be replaced by default, got=%q", scriptKind.StringValue) + } +} diff --git a/weed/admin/handlers/admin_handlers.go b/weed/admin/handlers/admin_handlers.go index b548d329f..3ce607150 100644 --- a/weed/admin/handlers/admin_handlers.go +++ b/weed/admin/handlers/admin_handlers.go @@ -234,6 +234,7 @@ func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) { pluginApi.HandleFunc("/jobs/{jobId}/detail", h.adminServer.GetPluginJobDetailAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/activities", h.adminServer.GetPluginActivitiesAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/scheduler-states", h.adminServer.GetPluginSchedulerStatesAPI).Methods(http.MethodGet) + pluginApi.HandleFunc("/scheduler-status", h.adminServer.GetPluginSchedulerStatusAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/job-types/{jobType}/descriptor", h.adminServer.GetPluginJobTypeDescriptorAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/job-types/{jobType}/schema", h.adminServer.RequestPluginJobTypeSchemaAPI).Methods(http.MethodPost) pluginApi.HandleFunc("/job-types/{jobType}/config", h.adminServer.GetPluginJobTypeConfigAPI).Methods(http.MethodGet) diff --git a/weed/admin/plugin/lock_manager.go b/weed/admin/plugin/lock_manager.go new file mode 100644 index 000000000..6ddb56d91 --- /dev/null +++ b/weed/admin/plugin/lock_manager.go @@ -0,0 +1,7 @@ +package plugin + +// LockManager provides a shared exclusive lock for admin-managed detection/execution. +// Acquire returns a release function that must be called when the protected work finishes. +type LockManager interface { + Acquire(reason string) (release func(), err error) +} diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 690777af3..6ec49612c 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -24,6 +24,7 @@ const ( defaultHeartbeatInterval = 30 defaultReconnectDelay = 5 defaultPendingSchemaBuffer = 1 + adminScriptJobType = "admin_script" ) type Options struct { @@ -32,6 +33,7 @@ type Options struct { SendTimeout time.Duration SchedulerTick time.Duration ClusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error) + LockManager LockManager } // JobTypeInfo contains metadata about a plugin job type. @@ -52,6 +54,7 @@ type Plugin struct { schedulerTick time.Duration clusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error) + lockManager LockManager schedulerMu sync.Mutex nextDetectionAt map[string]time.Time @@ -62,6 +65,9 @@ type Plugin struct { schedulerExecMu sync.Mutex schedulerExecReservations map[string]int + adminScriptRunMu sync.RWMutex + schedulerDetectionMu sync.Mutex + schedulerDetection map[string]*schedulerDetectionInfo dedupeMu sync.Mutex recentDedupeByType map[string]map[string]time.Time @@ -148,6 +154,7 @@ func New(options Options) (*Plugin, error) { sendTimeout: sendTimeout, schedulerTick: schedulerTick, clusterContextProvider: options.ClusterContextProvider, + lockManager: options.LockManager, sessions: make(map[string]*streamSession), pendingSchema: make(map[string]chan *plugin_pb.ConfigSchemaResponse), pendingDetection: make(map[string]*pendingDetectionState), @@ -156,6 +163,7 @@ func New(options Options) (*Plugin, error) { detectionInFlight: make(map[string]bool), detectorLeases: make(map[string]string), schedulerExecReservations: make(map[string]int), + schedulerDetection: make(map[string]*schedulerDetectionInfo), recentDedupeByType: make(map[string]map[string]time.Time), jobs: make(map[string]*TrackedJob), activities: make([]JobActivity, 0, 256), @@ -382,6 +390,13 @@ func (r *Plugin) BaseDir() string { return r.store.BaseDir() } +func (r *Plugin) acquireAdminLock(reason string) (func(), error) { + if r == nil || r.lockManager == nil { + return func() {}, nil + } + return r.lockManager.Acquire(reason) +} + // RunDetectionWithReport requests one detector worker and returns proposals with request metadata. func (r *Plugin) RunDetectionWithReport( ctx context.Context, @@ -389,6 +404,9 @@ func (r *Plugin) RunDetectionWithReport( clusterContext *plugin_pb.ClusterContext, maxResults int32, ) (*DetectionReport, error) { + releaseGate := r.acquireDetectionExecutionGate(jobType, false) + defer releaseGate() + detector, err := r.pickDetector(jobType) if err != nil { return nil, err @@ -403,7 +421,10 @@ func (r *Plugin) RunDetectionWithReport( if err != nil { return nil, err } - lastSuccessfulRun := r.loadLastSuccessfulRun(jobType) + lastCompletedRun := r.loadLastSuccessfulRun(jobType) + if strings.EqualFold(strings.TrimSpace(jobType), adminScriptJobType) { + lastCompletedRun = r.loadLastCompletedRun(jobType) + } state := &pendingDetectionState{ complete: make(chan *plugin_pb.DetectionComplete, 1), @@ -444,7 +465,7 @@ func (r *Plugin) RunDetectionWithReport( AdminConfigValues: adminConfigValues, WorkerConfigValues: workerConfigValues, ClusterContext: clusterContext, - LastSuccessfulRun: lastSuccessfulRun, + LastSuccessfulRun: lastCompletedRun, MaxResults: maxResults, }, }, @@ -531,11 +552,14 @@ func (r *Plugin) ExecuteJob( if job == nil { return nil, fmt.Errorf("job is nil") } - if strings.TrimSpace(job.JobType) == "" { + jobType := strings.TrimSpace(job.JobType) + if jobType == "" { return nil, fmt.Errorf("job_type is required") } + releaseGate := r.acquireDetectionExecutionGate(jobType, true) + defer releaseGate() - executor, err := r.registry.PickExecutor(job.JobType) + executor, err := r.registry.PickExecutor(jobType) if err != nil { return nil, err } @@ -543,6 +567,23 @@ func (r *Plugin) ExecuteJob( return r.executeJobWithExecutor(ctx, executor, job, clusterContext, attempt) } +func (r *Plugin) acquireDetectionExecutionGate(jobType string, execution bool) func() { + normalizedJobType := strings.ToLower(strings.TrimSpace(jobType)) + if execution && normalizedJobType == adminScriptJobType { + r.adminScriptRunMu.Lock() + return func() { + r.adminScriptRunMu.Unlock() + } + } + if normalizedJobType != adminScriptJobType { + r.adminScriptRunMu.RLock() + return func() { + r.adminScriptRunMu.RUnlock() + } + } + return func() {} +} + func (r *Plugin) executeJobWithExecutor( ctx context.Context, executor *WorkerSession, @@ -1291,6 +1332,41 @@ func (r *Plugin) loadLastSuccessfulRun(jobType string) *timestamppb.Timestamp { return timestamppb.New(latest.UTC()) } +func (r *Plugin) loadLastCompletedRun(jobType string) *timestamppb.Timestamp { + history, err := r.store.LoadRunHistory(jobType) + if err != nil { + glog.Warningf("Plugin failed to load run history for %s: %v", jobType, err) + return nil + } + if history == nil { + return nil + } + + var latest time.Time + for i := range history.SuccessfulRuns { + completedAt := history.SuccessfulRuns[i].CompletedAt + if completedAt == nil || completedAt.IsZero() { + continue + } + if latest.IsZero() || completedAt.After(latest) { + latest = *completedAt + } + } + for i := range history.ErrorRuns { + completedAt := history.ErrorRuns[i].CompletedAt + if completedAt == nil || completedAt.IsZero() { + continue + } + if latest.IsZero() || completedAt.After(latest) { + latest = *completedAt + } + } + if latest.IsZero() { + return nil + } + return timestamppb.New(latest.UTC()) +} + func CloneConfigValueMap(in map[string]*plugin_pb.ConfigValue) map[string]*plugin_pb.ConfigValue { if len(in) == 0 { return map[string]*plugin_pb.ConfigValue{} diff --git a/weed/admin/plugin/plugin_cancel_test.go b/weed/admin/plugin/plugin_cancel_test.go index bb597e3f7..2a966ae8c 100644 --- a/weed/admin/plugin/plugin_cancel_test.go +++ b/weed/admin/plugin/plugin_cancel_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "testing" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestRunDetectionSendsCancelOnContextDone(t *testing.T) { @@ -110,3 +112,165 @@ func TestExecuteJobSendsCancelOnContextDone(t *testing.T) { t.Fatalf("expected context canceled error, got %v", runErr) } } + +func TestAdminScriptExecutionBlocksOtherDetection(t *testing.T) { + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New plugin error: %v", err) + } + defer pluginSvc.Shutdown() + + const adminWorkerID = "worker-admin-script" + const otherWorkerID = "worker-vacuum" + + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: adminWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1}, + }, + }) + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: otherWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanDetect: true, MaxDetectionConcurrency: 1}, + }, + }) + adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + pluginSvc.putSession(adminSession) + pluginSvc.putSession(otherSession) + + adminErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{ + JobId: "job-admin-script-1", + JobType: "admin_script", + }, &plugin_pb.ClusterContext{}, 1) + adminErrCh <- runErr + }() + + adminExecMessage := <-adminSession.outgoing + if adminExecMessage.GetExecuteJobRequest() == nil { + t.Fatalf("expected admin_script execute request") + } + + detectErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.RunDetection(context.Background(), "vacuum", &plugin_pb.ClusterContext{}, 10) + detectErrCh <- runErr + }() + + select { + case unexpected := <-otherSession.outgoing: + t.Fatalf("expected vacuum detection to wait while admin_script runs, got message: %+v", unexpected) + case <-time.After(100 * time.Millisecond): + } + + pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{ + RequestId: adminExecMessage.RequestId, + JobId: "job-admin-script-1", + JobType: "admin_script", + Success: true, + CompletedAt: timestamppb.Now(), + }) + if runErr := <-adminErrCh; runErr != nil { + t.Fatalf("admin_script ExecuteJob error: %v", runErr) + } + + detectMessage := <-otherSession.outgoing + detectRequest := detectMessage.GetRunDetectionRequest() + if detectRequest == nil { + t.Fatalf("expected vacuum detection request after admin_script completion") + } + pluginSvc.handleDetectionComplete(otherWorkerID, &plugin_pb.DetectionComplete{ + RequestId: detectMessage.RequestId, + JobType: "vacuum", + Success: true, + }) + if runErr := <-detectErrCh; runErr != nil { + t.Fatalf("vacuum RunDetection error: %v", runErr) + } +} + +func TestAdminScriptExecutionBlocksOtherExecution(t *testing.T) { + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New plugin error: %v", err) + } + defer pluginSvc.Shutdown() + + const adminWorkerID = "worker-admin-script" + const otherWorkerID = "worker-vacuum" + + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: adminWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1}, + }, + }) + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: otherWorkerID, + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanExecute: true, MaxExecutionConcurrency: 1}, + }, + }) + adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)} + pluginSvc.putSession(adminSession) + pluginSvc.putSession(otherSession) + + adminErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{ + JobId: "job-admin-script-2", + JobType: "admin_script", + }, &plugin_pb.ClusterContext{}, 1) + adminErrCh <- runErr + }() + + adminExecMessage := <-adminSession.outgoing + if adminExecMessage.GetExecuteJobRequest() == nil { + t.Fatalf("expected admin_script execute request") + } + + otherErrCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{ + JobId: "job-vacuum-1", + JobType: "vacuum", + }, &plugin_pb.ClusterContext{}, 1) + otherErrCh <- runErr + }() + + select { + case unexpected := <-otherSession.outgoing: + t.Fatalf("expected vacuum execute to wait while admin_script runs, got message: %+v", unexpected) + case <-time.After(100 * time.Millisecond): + } + + pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{ + RequestId: adminExecMessage.RequestId, + JobId: "job-admin-script-2", + JobType: "admin_script", + Success: true, + CompletedAt: timestamppb.Now(), + }) + if runErr := <-adminErrCh; runErr != nil { + t.Fatalf("admin_script ExecuteJob error: %v", runErr) + } + + otherExecMessage := <-otherSession.outgoing + if otherExecMessage.GetExecuteJobRequest() == nil { + t.Fatalf("expected vacuum execute request after admin_script completion") + } + pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{ + RequestId: otherExecMessage.RequestId, + JobId: "job-vacuum-1", + JobType: "vacuum", + Success: true, + CompletedAt: timestamppb.Now(), + }) + if runErr := <-otherErrCh; runErr != nil { + t.Fatalf("vacuum ExecuteJob error: %v", runErr) + } +} diff --git a/weed/admin/plugin/plugin_detection_test.go b/weed/admin/plugin/plugin_detection_test.go index 755ade4cd..be2aac50c 100644 --- a/weed/admin/plugin/plugin_detection_test.go +++ b/weed/admin/plugin/plugin_detection_test.go @@ -195,3 +195,64 @@ func TestRunDetectionWithReportCapturesDetectionActivities(t *testing.T) { t.Fatalf("expected requested/proposal/completed activities, got stages=%v", stages) } } + +func TestRunDetectionAdminScriptUsesLastCompletedRun(t *testing.T) { + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New plugin error: %v", err) + } + defer pluginSvc.Shutdown() + + jobType := "admin_script" + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: "worker-admin-script", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: jobType, CanDetect: true, MaxDetectionConcurrency: 1}, + }, + }) + session := &streamSession{workerID: "worker-admin-script", outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 1)} + pluginSvc.putSession(session) + + successCompleted := time.Date(2026, 2, 1, 10, 0, 0, 0, time.UTC) + errorCompleted := successCompleted.Add(45 * time.Minute) + if err := pluginSvc.store.AppendRunRecord(jobType, &JobRunRecord{ + Outcome: RunOutcomeSuccess, + CompletedAt: timeToPtr(successCompleted), + }); err != nil { + t.Fatalf("AppendRunRecord success run: %v", err) + } + if err := pluginSvc.store.AppendRunRecord(jobType, &JobRunRecord{ + Outcome: RunOutcomeError, + CompletedAt: timeToPtr(errorCompleted), + }); err != nil { + t.Fatalf("AppendRunRecord error run: %v", err) + } + + errCh := make(chan error, 1) + go func() { + _, runErr := pluginSvc.RunDetection(context.Background(), jobType, &plugin_pb.ClusterContext{}, 10) + errCh <- runErr + }() + + message := <-session.outgoing + detectRequest := message.GetRunDetectionRequest() + if detectRequest == nil { + t.Fatalf("expected run detection request message") + } + if detectRequest.LastSuccessfulRun == nil { + t.Fatalf("expected last_successful_run to be set") + } + if got := detectRequest.LastSuccessfulRun.AsTime().UTC(); !got.Equal(errorCompleted) { + t.Fatalf("unexpected last_successful_run, got=%s want=%s", got, errorCompleted) + } + + pluginSvc.handleDetectionComplete("worker-admin-script", &plugin_pb.DetectionComplete{ + RequestId: message.RequestId, + JobType: jobType, + Success: true, + }) + + if runErr := <-errCh; runErr != nil { + t.Fatalf("RunDetection error: %v", runErr) + } +} diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 040140413..67e463c56 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -321,6 +321,22 @@ func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) { func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) { defer r.finishDetection(jobType) + releaseLock, lockErr := r.acquireAdminLock(fmt.Sprintf("plugin scheduled detection %s", jobType)) + if lockErr != nil { + r.recordSchedulerDetectionError(jobType, lockErr) + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection aborted: failed to acquire lock: %v", lockErr), + Stage: "failed", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + return + } + if releaseLock != nil { + defer releaseLock() + } + start := time.Now().UTC() r.appendActivity(JobActivity{ JobType: jobType, @@ -331,6 +347,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) { }) if skip, waitingCount, waitingThreshold := r.shouldSkipDetectionForWaitingJobs(jobType, policy); skip { + r.recordSchedulerDetectionSkip(jobType, fmt.Sprintf("waiting backlog %d reached threshold %d", waitingCount, waitingThreshold)) r.appendActivity(JobActivity{ JobType: jobType, Source: "admin_scheduler", @@ -343,6 +360,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) { clusterContext, err := r.loadSchedulerClusterContext() if err != nil { + r.recordSchedulerDetectionError(jobType, err) r.appendActivity(JobActivity{ JobType: jobType, Source: "admin_scheduler", @@ -357,6 +375,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) { proposals, err := r.RunDetection(ctx, jobType, clusterContext, policy.MaxResults) cancel() if err != nil { + r.recordSchedulerDetectionError(jobType, err) r.appendActivity(JobActivity{ JobType: jobType, Source: "admin_scheduler", @@ -374,6 +393,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) { Stage: "detected", OccurredAt: timeToPtr(time.Now().UTC()), }) + r.recordSchedulerDetectionSuccess(jobType, len(proposals)) filteredByActive, skippedActive := r.filterProposalsWithActiveJobs(jobType, proposals) if skippedActive > 0 { diff --git a/weed/admin/plugin/scheduler_status.go b/weed/admin/plugin/scheduler_status.go new file mode 100644 index 000000000..673afa232 --- /dev/null +++ b/weed/admin/plugin/scheduler_status.go @@ -0,0 +1,234 @@ +package plugin + +import ( + "sort" + "strings" + "time" +) + +type SchedulerStatus struct { + Now time.Time `json:"now"` + SchedulerTickSeconds int `json:"scheduler_tick_seconds"` + Waiting []SchedulerWaitingStatus `json:"waiting,omitempty"` + InProcessJobs []SchedulerJobStatus `json:"in_process_jobs,omitempty"` + JobTypes []SchedulerJobTypeStatus `json:"job_types,omitempty"` +} + +type SchedulerWaitingStatus struct { + Reason string `json:"reason"` + JobType string `json:"job_type,omitempty"` + Since *time.Time `json:"since,omitempty"` + Until *time.Time `json:"until,omitempty"` + Details map[string]interface{} `json:"details,omitempty"` +} + +type SchedulerJobStatus struct { + JobID string `json:"job_id"` + JobType string `json:"job_type"` + State string `json:"state"` + Stage string `json:"stage,omitempty"` + WorkerID string `json:"worker_id,omitempty"` + Message string `json:"message,omitempty"` + Progress float64 `json:"progress,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` + UpdatedAt *time.Time `json:"updated_at,omitempty"` + DurationSeconds float64 `json:"duration_seconds,omitempty"` +} + +type SchedulerJobTypeStatus struct { + JobType string `json:"job_type"` + Enabled bool `json:"enabled"` + DetectionInFlight bool `json:"detection_in_flight"` + NextDetectionAt *time.Time `json:"next_detection_at,omitempty"` + DetectionIntervalSeconds int32 `json:"detection_interval_seconds,omitempty"` + LastDetectedAt *time.Time `json:"last_detected_at,omitempty"` + LastDetectedCount int `json:"last_detected_count,omitempty"` + LastDetectionError string `json:"last_detection_error,omitempty"` + LastDetectionSkipped string `json:"last_detection_skipped,omitempty"` +} + +type schedulerDetectionInfo struct { + lastDetectedAt time.Time + lastDetectedCount int + lastErrorAt time.Time + lastError string + lastSkippedAt time.Time + lastSkippedReason string +} + +func (r *Plugin) recordSchedulerDetectionSuccess(jobType string, count int) { + if r == nil { + return + } + r.schedulerDetectionMu.Lock() + defer r.schedulerDetectionMu.Unlock() + info := r.schedulerDetection[jobType] + if info == nil { + info = &schedulerDetectionInfo{} + r.schedulerDetection[jobType] = info + } + info.lastDetectedAt = time.Now().UTC() + info.lastDetectedCount = count + info.lastError = "" + info.lastSkippedReason = "" +} + +func (r *Plugin) recordSchedulerDetectionError(jobType string, err error) { + if r == nil { + return + } + if err == nil { + return + } + r.schedulerDetectionMu.Lock() + defer r.schedulerDetectionMu.Unlock() + info := r.schedulerDetection[jobType] + if info == nil { + info = &schedulerDetectionInfo{} + r.schedulerDetection[jobType] = info + } + info.lastErrorAt = time.Now().UTC() + info.lastError = err.Error() +} + +func (r *Plugin) recordSchedulerDetectionSkip(jobType string, reason string) { + if r == nil { + return + } + if strings.TrimSpace(reason) == "" { + return + } + r.schedulerDetectionMu.Lock() + defer r.schedulerDetectionMu.Unlock() + info := r.schedulerDetection[jobType] + if info == nil { + info = &schedulerDetectionInfo{} + r.schedulerDetection[jobType] = info + } + info.lastSkippedAt = time.Now().UTC() + info.lastSkippedReason = reason +} + +func (r *Plugin) snapshotSchedulerDetection(jobType string) schedulerDetectionInfo { + if r == nil { + return schedulerDetectionInfo{} + } + r.schedulerDetectionMu.Lock() + defer r.schedulerDetectionMu.Unlock() + info := r.schedulerDetection[jobType] + if info == nil { + return schedulerDetectionInfo{} + } + return *info +} + +func (r *Plugin) GetSchedulerStatus() SchedulerStatus { + now := time.Now().UTC() + status := SchedulerStatus{ + Now: now, + SchedulerTickSeconds: int(secondsFromDuration(r.schedulerTick)), + InProcessJobs: r.listInProcessJobs(now), + } + + states, err := r.ListSchedulerStates() + if err != nil { + return status + } + + waiting := make([]SchedulerWaitingStatus, 0) + jobTypes := make([]SchedulerJobTypeStatus, 0, len(states)) + + for _, state := range states { + jobType := state.JobType + info := r.snapshotSchedulerDetection(jobType) + + jobStatus := SchedulerJobTypeStatus{ + JobType: jobType, + Enabled: state.Enabled, + DetectionInFlight: state.DetectionInFlight, + NextDetectionAt: state.NextDetectionAt, + DetectionIntervalSeconds: state.DetectionIntervalSeconds, + } + if !info.lastDetectedAt.IsZero() { + jobStatus.LastDetectedAt = timeToPtr(info.lastDetectedAt) + jobStatus.LastDetectedCount = info.lastDetectedCount + } + if info.lastError != "" { + jobStatus.LastDetectionError = info.lastError + } + if info.lastSkippedReason != "" { + jobStatus.LastDetectionSkipped = info.lastSkippedReason + } + jobTypes = append(jobTypes, jobStatus) + + if state.DetectionInFlight { + waiting = append(waiting, SchedulerWaitingStatus{ + Reason: "detection_in_flight", + JobType: jobType, + }) + } else if state.Enabled && state.NextDetectionAt != nil && now.Before(*state.NextDetectionAt) { + waiting = append(waiting, SchedulerWaitingStatus{ + Reason: "next_detection_at", + JobType: jobType, + Until: state.NextDetectionAt, + }) + } + } + + sort.Slice(jobTypes, func(i, j int) bool { + return jobTypes[i].JobType < jobTypes[j].JobType + }) + + status.Waiting = waiting + status.JobTypes = jobTypes + return status +} + +func (r *Plugin) listInProcessJobs(now time.Time) []SchedulerJobStatus { + active := make([]SchedulerJobStatus, 0) + if r == nil { + return active + } + + r.jobsMu.RLock() + for _, job := range r.jobs { + if job == nil { + continue + } + if !isActiveTrackedJobState(job.State) { + continue + } + start := timeToPtr(now) + if job.CreatedAt != nil && !job.CreatedAt.IsZero() { + start = job.CreatedAt + } else if job.UpdatedAt != nil && !job.UpdatedAt.IsZero() { + start = job.UpdatedAt + } + durationSeconds := 0.0 + if start != nil { + durationSeconds = now.Sub(*start).Seconds() + } + active = append(active, SchedulerJobStatus{ + JobID: job.JobID, + JobType: job.JobType, + State: strings.ToLower(job.State), + Stage: job.Stage, + WorkerID: job.WorkerID, + Message: job.Message, + Progress: job.Progress, + CreatedAt: job.CreatedAt, + UpdatedAt: job.UpdatedAt, + DurationSeconds: durationSeconds, + }) + } + r.jobsMu.RUnlock() + + sort.Slice(active, func(i, j int) bool { + if active[i].DurationSeconds != active[j].DurationSeconds { + return active[i].DurationSeconds > active[j].DurationSeconds + } + return active[i].JobID < active[j].JobID + }) + + return active +} diff --git a/weed/admin/plugin/scheduler_status_test.go b/weed/admin/plugin/scheduler_status_test.go new file mode 100644 index 000000000..b1cb7de70 --- /dev/null +++ b/weed/admin/plugin/scheduler_status_test.go @@ -0,0 +1,64 @@ +package plugin + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" +) + +func TestGetSchedulerStatusIncludesInProcessJobs(t *testing.T) { + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New: %v", err) + } + defer pluginSvc.Shutdown() + + pluginSvc.trackExecutionStart("req-1", "worker-a", &plugin_pb.JobSpec{ + JobId: "job-1", + JobType: "vacuum", + }, 1) + + status := pluginSvc.GetSchedulerStatus() + if len(status.InProcessJobs) != 1 { + t.Fatalf("expected one in-process job, got %d", len(status.InProcessJobs)) + } + if status.InProcessJobs[0].JobID != "job-1" { + t.Fatalf("unexpected job id: %s", status.InProcessJobs[0].JobID) + } +} + +func TestGetSchedulerStatusIncludesLastDetectionCount(t *testing.T) { + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New: %v", err) + } + defer pluginSvc.Shutdown() + + const jobType = "vacuum" + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: "worker-a", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: jobType, CanDetect: true}, + }, + }) + + pluginSvc.recordSchedulerDetectionSuccess(jobType, 3) + + status := pluginSvc.GetSchedulerStatus() + found := false + for _, jt := range status.JobTypes { + if jt.JobType != jobType { + continue + } + found = true + if jt.LastDetectedCount != 3 { + t.Fatalf("unexpected last detected count: got=%d want=3", jt.LastDetectedCount) + } + if jt.LastDetectedAt == nil { + t.Fatalf("expected last detected at to be set") + } + } + if !found { + t.Fatalf("expected job type status for %s", jobType) + } +} diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index 5794f452c..63dc16f05 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -511,6 +511,9 @@ templ Plugin(page string) { .plugin-form-root .card { border: 1px solid #dee2e6; } + .plugin-form-root textarea { + min-height: 12rem; + } .plugin-field-hidden { display: none; diff --git a/weed/admin/view/app/plugin_templ.go b/weed/admin/view/app/plugin_templ.go index c54890eda..5f3c5c2b6 100644 --- a/weed/admin/view/app/plugin_templ.go +++ b/weed/admin/view/app/plugin_templ.go @@ -46,7 +46,7 @@ func Plugin(page string) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Per job type detection schedule and execution limits
Job TypeEnabledDetectorIn FlightNext DetectionIntervalExec GlobalExec/WorkerExecutor WorkersEffective Exec
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Per job type detection schedule and execution limits
Job TypeEnabledDetectorIn FlightNext DetectionIntervalExec GlobalExec/WorkerExecutor WorkersEffective Exec
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/weed/command/mini.go b/weed/command/mini.go index 0ff0b9b51..ac8592725 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -45,7 +45,7 @@ const ( defaultMiniVolumeSizeMB = 128 // Default volume size for mini mode maxVolumeSizeMB = 1024 // Maximum volume size in MB (1GB) GrpcPortOffset = 10000 // Offset used to calculate gRPC port from HTTP port - defaultMiniPluginJobTypes = "vacuum,volume_balance,erasure_coding" + defaultMiniPluginJobTypes = "vacuum,volume_balance,erasure_coding,admin_script" ) var ( diff --git a/weed/command/mini_plugin_test.go b/weed/command/mini_plugin_test.go index 30aafcaf3..45f5e9482 100644 --- a/weed/command/mini_plugin_test.go +++ b/weed/command/mini_plugin_test.go @@ -7,7 +7,7 @@ func TestMiniDefaultPluginJobTypes(t *testing.T) { if err != nil { t.Fatalf("parsePluginWorkerJobTypes(mini default) err = %v", err) } - if len(jobTypes) != 3 { - t.Fatalf("expected mini default job types to include 3 handlers, got %v", jobTypes) + if len(jobTypes) != 4 { + t.Fatalf("expected mini default job types to include 4 handlers, got %v", jobTypes) } } diff --git a/weed/command/plugin_worker_test.go b/weed/command/plugin_worker_test.go index 7442b4eb8..9e13142cd 100644 --- a/weed/command/plugin_worker_test.go +++ b/weed/command/plugin_worker_test.go @@ -123,6 +123,14 @@ func TestParsePluginWorkerJobTypes(t *testing.T) { if _, err = parsePluginWorkerJobTypes(" , "); err != nil { t.Fatalf("expected empty list to resolve to default vacuum: %v", err) } + + jobTypes, err = parsePluginWorkerJobTypes("admin-script,script,admin_script") + if err != nil { + t.Fatalf("parsePluginWorkerJobTypes(admin script aliases) err = %v", err) + } + if len(jobTypes) != 1 || jobTypes[0] != "admin_script" { + t.Fatalf("expected admin_script alias to resolve, got %v", jobTypes) + } } func TestPluginWorkerDefaultJobTypes(t *testing.T) { @@ -130,8 +138,8 @@ func TestPluginWorkerDefaultJobTypes(t *testing.T) { if err != nil { t.Fatalf("parsePluginWorkerJobTypes(default setting) err = %v", err) } - if len(jobTypes) != 3 { - t.Fatalf("expected default job types to include 3 handlers, got %v", jobTypes) + if len(jobTypes) != 4 { + t.Fatalf("expected default job types to include 4 handlers, got %v", jobTypes) } } diff --git a/weed/command/worker.go b/weed/command/worker.go index 0e02578ef..962fc87f3 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -7,11 +7,11 @@ import ( ) var cmdWorker = &Command{ - UsageLine: "worker -admin= [-id=] [-jobType=vacuum,volume_balance,erasure_coding] [-workingDir=] [-heartbeat=15s] [-reconnect=5s] [-maxDetect=1] [-maxExecute=4] [-metricsPort=] [-metricsIp=] [-debug]", + UsageLine: "worker -admin= [-id=] [-jobType=vacuum,volume_balance,erasure_coding,admin_script] [-workingDir=] [-heartbeat=15s] [-reconnect=5s] [-maxDetect=1] [-maxExecute=4] [-metricsPort=] [-metricsIp=] [-debug]", Short: "start a plugin.proto worker process", Long: `Start an external plugin worker using weed/pb/plugin.proto over gRPC. -This command provides vacuum, volume_balance, and erasure_coding job type +This command provides vacuum, volume_balance, erasure_coding, and admin_script job type contracts with the plugin stream runtime, including descriptor delivery, heartbeat/load reporting, detection, and execution. @@ -25,6 +25,7 @@ Examples: weed worker -admin=localhost:23646 -jobType=volume_balance weed worker -admin=localhost:23646 -jobType=vacuum,volume_balance weed worker -admin=localhost:23646 -jobType=erasure_coding + weed worker -admin=localhost:23646 -jobType=admin_script weed worker -admin=admin.example.com:23646 -id=plugin-vacuum-a -heartbeat=10s weed worker -admin=localhost:23646 -workingDir=/var/lib/seaweedfs-plugin weed worker -admin=localhost:23646 -metricsPort=9327 -metricsIp=0.0.0.0 diff --git a/weed/command/worker_runtime.go b/weed/command/worker_runtime.go index a7affc3ae..170989503 100644 --- a/weed/command/worker_runtime.go +++ b/weed/command/worker_runtime.go @@ -23,7 +23,7 @@ import ( "google.golang.org/grpc" ) -const defaultPluginWorkerJobTypes = "vacuum,volume_balance,erasure_coding" +const defaultPluginWorkerJobTypes = "vacuum,volume_balance,erasure_coding,admin_script" type pluginWorkerRunOptions struct { AdminServer string @@ -156,6 +156,8 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExe return pluginworker.NewVolumeBalanceHandler(dialOption), nil case "erasure_coding": return pluginworker.NewErasureCodingHandler(dialOption, workingDir), nil + case "admin_script": + return pluginworker.NewAdminScriptHandler(dialOption), nil default: return nil, fmt.Errorf("unsupported plugin job type %q", canonicalJobType) } @@ -220,6 +222,8 @@ func canonicalPluginWorkerJobType(jobType string) (string, error) { return "volume_balance", nil case "erasure_coding", "erasure-coding", "erasure.coding", "ec": return "erasure_coding", nil + case "admin_script", "admin-script", "admin.script", "script", "admin": + return "admin_script", nil default: return "", fmt.Errorf("unsupported plugin job type %q", jobType) } diff --git a/weed/command/worker_test.go b/weed/command/worker_test.go index 54867893e..0bde51cc9 100644 --- a/weed/command/worker_test.go +++ b/weed/command/worker_test.go @@ -7,7 +7,7 @@ func TestWorkerDefaultJobTypes(t *testing.T) { if err != nil { t.Fatalf("parsePluginWorkerJobTypes(default worker flag) err = %v", err) } - if len(jobTypes) != 3 { - t.Fatalf("expected default worker job types to include 3 handlers, got %v", jobTypes) + if len(jobTypes) != 4 { + t.Fatalf("expected default worker job types to include 4 handlers, got %v", jobTypes) } } diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go new file mode 100644 index 000000000..a653b7e6b --- /dev/null +++ b/weed/plugin/worker/admin_script_handler.go @@ -0,0 +1,635 @@ +package pluginworker + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "regexp" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/shell" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + adminScriptJobType = "admin_script" + maxAdminScriptOutputBytes = 16 * 1024 + defaultAdminScriptRunMins = 17 + adminScriptDetectTickSecs = 60 +) + +const defaultAdminScript = `ec.balance -apply +fs.log.purge -daysAgo=7 +volume.deleteEmpty -quietFor=24h -apply +volume.fix.replication -apply +s3.clean.uploads -timeAgo=24h` + +var adminScriptTokenRegex = regexp.MustCompile(`'.*?'|".*?"|\S+`) + +type AdminScriptHandler struct { + grpcDialOption grpc.DialOption +} + +func NewAdminScriptHandler(grpcDialOption grpc.DialOption) *AdminScriptHandler { + return &AdminScriptHandler{grpcDialOption: grpcDialOption} +} + +func (h *AdminScriptHandler) Capability() *plugin_pb.JobTypeCapability { + return &plugin_pb.JobTypeCapability{ + JobType: adminScriptJobType, + CanDetect: true, + CanExecute: true, + MaxDetectionConcurrency: 1, + MaxExecutionConcurrency: 1, + DisplayName: "Admin Script", + Description: "Execute custom admin shell scripts", + Weight: 20, + } +} + +func (h *AdminScriptHandler) Descriptor() *plugin_pb.JobTypeDescriptor { + return &plugin_pb.JobTypeDescriptor{ + JobType: adminScriptJobType, + DisplayName: "Admin Script", + Description: "Run custom admin shell scripts not covered by built-in job types", + Icon: "fas fa-terminal", + DescriptorVersion: 1, + AdminConfigForm: &plugin_pb.ConfigForm{ + FormId: "admin-script-admin", + Title: "Admin Script Configuration", + Description: "Define the admin shell script to execute.", + Sections: []*plugin_pb.ConfigSection{ + { + SectionId: "script", + Title: "Script", + Description: "Commands run sequentially by the admin script worker.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "script", + Label: "Script", + Description: "Admin shell commands to execute (one per line).", + HelpText: "Lock/unlock are handled by the admin server; omit explicit lock/unlock commands.", + Placeholder: "volume.balance -apply\nvolume.fix.replication -apply", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXTAREA, + Required: true, + }, + { + Name: "run_interval_minutes", + Label: "Run Interval (minutes)", + Description: "Minimum interval between successful admin script runs.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + Required: true, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, + }, + }, + }, + DefaultValues: map[string]*plugin_pb.ConfigValue{ + "script": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultAdminScript}, + }, + "run_interval_minutes": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultAdminScriptRunMins}, + }, + }, + }, + AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ + Enabled: true, + DetectionIntervalSeconds: adminScriptDetectTickSecs, + DetectionTimeoutSeconds: 300, + MaxJobsPerDetection: 1, + GlobalExecutionConcurrency: 1, + PerWorkerExecutionConcurrency: 1, + RetryLimit: 0, + RetryBackoffSeconds: 30, + }, + WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{}, + } +} + +func (h *AdminScriptHandler) Detect(ctx context.Context, request *plugin_pb.RunDetectionRequest, sender DetectionSender) error { + if request == nil { + return fmt.Errorf("run detection request is nil") + } + if sender == nil { + return fmt.Errorf("detection sender is nil") + } + if request.JobType != "" && request.JobType != adminScriptJobType { + return fmt.Errorf("job type %q is not handled by admin_script worker", request.JobType) + } + + script := normalizeAdminScript(readStringConfig(request.GetAdminConfigValues(), "script", "")) + scriptName := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "script_name", "")) + runIntervalMinutes := readAdminScriptRunIntervalMinutes(request.GetAdminConfigValues()) + if shouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), runIntervalMinutes*60) { + _ = sender.SendActivity(buildDetectorActivity( + "skipped_by_interval", + fmt.Sprintf("ADMIN SCRIPT: Detection skipped due to run interval (%dm)", runIntervalMinutes), + map[string]*plugin_pb.ConfigValue{ + "run_interval_minutes": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(runIntervalMinutes)}, + }, + }, + )) + if err := sender.SendProposals(&plugin_pb.DetectionProposals{ + JobType: adminScriptJobType, + Proposals: []*plugin_pb.JobProposal{}, + HasMore: false, + }); err != nil { + return err + } + return sender.SendComplete(&plugin_pb.DetectionComplete{ + JobType: adminScriptJobType, + Success: true, + TotalProposals: 0, + }) + } + + commands := parseAdminScriptCommands(script) + execCount := countExecutableCommands(commands) + if execCount == 0 { + _ = sender.SendActivity(buildDetectorActivity( + "no_script", + "ADMIN SCRIPT: No executable commands configured", + map[string]*plugin_pb.ConfigValue{ + "command_count": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(execCount)}, + }, + }, + )) + if err := sender.SendProposals(&plugin_pb.DetectionProposals{ + JobType: adminScriptJobType, + Proposals: []*plugin_pb.JobProposal{}, + HasMore: false, + }); err != nil { + return err + } + return sender.SendComplete(&plugin_pb.DetectionComplete{ + JobType: adminScriptJobType, + Success: true, + TotalProposals: 0, + }) + } + + proposal := buildAdminScriptProposal(script, scriptName, execCount) + proposals := []*plugin_pb.JobProposal{proposal} + hasMore := false + maxResults := int(request.MaxResults) + if maxResults > 0 && len(proposals) > maxResults { + proposals = proposals[:maxResults] + hasMore = true + } + + if err := sender.SendProposals(&plugin_pb.DetectionProposals{ + JobType: adminScriptJobType, + Proposals: proposals, + HasMore: hasMore, + }); err != nil { + return err + } + + return sender.SendComplete(&plugin_pb.DetectionComplete{ + JobType: adminScriptJobType, + Success: true, + TotalProposals: 1, + }) +} + +func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequest, sender ExecutionSender) error { + if request == nil || request.Job == nil { + return fmt.Errorf("execute job request is nil") + } + if sender == nil { + return fmt.Errorf("execution sender is nil") + } + if request.Job.JobType != "" && request.Job.JobType != adminScriptJobType { + return fmt.Errorf("job type %q is not handled by admin_script worker", request.Job.JobType) + } + + script := normalizeAdminScript(readStringConfig(request.Job.Parameters, "script", "")) + scriptName := strings.TrimSpace(readStringConfig(request.Job.Parameters, "script_name", "")) + if script == "" { + script = normalizeAdminScript(readStringConfig(request.GetAdminConfigValues(), "script", "")) + } + if scriptName == "" { + scriptName = strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "script_name", "")) + } + + commands := parseAdminScriptCommands(script) + execCommands := filterExecutableCommands(commands) + if len(execCommands) == 0 { + return sender.SendCompleted(&plugin_pb.JobCompleted{ + Success: false, + ErrorMessage: "no executable admin script commands configured", + }) + } + + commandEnv, cancel, err := h.buildAdminScriptCommandEnv(ctx, request.ClusterContext) + if err != nil { + return sender.SendCompleted(&plugin_pb.JobCompleted{ + Success: false, + ErrorMessage: err.Error(), + }) + } + defer cancel() + + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_ASSIGNED, + ProgressPercent: 0, + Stage: "assigned", + Message: "admin script job accepted", + Activities: []*plugin_pb.ActivityEvent{ + buildExecutorActivity("assigned", "admin script job accepted"), + }, + }); err != nil { + return err + } + + output := &limitedBuffer{maxBytes: maxAdminScriptOutputBytes} + executed := 0 + errorMessages := make([]string, 0) + executedCommands := make([]string, 0, len(execCommands)) + + for _, cmd := range execCommands { + if ctx.Err() != nil { + errorMessages = append(errorMessages, ctx.Err().Error()) + break + } + + commandLine := formatAdminScriptCommand(cmd) + executedCommands = append(executedCommands, commandLine) + _, _ = fmt.Fprintf(output, "$ %s\n", commandLine) + + found := false + for _, command := range shell.Commands { + if command.Name() != cmd.Name { + continue + } + found = true + if err := command.Do(cmd.Args, commandEnv, output); err != nil { + msg := fmt.Sprintf("%s: %v", cmd.Name, err) + errorMessages = append(errorMessages, msg) + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: percentProgress(executed+1, len(execCommands)), + Stage: "error", + Message: msg, + Activities: []*plugin_pb.ActivityEvent{ + buildExecutorActivity("error", msg), + }, + }) + } + break + } + + if !found { + msg := fmt.Sprintf("unknown admin command: %s", cmd.Name) + errorMessages = append(errorMessages, msg) + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: percentProgress(executed+1, len(execCommands)), + Stage: "error", + Message: msg, + Activities: []*plugin_pb.ActivityEvent{ + buildExecutorActivity("error", msg), + }, + }) + } + + executed++ + progress := percentProgress(executed, len(execCommands)) + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: progress, + Stage: "running", + Message: fmt.Sprintf("executed %d/%d command(s)", executed, len(execCommands)), + Activities: []*plugin_pb.ActivityEvent{ + buildExecutorActivity("running", commandLine), + }, + }) + } + + scriptHash := hashAdminScript(script) + resultSummary := fmt.Sprintf("admin script executed (%d command(s))", executed) + if scriptName != "" { + resultSummary = fmt.Sprintf("admin script %q executed (%d command(s))", scriptName, executed) + } + + outputValues := map[string]*plugin_pb.ConfigValue{ + "command_count": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(executed)}, + }, + "error_count": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(errorMessages))}, + }, + "script_hash": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptHash}, + }, + } + if scriptName != "" { + outputValues["script_name"] = &plugin_pb.ConfigValue{ + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptName}, + } + } + if len(executedCommands) > 0 { + outputValues["commands"] = &plugin_pb.ConfigValue{ + Kind: &plugin_pb.ConfigValue_StringList{ + StringList: &plugin_pb.StringList{Values: executedCommands}, + }, + } + } + if out := strings.TrimSpace(output.String()); out != "" { + outputValues["output"] = &plugin_pb.ConfigValue{ + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: out}, + } + } + if output.truncated { + outputValues["output_truncated"] = &plugin_pb.ConfigValue{ + Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}, + } + } + + success := len(errorMessages) == 0 && ctx.Err() == nil + errorMessage := "" + if !success { + errorMessage = strings.Join(errorMessages, "; ") + if ctx.Err() != nil { + if errorMessage == "" { + errorMessage = ctx.Err().Error() + } else { + errorMessage = fmt.Sprintf("%s; %s", errorMessage, ctx.Err().Error()) + } + } + } + + return sender.SendCompleted(&plugin_pb.JobCompleted{ + Success: success, + ErrorMessage: errorMessage, + Result: &plugin_pb.JobResult{ + Summary: resultSummary, + OutputValues: outputValues, + }, + Activities: []*plugin_pb.ActivityEvent{ + buildExecutorActivity("completed", resultSummary), + }, + CompletedAt: timestamppb.Now(), + }) +} + +func readAdminScriptRunIntervalMinutes(values map[string]*plugin_pb.ConfigValue) int { + runIntervalMinutes := int(readInt64Config(values, "run_interval_minutes", defaultAdminScriptRunMins)) + if runIntervalMinutes <= 0 { + return defaultAdminScriptRunMins + } + return runIntervalMinutes +} + +type adminScriptCommand struct { + Name string + Args []string + Raw string +} + +func normalizeAdminScript(script string) string { + script = strings.ReplaceAll(script, "\r\n", "\n") + return strings.TrimSpace(script) +} + +func parseAdminScriptCommands(script string) []adminScriptCommand { + script = normalizeAdminScript(script) + if script == "" { + return nil + } + lines := strings.Split(script, "\n") + commands := make([]adminScriptCommand, 0) + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + for _, chunk := range strings.Split(line, ";") { + chunk = strings.TrimSpace(chunk) + if chunk == "" { + continue + } + parts := adminScriptTokenRegex.FindAllString(chunk, -1) + if len(parts) == 0 { + continue + } + args := make([]string, 0, len(parts)-1) + for _, arg := range parts[1:] { + args = append(args, strings.Trim(arg, "\"'")) + } + commands = append(commands, adminScriptCommand{ + Name: strings.TrimSpace(parts[0]), + Args: args, + Raw: chunk, + }) + } + } + return commands +} + +func filterExecutableCommands(commands []adminScriptCommand) []adminScriptCommand { + exec := make([]adminScriptCommand, 0, len(commands)) + for _, cmd := range commands { + if cmd.Name == "" { + continue + } + if isAdminScriptLockCommand(cmd.Name) { + continue + } + exec = append(exec, cmd) + } + return exec +} + +func countExecutableCommands(commands []adminScriptCommand) int { + count := 0 + for _, cmd := range commands { + if cmd.Name == "" { + continue + } + if isAdminScriptLockCommand(cmd.Name) { + continue + } + count++ + } + return count +} + +func isAdminScriptLockCommand(name string) bool { + switch strings.ToLower(strings.TrimSpace(name)) { + case "lock", "unlock": + return true + default: + return false + } +} + +func buildAdminScriptProposal(script, scriptName string, commandCount int) *plugin_pb.JobProposal { + scriptHash := hashAdminScript(script) + summary := "Run admin script" + if scriptName != "" { + summary = fmt.Sprintf("Run admin script: %s", scriptName) + } + detail := fmt.Sprintf("Admin script with %d command(s)", commandCount) + proposalID := fmt.Sprintf("admin-script-%s-%d", scriptHash[:8], time.Now().UnixNano()) + + labels := map[string]string{ + "script_hash": scriptHash, + } + if scriptName != "" { + labels["script_name"] = scriptName + } + + return &plugin_pb.JobProposal{ + ProposalId: proposalID, + DedupeKey: "admin-script:" + scriptHash, + JobType: adminScriptJobType, + Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL, + Summary: summary, + Detail: detail, + Parameters: map[string]*plugin_pb.ConfigValue{ + "script": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: script}, + }, + "script_name": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptName}, + }, + "script_hash": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptHash}, + }, + "command_count": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(commandCount)}, + }, + }, + Labels: labels, + } +} + +func (h *AdminScriptHandler) buildAdminScriptCommandEnv( + ctx context.Context, + clusterContext *plugin_pb.ClusterContext, +) (*shell.CommandEnv, context.CancelFunc, error) { + if clusterContext == nil { + return nil, nil, fmt.Errorf("cluster context is required") + } + + masters := normalizeAddressList(clusterContext.MasterGrpcAddresses) + if len(masters) == 0 { + return nil, nil, fmt.Errorf("missing master addresses for admin script") + } + + filerGroup := "" + mastersValue := strings.Join(masters, ",") + options := shell.ShellOptions{ + Masters: &mastersValue, + GrpcDialOption: h.grpcDialOption, + FilerGroup: &filerGroup, + Directory: "/", + } + + filers := normalizeAddressList(clusterContext.FilerGrpcAddresses) + if len(filers) > 0 { + options.FilerAddress = pb.ServerAddress(filers[0]) + } else { + glog.V(1).Infof("admin script worker missing filer address; filer-dependent commands may fail") + } + + commandEnv := shell.NewCommandEnv(&options) + commandEnv.ForceNoLock() + + ctx, cancel := context.WithCancel(ctx) + go commandEnv.MasterClient.KeepConnectedToMaster(ctx) + + return commandEnv, cancel, nil +} + +func normalizeAddressList(addresses []string) []string { + normalized := make([]string, 0, len(addresses)) + seen := make(map[string]struct{}, len(addresses)) + for _, address := range addresses { + address = strings.TrimSpace(address) + if address == "" { + continue + } + if _, exists := seen[address]; exists { + continue + } + seen[address] = struct{}{} + normalized = append(normalized, address) + } + return normalized +} + +func hashAdminScript(script string) string { + sum := sha256.Sum256([]byte(script)) + return hex.EncodeToString(sum[:]) +} + +func formatAdminScriptCommand(cmd adminScriptCommand) string { + if len(cmd.Args) == 0 { + return cmd.Name + } + return fmt.Sprintf("%s %s", cmd.Name, strings.Join(cmd.Args, " ")) +} + +func percentProgress(done, total int) float64 { + if total <= 0 { + return 0 + } + if done < 0 { + done = 0 + } + if done > total { + done = total + } + return float64(done) / float64(total) * 100 +} + +type limitedBuffer struct { + buf bytes.Buffer + maxBytes int + truncated bool +} + +func (b *limitedBuffer) Write(p []byte) (int, error) { + if b == nil { + return len(p), nil + } + if b.maxBytes <= 0 { + b.truncated = true + return len(p), nil + } + remaining := b.maxBytes - b.buf.Len() + if remaining <= 0 { + b.truncated = true + return len(p), nil + } + if len(p) > remaining { + _, _ = b.buf.Write(p[:remaining]) + b.truncated = true + return len(p), nil + } + _, _ = b.buf.Write(p) + return len(p), nil +} + +func (b *limitedBuffer) String() string { + if b == nil { + return "" + } + return b.buf.String() +} diff --git a/weed/plugin/worker/admin_script_handler_test.go b/weed/plugin/worker/admin_script_handler_test.go new file mode 100644 index 000000000..7f2ab2236 --- /dev/null +++ b/weed/plugin/worker/admin_script_handler_test.go @@ -0,0 +1,100 @@ +package pluginworker + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestAdminScriptDescriptorDefaults(t *testing.T) { + descriptor := NewAdminScriptHandler(nil).Descriptor() + if descriptor == nil { + t.Fatalf("expected descriptor") + } + if descriptor.AdminRuntimeDefaults == nil { + t.Fatalf("expected admin runtime defaults") + } + if descriptor.AdminRuntimeDefaults.DetectionIntervalSeconds != adminScriptDetectTickSecs { + t.Fatalf("unexpected detection interval seconds: got=%d want=%d", + descriptor.AdminRuntimeDefaults.DetectionIntervalSeconds, adminScriptDetectTickSecs) + } + if descriptor.AdminConfigForm == nil { + t.Fatalf("expected admin config form") + } + runInterval := readInt64Config(descriptor.AdminConfigForm.DefaultValues, "run_interval_minutes", 0) + if runInterval != defaultAdminScriptRunMins { + t.Fatalf("unexpected run_interval_minutes default: got=%d want=%d", runInterval, defaultAdminScriptRunMins) + } + script := readStringConfig(descriptor.AdminConfigForm.DefaultValues, "script", "") + if strings.TrimSpace(script) == "" { + t.Fatalf("expected non-empty default script") + } +} + +func TestAdminScriptDetectSkipsByRunInterval(t *testing.T) { + handler := NewAdminScriptHandler(nil) + sender := &recordingDetectionSender{} + err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ + JobType: adminScriptJobType, + LastSuccessfulRun: timestamppb.New(time.Now().Add(-2 * time.Minute)), + AdminConfigValues: map[string]*plugin_pb.ConfigValue{ + "script": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultAdminScript}, + }, + "run_interval_minutes": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 17}, + }, + }, + }, sender) + if err != nil { + t.Fatalf("detect returned err = %v", err) + } + if sender.proposals == nil { + t.Fatalf("expected proposals message") + } + if len(sender.proposals.Proposals) != 0 { + t.Fatalf("expected zero proposals, got %d", len(sender.proposals.Proposals)) + } + if sender.complete == nil || !sender.complete.Success { + t.Fatalf("expected successful completion message") + } + if len(sender.events) == 0 { + t.Fatalf("expected detector activity events") + } + if !strings.Contains(sender.events[0].Message, "run interval") { + t.Fatalf("unexpected skip message: %q", sender.events[0].Message) + } +} + +func TestAdminScriptDetectCreatesProposalWhenIntervalElapsed(t *testing.T) { + handler := NewAdminScriptHandler(nil) + sender := &recordingDetectionSender{} + err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ + JobType: adminScriptJobType, + LastSuccessfulRun: timestamppb.New(time.Now().Add(-20 * time.Minute)), + AdminConfigValues: map[string]*plugin_pb.ConfigValue{ + "script": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultAdminScript}, + }, + "run_interval_minutes": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 17}, + }, + }, + }, sender) + if err != nil { + t.Fatalf("detect returned err = %v", err) + } + if sender.proposals == nil { + t.Fatalf("expected proposals message") + } + if len(sender.proposals.Proposals) != 1 { + t.Fatalf("expected one proposal, got %d", len(sender.proposals.Proposals)) + } + if sender.complete == nil || !sender.complete.Success || sender.complete.TotalProposals != 1 { + t.Fatalf("unexpected completion message: %+v", sender.complete) + } +} diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 681cf317b..0fdc2fa0b 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -2,9 +2,9 @@ package shell import ( "flag" - "fmt" "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/types" ) @@ -66,7 +66,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W } else { collections = append(collections, *collection) } - fmt.Printf("balanceEcVolumes collections %+v\n", len(collections)) + glog.V(1).Infof("balanceEcVolumes collections %+v\n", len(collections)) rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement) if err != nil { diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 6b575d41a..5af5b1c17 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -150,14 +150,14 @@ func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super if err != nil { return rp, err } - fmt.Printf("using replica placement %q for EC volumes\n", rp.String()) + glog.V(1).Infof("using replica placement %q for EC volumes\n", rp.String()) } else { // No replica placement argument provided, resolve from master default settings. rp, err = getDefaultReplicaPlacement(commandEnv) if err != nil { return rp, err } - fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String()) + glog.V(1).Infof("using master default replica placement %q for EC volumes\n", rp.String()) } return rp, nil @@ -1628,7 +1628,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic } if len(collections) == 0 { - fmt.Printf("WARNING: No collections to balance EC volumes across.\n") + glog.V(1).Infof("WARNING: No collections to balance EC volumes across.\n") } for _, c := range collections { if err = ecb.balanceEcVolumes(c); err != nil { diff --git a/weed/shell/command_sleep.go b/weed/shell/command_sleep.go new file mode 100644 index 000000000..2751f854c --- /dev/null +++ b/weed/shell/command_sleep.go @@ -0,0 +1,43 @@ +package shell + +import ( + "fmt" + "io" + "strconv" + "time" +) + +func init() { + Commands = append(Commands, &commandSleep{}) +} + +// =========== Sleep ============== +type commandSleep struct { +} + +func (c *commandSleep) Name() string { + return "sleep" +} + +func (c *commandSleep) Help() string { + return `sleep for N seconds (useful to simulate long running jobs) + + sleep 5 +` +} + +func (c *commandSleep) HasTag(CommandTag) bool { + return false +} + +func (c *commandSleep) Do(args []string, _ *CommandEnv, _ io.Writer) error { + if len(args) == 0 { + return fmt.Errorf("sleep requires a seconds argument") + } + seconds, err := strconv.Atoi(args[0]) + if err != nil || seconds <= 0 { + return fmt.Errorf("sleep duration must be a positive integer, got %q", args[0]) + } + time.Sleep(time.Duration(seconds) * time.Second) + return nil +} diff --git a/weed/shell/commands.go b/weed/shell/commands.go index d34204878..741dff6b0 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -38,6 +38,7 @@ type CommandEnv struct { option *ShellOptions locker *exclusive_locks.ExclusiveLocker noLock bool + forceNoLock bool verbose bool } @@ -71,6 +72,9 @@ func (ce *CommandEnv) isDirectory(path string) bool { func (ce *CommandEnv) confirmIsLocked(args []string) error { + if ce.noLock || ce.forceNoLock { + return nil + } if ce.locker.IsLocked() { return nil } @@ -80,11 +84,25 @@ func (ce *CommandEnv) confirmIsLocked(args []string) error { } +func (ce *CommandEnv) SetNoLock(noLock bool) { + if ce == nil { + return + } + ce.noLock = noLock +} + +func (ce *CommandEnv) ForceNoLock() { + if ce == nil { + return + } + ce.forceNoLock = true +} + func (ce *CommandEnv) isLocked() bool { if ce == nil { return true } - if ce.noLock { + if ce.noLock || ce.forceNoLock { return true } return ce.locker.IsLocked() diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 9e5cb0cec..724931f54 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -11,6 +11,8 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -26,6 +28,22 @@ type masterVolumeProvider struct { masterClient *MasterClient } +func isCanceledErr(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return true + } + if statusErr, ok := status.FromError(err); ok { + switch statusErr.Code() { + case codes.Canceled, codes.DeadlineExceeded: + return true + } + } + return false +} + // LookupVolumeIds queries the master for volume locations (fallback when cache misses) // Returns partial results with aggregated errors for volumes that failed func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { @@ -194,8 +212,13 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server resp, err := stream.Recv() if err != nil { - glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) - stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() + canceled := isCanceledErr(err) || ctx.Err() != nil + if canceled { + glog.V(1).Infof("%s.%s masterClient stream closed from %s: %v", mc.FilerGroup, mc.clientType, master, err) + } else { + glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() + } return err } @@ -219,8 +242,13 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server for { resp, err := stream.Recv() if err != nil { - glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) - stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() + canceled := isCanceledErr(err) || ctx.Err() != nil + if canceled { + glog.V(1).Infof("%s.%s masterClient stream closed from %s: %v", mc.FilerGroup, mc.clientType, master, err) + } else { + glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() + } return err } @@ -252,12 +280,20 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server mc.OnPeerUpdateLock.RUnlock() } if err := ctx.Err(); err != nil { - glog.V(0).Infof("Connection attempt to master stopped: %v", err) + if isCanceledErr(err) { + glog.V(1).Infof("Connection attempt to master stopped: %v", err) + } else { + glog.V(0).Infof("Connection attempt to master stopped: %v", err) + } return err } } }) if gprcErr != nil { + if isCanceledErr(gprcErr) || ctx.Err() != nil { + glog.V(1).Infof("%s.%s masterClient connection closed to %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr) + return nextHintedLeader + } stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc() glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr) } @@ -387,7 +423,11 @@ func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) { for { select { case <-ctx.Done(): - glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) + if isCanceledErr(ctx.Err()) { + glog.V(1).Infof("Connection to masters stopped: %v", ctx.Err()) + } else { + glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) + } return default: reconnectStart := time.Now()