Browse Source

fix: seed adjustments from existing pending/assigned tasks to prevent over-scheduling

Detection now calls ActiveTopology.GetTaskServerAdjustments() to
initialize the adjustments map with source/destination deltas from
existing pending and assigned balance tasks. This ensures
effectiveCounts reflects in-flight moves, preventing the algorithm
from planning additional moves in the same direction when prior
moves already address the imbalance.

Added GetTaskServerAdjustments(taskType) to ActiveTopology which
iterates pending and assigned tasks, decrementing source servers
and incrementing destination servers for the given task type.
pull/8559/head
Chris Lu 21 hours ago
parent
commit
be374f8fab
  1. 34
      weed/admin/topology/task_management.go
  2. 11
      weed/worker/tasks/balance/detection.go
  3. 9
      weed/worker/tasks/balance/detection_test.go

34
weed/admin/topology/task_management.go

@ -272,6 +272,40 @@ func (at *ActiveTopology) HasAnyTask(volumeID uint32) bool {
return at.HasTask(volumeID, TaskTypeNone)
}
// GetTaskServerAdjustments returns per-server volume count adjustments for
// pending and assigned tasks of the given type. For each task, source servers
// are decremented and destination servers are incremented, reflecting the
// projected volume distribution once in-flight tasks complete.
func (at *ActiveTopology) GetTaskServerAdjustments(taskType TaskType) map[string]int {
at.mutex.RLock()
defer at.mutex.RUnlock()
adjustments := make(map[string]int)
for _, task := range at.pendingTasks {
if task.TaskType != taskType {
continue
}
for _, src := range task.Sources {
adjustments[src.SourceServer]--
}
for _, dst := range task.Destinations {
adjustments[dst.TargetServer]++
}
}
for _, task := range at.assignedTasks {
if task.TaskType != taskType {
continue
}
for _, src := range task.Sources {
adjustments[src.SourceServer]--
}
for _, dst := range task.Destinations {
adjustments[dst.TargetServer]++
}
}
return adjustments
}
// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
switch taskType {

11
weed/worker/tasks/balance/detection.go

@ -106,8 +106,15 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
return nil, false
}
// Track effective adjustments as we plan moves in this detection run
adjustments := make(map[string]int)
// Seed adjustments from existing pending/assigned balance tasks so that
// effectiveCounts reflects in-flight moves and prevents over-scheduling.
var adjustments map[string]int
if clusterInfo.ActiveTopology != nil {
adjustments = clusterInfo.ActiveTopology.GetTaskServerAdjustments(topology.TaskTypeBalance)
}
if adjustments == nil {
adjustments = make(map[string]int)
}
// Servers where we can no longer find eligible volumes or plan destinations
exhaustedServers := make(map[string]bool)

9
weed/worker/tasks/balance/detection_test.go

@ -576,13 +576,14 @@ func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) {
}
}
// With 15 of 20 volumes on node-a already having pending tasks, only
// volumes 16-20 are eligible. Detection should produce at most 5 new tasks.
// With 15 pending A→B moves, effective counts are A=5, B=20.
// Detection sees B as overloaded and may plan moves from B (5 volumes).
// Should produce a reasonable number of tasks without over-scheduling.
if len(tasks) > 5 {
t.Errorf("Expected at most 5 new tasks (only 5 eligible volumes remain), got %d", len(tasks))
t.Errorf("Expected at most 5 new tasks, got %d", len(tasks))
}
if len(tasks) == 0 {
t.Errorf("Expected at least 1 new task since imbalance still exists with actual volume counts")
t.Errorf("Expected at least 1 new task since projected imbalance still exists")
}
assertNoDuplicateVolumes(t, tasks)

Loading…
Cancel
Save