From 33da75cfa8a50f2e1d38734c4d62097348958f69 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 15:57:34 -0700 Subject: [PATCH] fix: handle zero maxResults as no-cap, emit trace after trim, seed empty servers - When MaxResults is 0 (omitted), treat as no explicit cap instead of defaulting to 1; only apply the +1 over-fetch probe when caller supplies a positive limit - Move decision trace emission after hasMore/trim so the trace accurately reflects the returned proposals - Seed serverVolumeCounts from ActiveTopology so servers that have a matching disk type but zero volumes are included in the imbalance calculation and MinServerCount check --- weed/plugin/worker/volume_balance_handler.go | 14 +++++++----- weed/worker/tasks/balance/detection.go | 23 ++++++++++++++++++-- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 5fd7962d9..55e9972f2 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -225,16 +225,14 @@ func (h *VolumeBalanceHandler) Detect( clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} maxResults := int(request.MaxResults) - if maxResults <= 0 { - maxResults = 1 + detectionLimit := maxResults + if maxResults > 0 { + detectionLimit = maxResults + 1 // over-fetch by 1 to detect truncation } - results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults+1) + results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, detectionLimit) if err != nil { return err } - if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil { - glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) - } hasMore := false if maxResults > 0 && len(results) > maxResults { @@ -242,6 +240,10 @@ func (h *VolumeBalanceHandler) Detect( results = results[:maxResults] } + if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil { + glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) + } + proposals := make([]*plugin_pb.JobProposal, 0, len(results)) for _, result := range results { proposal, proposalErr := buildVolumeBalanceProposal(result) diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 9ed9ddf45..4e8c878fa 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -15,6 +15,7 @@ import ( // Detection implements the detection logic for balance tasks. // maxResults limits how many balance operations are returned per invocation. +// A non-positive maxResults means no explicit limit (uses a large default). func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, error) { if !config.IsEnabled() { return nil, nil @@ -23,7 +24,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI balanceConfig := config.(*Config) if maxResults <= 0 { - maxResults = 1 + maxResults = math.MaxInt32 } // Group volumes by disk type to ensure we compare apples to apples @@ -57,8 +58,26 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics return nil } - // Analyze volume distribution across servers + // Analyze volume distribution across servers. + // Seed from ActiveTopology so servers with matching disk type but zero + // volumes are included in the count and imbalance calculation. serverVolumeCounts := make(map[string]int) + if clusterInfo != nil && clusterInfo.ActiveTopology != nil { + topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for diskTypeName := range node.DiskInfos { + if diskTypeName == diskType { + serverVolumeCounts[node.Id] = 0 + } + } + } + } + } + } + } for _, metric := range diskMetrics { serverVolumeCounts[metric.Server]++ }