diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 1a25c72af..ae1b51267 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -36,6 +36,8 @@ func init() { type volumeBalanceWorkerConfig struct { TaskConfig *balancetask.Config MinIntervalSeconds int + MaxConcurrentMoves int + BatchSize int } // VolumeBalanceHandler is the plugin job handler for volume balancing. @@ -134,6 +136,31 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, }, + { + SectionId: "batch_execution", + Title: "Batch Execution", + Description: "Controls for running multiple volume moves per job. The worker coordinates moves via gRPC and is not on the data path.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "max_concurrent_moves", + Label: "Max Concurrent Moves", + Description: "Maximum number of volume moves to run concurrently within a single batch job.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}}, + }, + { + Name: "batch_size", + Label: "Batch Size", + Description: "Maximum number of volume moves to group into a single job. Set to 1 to disable batching.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 100}}, + }, + }, + }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": { @@ -145,6 +172,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "min_interval_seconds": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60}, }, + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20}, + }, }, }, AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ @@ -168,6 +201,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "min_interval_seconds": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60}, }, + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20}, + }, }, } } @@ -848,9 +887,27 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume minIntervalSeconds = 0 } + maxConcurrentMoves := int(readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves))) + if maxConcurrentMoves < 1 { + maxConcurrentMoves = 1 + } + if maxConcurrentMoves > 50 { + maxConcurrentMoves = 50 + } + + batchSize := int(readInt64Config(values, "batch_size", 20)) + if batchSize < 1 { + batchSize = 1 + } + if batchSize > 100 { + batchSize = 100 + } + return &volumeBalanceWorkerConfig{ TaskConfig: taskConfig, MinIntervalSeconds: minIntervalSeconds, + MaxConcurrentMoves: maxConcurrentMoves, + BatchSize: batchSize, } } diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index 7263ed790..190c8cb0b 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -112,6 +112,61 @@ func TestDeriveBalanceWorkerConfig(t *testing.T) { if cfg.MinIntervalSeconds != 33 { t.Fatalf("expected min_interval_seconds 33, got %d", cfg.MinIntervalSeconds) } + // Defaults for batch config when not specified + if cfg.MaxConcurrentMoves != defaultMaxConcurrentMoves { + t.Fatalf("expected default max_concurrent_moves %d, got %d", defaultMaxConcurrentMoves, cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 20 { + t.Fatalf("expected default batch_size 20, got %d", cfg.BatchSize) + } +} + +func TestDeriveBalanceWorkerConfigBatchFields(t *testing.T) { + values := map[string]*plugin_pb.ConfigValue{ + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}, + }, + } + + cfg := deriveBalanceWorkerConfig(values) + if cfg.MaxConcurrentMoves != 10 { + t.Fatalf("expected max_concurrent_moves 10, got %d", cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 50 { + t.Fatalf("expected batch_size 50, got %d", cfg.BatchSize) + } +} + +func TestDeriveBalanceWorkerConfigBatchClamping(t *testing.T) { + values := map[string]*plugin_pb.ConfigValue{ + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 999}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}, + }, + } + + cfg := deriveBalanceWorkerConfig(values) + if cfg.MaxConcurrentMoves != 50 { + t.Fatalf("expected max_concurrent_moves clamped to 50, got %d", cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 1 { + t.Fatalf("expected batch_size clamped to 1, got %d", cfg.BatchSize) + } +} + +func TestVolumeBalanceDescriptorHasBatchFields(t *testing.T) { + descriptor := NewVolumeBalanceHandler(nil).Descriptor() + if !workerConfigFormHasField(descriptor.WorkerConfigForm, "max_concurrent_moves") { + t.Fatalf("expected max_concurrent_moves in worker config form") + } + if !workerConfigFormHasField(descriptor.WorkerConfigForm, "batch_size") { + t.Fatalf("expected batch_size in worker config form") + } } func TestBuildVolumeBalanceProposal(t *testing.T) {