Browse Source

balance handler: add batch config fields to Descriptor and worker config

Add max_concurrent_moves and batch_size fields to the worker config
form and deriveBalanceWorkerConfig(). These control how many volume
moves run concurrently within a batch job and the maximum batch size.
feat/batch-volume-balance
Chris Lu 23 hours ago
parent
commit
6f0a1cfd0e
  1. 57
      weed/plugin/worker/volume_balance_handler.go
  2. 55
      weed/plugin/worker/volume_balance_handler_test.go

57
weed/plugin/worker/volume_balance_handler.go

@ -36,6 +36,8 @@ func init() {
type volumeBalanceWorkerConfig struct {
TaskConfig *balancetask.Config
MinIntervalSeconds int
MaxConcurrentMoves int
BatchSize int
}
// VolumeBalanceHandler is the plugin job handler for volume balancing.
@ -134,6 +136,31 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
},
},
},
{
SectionId: "batch_execution",
Title: "Batch Execution",
Description: "Controls for running multiple volume moves per job. The worker coordinates moves via gRPC and is not on the data path.",
Fields: []*plugin_pb.ConfigField{
{
Name: "max_concurrent_moves",
Label: "Max Concurrent Moves",
Description: "Maximum number of volume moves to run concurrently within a single batch job.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}},
},
{
Name: "batch_size",
Label: "Batch Size",
Description: "Maximum number of volume moves to group into a single job. Set to 1 to disable batching.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 100}},
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {
@ -145,6 +172,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
@ -168,6 +201,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20},
},
},
}
}
@ -848,9 +887,27 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume
minIntervalSeconds = 0
}
maxConcurrentMoves := int(readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves)))
if maxConcurrentMoves < 1 {
maxConcurrentMoves = 1
}
if maxConcurrentMoves > 50 {
maxConcurrentMoves = 50
}
batchSize := int(readInt64Config(values, "batch_size", 20))
if batchSize < 1 {
batchSize = 1
}
if batchSize > 100 {
batchSize = 100
}
return &volumeBalanceWorkerConfig{
TaskConfig: taskConfig,
MinIntervalSeconds: minIntervalSeconds,
MaxConcurrentMoves: maxConcurrentMoves,
BatchSize: batchSize,
}
}

55
weed/plugin/worker/volume_balance_handler_test.go

@ -112,6 +112,61 @@ func TestDeriveBalanceWorkerConfig(t *testing.T) {
if cfg.MinIntervalSeconds != 33 {
t.Fatalf("expected min_interval_seconds 33, got %d", cfg.MinIntervalSeconds)
}
// Defaults for batch config when not specified
if cfg.MaxConcurrentMoves != defaultMaxConcurrentMoves {
t.Fatalf("expected default max_concurrent_moves %d, got %d", defaultMaxConcurrentMoves, cfg.MaxConcurrentMoves)
}
if cfg.BatchSize != 20 {
t.Fatalf("expected default batch_size 20, got %d", cfg.BatchSize)
}
}
func TestDeriveBalanceWorkerConfigBatchFields(t *testing.T) {
values := map[string]*plugin_pb.ConfigValue{
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50},
},
}
cfg := deriveBalanceWorkerConfig(values)
if cfg.MaxConcurrentMoves != 10 {
t.Fatalf("expected max_concurrent_moves 10, got %d", cfg.MaxConcurrentMoves)
}
if cfg.BatchSize != 50 {
t.Fatalf("expected batch_size 50, got %d", cfg.BatchSize)
}
}
func TestDeriveBalanceWorkerConfigBatchClamping(t *testing.T) {
values := map[string]*plugin_pb.ConfigValue{
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 999},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0},
},
}
cfg := deriveBalanceWorkerConfig(values)
if cfg.MaxConcurrentMoves != 50 {
t.Fatalf("expected max_concurrent_moves clamped to 50, got %d", cfg.MaxConcurrentMoves)
}
if cfg.BatchSize != 1 {
t.Fatalf("expected batch_size clamped to 1, got %d", cfg.BatchSize)
}
}
func TestVolumeBalanceDescriptorHasBatchFields(t *testing.T) {
descriptor := NewVolumeBalanceHandler(nil).Descriptor()
if !workerConfigFormHasField(descriptor.WorkerConfigForm, "max_concurrent_moves") {
t.Fatalf("expected max_concurrent_moves in worker config form")
}
if !workerConfigFormHasField(descriptor.WorkerConfigForm, "batch_size") {
t.Fatalf("expected batch_size in worker config form")
}
}
func TestBuildVolumeBalanceProposal(t *testing.T) {

Loading…
Cancel
Save