Browse Source

balance handler: add batch execution with concurrent volume moves

Refactor Execute() into executeSingleMove() (backward compatible) and
executeBatchMoves() which runs multiple volume moves concurrently using
a semaphore-bounded goroutine pool. When BalanceTaskParams.Moves is
populated, the batch path is taken; otherwise the single-move path.

Includes aggregate progress reporting across concurrent moves,
per-move error collection, and partial failure support.
feat/batch-volume-balance
Chris Lu 19 hours ago
parent
commit
50761ff299
  1. 198
      weed/plugin/worker/volume_balance_handler.go
  2. 192
      weed/plugin/worker/volume_balance_handler_test.go

198
weed/plugin/worker/volume_balance_handler.go

@ -5,6 +5,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
@ -511,6 +512,8 @@ func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int {
return len(diskTypes)
}
const defaultMaxConcurrentMoves = 5
func (h *VolumeBalanceHandler) Execute(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
@ -530,6 +533,24 @@ func (h *VolumeBalanceHandler) Execute(
if err != nil {
return err
}
applyBalanceExecutionDefaults(params)
// Batch path: if BalanceTaskParams has moves, execute them concurrently
if bp := params.GetBalanceParams(); bp != nil && len(bp.Moves) > 0 {
return h.executeBatchMoves(ctx, request, params, sender)
}
// Single-move path (backward compatible)
return h.executeSingleMove(ctx, request, params, sender)
}
func (h *VolumeBalanceHandler) executeSingleMove(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
params *worker_pb.TaskParams,
sender ExecutionSender,
) error {
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
return fmt.Errorf("volume balance source node is required")
}
@ -537,8 +558,6 @@ func (h *VolumeBalanceHandler) Execute(
return fmt.Errorf("volume balance target node is required")
}
applyBalanceExecutionDefaults(params)
task := balancetask.NewBalanceTask(
request.Job.JobId,
params.Sources[0].Node,
@ -621,6 +640,181 @@ func (h *VolumeBalanceHandler) Execute(
})
}
// executeBatchMoves runs multiple volume moves concurrently within a single job.
func (h *VolumeBalanceHandler) executeBatchMoves(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
params *worker_pb.TaskParams,
sender ExecutionSender,
) error {
bp := params.GetBalanceParams()
moves := bp.Moves
maxConcurrent := int(bp.MaxConcurrentMoves)
if maxConcurrent <= 0 {
maxConcurrent = defaultMaxConcurrentMoves
}
totalMoves := len(moves)
glog.Infof("batch volume balance: %d moves, max concurrent %d", totalMoves, maxConcurrent)
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: fmt.Sprintf("batch volume balance accepted: %d moves", totalMoves),
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity("assigned", fmt.Sprintf("batch volume balance: %d moves, concurrency %d", totalMoves, maxConcurrent)),
},
}); err != nil {
return err
}
// Per-move progress tracking
var mu sync.Mutex
moveProgress := make([]float64, totalMoves)
reportAggregate := func(moveIndex int, progress float64, stage string) {
mu.Lock()
moveProgress[moveIndex] = progress
total := 0.0
for _, p := range moveProgress {
total += p
}
mu.Unlock()
aggregate := total / float64(totalMoves)
move := moves[moveIndex]
message := fmt.Sprintf("[Move %d/%d vol:%d] %s", moveIndex+1, totalMoves, move.VolumeId, stage)
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: aggregate,
Stage: fmt.Sprintf("move %d/%d", moveIndex+1, totalMoves),
Message: message,
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity(fmt.Sprintf("move-%d", moveIndex+1), message),
},
})
}
type moveResult struct {
index int
volumeID uint32
source string
target string
err error
}
sem := make(chan struct{}, maxConcurrent)
results := make(chan moveResult, totalMoves)
for i, move := range moves {
sem <- struct{}{} // acquire slot
go func(idx int, m *worker_pb.BalanceMoveSpec) {
defer func() { <-sem }() // release slot
task := balancetask.NewBalanceTask(
fmt.Sprintf("%s-move-%d", request.Job.JobId, idx),
m.SourceNode,
m.VolumeId,
m.Collection,
h.grpcDialOption,
)
task.SetProgressCallback(func(progress float64, stage string) {
reportAggregate(idx, progress, stage)
})
moveParams := buildMoveTaskParams(m, bp.TimeoutSeconds)
err := task.Execute(ctx, moveParams)
results <- moveResult{
index: idx,
volumeID: m.VolumeId,
source: m.SourceNode,
target: m.TargetNode,
err: err,
}
}(i, move)
}
// Collect all results
var succeeded, failed int
var errMessages []string
var successDetails []string
for range moves {
r := <-results
if r.err != nil {
failed++
errMessages = append(errMessages, fmt.Sprintf("volume %d (%s→%s): %v", r.volumeID, r.source, r.target, r.err))
glog.Warningf("batch balance move %d failed: volume %d %s→%s: %v", r.index, r.volumeID, r.source, r.target, r.err)
} else {
succeeded++
successDetails = append(successDetails, fmt.Sprintf("volume %d (%s→%s)", r.volumeID, r.source, r.target))
}
}
summary := fmt.Sprintf("%d/%d volumes moved successfully", succeeded, totalMoves)
if failed > 0 {
summary += fmt.Sprintf("; %d failed", failed)
}
success := failed == 0
var errMsg string
if !success {
errMsg = strings.Join(errMessages, "; ")
}
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
Success: success,
ErrorMessage: errMsg,
Result: &plugin_pb.JobResult{
Summary: summary,
OutputValues: map[string]*plugin_pb.ConfigValue{
"total_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalMoves)},
},
"succeeded": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(succeeded)},
},
"failed": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(failed)},
},
},
},
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity("completed", summary),
},
})
}
// buildMoveTaskParams constructs a TaskParams for a single move within a batch.
func buildMoveTaskParams(move *worker_pb.BalanceMoveSpec, timeoutSeconds int32) *worker_pb.TaskParams {
if timeoutSeconds <= 0 {
timeoutSeconds = defaultBalanceTimeoutSeconds
}
return &worker_pb.TaskParams{
VolumeId: move.VolumeId,
Collection: move.Collection,
VolumeSize: move.VolumeSize,
Sources: []*worker_pb.TaskSource{
{Node: move.SourceNode, VolumeId: move.VolumeId},
},
Targets: []*worker_pb.TaskTarget{
{Node: move.TargetNode, VolumeId: move.VolumeId},
},
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
TimeoutSeconds: timeoutSeconds,
},
},
}
}
func (h *VolumeBalanceHandler) collectVolumeMetrics(
ctx context.Context,
masterAddresses []string,

192
weed/plugin/worker/volume_balance_handler_test.go

@ -3,6 +3,7 @@ package pluginworker
import (
"context"
"strings"
"sync"
"testing"
"time"
@ -265,6 +266,197 @@ func TestVolumeBalanceDescriptorOmitsExecutionTuningFields(t *testing.T) {
}
}
type recordingExecutionSender struct {
mu sync.Mutex
progress []*plugin_pb.JobProgressUpdate
completed *plugin_pb.JobCompleted
}
func (r *recordingExecutionSender) SendProgress(p *plugin_pb.JobProgressUpdate) error {
r.mu.Lock()
defer r.mu.Unlock()
r.progress = append(r.progress, p)
return nil
}
func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) error {
r.mu.Lock()
defer r.mu.Unlock()
r.completed = c
return nil
}
func TestBuildMoveTaskParams(t *testing.T) {
move := &worker_pb.BalanceMoveSpec{
VolumeId: 42,
SourceNode: "10.0.0.1:8080",
TargetNode: "10.0.0.2:8080",
Collection: "photos",
VolumeSize: 1024 * 1024,
}
params := buildMoveTaskParams(move, 300)
if params.VolumeId != 42 {
t.Fatalf("expected volume_id 42, got %d", params.VolumeId)
}
if params.Collection != "photos" {
t.Fatalf("expected collection photos, got %s", params.Collection)
}
if params.VolumeSize != 1024*1024 {
t.Fatalf("expected volume_size %d, got %d", 1024*1024, params.VolumeSize)
}
if len(params.Sources) != 1 || params.Sources[0].Node != "10.0.0.1:8080" {
t.Fatalf("unexpected sources: %+v", params.Sources)
}
if len(params.Targets) != 1 || params.Targets[0].Node != "10.0.0.2:8080" {
t.Fatalf("unexpected targets: %+v", params.Targets)
}
bp := params.GetBalanceParams()
if bp == nil {
t.Fatalf("expected balance params")
}
if bp.TimeoutSeconds != 300 {
t.Fatalf("expected timeout 300, got %d", bp.TimeoutSeconds)
}
}
func TestBuildMoveTaskParamsDefaultTimeout(t *testing.T) {
move := &worker_pb.BalanceMoveSpec{
VolumeId: 1,
SourceNode: "a:8080",
TargetNode: "b:8080",
}
params := buildMoveTaskParams(move, 0)
if params.GetBalanceParams().TimeoutSeconds != defaultBalanceTimeoutSeconds {
t.Fatalf("expected default timeout %d, got %d", defaultBalanceTimeoutSeconds, params.GetBalanceParams().TimeoutSeconds)
}
}
func TestExecuteDispatchesBatchPath(t *testing.T) {
// Build a job with batch moves in BalanceTaskParams
bp := &worker_pb.BalanceTaskParams{
TimeoutSeconds: 60,
MaxConcurrentMoves: 2,
Moves: []*worker_pb.BalanceMoveSpec{
{VolumeId: 1, SourceNode: "s1:8080", TargetNode: "t1:8080", Collection: "c1"},
{VolumeId: 2, SourceNode: "s2:8080", TargetNode: "t2:8080", Collection: "c1"},
},
}
taskParams := &worker_pb.TaskParams{
TaskId: "batch-1",
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: bp,
},
}
payload, err := proto.Marshal(taskParams)
if err != nil {
t.Fatalf("marshal: %v", err)
}
job := &plugin_pb.JobSpec{
JobId: "batch-job-1",
JobType: "volume_balance",
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}},
},
}
handler := NewVolumeBalanceHandler(nil)
sender := &recordingExecutionSender{}
// Execute will enter the batch path. It will fail because there's no real gRPC server,
// but we verify it sends the assigned progress and eventually a completion.
err = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{
Job: job,
}, sender)
// Expect an error since no real volume servers exist
// But verify the batch path was taken by checking the assigned message
sender.mu.Lock()
defer sender.mu.Unlock()
if len(sender.progress) == 0 {
t.Fatalf("expected progress messages from batch path")
}
// First progress should be "assigned" with batch info
first := sender.progress[0]
if first.Stage != "assigned" {
t.Fatalf("expected first stage 'assigned', got %q", first.Stage)
}
if !strings.Contains(first.Message, "batch") || !strings.Contains(first.Message, "2 moves") {
t.Fatalf("expected batch assigned message, got %q", first.Message)
}
// Should have a completion with failure details (since no servers)
if sender.completed == nil {
t.Fatalf("expected completion message")
}
if sender.completed.Success {
t.Fatalf("expected failure since no real gRPC servers")
}
// Should report 0 succeeded, 2 failed
if v, ok := sender.completed.Result.OutputValues["failed"]; !ok || v.GetInt64Value() != 2 {
t.Fatalf("expected 2 failed moves, got %+v", sender.completed.Result.OutputValues)
}
}
func TestExecuteSingleMovePathUnchanged(t *testing.T) {
// Build a single-move job (no batch moves)
taskParams := &worker_pb.TaskParams{
TaskId: "single-1",
VolumeId: 99,
Collection: "videos",
Sources: []*worker_pb.TaskSource{
{Node: "src:8080", VolumeId: 99},
},
Targets: []*worker_pb.TaskTarget{
{Node: "dst:8080", VolumeId: 99},
},
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
TimeoutSeconds: 60,
},
},
}
payload, err := proto.Marshal(taskParams)
if err != nil {
t.Fatalf("marshal: %v", err)
}
job := &plugin_pb.JobSpec{
JobId: "single-job-1",
JobType: "volume_balance",
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}},
},
}
handler := NewVolumeBalanceHandler(nil)
sender := &recordingExecutionSender{}
// Execute single-move path. Will fail on gRPC but verify it takes the single-move path.
_ = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{
Job: job,
}, sender)
sender.mu.Lock()
defer sender.mu.Unlock()
if len(sender.progress) == 0 {
t.Fatalf("expected progress messages")
}
// Single-move path sends "volume balance job accepted" not "batch volume balance"
first := sender.progress[0]
if first.Stage != "assigned" {
t.Fatalf("expected first stage 'assigned', got %q", first.Stage)
}
if strings.Contains(first.Message, "batch") {
t.Fatalf("single-move path should not mention batch, got %q", first.Message)
}
}
func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool {
if form == nil {
return false

Loading…
Cancel
Save