From be374f8fab92e864bf8158400bf37fd17e868e4f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 21:18:40 -0700 Subject: [PATCH] fix: seed adjustments from existing pending/assigned tasks to prevent over-scheduling Detection now calls ActiveTopology.GetTaskServerAdjustments() to initialize the adjustments map with source/destination deltas from existing pending and assigned balance tasks. This ensures effectiveCounts reflects in-flight moves, preventing the algorithm from planning additional moves in the same direction when prior moves already address the imbalance. Added GetTaskServerAdjustments(taskType) to ActiveTopology which iterates pending and assigned tasks, decrementing source servers and incrementing destination servers for the given task type. --- weed/admin/topology/task_management.go | 34 +++++++++++++++++++++ weed/worker/tasks/balance/detection.go | 11 +++++-- weed/worker/tasks/balance/detection_test.go | 9 +++--- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index 3710b06ab..5fad4eda6 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -272,6 +272,40 @@ func (at *ActiveTopology) HasAnyTask(volumeID uint32) bool { return at.HasTask(volumeID, TaskTypeNone) } +// GetTaskServerAdjustments returns per-server volume count adjustments for +// pending and assigned tasks of the given type. For each task, source servers +// are decremented and destination servers are incremented, reflecting the +// projected volume distribution once in-flight tasks complete. +func (at *ActiveTopology) GetTaskServerAdjustments(taskType TaskType) map[string]int { + at.mutex.RLock() + defer at.mutex.RUnlock() + + adjustments := make(map[string]int) + for _, task := range at.pendingTasks { + if task.TaskType != taskType { + continue + } + for _, src := range task.Sources { + adjustments[src.SourceServer]-- + } + for _, dst := range task.Destinations { + adjustments[dst.TargetServer]++ + } + } + for _, task := range at.assignedTasks { + if task.TaskType != taskType { + continue + } + for _, src := range task.Sources { + adjustments[src.SourceServer]-- + } + for _, dst := range task.Destinations { + adjustments[dst.TargetServer]++ + } + } + return adjustments +} + // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { switch taskType { diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 71643d5e6..313ff1ca9 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -106,8 +106,15 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics return nil, false } - // Track effective adjustments as we plan moves in this detection run - adjustments := make(map[string]int) + // Seed adjustments from existing pending/assigned balance tasks so that + // effectiveCounts reflects in-flight moves and prevents over-scheduling. + var adjustments map[string]int + if clusterInfo.ActiveTopology != nil { + adjustments = clusterInfo.ActiveTopology.GetTaskServerAdjustments(topology.TaskTypeBalance) + } + if adjustments == nil { + adjustments = make(map[string]int) + } // Servers where we can no longer find eligible volumes or plan destinations exhaustedServers := make(map[string]bool) diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 57b3e9340..344c63f04 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -576,13 +576,14 @@ func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) { } } - // With 15 of 20 volumes on node-a already having pending tasks, only - // volumes 16-20 are eligible. Detection should produce at most 5 new tasks. + // With 15 pending A→B moves, effective counts are A=5, B=20. + // Detection sees B as overloaded and may plan moves from B (5 volumes). + // Should produce a reasonable number of tasks without over-scheduling. if len(tasks) > 5 { - t.Errorf("Expected at most 5 new tasks (only 5 eligible volumes remain), got %d", len(tasks)) + t.Errorf("Expected at most 5 new tasks, got %d", len(tasks)) } if len(tasks) == 0 { - t.Errorf("Expected at least 1 new task since imbalance still exists with actual volume counts") + t.Errorf("Expected at least 1 new task since projected imbalance still exists") } assertNoDuplicateVolumes(t, tasks)