Browse Source

fix: accurate truncation flag, deterministic server order, indexed volume lookup

- Track balanced flag to distinguish "hit maxResults cap" from "cluster
  balanced at exactly maxResults" — truncated is only true when there's
  genuinely more work to do
- Sort servers for deterministic iteration and tie-breaking when
  multiple servers have equal volume counts
- Pre-index volumes by server with per-server cursors to avoid
  O(maxResults * volumes) rescanning on each iteration
- Add truncation flag assertions to RespectsMaxResults test: true when
  capped, false when detection finishes naturally
pull/8559/head
Chris Lu 22 hours ago
parent
commit
56bd5e3110
  1. 45
      weed/worker/tasks/balance/detection.go
  2. 19
      weed/worker/tasks/balance/detection_test.go

45
weed/worker/tasks/balance/detection.go

@ -111,7 +111,22 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
// Servers where we can no longer find eligible volumes or plan destinations
exhaustedServers := make(map[string]bool)
// Sort servers for deterministic iteration and tie-breaking
sortedServers := make([]string, 0, len(serverVolumeCounts))
for server := range serverVolumeCounts {
sortedServers = append(sortedServers, server)
}
sort.Strings(sortedServers)
// Pre-index volumes by server with cursors to avoid O(maxResults * volumes) scanning
volumesByServer := make(map[string][]*types.VolumeHealthMetrics, len(serverVolumeCounts))
for _, metric := range diskMetrics {
volumesByServer[metric.Server] = append(volumesByServer[metric.Server], metric)
}
serverCursors := make(map[string]int, len(serverVolumeCounts))
var results []*types.TaskDetectionResult
balanced := false
for len(results) < maxResults {
// Compute effective volume counts with adjustments from planned moves
@ -132,7 +147,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
maxServer := ""
minServer := ""
for server, count := range effectiveCounts {
for _, server := range sortedServers {
count := effectiveCounts[server]
// Min is calculated across all servers for an accurate imbalance ratio
if count < minVolumes {
minVolumes = count
@ -164,21 +180,25 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
glog.Infof("BALANCE [%s]: Created %d task(s), cluster now balanced. Imbalance=%.1f%% (threshold=%.1f%%)",
diskType, len(results), imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100)
}
balanced = true
break
}
// Select a volume from the overloaded server for balance
// Select a volume from the overloaded server using per-server cursor
var selectedVolume *types.VolumeHealthMetrics
for _, metric := range diskMetrics {
if metric.Server == maxServer {
// Skip volumes that already have a task in ActiveTopology
if clusterInfo.ActiveTopology != nil && clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) {
continue
}
selectedVolume = metric
break
serverVols := volumesByServer[maxServer]
cursor := serverCursors[maxServer]
for cursor < len(serverVols) {
metric := serverVols[cursor]
cursor++
// Skip volumes that already have a task in ActiveTopology
if clusterInfo.ActiveTopology != nil && clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) {
continue
}
selectedVolume = metric
break
}
serverCursors[maxServer] = cursor
if selectedVolume == nil {
glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s, trying other servers", diskType, maxServer)
@ -203,8 +223,9 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
}
}
// Truncated if the loop exited because we hit the maxResults cap
return results, len(results) >= maxResults
// Truncated only if we hit maxResults and detection didn't naturally finish
truncated := len(results) >= maxResults && !balanced
return results, truncated
}
// createBalanceTask creates a single balance task for the selected volume.

19
weed/worker/tasks/balance/detection_test.go

@ -444,8 +444,9 @@ func TestDetection_RespectsMaxResults(t *testing.T) {
ActiveTopology: at,
}
// Request only 3 results
tasks, _, err := Detection(metrics, clusterInfo, conf, 3)
// Request only 3 results — there are enough volumes to produce more,
// so truncated should be true.
tasks, truncated, err := Detection(metrics, clusterInfo, conf, 3)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -453,6 +454,20 @@ func TestDetection_RespectsMaxResults(t *testing.T) {
if len(tasks) != 3 {
t.Errorf("Expected exactly 3 tasks (maxResults=3), got %d", len(tasks))
}
if !truncated {
t.Errorf("Expected truncated=true when maxResults caps results")
}
// Verify truncated=false when detection finishes naturally (no cap)
at2 := createMockTopology(metrics...)
clusterInfo2 := &types.ClusterInfo{ActiveTopology: at2}
tasks2, truncated2, err := Detection(metrics, clusterInfo2, conf, 500)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if truncated2 {
t.Errorf("Expected truncated=false when detection finishes naturally, got true (len=%d)", len(tasks2))
}
}
// --- Complicated scenario tests ---

Loading…
Cancel
Save