diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index 3710b06ab..5fad4eda6 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -272,6 +272,40 @@ func (at *ActiveTopology) HasAnyTask(volumeID uint32) bool { return at.HasTask(volumeID, TaskTypeNone) } +// GetTaskServerAdjustments returns per-server volume count adjustments for +// pending and assigned tasks of the given type. For each task, source servers +// are decremented and destination servers are incremented, reflecting the +// projected volume distribution once in-flight tasks complete. +func (at *ActiveTopology) GetTaskServerAdjustments(taskType TaskType) map[string]int { + at.mutex.RLock() + defer at.mutex.RUnlock() + + adjustments := make(map[string]int) + for _, task := range at.pendingTasks { + if task.TaskType != taskType { + continue + } + for _, src := range task.Sources { + adjustments[src.SourceServer]-- + } + for _, dst := range task.Destinations { + adjustments[dst.TargetServer]++ + } + } + for _, task := range at.assignedTasks { + if task.TaskType != taskType { + continue + } + for _, src := range task.Sources { + adjustments[src.SourceServer]-- + } + for _, dst := range task.Destinations { + adjustments[dst.TargetServer]++ + } + } + return adjustments +} + // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { switch taskType { diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 71643d5e6..313ff1ca9 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -106,8 +106,15 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics return nil, false } - // Track effective adjustments as we plan moves in this detection run - adjustments := make(map[string]int) + // Seed adjustments from existing pending/assigned balance tasks so that + // effectiveCounts reflects in-flight moves and prevents over-scheduling. + var adjustments map[string]int + if clusterInfo.ActiveTopology != nil { + adjustments = clusterInfo.ActiveTopology.GetTaskServerAdjustments(topology.TaskTypeBalance) + } + if adjustments == nil { + adjustments = make(map[string]int) + } // Servers where we can no longer find eligible volumes or plan destinations exhaustedServers := make(map[string]bool) diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 57b3e9340..344c63f04 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -576,13 +576,14 @@ func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) { } } - // With 15 of 20 volumes on node-a already having pending tasks, only - // volumes 16-20 are eligible. Detection should produce at most 5 new tasks. + // With 15 pending A→B moves, effective counts are A=5, B=20. + // Detection sees B as overloaded and may plan moves from B (5 volumes). + // Should produce a reasonable number of tasks without over-scheduling. if len(tasks) > 5 { - t.Errorf("Expected at most 5 new tasks (only 5 eligible volumes remain), got %d", len(tasks)) + t.Errorf("Expected at most 5 new tasks, got %d", len(tasks)) } if len(tasks) == 0 { - t.Errorf("Expected at least 1 new task since imbalance still exists with actual volume counts") + t.Errorf("Expected at least 1 new task since projected imbalance still exists") } assertNoDuplicateVolumes(t, tasks)