diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index bdfea0a98..3a39cd16f 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -236,7 +236,7 @@ func (h *VolumeBalanceHandler) Detect( glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } - hasMore := false + hasMore := len(results) >= maxResults proposals := make([]*plugin_pb.JobProposal, 0, len(results)) for _, result := range results { diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 1480c13ca..120741cb6 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -2,6 +2,7 @@ package balance import ( "fmt" + "math" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -69,6 +70,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics // Track effective adjustments as we plan moves in this detection run adjustments := make(map[string]int) + // Servers where we can no longer find eligible volumes or plan destinations + exhaustedServers := make(map[string]bool) var results []*types.TaskDetectionResult @@ -100,6 +103,9 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics minServer := "" for server, count := range effectiveCounts { + if exhaustedServers[server] { + continue + } if count > maxVolumes { maxVolumes = count maxServer = server @@ -109,6 +115,19 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics minServer = server } } + // Also consider exhausted servers for minVolumes (they still exist) + for server, count := range effectiveCounts { + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + if maxServer == "" { + // All servers exhausted + glog.V(1).Infof("BALANCE [%s]: All overloaded servers exhausted after %d task(s)", diskType, len(results)) + break + } // Check if imbalance exceeds threshold imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer @@ -137,62 +156,35 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } if selectedVolume == nil { - glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s", diskType, maxServer) - break + glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s, trying other servers", diskType, maxServer) + exhaustedServers[maxServer] = true + continue } // Plan destination and create task - task := createBalanceTask(diskType, selectedVolume, clusterInfo) + task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo) if task == nil { - break + glog.V(1).Infof("BALANCE [%s]: Cannot plan destination for server %s, trying other servers", diskType, maxServer) + exhaustedServers[maxServer] = true + continue } results = append(results, task) // Adjust effective counts for the next iteration adjustments[maxServer]-- - if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { - // Find the destination server ID from the planned task - destAddress := task.TypedParams.Targets[0].Node - destServer := findServerIDByAddress(diskMetrics, destAddress, clusterInfo) - if destServer != "" { - adjustments[destServer]++ - } + if destServerID != "" { + adjustments[destServerID]++ } } return results } -// findServerIDByAddress resolves a server address back to its server ID. -func findServerIDByAddress(diskMetrics []*types.VolumeHealthMetrics, address string, clusterInfo *types.ClusterInfo) string { - // Check metrics first for a direct match - for _, m := range diskMetrics { - if m.ServerAddress == address || m.Server == address { - return m.Server - } - } - // Fall back to topology lookup - if clusterInfo.ActiveTopology != nil { - topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() - if topologyInfo != nil { - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dn := range rack.DataNodeInfos { - if dn.Address == address || dn.Id == address { - return dn.Id - } - } - } - } - } - } - return "" -} - // createBalanceTask creates a single balance task for the selected volume. -// Returns nil if destination planning fails. -func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { +// Returns (nil, "") if destination planning fails. +// On success, returns the task result and the canonical destination server ID. +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) (*types.TaskDetectionResult, string) { taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().UnixNano()) task := &types.TaskDetectionResult{ @@ -210,19 +202,19 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric // Plan destination if ActiveTopology is available if clusterInfo.ActiveTopology == nil { glog.Warningf("No ActiveTopology available for destination planning in balance detection") - return nil + return nil, "" } // Check if ANY task already exists in ActiveTopology for this volume if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) - return nil + return nil, "" } destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) if err != nil { glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil + return nil, "" } // Find the actual disk containing the volume on the source server @@ -230,7 +222,7 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric if !found { glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - return nil + return nil, "" } // Update reason with full details now that we have destination info @@ -293,13 +285,13 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric }) if err != nil { glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) - return nil + return nil, "" } glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) - return task + return task, destinationPlan.TargetNode } // planBalanceDestination plans the destination for a balance operation @@ -338,7 +330,7 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol // Find the best destination disk based on balance criteria var bestDisk *topology.DiskInfo - bestScore := -1.0 + bestScore := math.Inf(-1) for _, disk := range availableDisks { // Ensure disk type matches diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 670dc5758..bbdc00ac5 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -751,3 +751,72 @@ func TestDetection_ConvergenceVerification(t *testing.T) { }) } } + +// TestDetection_ExhaustedServerFallsThrough verifies that when the most +// overloaded server has all its volumes blocked by pre-existing tasks, +// the algorithm falls through to the next overloaded server instead of stopping. +func TestDetection_ExhaustedServerFallsThrough(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + // node-a: 50 volumes, node-b: 40 volumes, node-c: 10 volumes + // avg = 33.3, imbalance = (50-10)/33.3 = 1.2 > 0.2 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 40)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...) + + at := buildTopology(servers, metrics) + + // Block ALL of node-a's volumes with pre-existing tasks + for i := 0; i < 50; i++ { + volID := uint32(1 + i) + err := at.AddPendingTask(topology.TaskSpec{ + TaskID: fmt.Sprintf("existing-%d", volID), + TaskType: topology.TaskTypeBalance, + VolumeID: volID, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "node-c", DiskID: 3}}, + }) + if err != nil { + t.Fatalf("AddPendingTask failed: %v", err) + } + } + + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // node-a is exhausted, but node-b (40 vols) vs node-c (10 vols) is still + // imbalanced. The algorithm should fall through and move from node-b. + if len(tasks) == 0 { + t.Fatal("Expected tasks from node-b after node-a was exhausted, got 0") + } + + for i, task := range tasks { + if task.Server == "node-a" { + t.Errorf("Task %d: should not move FROM node-a (all volumes blocked)", i) + } + } + + // Verify node-b is the source + hasNodeBSource := false + for _, task := range tasks { + if task.Server == "node-b" { + hasNodeBSource = true + break + } + } + if !hasNodeBSource { + t.Error("Expected node-b to be a source after node-a was exhausted") + } + + assertNoDuplicateVolumes(t, tasks) + t.Logf("Created %d tasks from node-b after node-a exhausted", len(tasks)) +}