diff --git a/test/plugin_workers/volume_balance/detection_test.go b/test/plugin_workers/volume_balance/detection_test.go index 069c6c4c9..0aa7e3a37 100644 --- a/test/plugin_workers/volume_balance/detection_test.go +++ b/test/plugin_workers/volume_balance/detection_test.go @@ -37,17 +37,20 @@ func TestVolumeBalanceDetectionIntegration(t *testing.T) { MasterGrpcAddresses: []string{master.Address()}, }, 10) require.NoError(t, err) - require.Len(t, proposals, 1) - - proposal := proposals[0] - require.Equal(t, "volume_balance", proposal.JobType) - paramsValue := proposal.Parameters["task_params_pb"] - require.NotNil(t, paramsValue) - - params := &worker_pb.TaskParams{} - require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) - require.NotEmpty(t, params.Sources) - require.NotEmpty(t, params.Targets) + // With 10 volumes on one server and 1 on the other (avg=5.5), + // multiple balance moves should be detected until imbalance is within threshold. + require.Greater(t, len(proposals), 1, "expected multiple balance proposals") + + for _, proposal := range proposals { + require.Equal(t, "volume_balance", proposal.JobType) + paramsValue := proposal.Parameters["task_params_pb"] + require.NotNil(t, paramsValue) + + params := &worker_pb.TaskParams{} + require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) + require.NotEmpty(t, params.Sources) + require.NotEmpty(t, params.Targets) + } } func buildBalanceVolumeListResponse(t *testing.T) *master_pb.VolumeListResponse { diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index d15fc6de0..bdfea0a98 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -224,7 +224,11 @@ func (h *VolumeBalanceHandler) Detect( } clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} - results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig) + maxResults := int(request.MaxResults) + if maxResults <= 0 { + maxResults = 1 + } + results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) if err != nil { return err } @@ -232,12 +236,7 @@ func (h *VolumeBalanceHandler) Detect( glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } - maxResults := int(request.MaxResults) hasMore := false - if maxResults > 0 && len(results) > maxResults { - hasMore = true - results = results[:maxResults] - } proposals := make([]*plugin_pb.JobProposal, 0, len(results)) for _, result := range results { diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 4a52ea943..d71dca4b7 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -12,14 +12,19 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Detection implements the detection logic for balance tasks -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { +// Detection implements the detection logic for balance tasks. +// maxResults limits how many balance operations are returned per invocation. +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, error) { if !config.IsEnabled() { return nil, nil } balanceConfig := config.(*Config) + if maxResults <= 0 { + maxResults = 1 + } + // Group volumes by disk type to ensure we compare apples to apples volumesByDiskType := make(map[string][]*types.VolumeHealthMetrics) for _, metric := range metrics { @@ -29,16 +34,20 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI var allParams []*types.TaskDetectionResult for diskType, diskMetrics := range volumesByDiskType { - if task := detectForDiskType(diskType, diskMetrics, balanceConfig, clusterInfo); task != nil { - allParams = append(allParams, task) + remaining := maxResults - len(allParams) + if remaining <= 0 { + break } + tasks := detectForDiskType(diskType, diskMetrics, balanceConfig, clusterInfo, remaining) + allParams = append(allParams, tasks...) } return allParams, nil } -// detectForDiskType performs balance detection for a specific disk type -func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { +// detectForDiskType performs balance detection for a specific disk type, +// returning up to maxResults balance tasks. +func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) []*types.TaskDetectionResult { // Skip if cluster segment is too small minVolumeCount := 2 // More reasonable for small clusters if len(diskMetrics) < minVolumeCount { @@ -58,155 +67,238 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics return nil } - // Calculate balance metrics - totalVolumes := len(diskMetrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) + // Track effective adjustments as we plan moves in this detection run + adjustments := make(map[string]int) - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" + var results []*types.TaskDetectionResult - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server + for len(results) < maxResults { + // Compute effective volume counts with adjustments from planned moves + effectiveCounts := make(map[string]int, len(serverVolumeCounts)) + totalVolumes := 0 + for server, count := range serverVolumeCounts { + effective := count + adjustments[server] + if effective < 0 { + effective = 0 + } + effectiveCounts[server] = effective + totalVolumes += effective } - if count < minVolumes { - minVolumes = count - minServer = server + // Include any destination servers not in the original disk metrics + for server, adj := range adjustments { + if _, exists := serverVolumeCounts[server]; !exists && adj > 0 { + effectiveCounts[server] = adj + totalVolumes += adj + } } - } - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= balanceConfig.ImbalanceThreshold { - glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", - diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - return nil - } + avgVolumesPerServer := float64(totalVolumes) / float64(len(effectiveCounts)) - // Select a volume from the overloaded server for balance - var selectedVolume *types.VolumeHealthMetrics - for _, metric := range diskMetrics { - if metric.Server == maxServer { - selectedVolume = metric + maxVolumes := 0 + minVolumes := totalVolumes + maxServer := "" + minServer := "" + + for server, count := range effectiveCounts { + if count > maxVolumes { + maxVolumes = count + maxServer = server + } + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + // Check if imbalance exceeds threshold + imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer + if imbalanceRatio <= balanceConfig.ImbalanceThreshold { + if len(results) == 0 { + glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", + diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + } else { + glog.Infof("BALANCE [%s]: Created %d task(s), cluster now balanced. Imbalance=%.1f%% (threshold=%.1f%%)", + diskType, len(results), imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100) + } break } - } - if selectedVolume == nil { - glog.Warningf("BALANCE [%s]: Could not find volume on overloaded server %s", diskType, maxServer) - return nil + // Select a volume from the overloaded server for balance + var selectedVolume *types.VolumeHealthMetrics + for _, metric := range diskMetrics { + if metric.Server == maxServer { + // Skip volumes that already have a task in ActiveTopology + if clusterInfo.ActiveTopology != nil && clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { + continue + } + selectedVolume = metric + break + } + } + + if selectedVolume == nil { + glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s", diskType, maxServer) + break + } + + // Plan destination and create task + task := createBalanceTask(diskType, selectedVolume, clusterInfo) + if task == nil { + break + } + + results = append(results, task) + + // Adjust effective counts for the next iteration + adjustments[maxServer]-- + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + // Find the destination server ID from the planned task + destAddress := task.TypedParams.Targets[0].Node + destServer := findServerIDByAddress(diskMetrics, destAddress, clusterInfo) + if destServer != "" { + adjustments[destServer]++ + } + } } - // Create balance task with volume and destination planning info - reason := fmt.Sprintf("Cluster imbalance detected for %s: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - diskType, imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + return results +} - // Generate task ID for ActiveTopology integration +// findServerIDByAddress resolves a server address back to its server ID. +func findServerIDByAddress(diskMetrics []*types.VolumeHealthMetrics, address string, clusterInfo *types.ClusterInfo) string { + // Check metrics first for a direct match + for _, m := range diskMetrics { + if m.ServerAddress == address || m.Server == address { + return m.Server + } + } + // Fall back to topology lookup + if clusterInfo.ActiveTopology != nil { + topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + if dn.Address == address || dn.Id == address { + return dn.Id + } + } + } + } + } + } + return "" +} + +// createBalanceTask creates a single balance task for the selected volume. +// Returns nil if destination planning fails. +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) task := &types.TaskDetectionResult{ - TaskID: taskID, // Link to ActiveTopology pending task + TaskID: taskID, TaskType: types.TaskTypeBalance, VolumeID: selectedVolume.VolumeID, Server: selectedVolume.Server, Collection: selectedVolume.Collection, Priority: types.TaskPriorityNormal, - Reason: reason, + Reason: fmt.Sprintf("Cluster imbalance detected for %s disk type", + diskType), ScheduleAt: time.Now(), } // Plan destination if ActiveTopology is available - if clusterInfo.ActiveTopology != nil { - // Check if ANY task already exists in ActiveTopology for this volume - if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { - glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) - return nil - } + if clusterInfo.ActiveTopology == nil { + glog.Warningf("No ActiveTopology available for destination planning in balance detection") + return nil + } - destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) - if err != nil { - glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil - } + // Check if ANY task already exists in ActiveTopology for this volume + if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { + glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) + return nil + } - // Find the actual disk containing the volume on the source server - sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - if !found { - glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", - diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - return nil - } + destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + if err != nil { + glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) + return nil + } + + // Find the actual disk containing the volume on the source server + sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + if !found { + glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", + diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + return nil + } - // Create typed parameters with unified source and target information - task.TypedParams = &worker_pb.TaskParams{ - TaskId: taskID, // Link to ActiveTopology pending task - VolumeId: selectedVolume.VolumeID, - Collection: selectedVolume.Collection, - VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes - - // Unified sources and targets - the only way to specify locations - Sources: []*worker_pb.TaskSource{ - { - Node: selectedVolume.ServerAddress, - DiskId: sourceDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: selectedVolume.Size, - DataCenter: selectedVolume.DataCenter, - Rack: selectedVolume.Rack, - }, + // Update reason with full details now that we have destination info + task.Reason = fmt.Sprintf("Cluster imbalance detected for %s: move volume %d from %s to %s", + diskType, selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) + + // Create typed parameters with unified source and target information + task.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, + VolumeId: selectedVolume.VolumeID, + Collection: selectedVolume.Collection, + VolumeSize: selectedVolume.Size, + + Sources: []*worker_pb.TaskSource{ + { + Node: selectedVolume.ServerAddress, + DiskId: sourceDisk, + VolumeId: selectedVolume.VolumeID, + EstimatedSize: selectedVolume.Size, + DataCenter: selectedVolume.DataCenter, + Rack: selectedVolume.Rack, }, - Targets: []*worker_pb.TaskTarget{ - { - Node: destinationPlan.TargetAddress, - DiskId: destinationPlan.TargetDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: destinationPlan.ExpectedSize, - DataCenter: destinationPlan.TargetDC, - Rack: destinationPlan.TargetRack, - }, + }, + Targets: []*worker_pb.TaskTarget{ + { + Node: destinationPlan.TargetAddress, + DiskId: destinationPlan.TargetDisk, + VolumeId: selectedVolume.VolumeID, + EstimatedSize: destinationPlan.ExpectedSize, + DataCenter: destinationPlan.TargetDC, + Rack: destinationPlan.TargetRack, }, + }, - TaskParams: &worker_pb.TaskParams_BalanceParams{ - BalanceParams: &worker_pb.BalanceTaskParams{ - ForceMove: false, - TimeoutSeconds: 600, // 10 minutes default - }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + ForceMove: false, + TimeoutSeconds: 600, // 10 minutes default }, - } - - glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s", - selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) + }, + } - // Add pending balance task to ActiveTopology for capacity management - targetDisk := destinationPlan.TargetDisk + glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s", + selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) - err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ - TaskID: taskID, - TaskType: topology.TaskTypeBalance, - VolumeID: selectedVolume.VolumeID, - VolumeSize: int64(selectedVolume.Size), - Sources: []topology.TaskSourceSpec{ - {ServerID: selectedVolume.Server, DiskID: sourceDisk}, - }, - Destinations: []topology.TaskDestinationSpec{ - {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, - }, - }) - if err != nil { - glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) - return nil - } + // Add pending balance task to ActiveTopology for capacity management + targetDisk := destinationPlan.TargetDisk - glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", - taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) - } else { - glog.Warningf("No ActiveTopology available for destination planning in balance detection") + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: selectedVolume.VolumeID, + VolumeSize: int64(selectedVolume.Size), + Sources: []topology.TaskSourceSpec{ + {ServerID: selectedVolume.Server, DiskID: sourceDisk}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, + }, + }) + if err != nil { + glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) return nil } + glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", + taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) + return task } diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 3fff3720b..0c60625bb 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -171,7 +171,7 @@ func TestDetection_MixedDiskTypes(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf) + tasks, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -231,25 +231,86 @@ func TestDetection_ImbalancedDiskType(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf) + tasks, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } if len(tasks) == 0 { t.Error("Expected tasks for imbalanced SSD cluster, got 0") - } else { - // Verify task details - task := tasks[0] + } + + // With 100 volumes on server-1 and 10 on server-2, avg=55, detection should + // propose multiple moves until imbalance drops below 20% threshold. + // All tasks should move volumes from ssd-server-1 to ssd-server-2. + if len(tasks) < 2 { + t.Errorf("Expected multiple balance tasks, got %d", len(tasks)) + } + + for i, task := range tasks { if task.VolumeID == 0 { - t.Error("Task has invalid VolumeID") + t.Errorf("Task %d has invalid VolumeID", i) } - // Expect volume to be moving from ssd-server-1 to ssd-server-2 if task.TypedParams.Sources[0].Node != "ssd-server-1:8080" { - t.Errorf("Expected source ssd-server-1:8080, got %s", task.TypedParams.Sources[0].Node) + t.Errorf("Task %d: expected source ssd-server-1:8080, got %s", i, task.TypedParams.Sources[0].Node) } if task.TypedParams.Targets[0].Node != "ssd-server-2:8080" { - t.Errorf("Expected target ssd-server-2:8080, got %s", task.TypedParams.Targets[0].Node) + t.Errorf("Task %d: expected target ssd-server-2:8080, got %s", i, task.TypedParams.Targets[0].Node) } } } + +func TestDetection_RespectsMaxResults(t *testing.T) { + // Setup: 2 SSD servers with big imbalance (100 vs 10) + metrics := []*types.VolumeHealthMetrics{} + + for i := 0; i < 100; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(i + 1), + Server: "ssd-server-1", + ServerAddress: "ssd-server-1:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + for i := 0; i < 10; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(100 + i + 1), + Server: "ssd-server-2", + ServerAddress: "ssd-server-2:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + + conf := &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30, + MaxConcurrent: 1, + }, + MinServerCount: 2, + ImbalanceThreshold: 0.2, + } + + at := createMockTopology(metrics...) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + } + + // Request only 3 results + tasks, err := Detection(metrics, clusterInfo, conf, 3) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) != 3 { + t.Errorf("Expected exactly 3 tasks (maxResults=3), got %d", len(tasks)) + } +} diff --git a/weed/worker/tasks/balance/register.go b/weed/worker/tasks/balance/register.go index 68e3458e9..c7a79c1ca 100644 --- a/weed/worker/tasks/balance/register.go +++ b/weed/worker/tasks/balance/register.go @@ -58,7 +58,9 @@ func RegisterBalanceTask() { dialOpt, ), nil }, - DetectionFunc: Detection, + DetectionFunc: func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + return Detection(metrics, info, config, 1) + }, ScanInterval: 30 * time.Minute, SchedulingFunc: Scheduling, MaxConcurrent: 1,