diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 8649310eb..df1a61f58 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -111,7 +111,22 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics // Servers where we can no longer find eligible volumes or plan destinations exhaustedServers := make(map[string]bool) + // Sort servers for deterministic iteration and tie-breaking + sortedServers := make([]string, 0, len(serverVolumeCounts)) + for server := range serverVolumeCounts { + sortedServers = append(sortedServers, server) + } + sort.Strings(sortedServers) + + // Pre-index volumes by server with cursors to avoid O(maxResults * volumes) scanning + volumesByServer := make(map[string][]*types.VolumeHealthMetrics, len(serverVolumeCounts)) + for _, metric := range diskMetrics { + volumesByServer[metric.Server] = append(volumesByServer[metric.Server], metric) + } + serverCursors := make(map[string]int, len(serverVolumeCounts)) + var results []*types.TaskDetectionResult + balanced := false for len(results) < maxResults { // Compute effective volume counts with adjustments from planned moves @@ -132,7 +147,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics maxServer := "" minServer := "" - for server, count := range effectiveCounts { + for _, server := range sortedServers { + count := effectiveCounts[server] // Min is calculated across all servers for an accurate imbalance ratio if count < minVolumes { minVolumes = count @@ -164,21 +180,25 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics glog.Infof("BALANCE [%s]: Created %d task(s), cluster now balanced. Imbalance=%.1f%% (threshold=%.1f%%)", diskType, len(results), imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100) } + balanced = true break } - // Select a volume from the overloaded server for balance + // Select a volume from the overloaded server using per-server cursor var selectedVolume *types.VolumeHealthMetrics - for _, metric := range diskMetrics { - if metric.Server == maxServer { - // Skip volumes that already have a task in ActiveTopology - if clusterInfo.ActiveTopology != nil && clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { - continue - } - selectedVolume = metric - break + serverVols := volumesByServer[maxServer] + cursor := serverCursors[maxServer] + for cursor < len(serverVols) { + metric := serverVols[cursor] + cursor++ + // Skip volumes that already have a task in ActiveTopology + if clusterInfo.ActiveTopology != nil && clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { + continue } + selectedVolume = metric + break } + serverCursors[maxServer] = cursor if selectedVolume == nil { glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s, trying other servers", diskType, maxServer) @@ -203,8 +223,9 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } } - // Truncated if the loop exited because we hit the maxResults cap - return results, len(results) >= maxResults + // Truncated only if we hit maxResults and detection didn't naturally finish + truncated := len(results) >= maxResults && !balanced + return results, truncated } // createBalanceTask creates a single balance task for the selected volume. diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index ac477853f..8d6e5f19c 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -444,8 +444,9 @@ func TestDetection_RespectsMaxResults(t *testing.T) { ActiveTopology: at, } - // Request only 3 results - tasks, _, err := Detection(metrics, clusterInfo, conf, 3) + // Request only 3 results — there are enough volumes to produce more, + // so truncated should be true. + tasks, truncated, err := Detection(metrics, clusterInfo, conf, 3) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -453,6 +454,20 @@ func TestDetection_RespectsMaxResults(t *testing.T) { if len(tasks) != 3 { t.Errorf("Expected exactly 3 tasks (maxResults=3), got %d", len(tasks)) } + if !truncated { + t.Errorf("Expected truncated=true when maxResults caps results") + } + + // Verify truncated=false when detection finishes naturally (no cap) + at2 := createMockTopology(metrics...) + clusterInfo2 := &types.ClusterInfo{ActiveTopology: at2} + tasks2, truncated2, err := Detection(metrics, clusterInfo2, conf, 500) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + if truncated2 { + t.Errorf("Expected truncated=false when detection finishes naturally, got true (len=%d)", len(tasks2)) + } } // --- Complicated scenario tests ---