From f0ba8b6e3bbe6c685feb133b21067f402225652c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 14:35:42 -0700 Subject: [PATCH] fix: volume balance detection now returns multiple tasks per run (#8551) Previously, detectForDiskType() returned at most 1 balance task per disk type, making the MaxJobsPerDetection setting ineffective. The detection loop now iterates within each disk type, planning multiple moves until the imbalance drops below threshold or maxResults is reached. Effective volume counts are adjusted after each planned move so the algorithm correctly re-evaluates which server is overloaded. --- .../volume_balance/detection_test.go | 25 +- weed/plugin/worker/volume_balance_handler.go | 11 +- weed/worker/tasks/balance/detection.go | 328 +++++++++++------- weed/worker/tasks/balance/detection_test.go | 79 ++++- weed/worker/tasks/balance/register.go | 4 +- 5 files changed, 302 insertions(+), 145 deletions(-) diff --git a/test/plugin_workers/volume_balance/detection_test.go b/test/plugin_workers/volume_balance/detection_test.go index 069c6c4c9..0aa7e3a37 100644 --- a/test/plugin_workers/volume_balance/detection_test.go +++ b/test/plugin_workers/volume_balance/detection_test.go @@ -37,17 +37,20 @@ func TestVolumeBalanceDetectionIntegration(t *testing.T) { MasterGrpcAddresses: []string{master.Address()}, }, 10) require.NoError(t, err) - require.Len(t, proposals, 1) - - proposal := proposals[0] - require.Equal(t, "volume_balance", proposal.JobType) - paramsValue := proposal.Parameters["task_params_pb"] - require.NotNil(t, paramsValue) - - params := &worker_pb.TaskParams{} - require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) - require.NotEmpty(t, params.Sources) - require.NotEmpty(t, params.Targets) + // With 10 volumes on one server and 1 on the other (avg=5.5), + // multiple balance moves should be detected until imbalance is within threshold. + require.Greater(t, len(proposals), 1, "expected multiple balance proposals") + + for _, proposal := range proposals { + require.Equal(t, "volume_balance", proposal.JobType) + paramsValue := proposal.Parameters["task_params_pb"] + require.NotNil(t, paramsValue) + + params := &worker_pb.TaskParams{} + require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) + require.NotEmpty(t, params.Sources) + require.NotEmpty(t, params.Targets) + } } func buildBalanceVolumeListResponse(t *testing.T) *master_pb.VolumeListResponse { diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index d15fc6de0..bdfea0a98 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -224,7 +224,11 @@ func (h *VolumeBalanceHandler) Detect( } clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} - results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig) + maxResults := int(request.MaxResults) + if maxResults <= 0 { + maxResults = 1 + } + results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) if err != nil { return err } @@ -232,12 +236,7 @@ func (h *VolumeBalanceHandler) Detect( glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } - maxResults := int(request.MaxResults) hasMore := false - if maxResults > 0 && len(results) > maxResults { - hasMore = true - results = results[:maxResults] - } proposals := make([]*plugin_pb.JobProposal, 0, len(results)) for _, result := range results { diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 4a52ea943..d71dca4b7 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -12,14 +12,19 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Detection implements the detection logic for balance tasks -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { +// Detection implements the detection logic for balance tasks. +// maxResults limits how many balance operations are returned per invocation. +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, error) { if !config.IsEnabled() { return nil, nil } balanceConfig := config.(*Config) + if maxResults <= 0 { + maxResults = 1 + } + // Group volumes by disk type to ensure we compare apples to apples volumesByDiskType := make(map[string][]*types.VolumeHealthMetrics) for _, metric := range metrics { @@ -29,16 +34,20 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI var allParams []*types.TaskDetectionResult for diskType, diskMetrics := range volumesByDiskType { - if task := detectForDiskType(diskType, diskMetrics, balanceConfig, clusterInfo); task != nil { - allParams = append(allParams, task) + remaining := maxResults - len(allParams) + if remaining <= 0 { + break } + tasks := detectForDiskType(diskType, diskMetrics, balanceConfig, clusterInfo, remaining) + allParams = append(allParams, tasks...) } return allParams, nil } -// detectForDiskType performs balance detection for a specific disk type -func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { +// detectForDiskType performs balance detection for a specific disk type, +// returning up to maxResults balance tasks. +func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) []*types.TaskDetectionResult { // Skip if cluster segment is too small minVolumeCount := 2 // More reasonable for small clusters if len(diskMetrics) < minVolumeCount { @@ -58,155 +67,238 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics return nil } - // Calculate balance metrics - totalVolumes := len(diskMetrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) + // Track effective adjustments as we plan moves in this detection run + adjustments := make(map[string]int) - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" + var results []*types.TaskDetectionResult - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server + for len(results) < maxResults { + // Compute effective volume counts with adjustments from planned moves + effectiveCounts := make(map[string]int, len(serverVolumeCounts)) + totalVolumes := 0 + for server, count := range serverVolumeCounts { + effective := count + adjustments[server] + if effective < 0 { + effective = 0 + } + effectiveCounts[server] = effective + totalVolumes += effective } - if count < minVolumes { - minVolumes = count - minServer = server + // Include any destination servers not in the original disk metrics + for server, adj := range adjustments { + if _, exists := serverVolumeCounts[server]; !exists && adj > 0 { + effectiveCounts[server] = adj + totalVolumes += adj + } } - } - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= balanceConfig.ImbalanceThreshold { - glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", - diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - return nil - } + avgVolumesPerServer := float64(totalVolumes) / float64(len(effectiveCounts)) - // Select a volume from the overloaded server for balance - var selectedVolume *types.VolumeHealthMetrics - for _, metric := range diskMetrics { - if metric.Server == maxServer { - selectedVolume = metric + maxVolumes := 0 + minVolumes := totalVolumes + maxServer := "" + minServer := "" + + for server, count := range effectiveCounts { + if count > maxVolumes { + maxVolumes = count + maxServer = server + } + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + // Check if imbalance exceeds threshold + imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer + if imbalanceRatio <= balanceConfig.ImbalanceThreshold { + if len(results) == 0 { + glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", + diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + } else { + glog.Infof("BALANCE [%s]: Created %d task(s), cluster now balanced. Imbalance=%.1f%% (threshold=%.1f%%)", + diskType, len(results), imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100) + } break } - } - if selectedVolume == nil { - glog.Warningf("BALANCE [%s]: Could not find volume on overloaded server %s", diskType, maxServer) - return nil + // Select a volume from the overloaded server for balance + var selectedVolume *types.VolumeHealthMetrics + for _, metric := range diskMetrics { + if metric.Server == maxServer { + // Skip volumes that already have a task in ActiveTopology + if clusterInfo.ActiveTopology != nil && clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { + continue + } + selectedVolume = metric + break + } + } + + if selectedVolume == nil { + glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s", diskType, maxServer) + break + } + + // Plan destination and create task + task := createBalanceTask(diskType, selectedVolume, clusterInfo) + if task == nil { + break + } + + results = append(results, task) + + // Adjust effective counts for the next iteration + adjustments[maxServer]-- + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + // Find the destination server ID from the planned task + destAddress := task.TypedParams.Targets[0].Node + destServer := findServerIDByAddress(diskMetrics, destAddress, clusterInfo) + if destServer != "" { + adjustments[destServer]++ + } + } } - // Create balance task with volume and destination planning info - reason := fmt.Sprintf("Cluster imbalance detected for %s: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - diskType, imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + return results +} - // Generate task ID for ActiveTopology integration +// findServerIDByAddress resolves a server address back to its server ID. +func findServerIDByAddress(diskMetrics []*types.VolumeHealthMetrics, address string, clusterInfo *types.ClusterInfo) string { + // Check metrics first for a direct match + for _, m := range diskMetrics { + if m.ServerAddress == address || m.Server == address { + return m.Server + } + } + // Fall back to topology lookup + if clusterInfo.ActiveTopology != nil { + topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + if dn.Address == address || dn.Id == address { + return dn.Id + } + } + } + } + } + } + return "" +} + +// createBalanceTask creates a single balance task for the selected volume. +// Returns nil if destination planning fails. +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) task := &types.TaskDetectionResult{ - TaskID: taskID, // Link to ActiveTopology pending task + TaskID: taskID, TaskType: types.TaskTypeBalance, VolumeID: selectedVolume.VolumeID, Server: selectedVolume.Server, Collection: selectedVolume.Collection, Priority: types.TaskPriorityNormal, - Reason: reason, + Reason: fmt.Sprintf("Cluster imbalance detected for %s disk type", + diskType), ScheduleAt: time.Now(), } // Plan destination if ActiveTopology is available - if clusterInfo.ActiveTopology != nil { - // Check if ANY task already exists in ActiveTopology for this volume - if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { - glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) - return nil - } + if clusterInfo.ActiveTopology == nil { + glog.Warningf("No ActiveTopology available for destination planning in balance detection") + return nil + } - destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) - if err != nil { - glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil - } + // Check if ANY task already exists in ActiveTopology for this volume + if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { + glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) + return nil + } - // Find the actual disk containing the volume on the source server - sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - if !found { - glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", - diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - return nil - } + destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + if err != nil { + glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) + return nil + } + + // Find the actual disk containing the volume on the source server + sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + if !found { + glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", + diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + return nil + } - // Create typed parameters with unified source and target information - task.TypedParams = &worker_pb.TaskParams{ - TaskId: taskID, // Link to ActiveTopology pending task - VolumeId: selectedVolume.VolumeID, - Collection: selectedVolume.Collection, - VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes - - // Unified sources and targets - the only way to specify locations - Sources: []*worker_pb.TaskSource{ - { - Node: selectedVolume.ServerAddress, - DiskId: sourceDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: selectedVolume.Size, - DataCenter: selectedVolume.DataCenter, - Rack: selectedVolume.Rack, - }, + // Update reason with full details now that we have destination info + task.Reason = fmt.Sprintf("Cluster imbalance detected for %s: move volume %d from %s to %s", + diskType, selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) + + // Create typed parameters with unified source and target information + task.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, + VolumeId: selectedVolume.VolumeID, + Collection: selectedVolume.Collection, + VolumeSize: selectedVolume.Size, + + Sources: []*worker_pb.TaskSource{ + { + Node: selectedVolume.ServerAddress, + DiskId: sourceDisk, + VolumeId: selectedVolume.VolumeID, + EstimatedSize: selectedVolume.Size, + DataCenter: selectedVolume.DataCenter, + Rack: selectedVolume.Rack, }, - Targets: []*worker_pb.TaskTarget{ - { - Node: destinationPlan.TargetAddress, - DiskId: destinationPlan.TargetDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: destinationPlan.ExpectedSize, - DataCenter: destinationPlan.TargetDC, - Rack: destinationPlan.TargetRack, - }, + }, + Targets: []*worker_pb.TaskTarget{ + { + Node: destinationPlan.TargetAddress, + DiskId: destinationPlan.TargetDisk, + VolumeId: selectedVolume.VolumeID, + EstimatedSize: destinationPlan.ExpectedSize, + DataCenter: destinationPlan.TargetDC, + Rack: destinationPlan.TargetRack, }, + }, - TaskParams: &worker_pb.TaskParams_BalanceParams{ - BalanceParams: &worker_pb.BalanceTaskParams{ - ForceMove: false, - TimeoutSeconds: 600, // 10 minutes default - }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + ForceMove: false, + TimeoutSeconds: 600, // 10 minutes default }, - } - - glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s", - selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) + }, + } - // Add pending balance task to ActiveTopology for capacity management - targetDisk := destinationPlan.TargetDisk + glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s", + selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) - err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ - TaskID: taskID, - TaskType: topology.TaskTypeBalance, - VolumeID: selectedVolume.VolumeID, - VolumeSize: int64(selectedVolume.Size), - Sources: []topology.TaskSourceSpec{ - {ServerID: selectedVolume.Server, DiskID: sourceDisk}, - }, - Destinations: []topology.TaskDestinationSpec{ - {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, - }, - }) - if err != nil { - glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) - return nil - } + // Add pending balance task to ActiveTopology for capacity management + targetDisk := destinationPlan.TargetDisk - glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", - taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) - } else { - glog.Warningf("No ActiveTopology available for destination planning in balance detection") + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: selectedVolume.VolumeID, + VolumeSize: int64(selectedVolume.Size), + Sources: []topology.TaskSourceSpec{ + {ServerID: selectedVolume.Server, DiskID: sourceDisk}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, + }, + }) + if err != nil { + glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) return nil } + glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", + taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) + return task } diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 3fff3720b..0c60625bb 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -171,7 +171,7 @@ func TestDetection_MixedDiskTypes(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf) + tasks, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -231,25 +231,86 @@ func TestDetection_ImbalancedDiskType(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf) + tasks, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } if len(tasks) == 0 { t.Error("Expected tasks for imbalanced SSD cluster, got 0") - } else { - // Verify task details - task := tasks[0] + } + + // With 100 volumes on server-1 and 10 on server-2, avg=55, detection should + // propose multiple moves until imbalance drops below 20% threshold. + // All tasks should move volumes from ssd-server-1 to ssd-server-2. + if len(tasks) < 2 { + t.Errorf("Expected multiple balance tasks, got %d", len(tasks)) + } + + for i, task := range tasks { if task.VolumeID == 0 { - t.Error("Task has invalid VolumeID") + t.Errorf("Task %d has invalid VolumeID", i) } - // Expect volume to be moving from ssd-server-1 to ssd-server-2 if task.TypedParams.Sources[0].Node != "ssd-server-1:8080" { - t.Errorf("Expected source ssd-server-1:8080, got %s", task.TypedParams.Sources[0].Node) + t.Errorf("Task %d: expected source ssd-server-1:8080, got %s", i, task.TypedParams.Sources[0].Node) } if task.TypedParams.Targets[0].Node != "ssd-server-2:8080" { - t.Errorf("Expected target ssd-server-2:8080, got %s", task.TypedParams.Targets[0].Node) + t.Errorf("Task %d: expected target ssd-server-2:8080, got %s", i, task.TypedParams.Targets[0].Node) } } } + +func TestDetection_RespectsMaxResults(t *testing.T) { + // Setup: 2 SSD servers with big imbalance (100 vs 10) + metrics := []*types.VolumeHealthMetrics{} + + for i := 0; i < 100; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(i + 1), + Server: "ssd-server-1", + ServerAddress: "ssd-server-1:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + for i := 0; i < 10; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(100 + i + 1), + Server: "ssd-server-2", + ServerAddress: "ssd-server-2:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + + conf := &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30, + MaxConcurrent: 1, + }, + MinServerCount: 2, + ImbalanceThreshold: 0.2, + } + + at := createMockTopology(metrics...) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + } + + // Request only 3 results + tasks, err := Detection(metrics, clusterInfo, conf, 3) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) != 3 { + t.Errorf("Expected exactly 3 tasks (maxResults=3), got %d", len(tasks)) + } +} diff --git a/weed/worker/tasks/balance/register.go b/weed/worker/tasks/balance/register.go index 68e3458e9..c7a79c1ca 100644 --- a/weed/worker/tasks/balance/register.go +++ b/weed/worker/tasks/balance/register.go @@ -58,7 +58,9 @@ func RegisterBalanceTask() { dialOpt, ), nil }, - DetectionFunc: Detection, + DetectionFunc: func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + return Detection(metrics, info, config, 1) + }, ScanInterval: 30 * time.Minute, SchedulingFunc: Scheduling, MaxConcurrent: 1,