diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 27a16f7b9..248ca8985 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -106,6 +107,14 @@ func (r *Plugin) schedulerLoop() { // runLaneSchedulerIteration runs one scheduling pass for a single lane, // processing only the job types assigned to that lane. +// +// For lanes that require a lock (e.g. LaneDefault), all job types are +// processed sequentially under one admin lock because their volume +// management operations share global state. +// +// For lanes that do not require a lock (e.g. LaneIceberg, LaneLifecycle), +// each job type runs independently in its own goroutine so they do not +// block each other. func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool { r.expireStaleJobs(time.Now().UTC()) @@ -122,21 +131,23 @@ func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool { return false } - r.setLaneLoopState(ls, "", "waiting_for_lock") - lockName := fmt.Sprintf("plugin scheduler:%s", ls.lane) - releaseLock, err := r.acquireAdminLock(lockName) - if err != nil { - glog.Warningf("Plugin scheduler [%s] failed to acquire lock: %v", ls.lane, err) - r.setLaneLoopState(ls, "", "idle") - return false - } - if releaseLock != nil { - defer releaseLock() + if LaneRequiresLock(ls.lane) { + return r.runLaneSchedulerIterationLocked(ls, jobTypes) } + return r.runLaneSchedulerIterationConcurrent(ls, jobTypes) +} - active := make(map[string]struct{}, len(jobTypes)) - hadJobs := false +// dueJobType pairs a job type with its resolved scheduling policy. +type dueJobType struct { + jobType string + policy schedulerPolicy +} +// collectDueJobTypes loads policies for all job types in the lane and +// returns those whose detection interval has elapsed. It also returns +// the full set of active job type names for later pruning. +func (r *Plugin) collectDueJobTypes(ls *schedulerLaneState, jobTypes []string) (active map[string]struct{}, due []dueJobType) { + active = make(map[string]struct{}, len(jobTypes)) for _, jobType := range jobTypes { active[jobType] = struct{}{} @@ -156,9 +167,31 @@ func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool { if !r.markDetectionDue(jobType, policy.DetectionInterval, initialDelay) { continue } + due = append(due, dueJobType{jobType: jobType, policy: policy}) + } + return active, due +} - detected := r.runJobTypeIteration(jobType, policy) - if detected { +// runLaneSchedulerIterationLocked processes job types sequentially under a +// single admin lock. Used by the default lane where volume management +// operations must be serialised. +func (r *Plugin) runLaneSchedulerIterationLocked(ls *schedulerLaneState, jobTypes []string) bool { + r.setLaneLoopState(ls, "", "waiting_for_lock") + lockName := fmt.Sprintf("plugin scheduler:%s", ls.lane) + releaseLock, err := r.acquireAdminLock(lockName) + if err != nil { + glog.Warningf("Plugin scheduler [%s] failed to acquire lock: %v", ls.lane, err) + r.setLaneLoopState(ls, "", "idle") + return false + } + if releaseLock != nil { + defer releaseLock() + } + + active, due := r.collectDueJobTypes(ls, jobTypes) + hadJobs := false + for _, w := range due { + if r.runJobTypeIteration(w.jobType, w.policy) { hadJobs = true } } @@ -169,6 +202,33 @@ func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool { return hadJobs } +// runLaneSchedulerIterationConcurrent processes each job type in its own +// goroutine so they run independently. Used by lanes (e.g. iceberg, +// lifecycle) whose job types do not share global state. +func (r *Plugin) runLaneSchedulerIterationConcurrent(ls *schedulerLaneState, jobTypes []string) bool { + active, due := r.collectDueJobTypes(ls, jobTypes) + + r.setLaneLoopState(ls, "", "busy") + + var hadJobs atomic.Bool + var wg sync.WaitGroup + for _, w := range due { + wg.Add(1) + go func(jobType string, policy schedulerPolicy) { + defer wg.Done() + if r.runJobTypeIteration(jobType, policy) { + hadJobs.Store(true) + } + }(w.jobType, w.policy) + } + wg.Wait() + + r.pruneSchedulerState(active) + r.pruneDetectorLeases(active) + r.setLaneLoopState(ls, "", "idle") + return hadJobs.Load() +} + // runSchedulerIteration is kept for backward compatibility. It runs a // single iteration across ALL job types (equivalent to the old single-loop // behavior). It is only used by the legacy schedulerLoop() fallback. @@ -267,7 +327,7 @@ func (r *Plugin) wakeScheduler() { func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) bool { r.recordSchedulerRunStart(jobType) r.clearWaitingJobQueue(jobType) - r.setSchedulerLoopState(jobType, "detecting") + r.setSchedulerLoopStateForJobType(jobType, "detecting") r.markJobTypeInFlight(jobType) defer r.finishDetection(jobType) @@ -399,7 +459,7 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo return detected } - r.setSchedulerLoopState(jobType, "executing") + r.setSchedulerLoopStateForJobType(jobType, "executing") // Scan proposals for the maximum estimated_runtime_seconds so the // execution phase gets enough time for large jobs (e.g. vacuum on diff --git a/weed/admin/plugin/plugin_scheduler_test.go b/weed/admin/plugin/plugin_scheduler_test.go index 171c5a33f..9ac853163 100644 --- a/weed/admin/plugin/plugin_scheduler_test.go +++ b/weed/admin/plugin/plugin_scheduler_test.go @@ -3,6 +3,7 @@ package plugin import ( "context" "fmt" + "sync" "testing" "time" @@ -594,3 +595,81 @@ func TestPickDetectorReassignsWhenLeaseIsStale(t *testing.T) { t.Fatalf("expected detector lease to be updated to worker-a, got=%s", lease) } } + +// trackingLockManager records whether Acquire was called and how many times. +type trackingLockManager struct { + mu sync.Mutex + acquired int +} + +func (m *trackingLockManager) Acquire(reason string) (func(), error) { + m.mu.Lock() + m.acquired++ + m.mu.Unlock() + return func() {}, nil +} + +func (m *trackingLockManager) count() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.acquired +} + +func TestRunLaneSchedulerIterationLockBehavior(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + lane SchedulerLane + jobType string + wantLock bool + }{ + {"Default", LaneDefault, "vacuum", true}, + {"Iceberg", LaneIceberg, "iceberg_maintenance", false}, + {"Lifecycle", LaneLifecycle, "s3_lifecycle", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + lm := &trackingLockManager{} + pluginSvc, err := New(Options{ + LockManager: lm, + ClusterContextProvider: func(context.Context) (*plugin_pb.ClusterContext, error) { + return &plugin_pb.ClusterContext{}, nil + }, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer pluginSvc.Shutdown() + + // Register a detectable worker for the job type. + pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: "worker-a", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: tt.jobType, CanDetect: true}, + }, + }) + + // Enable the job type so the scheduler picks it up. + err = pluginSvc.SaveJobTypeConfig(&plugin_pb.PersistedJobTypeConfig{ + JobType: tt.jobType, + AdminRuntime: &plugin_pb.AdminRuntimeConfig{ + Enabled: true, + DetectionIntervalSeconds: 1, + }, + }) + if err != nil { + t.Fatalf("SaveJobTypeConfig: %v", err) + } + + ls := pluginSvc.lanes[tt.lane] + pluginSvc.runLaneSchedulerIteration(ls) + + if got := lm.count(); (got > 0) != tt.wantLock { + t.Errorf("lock acquired %d times, wantLock=%v", got, tt.wantLock) + } + }) + } +} diff --git a/weed/admin/plugin/scheduler_lane.go b/weed/admin/plugin/scheduler_lane.go index efa808e73..d5ca4a208 100644 --- a/weed/admin/plugin/scheduler_lane.go +++ b/weed/admin/plugin/scheduler_lane.go @@ -38,6 +38,25 @@ var laneIdleSleep = map[SchedulerLane]time.Duration{ LaneLifecycle: 5 * time.Minute, } +// laneRequiresLock maps each lane to whether its job types must be +// serialised under a single admin lock. The default lane needs this +// because volume-management operations share global state. Other +// lanes run each job type independently. +var laneRequiresLock = map[SchedulerLane]bool{ + LaneDefault: true, + LaneIceberg: false, + LaneLifecycle: false, +} + +// LaneRequiresLock returns true if the given lane needs a single admin +// lock to serialise its job types. Unknown lanes default to true. +func LaneRequiresLock(lane SchedulerLane) bool { + if v, ok := laneRequiresLock[lane]; ok { + return v + } + return true +} + // LaneIdleSleep returns the idle sleep duration for the given lane, // falling back to defaultSchedulerIdleSleep if the lane is unknown. func LaneIdleSleep(lane SchedulerLane) time.Duration { diff --git a/weed/admin/plugin/scheduler_lane_test.go b/weed/admin/plugin/scheduler_lane_test.go index 6217e7f22..1bdd19436 100644 --- a/weed/admin/plugin/scheduler_lane_test.go +++ b/weed/admin/plugin/scheduler_lane_test.go @@ -27,6 +27,26 @@ func TestAllLanesHaveIdleSleep(t *testing.T) { } } +func TestLaneRequiresLock(t *testing.T) { + tests := []struct { + name string + lane SchedulerLane + want bool + }{ + {"Default", LaneDefault, true}, + {"Iceberg", LaneIceberg, false}, + {"Lifecycle", LaneLifecycle, false}, + {"Unknown", "unknown_lane", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := LaneRequiresLock(tt.lane); got != tt.want { + t.Errorf("LaneRequiresLock(%q) = %v, want %v", tt.lane, got, tt.want) + } + }) + } +} + func TestKnownJobTypesInMap(t *testing.T) { // Ensure the well-known job types are mapped. This catches drift // if a handler's job type string changes without updating the map. diff --git a/weed/admin/plugin/scheduler_status.go b/weed/admin/plugin/scheduler_status.go index 1406a2f7f..19de4ea2e 100644 --- a/weed/admin/plugin/scheduler_status.go +++ b/weed/admin/plugin/scheduler_status.go @@ -195,6 +195,21 @@ func (r *Plugin) setSchedulerLoopState(jobType, phase string) { r.schedulerLoopMu.Unlock() } +// setSchedulerLoopStateForJobType keeps the aggregate scheduler state and the +// owning lane state in sync while a specific job type is active. +func (r *Plugin) setSchedulerLoopStateForJobType(jobType, phase string) { + if r == nil { + return + } + r.setSchedulerLoopState(jobType, phase) + if jobType == "" { + return + } + if ls := r.lanes[JobTypeLane(jobType)]; ls != nil { + r.setLaneLoopState(ls, jobType, phase) + } +} + func (r *Plugin) recordSchedulerIterationComplete(hadJobs bool) { if r == nil { return @@ -251,7 +266,6 @@ func (r *Plugin) aggregateLaneLoopStates() schedulerLoopState { return agg } - // --- Per-lane loop state helpers --- func (r *Plugin) setLaneLoopState(ls *schedulerLaneState, jobType, phase string) { diff --git a/weed/admin/plugin/scheduler_status_test.go b/weed/admin/plugin/scheduler_status_test.go index b1cb7de70..c2f175087 100644 --- a/weed/admin/plugin/scheduler_status_test.go +++ b/weed/admin/plugin/scheduler_status_test.go @@ -1,7 +1,9 @@ package plugin import ( + "context" "testing" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" ) @@ -62,3 +64,84 @@ func TestGetSchedulerStatusIncludesLastDetectionCount(t *testing.T) { t.Fatalf("expected job type status for %s", jobType) } } + +func TestGetLaneSchedulerStatusShowsActiveConcurrentLaneWork(t *testing.T) { + clusterContextStarted := make(chan struct{}) + releaseClusterContext := make(chan struct{}) + + // Create the Plugin without a ClusterContextProvider so no background + // scheduler goroutines are started; they would race with the direct + // runJobTypeIteration call below. + pluginSvc, err := New(Options{}) + if err != nil { + t.Fatalf("New: %v", err) + } + defer pluginSvc.Shutdown() + + // Set the provider after construction so runJobTypeIteration can use it. + pluginSvc.clusterContextProvider = func(context.Context) (*plugin_pb.ClusterContext, error) { + close(clusterContextStarted) + <-releaseClusterContext + return nil, context.Canceled + } + + const jobType = "s3_lifecycle" + err = pluginSvc.SaveJobTypeConfig(&plugin_pb.PersistedJobTypeConfig{ + JobType: jobType, + AdminRuntime: &plugin_pb.AdminRuntimeConfig{ + Enabled: true, + DetectionIntervalSeconds: 30, + DetectionTimeoutSeconds: 15, + }, + }) + if err != nil { + t.Fatalf("SaveJobTypeConfig: %v", err) + } + + policy, enabled, err := pluginSvc.loadSchedulerPolicy(jobType) + if err != nil { + t.Fatalf("loadSchedulerPolicy: %v", err) + } + if !enabled { + t.Fatalf("expected enabled policy") + } + + done := make(chan struct{}) + go func() { + defer close(done) + pluginSvc.runJobTypeIteration(jobType, policy) + }() + + select { + case <-clusterContextStarted: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for job type iteration to start") + } + + var laneStatus SchedulerStatus + var aggregateStatus SchedulerStatus + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + laneStatus = pluginSvc.GetLaneSchedulerStatus(LaneLifecycle) + aggregateStatus = pluginSvc.GetSchedulerStatus() + if laneStatus.CurrentJobType == jobType && laneStatus.CurrentPhase == "detecting" && + aggregateStatus.CurrentJobType == jobType && aggregateStatus.CurrentPhase == "detecting" { + break + } + time.Sleep(10 * time.Millisecond) + } + + if laneStatus.CurrentJobType != jobType || laneStatus.CurrentPhase != "detecting" { + t.Fatalf("unexpected lane status while work is active: job=%q phase=%q", laneStatus.CurrentJobType, laneStatus.CurrentPhase) + } + if aggregateStatus.CurrentJobType != jobType || aggregateStatus.CurrentPhase != "detecting" { + t.Fatalf("unexpected aggregate status while work is active: job=%q phase=%q", aggregateStatus.CurrentJobType, aggregateStatus.CurrentPhase) + } + + close(releaseClusterContext) + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for job type iteration to finish") + } +}