Browse Source

fix: return explicit truncated flag from Detection instead of approximating

Detection now returns (results, truncated, error) where truncated is true
only when the loop stopped because it hit maxResults, not when it ran out
of work naturally. This eliminates false hasMore signals when detection
happens to produce exactly maxResults results by resolving the imbalance.
pull/8559/head
Chris Lu 22 hours ago
parent
commit
d35dd2a3b5
  1. 7
      weed/plugin/worker/volume_balance_handler.go
  2. 28
      weed/worker/tasks/balance/detection.go
  3. 20
      weed/worker/tasks/balance/detection_test.go
  4. 3
      weed/worker/tasks/balance/register.go

7
weed/plugin/worker/volume_balance_handler.go

@ -225,16 +225,11 @@ func (h *VolumeBalanceHandler) Detect(
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
maxResults := int(request.MaxResults)
results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
if err != nil {
return err
}
// Detection is stateful (registers planned moves in ActiveTopology), so we
// cannot over-fetch to probe for truncation. Instead, hitting the exact
// limit signals that more work may exist.
hasMore := maxResults > 0 && len(results) >= maxResults
if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil {
glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr)
}

28
weed/worker/tasks/balance/detection.go

@ -17,12 +17,14 @@ import (
// Detection implements the detection logic for balance tasks.
// maxResults limits how many balance operations are returned per invocation.
// A non-positive maxResults means no explicit limit (uses a large default).
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, error) {
// The returned truncated flag is true when detection stopped because it hit
// maxResults rather than running out of work.
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, bool, error) {
if !config.IsEnabled() {
return nil, nil
return nil, false, nil
}
if clusterInfo == nil {
return nil, nil
return nil, false, nil
}
balanceConfig := config.(*Config)
@ -46,28 +48,33 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
sort.Strings(diskTypes)
var allParams []*types.TaskDetectionResult
truncated := false
for _, diskType := range diskTypes {
remaining := maxResults - len(allParams)
if remaining <= 0 {
truncated = true
break
}
tasks := detectForDiskType(diskType, volumesByDiskType[diskType], balanceConfig, clusterInfo, remaining)
tasks, diskTruncated := detectForDiskType(diskType, volumesByDiskType[diskType], balanceConfig, clusterInfo, remaining)
allParams = append(allParams, tasks...)
if diskTruncated {
truncated = true
}
}
return allParams, nil
return allParams, truncated, nil
}
// detectForDiskType performs balance detection for a specific disk type,
// returning up to maxResults balance tasks.
func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) []*types.TaskDetectionResult {
// returning up to maxResults balance tasks and whether it was truncated by the limit.
func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) ([]*types.TaskDetectionResult, bool) {
// Skip if cluster segment is too small
minVolumeCount := 2 // More reasonable for small clusters
if len(diskMetrics) < minVolumeCount {
// Only log at verbose level to avoid spamming for small/empty disk types
glog.V(1).Infof("BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)", diskType, len(diskMetrics), minVolumeCount)
return nil
return nil, false
}
// Analyze volume distribution across servers.
@ -96,7 +103,7 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
if len(serverVolumeCounts) < balanceConfig.MinServerCount {
glog.V(1).Infof("BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)", diskType, len(serverVolumeCounts), balanceConfig.MinServerCount)
return nil
return nil, false
}
// Track effective adjustments as we plan moves in this detection run
@ -209,7 +216,8 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
}
}
return results
// Truncated if the loop exited because we hit the maxResults cap
return results, len(results) >= maxResults
}
// createBalanceTask creates a single balance task for the selected volume.

20
weed/worker/tasks/balance/detection_test.go

@ -311,7 +311,7 @@ func TestDetection_MixedDiskTypes(t *testing.T) {
ActiveTopology: at,
}
tasks, err := Detection(metrics, clusterInfo, conf, 100)
tasks, _, err := Detection(metrics, clusterInfo, conf, 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -371,7 +371,7 @@ func TestDetection_ImbalancedDiskType(t *testing.T) {
ActiveTopology: at,
}
tasks, err := Detection(metrics, clusterInfo, conf, 100)
tasks, _, err := Detection(metrics, clusterInfo, conf, 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -445,7 +445,7 @@ func TestDetection_RespectsMaxResults(t *testing.T) {
}
// Request only 3 results
tasks, err := Detection(metrics, clusterInfo, conf, 3)
tasks, _, err := Detection(metrics, clusterInfo, conf, 3)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -475,7 +475,7 @@ func TestDetection_ThreeServers_ConvergesToBalance(t *testing.T) {
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -548,7 +548,7 @@ func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) {
}
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -579,7 +579,7 @@ func TestDetection_NoDuplicateVolumesAcrossIterations(t *testing.T) {
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 200)
tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 200)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -612,7 +612,7 @@ func TestDetection_ThreeServers_MaxServerShifts(t *testing.T) {
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -661,7 +661,7 @@ func TestDetection_FourServers_DestinationSpreading(t *testing.T) {
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -723,7 +723,7 @@ func TestDetection_ConvergenceVerification(t *testing.T) {
conf := defaultConf()
conf.ImbalanceThreshold = tt.threshold
tasks, err := Detection(metrics, clusterInfo, conf, 500)
tasks, _, err := Detection(metrics, clusterInfo, conf, 500)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
@ -795,7 +795,7 @@ func TestDetection_ExhaustedServerFallsThrough(t *testing.T) {
}
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}

3
weed/worker/tasks/balance/register.go

@ -59,7 +59,8 @@ func RegisterBalanceTask() {
), nil
},
DetectionFunc: func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
return Detection(metrics, info, config, 0)
results, _, err := Detection(metrics, info, config, 0)
return results, err
},
ScanInterval: 30 * time.Minute,
SchedulingFunc: Scheduling,

Loading…
Cancel
Save