From 1e0003e28355f01c13db2112fedb1c8f7be45bb0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 2 Mar 2026 13:29:16 -0800 Subject: [PATCH] admin: add plugin lock coordination --- weed/admin/dash/admin_lock_manager.go | 87 +++++++++++++++++++++++++++ weed/admin/dash/admin_server.go | 38 ++++++++++++ weed/admin/dash/plugin_api.go | 8 +++ weed/admin/plugin/lock_manager.go | 7 +++ weed/admin/plugin/plugin.go | 10 +++ weed/admin/plugin/plugin_scheduler.go | 15 +++++ 6 files changed, 165 insertions(+) create mode 100644 weed/admin/dash/admin_lock_manager.go create mode 100644 weed/admin/plugin/lock_manager.go diff --git a/weed/admin/dash/admin_lock_manager.go b/weed/admin/dash/admin_lock_manager.go new file mode 100644 index 000000000..f84a72810 --- /dev/null +++ b/weed/admin/dash/admin_lock_manager.go @@ -0,0 +1,87 @@ +package dash + +import ( + "sync" + + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks" +) + +const ( + adminLockName = "shell" + adminLockClientName = "admin-plugin" +) + +// AdminLockManager coordinates exclusive admin locks with reference counting. +// It is safe for concurrent use. +type AdminLockManager struct { + locker *exclusive_locks.ExclusiveLocker + clientName string + + mu sync.Mutex + cond *sync.Cond + acquiring bool + holdCount int +} + +func NewAdminLockManager(masterClient *wdclient.MasterClient, clientName string) *AdminLockManager { + if masterClient == nil { + return nil + } + if clientName == "" { + clientName = adminLockClientName + } + manager := &AdminLockManager{ + locker: exclusive_locks.NewExclusiveLocker(masterClient, adminLockName), + clientName: clientName, + } + manager.cond = sync.NewCond(&manager.mu) + return manager +} + +func (m *AdminLockManager) Acquire(reason string) (func(), error) { + if m == nil || m.locker == nil { + return func() {}, nil + } + + m.mu.Lock() + if reason != "" { + m.locker.SetMessage(reason) + } + for m.acquiring { + m.cond.Wait() + } + if m.holdCount == 0 { + m.acquiring = true + m.mu.Unlock() + m.locker.RequestLock(m.clientName) + m.mu.Lock() + m.acquiring = false + m.holdCount = 1 + m.cond.Broadcast() + m.mu.Unlock() + return m.Release, nil + } + m.holdCount++ + m.mu.Unlock() + return m.Release, nil +} + +func (m *AdminLockManager) Release() { + if m == nil || m.locker == nil { + return + } + + m.mu.Lock() + if m.holdCount <= 0 { + m.mu.Unlock() + return + } + m.holdCount-- + shouldRelease := m.holdCount == 0 + m.mu.Unlock() + + if shouldRelease { + m.locker.ReleaseLock() + } +} diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 250bad1d4..23d13a8e7 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -98,6 +98,7 @@ type AdminServer struct { // Maintenance system maintenanceManager *maintenance.MaintenanceManager plugin *adminplugin.Plugin + pluginLock *AdminLockManager // Topic retention purger topicRetentionPurger *TopicRetentionPurger @@ -134,6 +135,8 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, ctx := context.Background() go masterClient.KeepConnectedToMaster(ctx) + lockManager := NewAdminLockManager(masterClient, adminLockClientName) + server := &AdminServer{ masterClient: masterClient, templateFS: templateFS, @@ -145,6 +148,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, collectionStatsCacheThreshold: defaultStatsCacheTimeout, s3TablesManager: newS3TablesManager(), icebergPort: icebergPort, + pluginLock: lockManager, } // Initialize topic retention purger @@ -228,6 +232,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) { return server.buildDefaultPluginClusterContext(), nil }, + LockManager: lockManager, }) if err != nil && dataDir != "" { glog.Warningf("Failed to initialize plugin with dataDir=%q: %v. Falling back to in-memory plugin state.", dataDir, err) @@ -236,6 +241,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) { return server.buildDefaultPluginClusterContext(), nil }, + LockManager: lockManager, }) } if err != nil { @@ -889,6 +895,13 @@ func (s *AdminServer) GetPlugin() *adminplugin.Plugin { return s.plugin } +func (s *AdminServer) acquirePluginLock(reason string) (func(), error) { + if s == nil || s.pluginLock == nil { + return func() {}, nil + } + return s.pluginLock.Acquire(reason) +} + // RequestPluginJobTypeDescriptor asks one worker for job type schema and returns the descriptor. func (s *AdminServer) RequestPluginJobTypeDescriptor(ctx context.Context, jobType string, forceRefresh bool) (*plugin_pb.JobTypeDescriptor, error) { if s.plugin == nil { @@ -931,6 +944,13 @@ func (s *AdminServer) RunPluginDetection( if s.plugin == nil { return nil, fmt.Errorf("plugin is not enabled") } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType)) + if err != nil { + return nil, err + } + if releaseLock != nil { + defer releaseLock() + } return s.plugin.RunDetection(ctx, jobType, clusterContext, maxResults) } @@ -956,6 +976,13 @@ func (s *AdminServer) RunPluginDetectionWithReport( if s.plugin == nil { return nil, fmt.Errorf("plugin is not enabled") } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType)) + if err != nil { + return nil, err + } + if releaseLock != nil { + defer releaseLock() + } return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults) } @@ -969,6 +996,17 @@ func (s *AdminServer) ExecutePluginJob( if s.plugin == nil { return nil, fmt.Errorf("plugin is not enabled") } + jobType := "" + if job != nil { + jobType = strings.TrimSpace(job.JobType) + } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin execution %s", jobType)) + if err != nil { + return nil, err + } + if releaseLock != nil { + defer releaseLock() + } return s.plugin.ExecuteJob(ctx, job, clusterContext, attempt) } diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 65197a3fd..e3b64fa02 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -413,6 +413,14 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request writeJSONError(w, http.StatusBadRequest, "jobType is required") return } + releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detect+execute %s", jobType)) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, err.Error()) + return + } + if releaseLock != nil { + defer releaseLock() + } var req struct { ClusterContext json.RawMessage `json:"cluster_context"` diff --git a/weed/admin/plugin/lock_manager.go b/weed/admin/plugin/lock_manager.go new file mode 100644 index 000000000..6ddb56d91 --- /dev/null +++ b/weed/admin/plugin/lock_manager.go @@ -0,0 +1,7 @@ +package plugin + +// LockManager provides a shared exclusive lock for admin-managed detection/execution. +// Acquire returns a release function that must be called when the protected work finishes. +type LockManager interface { + Acquire(reason string) (release func(), err error) +} diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index c1175b931..16a9ed3a2 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -32,6 +32,7 @@ type Options struct { SendTimeout time.Duration SchedulerTick time.Duration ClusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error) + LockManager LockManager } // JobTypeInfo contains metadata about a plugin job type. @@ -52,6 +53,7 @@ type Plugin struct { schedulerTick time.Duration clusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error) + lockManager LockManager schedulerMu sync.Mutex nextDetectionAt map[string]time.Time @@ -146,6 +148,7 @@ func New(options Options) (*Plugin, error) { sendTimeout: sendTimeout, schedulerTick: schedulerTick, clusterContextProvider: options.ClusterContextProvider, + lockManager: options.LockManager, sessions: make(map[string]*streamSession), pendingSchema: make(map[string]chan *plugin_pb.ConfigSchemaResponse), pendingDetection: make(map[string]*pendingDetectionState), @@ -380,6 +383,13 @@ func (r *Plugin) BaseDir() string { return r.store.BaseDir() } +func (r *Plugin) acquireAdminLock(reason string) (func(), error) { + if r == nil || r.lockManager == nil { + return func() {}, nil + } + return r.lockManager.Acquire(reason) +} + // RunDetectionWithReport requests one detector worker and returns proposals with request metadata. func (r *Plugin) RunDetectionWithReport( ctx context.Context, diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index e825e8069..77e6e213e 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -319,6 +319,21 @@ func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) { func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) { defer r.finishDetection(jobType) + releaseLock, lockErr := r.acquireAdminLock(fmt.Sprintf("plugin scheduled detection %s", jobType)) + if lockErr != nil { + r.appendActivity(JobActivity{ + JobType: jobType, + Source: "admin_scheduler", + Message: fmt.Sprintf("scheduled detection aborted: failed to acquire lock: %v", lockErr), + Stage: "failed", + OccurredAt: timeToPtr(time.Now().UTC()), + }) + return + } + if releaseLock != nil { + defer releaseLock() + } + start := time.Now().UTC() r.appendActivity(JobActivity{ JobType: jobType,