From d35dd2a3b5e9fbb39f86e6d68d22f218efa870d8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 20:10:27 -0700 Subject: [PATCH] fix: return explicit truncated flag from Detection instead of approximating Detection now returns (results, truncated, error) where truncated is true only when the loop stopped because it hit maxResults, not when it ran out of work naturally. This eliminates false hasMore signals when detection happens to produce exactly maxResults results by resolving the imbalance. --- weed/plugin/worker/volume_balance_handler.go | 7 +---- weed/worker/tasks/balance/detection.go | 28 +++++++++++++------- weed/worker/tasks/balance/detection_test.go | 20 +++++++------- weed/worker/tasks/balance/register.go | 3 ++- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 92aaa5366..8a25a09cf 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -225,16 +225,11 @@ func (h *VolumeBalanceHandler) Detect( clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} maxResults := int(request.MaxResults) - results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) + results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) if err != nil { return err } - // Detection is stateful (registers planned moves in ActiveTopology), so we - // cannot over-fetch to probe for truncation. Instead, hitting the exact - // limit signals that more work may exist. - hasMore := maxResults > 0 && len(results) >= maxResults - if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil { glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 3af8d4b23..76f81fb56 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -17,12 +17,14 @@ import ( // Detection implements the detection logic for balance tasks. // maxResults limits how many balance operations are returned per invocation. // A non-positive maxResults means no explicit limit (uses a large default). -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, error) { +// The returned truncated flag is true when detection stopped because it hit +// maxResults rather than running out of work. +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, bool, error) { if !config.IsEnabled() { - return nil, nil + return nil, false, nil } if clusterInfo == nil { - return nil, nil + return nil, false, nil } balanceConfig := config.(*Config) @@ -46,28 +48,33 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI sort.Strings(diskTypes) var allParams []*types.TaskDetectionResult + truncated := false for _, diskType := range diskTypes { remaining := maxResults - len(allParams) if remaining <= 0 { + truncated = true break } - tasks := detectForDiskType(diskType, volumesByDiskType[diskType], balanceConfig, clusterInfo, remaining) + tasks, diskTruncated := detectForDiskType(diskType, volumesByDiskType[diskType], balanceConfig, clusterInfo, remaining) allParams = append(allParams, tasks...) + if diskTruncated { + truncated = true + } } - return allParams, nil + return allParams, truncated, nil } // detectForDiskType performs balance detection for a specific disk type, -// returning up to maxResults balance tasks. -func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) []*types.TaskDetectionResult { +// returning up to maxResults balance tasks and whether it was truncated by the limit. +func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) ([]*types.TaskDetectionResult, bool) { // Skip if cluster segment is too small minVolumeCount := 2 // More reasonable for small clusters if len(diskMetrics) < minVolumeCount { // Only log at verbose level to avoid spamming for small/empty disk types glog.V(1).Infof("BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)", diskType, len(diskMetrics), minVolumeCount) - return nil + return nil, false } // Analyze volume distribution across servers. @@ -96,7 +103,7 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics if len(serverVolumeCounts) < balanceConfig.MinServerCount { glog.V(1).Infof("BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)", diskType, len(serverVolumeCounts), balanceConfig.MinServerCount) - return nil + return nil, false } // Track effective adjustments as we plan moves in this detection run @@ -209,7 +216,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } } - return results + // Truncated if the loop exited because we hit the maxResults cap + return results, len(results) >= maxResults } // createBalanceTask creates a single balance task for the selected volume. diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 1fba288c9..f75d87b31 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -311,7 +311,7 @@ func TestDetection_MixedDiskTypes(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf, 100) + tasks, _, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -371,7 +371,7 @@ func TestDetection_ImbalancedDiskType(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf, 100) + tasks, _, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -445,7 +445,7 @@ func TestDetection_RespectsMaxResults(t *testing.T) { } // Request only 3 results - tasks, err := Detection(metrics, clusterInfo, conf, 3) + tasks, _, err := Detection(metrics, clusterInfo, conf, 3) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -475,7 +475,7 @@ func TestDetection_ThreeServers_ConvergesToBalance(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -548,7 +548,7 @@ func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) { } clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -579,7 +579,7 @@ func TestDetection_NoDuplicateVolumesAcrossIterations(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 200) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 200) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -612,7 +612,7 @@ func TestDetection_ThreeServers_MaxServerShifts(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -661,7 +661,7 @@ func TestDetection_FourServers_DestinationSpreading(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -723,7 +723,7 @@ func TestDetection_ConvergenceVerification(t *testing.T) { conf := defaultConf() conf.ImbalanceThreshold = tt.threshold - tasks, err := Detection(metrics, clusterInfo, conf, 500) + tasks, _, err := Detection(metrics, clusterInfo, conf, 500) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -795,7 +795,7 @@ func TestDetection_ExhaustedServerFallsThrough(t *testing.T) { } clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } diff --git a/weed/worker/tasks/balance/register.go b/weed/worker/tasks/balance/register.go index cf9c7e1b8..ecd0216eb 100644 --- a/weed/worker/tasks/balance/register.go +++ b/weed/worker/tasks/balance/register.go @@ -59,7 +59,8 @@ func RegisterBalanceTask() { ), nil }, DetectionFunc: func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - return Detection(metrics, info, config, 0) + results, _, err := Detection(metrics, info, config, 0) + return results, err }, ScanInterval: 30 * time.Minute, SchedulingFunc: Scheduling,