Browse Source

fix: address PR review findings in balance detection

- hasMore flag: compute from len(results) >= maxResults so the scheduler
  knows more pages may exist, matching vacuum/EC handler pattern
- Exhausted server fallthrough: when no eligible volumes remain on the
  current maxServer (all have pending tasks) or destination planning
  fails, mark the server as exhausted and continue to the next
  overloaded server instead of stopping the entire detection loop
- Return canonical destination server ID directly from createBalanceTask
  instead of resolving via findServerIDByAddress, eliminating the
  fragile address→ID lookup for adjustment tracking
- Fix bestScore sentinel: use math.Inf(-1) instead of -1.0 so disks
  with negative scores (high pending load, same rack/DC) are still
  selected as the best available destination
- Add TestDetection_ExhaustedServerFallsThrough covering the scenario
  where the top server's volumes are all blocked by pre-existing tasks
pull/8559/head
Chris Lu 2 days ago
parent
commit
2a6828294a
  1. 2
      weed/plugin/worker/volume_balance_handler.go
  2. 84
      weed/worker/tasks/balance/detection.go
  3. 69
      weed/worker/tasks/balance/detection_test.go

2
weed/plugin/worker/volume_balance_handler.go

@ -236,7 +236,7 @@ func (h *VolumeBalanceHandler) Detect(
glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr)
}
hasMore := false
hasMore := len(results) >= maxResults
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {

84
weed/worker/tasks/balance/detection.go

@ -2,6 +2,7 @@ package balance
import (
"fmt"
"math"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
@ -69,6 +70,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
// Track effective adjustments as we plan moves in this detection run
adjustments := make(map[string]int)
// Servers where we can no longer find eligible volumes or plan destinations
exhaustedServers := make(map[string]bool)
var results []*types.TaskDetectionResult
@ -100,6 +103,9 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
minServer := ""
for server, count := range effectiveCounts {
if exhaustedServers[server] {
continue
}
if count > maxVolumes {
maxVolumes = count
maxServer = server
@ -109,6 +115,19 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
minServer = server
}
}
// Also consider exhausted servers for minVolumes (they still exist)
for server, count := range effectiveCounts {
if count < minVolumes {
minVolumes = count
minServer = server
}
}
if maxServer == "" {
// All servers exhausted
glog.V(1).Infof("BALANCE [%s]: All overloaded servers exhausted after %d task(s)", diskType, len(results))
break
}
// Check if imbalance exceeds threshold
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
@ -137,62 +156,35 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
}
if selectedVolume == nil {
glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s", diskType, maxServer)
break
glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s, trying other servers", diskType, maxServer)
exhaustedServers[maxServer] = true
continue
}
// Plan destination and create task
task := createBalanceTask(diskType, selectedVolume, clusterInfo)
task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo)
if task == nil {
break
glog.V(1).Infof("BALANCE [%s]: Cannot plan destination for server %s, trying other servers", diskType, maxServer)
exhaustedServers[maxServer] = true
continue
}
results = append(results, task)
// Adjust effective counts for the next iteration
adjustments[maxServer]--
if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 {
// Find the destination server ID from the planned task
destAddress := task.TypedParams.Targets[0].Node
destServer := findServerIDByAddress(diskMetrics, destAddress, clusterInfo)
if destServer != "" {
adjustments[destServer]++
}
if destServerID != "" {
adjustments[destServerID]++
}
}
return results
}
// findServerIDByAddress resolves a server address back to its server ID.
func findServerIDByAddress(diskMetrics []*types.VolumeHealthMetrics, address string, clusterInfo *types.ClusterInfo) string {
// Check metrics first for a direct match
for _, m := range diskMetrics {
if m.ServerAddress == address || m.Server == address {
return m.Server
}
}
// Fall back to topology lookup
if clusterInfo.ActiveTopology != nil {
topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo()
if topologyInfo != nil {
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dn := range rack.DataNodeInfos {
if dn.Address == address || dn.Id == address {
return dn.Id
}
}
}
}
}
}
return ""
}
// createBalanceTask creates a single balance task for the selected volume.
// Returns nil if destination planning fails.
func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult {
// Returns (nil, "") if destination planning fails.
// On success, returns the task result and the canonical destination server ID.
func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) (*types.TaskDetectionResult, string) {
taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().UnixNano())
task := &types.TaskDetectionResult{
@ -210,19 +202,19 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric
// Plan destination if ActiveTopology is available
if clusterInfo.ActiveTopology == nil {
glog.Warningf("No ActiveTopology available for destination planning in balance detection")
return nil
return nil, ""
}
// Check if ANY task already exists in ActiveTopology for this volume
if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) {
glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID)
return nil
return nil, ""
}
destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume)
if err != nil {
glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err)
return nil
return nil, ""
}
// Find the actual disk containing the volume on the source server
@ -230,7 +222,7 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric
if !found {
glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
return nil
return nil, ""
}
// Update reason with full details now that we have destination info
@ -293,13 +285,13 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric
})
if err != nil {
glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err)
return nil
return nil, ""
}
glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
return task
return task, destinationPlan.TargetNode
}
// planBalanceDestination plans the destination for a balance operation
@ -338,7 +330,7 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol
// Find the best destination disk based on balance criteria
var bestDisk *topology.DiskInfo
bestScore := -1.0
bestScore := math.Inf(-1)
for _, disk := range availableDisks {
// Ensure disk type matches

69
weed/worker/tasks/balance/detection_test.go

@ -751,3 +751,72 @@ func TestDetection_ConvergenceVerification(t *testing.T) {
})
}
}
// TestDetection_ExhaustedServerFallsThrough verifies that when the most
// overloaded server has all its volumes blocked by pre-existing tasks,
// the algorithm falls through to the next overloaded server instead of stopping.
func TestDetection_ExhaustedServerFallsThrough(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"},
}
// node-a: 50 volumes, node-b: 40 volumes, node-c: 10 volumes
// avg = 33.3, imbalance = (50-10)/33.3 = 1.2 > 0.2
var metrics []*types.VolumeHealthMetrics
metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...)
metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 40)...)
metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...)
at := buildTopology(servers, metrics)
// Block ALL of node-a's volumes with pre-existing tasks
for i := 0; i < 50; i++ {
volID := uint32(1 + i)
err := at.AddPendingTask(topology.TaskSpec{
TaskID: fmt.Sprintf("existing-%d", volID),
TaskType: topology.TaskTypeBalance,
VolumeID: volID,
VolumeSize: 1024,
Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}},
Destinations: []topology.TaskDestinationSpec{{ServerID: "node-c", DiskID: 3}},
})
if err != nil {
t.Fatalf("AddPendingTask failed: %v", err)
}
}
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
// node-a is exhausted, but node-b (40 vols) vs node-c (10 vols) is still
// imbalanced. The algorithm should fall through and move from node-b.
if len(tasks) == 0 {
t.Fatal("Expected tasks from node-b after node-a was exhausted, got 0")
}
for i, task := range tasks {
if task.Server == "node-a" {
t.Errorf("Task %d: should not move FROM node-a (all volumes blocked)", i)
}
}
// Verify node-b is the source
hasNodeBSource := false
for _, task := range tasks {
if task.Server == "node-b" {
hasNodeBSource = true
break
}
}
if !hasNodeBSource {
t.Error("Expected node-b to be a source after node-a was exhausted")
}
assertNoDuplicateVolumes(t, tasks)
t.Logf("Created %d tasks from node-b after node-a exhausted", len(tasks))
}
Loading…
Cancel
Save