Browse Source

admin: add plugin lock coordination

pull/8491/head
Chris Lu 2 days ago
parent
commit
1e0003e283
  1. 87
      weed/admin/dash/admin_lock_manager.go
  2. 38
      weed/admin/dash/admin_server.go
  3. 8
      weed/admin/dash/plugin_api.go
  4. 7
      weed/admin/plugin/lock_manager.go
  5. 10
      weed/admin/plugin/plugin.go
  6. 15
      weed/admin/plugin/plugin_scheduler.go

87
weed/admin/dash/admin_lock_manager.go

@ -0,0 +1,87 @@
package dash
import (
"sync"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
)
const (
adminLockName = "shell"
adminLockClientName = "admin-plugin"
)
// AdminLockManager coordinates exclusive admin locks with reference counting.
// It is safe for concurrent use.
type AdminLockManager struct {
locker *exclusive_locks.ExclusiveLocker
clientName string
mu sync.Mutex
cond *sync.Cond
acquiring bool
holdCount int
}
func NewAdminLockManager(masterClient *wdclient.MasterClient, clientName string) *AdminLockManager {
if masterClient == nil {
return nil
}
if clientName == "" {
clientName = adminLockClientName
}
manager := &AdminLockManager{
locker: exclusive_locks.NewExclusiveLocker(masterClient, adminLockName),
clientName: clientName,
}
manager.cond = sync.NewCond(&manager.mu)
return manager
}
func (m *AdminLockManager) Acquire(reason string) (func(), error) {
if m == nil || m.locker == nil {
return func() {}, nil
}
m.mu.Lock()
if reason != "" {
m.locker.SetMessage(reason)
}
for m.acquiring {
m.cond.Wait()
}
if m.holdCount == 0 {
m.acquiring = true
m.mu.Unlock()
m.locker.RequestLock(m.clientName)
m.mu.Lock()
m.acquiring = false
m.holdCount = 1
m.cond.Broadcast()
m.mu.Unlock()
return m.Release, nil
}
m.holdCount++
m.mu.Unlock()
return m.Release, nil
}
func (m *AdminLockManager) Release() {
if m == nil || m.locker == nil {
return
}
m.mu.Lock()
if m.holdCount <= 0 {
m.mu.Unlock()
return
}
m.holdCount--
shouldRelease := m.holdCount == 0
m.mu.Unlock()
if shouldRelease {
m.locker.ReleaseLock()
}
}

38
weed/admin/dash/admin_server.go

@ -98,6 +98,7 @@ type AdminServer struct {
// Maintenance system
maintenanceManager *maintenance.MaintenanceManager
plugin *adminplugin.Plugin
pluginLock *AdminLockManager
// Topic retention purger
topicRetentionPurger *TopicRetentionPurger
@ -134,6 +135,8 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
ctx := context.Background()
go masterClient.KeepConnectedToMaster(ctx)
lockManager := NewAdminLockManager(masterClient, adminLockClientName)
server := &AdminServer{
masterClient: masterClient,
templateFS: templateFS,
@ -145,6 +148,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
collectionStatsCacheThreshold: defaultStatsCacheTimeout,
s3TablesManager: newS3TablesManager(),
icebergPort: icebergPort,
pluginLock: lockManager,
}
// Initialize topic retention purger
@ -228,6 +232,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) {
return server.buildDefaultPluginClusterContext(), nil
},
LockManager: lockManager,
})
if err != nil && dataDir != "" {
glog.Warningf("Failed to initialize plugin with dataDir=%q: %v. Falling back to in-memory plugin state.", dataDir, err)
@ -236,6 +241,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) {
return server.buildDefaultPluginClusterContext(), nil
},
LockManager: lockManager,
})
}
if err != nil {
@ -889,6 +895,13 @@ func (s *AdminServer) GetPlugin() *adminplugin.Plugin {
return s.plugin
}
func (s *AdminServer) acquirePluginLock(reason string) (func(), error) {
if s == nil || s.pluginLock == nil {
return func() {}, nil
}
return s.pluginLock.Acquire(reason)
}
// RequestPluginJobTypeDescriptor asks one worker for job type schema and returns the descriptor.
func (s *AdminServer) RequestPluginJobTypeDescriptor(ctx context.Context, jobType string, forceRefresh bool) (*plugin_pb.JobTypeDescriptor, error) {
if s.plugin == nil {
@ -931,6 +944,13 @@ func (s *AdminServer) RunPluginDetection(
if s.plugin == nil {
return nil, fmt.Errorf("plugin is not enabled")
}
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
if err != nil {
return nil, err
}
if releaseLock != nil {
defer releaseLock()
}
return s.plugin.RunDetection(ctx, jobType, clusterContext, maxResults)
}
@ -956,6 +976,13 @@ func (s *AdminServer) RunPluginDetectionWithReport(
if s.plugin == nil {
return nil, fmt.Errorf("plugin is not enabled")
}
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
if err != nil {
return nil, err
}
if releaseLock != nil {
defer releaseLock()
}
return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults)
}
@ -969,6 +996,17 @@ func (s *AdminServer) ExecutePluginJob(
if s.plugin == nil {
return nil, fmt.Errorf("plugin is not enabled")
}
jobType := ""
if job != nil {
jobType = strings.TrimSpace(job.JobType)
}
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin execution %s", jobType))
if err != nil {
return nil, err
}
if releaseLock != nil {
defer releaseLock()
}
return s.plugin.ExecuteJob(ctx, job, clusterContext, attempt)
}

8
weed/admin/dash/plugin_api.go

@ -413,6 +413,14 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request
writeJSONError(w, http.StatusBadRequest, "jobType is required")
return
}
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detect+execute %s", jobType))
if err != nil {
writeJSONError(w, http.StatusInternalServerError, err.Error())
return
}
if releaseLock != nil {
defer releaseLock()
}
var req struct {
ClusterContext json.RawMessage `json:"cluster_context"`

7
weed/admin/plugin/lock_manager.go

@ -0,0 +1,7 @@
package plugin
// LockManager provides a shared exclusive lock for admin-managed detection/execution.
// Acquire returns a release function that must be called when the protected work finishes.
type LockManager interface {
Acquire(reason string) (release func(), err error)
}

10
weed/admin/plugin/plugin.go

@ -32,6 +32,7 @@ type Options struct {
SendTimeout time.Duration
SchedulerTick time.Duration
ClusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error)
LockManager LockManager
}
// JobTypeInfo contains metadata about a plugin job type.
@ -52,6 +53,7 @@ type Plugin struct {
schedulerTick time.Duration
clusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error)
lockManager LockManager
schedulerMu sync.Mutex
nextDetectionAt map[string]time.Time
@ -146,6 +148,7 @@ func New(options Options) (*Plugin, error) {
sendTimeout: sendTimeout,
schedulerTick: schedulerTick,
clusterContextProvider: options.ClusterContextProvider,
lockManager: options.LockManager,
sessions: make(map[string]*streamSession),
pendingSchema: make(map[string]chan *plugin_pb.ConfigSchemaResponse),
pendingDetection: make(map[string]*pendingDetectionState),
@ -380,6 +383,13 @@ func (r *Plugin) BaseDir() string {
return r.store.BaseDir()
}
func (r *Plugin) acquireAdminLock(reason string) (func(), error) {
if r == nil || r.lockManager == nil {
return func() {}, nil
}
return r.lockManager.Acquire(reason)
}
// RunDetectionWithReport requests one detector worker and returns proposals with request metadata.
func (r *Plugin) RunDetectionWithReport(
ctx context.Context,

15
weed/admin/plugin/plugin_scheduler.go

@ -319,6 +319,21 @@ func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) {
func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) {
defer r.finishDetection(jobType)
releaseLock, lockErr := r.acquireAdminLock(fmt.Sprintf("plugin scheduled detection %s", jobType))
if lockErr != nil {
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection aborted: failed to acquire lock: %v", lockErr),
Stage: "failed",
OccurredAt: timeToPtr(time.Now().UTC()),
})
return
}
if releaseLock != nil {
defer releaseLock()
}
start := time.Now().UTC()
r.appendActivity(JobActivity{
JobType: jobType,

Loading…
Cancel
Save