From 2a6828294a6303941a199342b1f930d8e78ac3a9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 15:06:47 -0700 Subject: [PATCH] fix: address PR review findings in balance detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - hasMore flag: compute from len(results) >= maxResults so the scheduler knows more pages may exist, matching vacuum/EC handler pattern - Exhausted server fallthrough: when no eligible volumes remain on the current maxServer (all have pending tasks) or destination planning fails, mark the server as exhausted and continue to the next overloaded server instead of stopping the entire detection loop - Return canonical destination server ID directly from createBalanceTask instead of resolving via findServerIDByAddress, eliminating the fragile address→ID lookup for adjustment tracking - Fix bestScore sentinel: use math.Inf(-1) instead of -1.0 so disks with negative scores (high pending load, same rack/DC) are still selected as the best available destination - Add TestDetection_ExhaustedServerFallsThrough covering the scenario where the top server's volumes are all blocked by pre-existing tasks --- weed/plugin/worker/volume_balance_handler.go | 2 +- weed/worker/tasks/balance/detection.go | 84 +++++++++----------- weed/worker/tasks/balance/detection_test.go | 69 ++++++++++++++++ 3 files changed, 108 insertions(+), 47 deletions(-) diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index bdfea0a98..3a39cd16f 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -236,7 +236,7 @@ func (h *VolumeBalanceHandler) Detect( glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } - hasMore := false + hasMore := len(results) >= maxResults proposals := make([]*plugin_pb.JobProposal, 0, len(results)) for _, result := range results { diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 1480c13ca..120741cb6 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -2,6 +2,7 @@ package balance import ( "fmt" + "math" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -69,6 +70,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics // Track effective adjustments as we plan moves in this detection run adjustments := make(map[string]int) + // Servers where we can no longer find eligible volumes or plan destinations + exhaustedServers := make(map[string]bool) var results []*types.TaskDetectionResult @@ -100,6 +103,9 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics minServer := "" for server, count := range effectiveCounts { + if exhaustedServers[server] { + continue + } if count > maxVolumes { maxVolumes = count maxServer = server @@ -109,6 +115,19 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics minServer = server } } + // Also consider exhausted servers for minVolumes (they still exist) + for server, count := range effectiveCounts { + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + if maxServer == "" { + // All servers exhausted + glog.V(1).Infof("BALANCE [%s]: All overloaded servers exhausted after %d task(s)", diskType, len(results)) + break + } // Check if imbalance exceeds threshold imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer @@ -137,62 +156,35 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } if selectedVolume == nil { - glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s", diskType, maxServer) - break + glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s, trying other servers", diskType, maxServer) + exhaustedServers[maxServer] = true + continue } // Plan destination and create task - task := createBalanceTask(diskType, selectedVolume, clusterInfo) + task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo) if task == nil { - break + glog.V(1).Infof("BALANCE [%s]: Cannot plan destination for server %s, trying other servers", diskType, maxServer) + exhaustedServers[maxServer] = true + continue } results = append(results, task) // Adjust effective counts for the next iteration adjustments[maxServer]-- - if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { - // Find the destination server ID from the planned task - destAddress := task.TypedParams.Targets[0].Node - destServer := findServerIDByAddress(diskMetrics, destAddress, clusterInfo) - if destServer != "" { - adjustments[destServer]++ - } + if destServerID != "" { + adjustments[destServerID]++ } } return results } -// findServerIDByAddress resolves a server address back to its server ID. -func findServerIDByAddress(diskMetrics []*types.VolumeHealthMetrics, address string, clusterInfo *types.ClusterInfo) string { - // Check metrics first for a direct match - for _, m := range diskMetrics { - if m.ServerAddress == address || m.Server == address { - return m.Server - } - } - // Fall back to topology lookup - if clusterInfo.ActiveTopology != nil { - topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() - if topologyInfo != nil { - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dn := range rack.DataNodeInfos { - if dn.Address == address || dn.Id == address { - return dn.Id - } - } - } - } - } - } - return "" -} - // createBalanceTask creates a single balance task for the selected volume. -// Returns nil if destination planning fails. -func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { +// Returns (nil, "") if destination planning fails. +// On success, returns the task result and the canonical destination server ID. +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) (*types.TaskDetectionResult, string) { taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().UnixNano()) task := &types.TaskDetectionResult{ @@ -210,19 +202,19 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric // Plan destination if ActiveTopology is available if clusterInfo.ActiveTopology == nil { glog.Warningf("No ActiveTopology available for destination planning in balance detection") - return nil + return nil, "" } // Check if ANY task already exists in ActiveTopology for this volume if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) - return nil + return nil, "" } destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) if err != nil { glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil + return nil, "" } // Find the actual disk containing the volume on the source server @@ -230,7 +222,7 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric if !found { glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - return nil + return nil, "" } // Update reason with full details now that we have destination info @@ -293,13 +285,13 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric }) if err != nil { glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) - return nil + return nil, "" } glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) - return task + return task, destinationPlan.TargetNode } // planBalanceDestination plans the destination for a balance operation @@ -338,7 +330,7 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol // Find the best destination disk based on balance criteria var bestDisk *topology.DiskInfo - bestScore := -1.0 + bestScore := math.Inf(-1) for _, disk := range availableDisks { // Ensure disk type matches diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 670dc5758..bbdc00ac5 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -751,3 +751,72 @@ func TestDetection_ConvergenceVerification(t *testing.T) { }) } } + +// TestDetection_ExhaustedServerFallsThrough verifies that when the most +// overloaded server has all its volumes blocked by pre-existing tasks, +// the algorithm falls through to the next overloaded server instead of stopping. +func TestDetection_ExhaustedServerFallsThrough(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + // node-a: 50 volumes, node-b: 40 volumes, node-c: 10 volumes + // avg = 33.3, imbalance = (50-10)/33.3 = 1.2 > 0.2 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 40)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...) + + at := buildTopology(servers, metrics) + + // Block ALL of node-a's volumes with pre-existing tasks + for i := 0; i < 50; i++ { + volID := uint32(1 + i) + err := at.AddPendingTask(topology.TaskSpec{ + TaskID: fmt.Sprintf("existing-%d", volID), + TaskType: topology.TaskTypeBalance, + VolumeID: volID, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "node-c", DiskID: 3}}, + }) + if err != nil { + t.Fatalf("AddPendingTask failed: %v", err) + } + } + + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // node-a is exhausted, but node-b (40 vols) vs node-c (10 vols) is still + // imbalanced. The algorithm should fall through and move from node-b. + if len(tasks) == 0 { + t.Fatal("Expected tasks from node-b after node-a was exhausted, got 0") + } + + for i, task := range tasks { + if task.Server == "node-a" { + t.Errorf("Task %d: should not move FROM node-a (all volumes blocked)", i) + } + } + + // Verify node-b is the source + hasNodeBSource := false + for _, task := range tasks { + if task.Server == "node-b" { + hasNodeBSource = true + break + } + } + if !hasNodeBSource { + t.Error("Expected node-b to be a source after node-a was exhausted") + } + + assertNoDuplicateVolumes(t, tasks) + t.Logf("Created %d tasks from node-b after node-a exhausted", len(tasks)) +}