diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 1a3af2b0d..1a25c72af 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -5,6 +5,7 @@ import ( "fmt" "sort" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -511,6 +512,8 @@ func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int { return len(diskTypes) } +const defaultMaxConcurrentMoves = 5 + func (h *VolumeBalanceHandler) Execute( ctx context.Context, request *plugin_pb.ExecuteJobRequest, @@ -530,6 +533,24 @@ func (h *VolumeBalanceHandler) Execute( if err != nil { return err } + + applyBalanceExecutionDefaults(params) + + // Batch path: if BalanceTaskParams has moves, execute them concurrently + if bp := params.GetBalanceParams(); bp != nil && len(bp.Moves) > 0 { + return h.executeBatchMoves(ctx, request, params, sender) + } + + // Single-move path (backward compatible) + return h.executeSingleMove(ctx, request, params, sender) +} + +func (h *VolumeBalanceHandler) executeSingleMove( + ctx context.Context, + request *plugin_pb.ExecuteJobRequest, + params *worker_pb.TaskParams, + sender ExecutionSender, +) error { if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" { return fmt.Errorf("volume balance source node is required") } @@ -537,8 +558,6 @@ func (h *VolumeBalanceHandler) Execute( return fmt.Errorf("volume balance target node is required") } - applyBalanceExecutionDefaults(params) - task := balancetask.NewBalanceTask( request.Job.JobId, params.Sources[0].Node, @@ -621,6 +640,181 @@ func (h *VolumeBalanceHandler) Execute( }) } +// executeBatchMoves runs multiple volume moves concurrently within a single job. +func (h *VolumeBalanceHandler) executeBatchMoves( + ctx context.Context, + request *plugin_pb.ExecuteJobRequest, + params *worker_pb.TaskParams, + sender ExecutionSender, +) error { + bp := params.GetBalanceParams() + moves := bp.Moves + maxConcurrent := int(bp.MaxConcurrentMoves) + if maxConcurrent <= 0 { + maxConcurrent = defaultMaxConcurrentMoves + } + + totalMoves := len(moves) + glog.Infof("batch volume balance: %d moves, max concurrent %d", totalMoves, maxConcurrent) + + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_ASSIGNED, + ProgressPercent: 0, + Stage: "assigned", + Message: fmt.Sprintf("batch volume balance accepted: %d moves", totalMoves), + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("assigned", fmt.Sprintf("batch volume balance: %d moves, concurrency %d", totalMoves, maxConcurrent)), + }, + }); err != nil { + return err + } + + // Per-move progress tracking + var mu sync.Mutex + moveProgress := make([]float64, totalMoves) + + reportAggregate := func(moveIndex int, progress float64, stage string) { + mu.Lock() + moveProgress[moveIndex] = progress + total := 0.0 + for _, p := range moveProgress { + total += p + } + mu.Unlock() + + aggregate := total / float64(totalMoves) + move := moves[moveIndex] + message := fmt.Sprintf("[Move %d/%d vol:%d] %s", moveIndex+1, totalMoves, move.VolumeId, stage) + + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: aggregate, + Stage: fmt.Sprintf("move %d/%d", moveIndex+1, totalMoves), + Message: message, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity(fmt.Sprintf("move-%d", moveIndex+1), message), + }, + }) + } + + type moveResult struct { + index int + volumeID uint32 + source string + target string + err error + } + + sem := make(chan struct{}, maxConcurrent) + results := make(chan moveResult, totalMoves) + + for i, move := range moves { + sem <- struct{}{} // acquire slot + go func(idx int, m *worker_pb.BalanceMoveSpec) { + defer func() { <-sem }() // release slot + + task := balancetask.NewBalanceTask( + fmt.Sprintf("%s-move-%d", request.Job.JobId, idx), + m.SourceNode, + m.VolumeId, + m.Collection, + h.grpcDialOption, + ) + task.SetProgressCallback(func(progress float64, stage string) { + reportAggregate(idx, progress, stage) + }) + + moveParams := buildMoveTaskParams(m, bp.TimeoutSeconds) + err := task.Execute(ctx, moveParams) + results <- moveResult{ + index: idx, + volumeID: m.VolumeId, + source: m.SourceNode, + target: m.TargetNode, + err: err, + } + }(i, move) + } + + // Collect all results + var succeeded, failed int + var errMessages []string + var successDetails []string + for range moves { + r := <-results + if r.err != nil { + failed++ + errMessages = append(errMessages, fmt.Sprintf("volume %d (%s→%s): %v", r.volumeID, r.source, r.target, r.err)) + glog.Warningf("batch balance move %d failed: volume %d %s→%s: %v", r.index, r.volumeID, r.source, r.target, r.err) + } else { + succeeded++ + successDetails = append(successDetails, fmt.Sprintf("volume %d (%s→%s)", r.volumeID, r.source, r.target)) + } + } + + summary := fmt.Sprintf("%d/%d volumes moved successfully", succeeded, totalMoves) + if failed > 0 { + summary += fmt.Sprintf("; %d failed", failed) + } + + success := failed == 0 + var errMsg string + if !success { + errMsg = strings.Join(errMessages, "; ") + } + + return sender.SendCompleted(&plugin_pb.JobCompleted{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + Success: success, + ErrorMessage: errMsg, + Result: &plugin_pb.JobResult{ + Summary: summary, + OutputValues: map[string]*plugin_pb.ConfigValue{ + "total_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalMoves)}, + }, + "succeeded": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(succeeded)}, + }, + "failed": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(failed)}, + }, + }, + }, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("completed", summary), + }, + }) +} + +// buildMoveTaskParams constructs a TaskParams for a single move within a batch. +func buildMoveTaskParams(move *worker_pb.BalanceMoveSpec, timeoutSeconds int32) *worker_pb.TaskParams { + if timeoutSeconds <= 0 { + timeoutSeconds = defaultBalanceTimeoutSeconds + } + return &worker_pb.TaskParams{ + VolumeId: move.VolumeId, + Collection: move.Collection, + VolumeSize: move.VolumeSize, + Sources: []*worker_pb.TaskSource{ + {Node: move.SourceNode, VolumeId: move.VolumeId}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: move.TargetNode, VolumeId: move.VolumeId}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: timeoutSeconds, + }, + }, + } +} + func (h *VolumeBalanceHandler) collectVolumeMetrics( ctx context.Context, masterAddresses []string, diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index ace71ae3a..7263ed790 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -3,6 +3,7 @@ package pluginworker import ( "context" "strings" + "sync" "testing" "time" @@ -265,6 +266,197 @@ func TestVolumeBalanceDescriptorOmitsExecutionTuningFields(t *testing.T) { } } +type recordingExecutionSender struct { + mu sync.Mutex + progress []*plugin_pb.JobProgressUpdate + completed *plugin_pb.JobCompleted +} + +func (r *recordingExecutionSender) SendProgress(p *plugin_pb.JobProgressUpdate) error { + r.mu.Lock() + defer r.mu.Unlock() + r.progress = append(r.progress, p) + return nil +} + +func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) error { + r.mu.Lock() + defer r.mu.Unlock() + r.completed = c + return nil +} + +func TestBuildMoveTaskParams(t *testing.T) { + move := &worker_pb.BalanceMoveSpec{ + VolumeId: 42, + SourceNode: "10.0.0.1:8080", + TargetNode: "10.0.0.2:8080", + Collection: "photos", + VolumeSize: 1024 * 1024, + } + + params := buildMoveTaskParams(move, 300) + if params.VolumeId != 42 { + t.Fatalf("expected volume_id 42, got %d", params.VolumeId) + } + if params.Collection != "photos" { + t.Fatalf("expected collection photos, got %s", params.Collection) + } + if params.VolumeSize != 1024*1024 { + t.Fatalf("expected volume_size %d, got %d", 1024*1024, params.VolumeSize) + } + if len(params.Sources) != 1 || params.Sources[0].Node != "10.0.0.1:8080" { + t.Fatalf("unexpected sources: %+v", params.Sources) + } + if len(params.Targets) != 1 || params.Targets[0].Node != "10.0.0.2:8080" { + t.Fatalf("unexpected targets: %+v", params.Targets) + } + bp := params.GetBalanceParams() + if bp == nil { + t.Fatalf("expected balance params") + } + if bp.TimeoutSeconds != 300 { + t.Fatalf("expected timeout 300, got %d", bp.TimeoutSeconds) + } +} + +func TestBuildMoveTaskParamsDefaultTimeout(t *testing.T) { + move := &worker_pb.BalanceMoveSpec{ + VolumeId: 1, + SourceNode: "a:8080", + TargetNode: "b:8080", + } + params := buildMoveTaskParams(move, 0) + if params.GetBalanceParams().TimeoutSeconds != defaultBalanceTimeoutSeconds { + t.Fatalf("expected default timeout %d, got %d", defaultBalanceTimeoutSeconds, params.GetBalanceParams().TimeoutSeconds) + } +} + +func TestExecuteDispatchesBatchPath(t *testing.T) { + // Build a job with batch moves in BalanceTaskParams + bp := &worker_pb.BalanceTaskParams{ + TimeoutSeconds: 60, + MaxConcurrentMoves: 2, + Moves: []*worker_pb.BalanceMoveSpec{ + {VolumeId: 1, SourceNode: "s1:8080", TargetNode: "t1:8080", Collection: "c1"}, + {VolumeId: 2, SourceNode: "s2:8080", TargetNode: "t2:8080", Collection: "c1"}, + }, + } + taskParams := &worker_pb.TaskParams{ + TaskId: "batch-1", + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: bp, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + job := &plugin_pb.JobSpec{ + JobId: "batch-job-1", + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + + handler := NewVolumeBalanceHandler(nil) + sender := &recordingExecutionSender{} + + // Execute will enter the batch path. It will fail because there's no real gRPC server, + // but we verify it sends the assigned progress and eventually a completion. + err = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{ + Job: job, + }, sender) + + // Expect an error since no real volume servers exist + // But verify the batch path was taken by checking the assigned message + sender.mu.Lock() + defer sender.mu.Unlock() + + if len(sender.progress) == 0 { + t.Fatalf("expected progress messages from batch path") + } + + // First progress should be "assigned" with batch info + first := sender.progress[0] + if first.Stage != "assigned" { + t.Fatalf("expected first stage 'assigned', got %q", first.Stage) + } + if !strings.Contains(first.Message, "batch") || !strings.Contains(first.Message, "2 moves") { + t.Fatalf("expected batch assigned message, got %q", first.Message) + } + + // Should have a completion with failure details (since no servers) + if sender.completed == nil { + t.Fatalf("expected completion message") + } + if sender.completed.Success { + t.Fatalf("expected failure since no real gRPC servers") + } + // Should report 0 succeeded, 2 failed + if v, ok := sender.completed.Result.OutputValues["failed"]; !ok || v.GetInt64Value() != 2 { + t.Fatalf("expected 2 failed moves, got %+v", sender.completed.Result.OutputValues) + } +} + +func TestExecuteSingleMovePathUnchanged(t *testing.T) { + // Build a single-move job (no batch moves) + taskParams := &worker_pb.TaskParams{ + TaskId: "single-1", + VolumeId: 99, + Collection: "videos", + Sources: []*worker_pb.TaskSource{ + {Node: "src:8080", VolumeId: 99}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: "dst:8080", VolumeId: 99}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: 60, + }, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + job := &plugin_pb.JobSpec{ + JobId: "single-job-1", + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + + handler := NewVolumeBalanceHandler(nil) + sender := &recordingExecutionSender{} + + // Execute single-move path. Will fail on gRPC but verify it takes the single-move path. + _ = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{ + Job: job, + }, sender) + + sender.mu.Lock() + defer sender.mu.Unlock() + + if len(sender.progress) == 0 { + t.Fatalf("expected progress messages") + } + + // Single-move path sends "volume balance job accepted" not "batch volume balance" + first := sender.progress[0] + if first.Stage != "assigned" { + t.Fatalf("expected first stage 'assigned', got %q", first.Stage) + } + if strings.Contains(first.Message, "batch") { + t.Fatalf("single-move path should not mention batch, got %q", first.Message) + } +} + func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool { if form == nil { return false