diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 92aaa5366..8a25a09cf 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -225,16 +225,11 @@ func (h *VolumeBalanceHandler) Detect( clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} maxResults := int(request.MaxResults) - results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) + results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) if err != nil { return err } - // Detection is stateful (registers planned moves in ActiveTopology), so we - // cannot over-fetch to probe for truncation. Instead, hitting the exact - // limit signals that more work may exist. - hasMore := maxResults > 0 && len(results) >= maxResults - if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil { glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 3af8d4b23..76f81fb56 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -17,12 +17,14 @@ import ( // Detection implements the detection logic for balance tasks. // maxResults limits how many balance operations are returned per invocation. // A non-positive maxResults means no explicit limit (uses a large default). -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, error) { +// The returned truncated flag is true when detection stopped because it hit +// maxResults rather than running out of work. +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, bool, error) { if !config.IsEnabled() { - return nil, nil + return nil, false, nil } if clusterInfo == nil { - return nil, nil + return nil, false, nil } balanceConfig := config.(*Config) @@ -46,28 +48,33 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI sort.Strings(diskTypes) var allParams []*types.TaskDetectionResult + truncated := false for _, diskType := range diskTypes { remaining := maxResults - len(allParams) if remaining <= 0 { + truncated = true break } - tasks := detectForDiskType(diskType, volumesByDiskType[diskType], balanceConfig, clusterInfo, remaining) + tasks, diskTruncated := detectForDiskType(diskType, volumesByDiskType[diskType], balanceConfig, clusterInfo, remaining) allParams = append(allParams, tasks...) + if diskTruncated { + truncated = true + } } - return allParams, nil + return allParams, truncated, nil } // detectForDiskType performs balance detection for a specific disk type, -// returning up to maxResults balance tasks. -func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) []*types.TaskDetectionResult { +// returning up to maxResults balance tasks and whether it was truncated by the limit. +func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) ([]*types.TaskDetectionResult, bool) { // Skip if cluster segment is too small minVolumeCount := 2 // More reasonable for small clusters if len(diskMetrics) < minVolumeCount { // Only log at verbose level to avoid spamming for small/empty disk types glog.V(1).Infof("BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)", diskType, len(diskMetrics), minVolumeCount) - return nil + return nil, false } // Analyze volume distribution across servers. @@ -96,7 +103,7 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics if len(serverVolumeCounts) < balanceConfig.MinServerCount { glog.V(1).Infof("BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)", diskType, len(serverVolumeCounts), balanceConfig.MinServerCount) - return nil + return nil, false } // Track effective adjustments as we plan moves in this detection run @@ -209,7 +216,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } } - return results + // Truncated if the loop exited because we hit the maxResults cap + return results, len(results) >= maxResults } // createBalanceTask creates a single balance task for the selected volume. diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 1fba288c9..f75d87b31 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -311,7 +311,7 @@ func TestDetection_MixedDiskTypes(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf, 100) + tasks, _, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -371,7 +371,7 @@ func TestDetection_ImbalancedDiskType(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf, 100) + tasks, _, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -445,7 +445,7 @@ func TestDetection_RespectsMaxResults(t *testing.T) { } // Request only 3 results - tasks, err := Detection(metrics, clusterInfo, conf, 3) + tasks, _, err := Detection(metrics, clusterInfo, conf, 3) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -475,7 +475,7 @@ func TestDetection_ThreeServers_ConvergesToBalance(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -548,7 +548,7 @@ func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) { } clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -579,7 +579,7 @@ func TestDetection_NoDuplicateVolumesAcrossIterations(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 200) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 200) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -612,7 +612,7 @@ func TestDetection_ThreeServers_MaxServerShifts(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -661,7 +661,7 @@ func TestDetection_FourServers_DestinationSpreading(t *testing.T) { at := buildTopology(servers, metrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -723,7 +723,7 @@ func TestDetection_ConvergenceVerification(t *testing.T) { conf := defaultConf() conf.ImbalanceThreshold = tt.threshold - tasks, err := Detection(metrics, clusterInfo, conf, 500) + tasks, _, err := Detection(metrics, clusterInfo, conf, 500) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -795,7 +795,7 @@ func TestDetection_ExhaustedServerFallsThrough(t *testing.T) { } clusterInfo := &types.ClusterInfo{ActiveTopology: at} - tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) if err != nil { t.Fatalf("Detection failed: %v", err) } diff --git a/weed/worker/tasks/balance/register.go b/weed/worker/tasks/balance/register.go index cf9c7e1b8..ecd0216eb 100644 --- a/weed/worker/tasks/balance/register.go +++ b/weed/worker/tasks/balance/register.go @@ -59,7 +59,8 @@ func RegisterBalanceTask() { ), nil }, DetectionFunc: func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { - return Detection(metrics, info, config, 0) + results, _, err := Detection(metrics, info, config, 0) + return results, err }, ScanInterval: 30 * time.Minute, SchedulingFunc: Scheduling,