Browse Source

fix: handle zero maxResults as no-cap, emit trace after trim, seed empty servers

- When MaxResults is 0 (omitted), treat as no explicit cap instead of
  defaulting to 1; only apply the +1 over-fetch probe when caller
  supplies a positive limit
- Move decision trace emission after hasMore/trim so the trace
  accurately reflects the returned proposals
- Seed serverVolumeCounts from ActiveTopology so servers that have a
  matching disk type but zero volumes are included in the imbalance
  calculation and MinServerCount check
pull/8559/head
Chris Lu 2 days ago
parent
commit
33da75cfa8
  1. 14
      weed/plugin/worker/volume_balance_handler.go
  2. 23
      weed/worker/tasks/balance/detection.go

14
weed/plugin/worker/volume_balance_handler.go

@ -225,16 +225,14 @@ func (h *VolumeBalanceHandler) Detect(
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
maxResults := int(request.MaxResults)
if maxResults <= 0 {
maxResults = 1
detectionLimit := maxResults
if maxResults > 0 {
detectionLimit = maxResults + 1 // over-fetch by 1 to detect truncation
}
results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults+1)
results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, detectionLimit)
if err != nil {
return err
}
if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil {
glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr)
}
hasMore := false
if maxResults > 0 && len(results) > maxResults {
@ -242,6 +240,10 @@ func (h *VolumeBalanceHandler) Detect(
results = results[:maxResults]
}
if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil {
glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr)
}
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildVolumeBalanceProposal(result)

23
weed/worker/tasks/balance/detection.go

@ -15,6 +15,7 @@ import (
// Detection implements the detection logic for balance tasks.
// maxResults limits how many balance operations are returned per invocation.
// A non-positive maxResults means no explicit limit (uses a large default).
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, error) {
if !config.IsEnabled() {
return nil, nil
@ -23,7 +24,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
balanceConfig := config.(*Config)
if maxResults <= 0 {
maxResults = 1
maxResults = math.MaxInt32
}
// Group volumes by disk type to ensure we compare apples to apples
@ -57,8 +58,26 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
return nil
}
// Analyze volume distribution across servers
// Analyze volume distribution across servers.
// Seed from ActiveTopology so servers with matching disk type but zero
// volumes are included in the count and imbalance calculation.
serverVolumeCounts := make(map[string]int)
if clusterInfo != nil && clusterInfo.ActiveTopology != nil {
topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo()
if topologyInfo != nil {
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for diskTypeName := range node.DiskInfos {
if diskTypeName == diskType {
serverVolumeCounts[node.Id] = 0
}
}
}
}
}
}
}
for _, metric := range diskMetrics {
serverVolumeCounts[metric.Server]++
}

Loading…
Cancel
Save