From 765d507fb687d98c78d4932cf109ec2bce2fe619 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 22:56:00 -0700 Subject: [PATCH] balance handler: group detection proposals into batch jobs When batch_size > 1, the Detect method groups detection results into batch proposals where each proposal encodes multiple BalanceMoveSpec entries in BalanceTaskParams.Moves. Single-result batches fall back to the existing single-move proposal format for backward compatibility. --- weed/plugin/worker/volume_balance_handler.go | 149 +++++++++++++++++- .../worker/volume_balance_handler_test.go | 107 +++++++++++++ 2 files changed, 249 insertions(+), 7 deletions(-) diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index ae1b51267..327742b7a 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -274,14 +274,19 @@ func (h *VolumeBalanceHandler) Detect( glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } - proposals := make([]*plugin_pb.JobProposal, 0, len(results)) - for _, result := range results { - proposal, proposalErr := buildVolumeBalanceProposal(result) - if proposalErr != nil { - glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr) - continue + var proposals []*plugin_pb.JobProposal + if workerConfig.BatchSize > 1 && len(results) > 1 { + proposals = buildBatchVolumeBalanceProposals(results, workerConfig.BatchSize, workerConfig.MaxConcurrentMoves) + } else { + proposals = make([]*plugin_pb.JobProposal, 0, len(results)) + for _, result := range results { + proposal, proposalErr := buildVolumeBalanceProposal(result) + if proposalErr != nil { + glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr) + continue + } + proposals = append(proposals, proposal) } - proposals = append(proposals, proposal) } if err := sender.SendProposals(&plugin_pb.DetectionProposals{ @@ -989,6 +994,136 @@ func buildVolumeBalanceProposal( }, nil } +// buildBatchVolumeBalanceProposals groups detection results into batch proposals. +// Each batch proposal encodes multiple moves in BalanceTaskParams.Moves. +func buildBatchVolumeBalanceProposals( + results []*workertypes.TaskDetectionResult, + batchSize int, + maxConcurrentMoves int, +) []*plugin_pb.JobProposal { + if batchSize <= 0 { + batchSize = 1 + } + if maxConcurrentMoves <= 0 { + maxConcurrentMoves = defaultMaxConcurrentMoves + } + + var proposals []*plugin_pb.JobProposal + + for batchStart := 0; batchStart < len(results); batchStart += batchSize { + batchEnd := batchStart + batchSize + if batchEnd > len(results) { + batchEnd = len(results) + } + batch := results[batchStart:batchEnd] + + // If only one result in this batch, emit a single-move proposal + if len(batch) == 1 { + proposal, err := buildVolumeBalanceProposal(batch[0]) + if err != nil { + glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", err) + continue + } + proposals = append(proposals, proposal) + continue + } + + // Build batch proposal with BalanceMoveSpec entries + moves := make([]*worker_pb.BalanceMoveSpec, 0, len(batch)) + var volumeIDs []string + var dedupeKeys []string + highestPriority := workertypes.TaskPriorityLow + + for _, result := range batch { + if result == nil || result.TypedParams == nil { + continue + } + sourceNode := "" + targetNode := "" + if len(result.TypedParams.Sources) > 0 { + sourceNode = result.TypedParams.Sources[0].Node + } + if len(result.TypedParams.Targets) > 0 { + targetNode = result.TypedParams.Targets[0].Node + } + moves = append(moves, &worker_pb.BalanceMoveSpec{ + VolumeId: uint32(result.VolumeID), + SourceNode: sourceNode, + TargetNode: targetNode, + Collection: result.Collection, + VolumeSize: result.TypedParams.VolumeSize, + }) + volumeIDs = append(volumeIDs, fmt.Sprintf("%d", result.VolumeID)) + + dedupeKey := fmt.Sprintf("volume_balance:%d", result.VolumeID) + if result.Collection != "" { + dedupeKey += ":" + result.Collection + } + dedupeKeys = append(dedupeKeys, dedupeKey) + + if result.Priority > highestPriority { + highestPriority = result.Priority + } + } + + if len(moves) == 0 { + continue + } + + // Serialize batch params + taskParams := &worker_pb.TaskParams{ + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: defaultBalanceTimeoutSeconds, + MaxConcurrentMoves: int32(maxConcurrentMoves), + Moves: moves, + }, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + glog.Warningf("Plugin worker failed to marshal batch balance proposal: %v", err) + continue + } + + proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano()) + summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ",")) + if len(summary) > 200 { + summary = fmt.Sprintf("Batch balance %d volumes", len(moves)) + } + + // Use composite dedupe key for the batch + compositeDedupeKey := fmt.Sprintf("volume_balance_batch:%s", strings.Join(dedupeKeys, "+")) + if len(compositeDedupeKey) > 200 { + compositeDedupeKey = fmt.Sprintf("volume_balance_batch:%d-%d", batchStart, time.Now().UnixNano()) + } + + proposals = append(proposals, &plugin_pb.JobProposal{ + ProposalId: proposalID, + DedupeKey: compositeDedupeKey, + JobType: "volume_balance", + Priority: mapTaskPriority(highestPriority), + Summary: summary, + Detail: fmt.Sprintf("Batch of %d volume moves with concurrency %d", len(moves), maxConcurrentMoves), + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": { + Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))}, + }, + }, + Labels: map[string]string{ + "task_type": "balance", + "batch": "true", + "batch_size": fmt.Sprintf("%d", len(moves)), + }, + }) + } + + return proposals +} + func decodeVolumeBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) { if job == nil { return nil, fmt.Errorf("job spec is nil") diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index 190c8cb0b..c08902c7b 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -2,6 +2,7 @@ package pluginworker import ( "context" + "fmt" "strings" "sync" "testing" @@ -159,6 +160,112 @@ func TestDeriveBalanceWorkerConfigBatchClamping(t *testing.T) { } } +func makeDetectionResult(volumeID uint32, source, target, collection string) *workertypes.TaskDetectionResult { + return &workertypes.TaskDetectionResult{ + TaskID: fmt.Sprintf("balance-%d", volumeID), + TaskType: workertypes.TaskTypeBalance, + VolumeID: volumeID, + Server: source, + Collection: collection, + Priority: workertypes.TaskPriorityNormal, + Reason: "imbalanced", + TypedParams: &worker_pb.TaskParams{ + VolumeId: volumeID, + Collection: collection, + VolumeSize: 1024, + Sources: []*worker_pb.TaskSource{ + {Node: source, VolumeId: volumeID}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: target, VolumeId: volumeID}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{TimeoutSeconds: 600}, + }, + }, + } +} + +func TestBuildBatchVolumeBalanceProposals_SingleBatch(t *testing.T) { + results := []*workertypes.TaskDetectionResult{ + makeDetectionResult(1, "s1:8080", "t1:8080", "c1"), + makeDetectionResult(2, "s2:8080", "t2:8080", "c1"), + makeDetectionResult(3, "s1:8080", "t2:8080", "c1"), + } + + proposals := buildBatchVolumeBalanceProposals(results, 10, 5) + if len(proposals) != 1 { + t.Fatalf("expected 1 batch proposal, got %d", len(proposals)) + } + + p := proposals[0] + if p.Labels["batch"] != "true" { + t.Fatalf("expected batch label") + } + if p.Labels["batch_size"] != "3" { + t.Fatalf("expected batch_size label '3', got %q", p.Labels["batch_size"]) + } + + // Decode and verify moves + payload := p.Parameters["task_params_pb"].GetBytesValue() + if len(payload) == 0 { + t.Fatalf("expected task_params_pb payload") + } + decoded := &worker_pb.TaskParams{} + if err := proto.Unmarshal(payload, decoded); err != nil { + t.Fatalf("unmarshal: %v", err) + } + moves := decoded.GetBalanceParams().GetMoves() + if len(moves) != 3 { + t.Fatalf("expected 3 moves, got %d", len(moves)) + } + if moves[0].VolumeId != 1 || moves[1].VolumeId != 2 || moves[2].VolumeId != 3 { + t.Fatalf("unexpected volume IDs: %v", moves) + } + if decoded.GetBalanceParams().MaxConcurrentMoves != 5 { + t.Fatalf("expected MaxConcurrentMoves 5, got %d", decoded.GetBalanceParams().MaxConcurrentMoves) + } +} + +func TestBuildBatchVolumeBalanceProposals_MultipleBatches(t *testing.T) { + results := make([]*workertypes.TaskDetectionResult, 5) + for i := range results { + results[i] = makeDetectionResult(uint32(i+1), "s1:8080", "t1:8080", "c1") + } + + proposals := buildBatchVolumeBalanceProposals(results, 2, 3) + // 5 results / batch_size 2 = 3 proposals (2, 2, 1) + if len(proposals) != 3 { + t.Fatalf("expected 3 proposals, got %d", len(proposals)) + } + + // First two should be batch proposals + if proposals[0].Labels["batch"] != "true" { + t.Fatalf("first proposal should be batch") + } + if proposals[1].Labels["batch"] != "true" { + t.Fatalf("second proposal should be batch") + } + // Last one has only 1 result, should fall back to single-move proposal + if proposals[2].Labels["batch"] == "true" { + t.Fatalf("last proposal with 1 result should be single-move, not batch") + } +} + +func TestBuildBatchVolumeBalanceProposals_BatchSizeOne(t *testing.T) { + results := []*workertypes.TaskDetectionResult{ + makeDetectionResult(1, "s1:8080", "t1:8080", "c1"), + makeDetectionResult(2, "s2:8080", "t2:8080", "c1"), + } + + // batch_size=1 should not be called (Detect guards this), but test the function directly + proposals := buildBatchVolumeBalanceProposals(results, 1, 5) + // Each result becomes its own single-move proposal + if len(proposals) != 2 { + t.Fatalf("expected 2 proposals, got %d", len(proposals)) + } +} + func TestVolumeBalanceDescriptorHasBatchFields(t *testing.T) { descriptor := NewVolumeBalanceHandler(nil).Descriptor() if !workerConfigFormHasField(descriptor.WorkerConfigForm, "max_concurrent_moves") {