diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index 2b9e8fe27..a75297416 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -1026,6 +1026,9 @@ templ Plugin(page string) { } var jobType = String(plan.job_type || '').trim().toLowerCase(); + if (jobType === 'volume_balance') { + return renderBalanceExecutionPlan(plan); + } if (jobType !== 'erasure_coding') { var fallbackText = toPrettyJson(plan); if (!fallbackText) { @@ -1107,6 +1110,44 @@ templ Plugin(page string) { return html; } + function renderBalanceExecutionPlan(plan) { + var html = '
Execution Plan
'; + var moves = Array.isArray(plan.moves) ? plan.moves : []; + + if (moves.length === 0) { + // Single-move balance job + var src = textOrDash(plan.source_node || plan.source_server); + var dst = textOrDash(plan.target_node || plan.target_server); + var vid = textOrDash(plan.volume_id); + var col = textOrDash(plan.collection); + html += '
' + + '
Volume: ' + escapeHtml(vid) + '
' + + '
Collection: ' + escapeHtml(col) + '
' + + '
Source: ' + escapeHtml(src) + '
' + + '
Target: ' + escapeHtml(dst) + '
' + + '
'; + } else { + // Batch balance job + html += '
' + escapeHtml(String(moves.length)) + ' moves
'; + html += '
' + + ''; + for (var i = 0; i < moves.length; i++) { + var move = moves[i] || {}; + html += '' + + '' + + '' + + '' + + '' + + '' + + ''; + } + html += '
#VolumeSourceTargetCollection
' + escapeHtml(String(i + 1)) + '' + escapeHtml(textOrDash(move.volume_id)) + '' + escapeHtml(textOrDash(move.source_node)) + '' + escapeHtml(textOrDash(move.target_node)) + '' + escapeHtml(textOrDash(move.collection)) + '
'; + } + + html += '
'; + return html; + } + function isActiveJobState(candidateState) { var jobState = candidateState; if (candidateState && typeof candidateState === 'object' && candidateState.state !== undefined) { @@ -1676,9 +1717,15 @@ templ Plugin(page string) { barClass = 'bg-warning'; } + var jobTypeCell = escapeHtml(textOrDash(executionJob.job_type)); + var execLabels = executionJob.labels || {}; + if (execLabels.batch === 'true' && execLabels.batch_size) { + jobTypeCell += ' ' + escapeHtml(execLabels.batch_size) + ' moves'; + } + rows += '' + '' + renderJobLink(executionJob.job_id) + '' + - '' + escapeHtml(textOrDash(executionJob.job_type)) + '' + + '' + jobTypeCell + '' + '' + escapeHtml(textOrDash(executionJob.state)) + '' + '
' + Math.round(progress) + '%
' + '' + escapeHtml(textOrDash(executionJob.worker_id)) + '' + diff --git a/weed/admin/view/app/plugin_templ.go b/weed/admin/view/app/plugin_templ.go index cd9ae58d2..77d7636cc 100644 --- a/weed/admin/view/app/plugin_templ.go +++ b/weed/admin/view/app/plugin_templ.go @@ -46,7 +46,7 @@ func Plugin(page string) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto index bfb775151..9f9684473 100644 --- a/weed/pb/worker.proto +++ b/weed/pb/worker.proto @@ -157,10 +157,21 @@ message TaskTarget { +// BalanceMoveSpec describes a single volume move within a batch balance job +message BalanceMoveSpec { + uint32 volume_id = 1; // Volume to move + string source_node = 2; // Source server address (host:port) + string target_node = 3; // Destination server address (host:port) + string collection = 4; // Collection name + uint64 volume_size = 5; // Volume size in bytes (informational) +} + // BalanceTaskParams for volume balancing operations message BalanceTaskParams { bool force_move = 1; // Force move even with conflicts int32 timeout_seconds = 2; // Operation timeout + int32 max_concurrent_moves = 3; // Max concurrent moves in a batch job (0 = default 5) + repeated BalanceMoveSpec moves = 4; // Batch: multiple volume moves in one job } // ReplicationTaskParams for adding replicas diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go index 1502f7867..7677b0837 100644 --- a/weed/pb/worker_pb/worker.pb.go +++ b/weed/pb/worker_pb/worker.pb.go @@ -1331,18 +1331,97 @@ func (x *TaskTarget) GetEstimatedSize() uint64 { return 0 } +// BalanceMoveSpec describes a single volume move within a batch balance job +type BalanceMoveSpec struct { + state protoimpl.MessageState `protogen:"open.v1"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` // Volume to move + SourceNode string `protobuf:"bytes,2,opt,name=source_node,json=sourceNode,proto3" json:"source_node,omitempty"` // Source server address (host:port) + TargetNode string `protobuf:"bytes,3,opt,name=target_node,json=targetNode,proto3" json:"target_node,omitempty"` // Destination server address (host:port) + Collection string `protobuf:"bytes,4,opt,name=collection,proto3" json:"collection,omitempty"` // Collection name + VolumeSize uint64 `protobuf:"varint,5,opt,name=volume_size,json=volumeSize,proto3" json:"volume_size,omitempty"` // Volume size in bytes (informational) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BalanceMoveSpec) Reset() { + *x = BalanceMoveSpec{} + mi := &file_worker_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BalanceMoveSpec) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BalanceMoveSpec) ProtoMessage() {} + +func (x *BalanceMoveSpec) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BalanceMoveSpec.ProtoReflect.Descriptor instead. +func (*BalanceMoveSpec) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{13} +} + +func (x *BalanceMoveSpec) GetVolumeId() uint32 { + if x != nil { + return x.VolumeId + } + return 0 +} + +func (x *BalanceMoveSpec) GetSourceNode() string { + if x != nil { + return x.SourceNode + } + return "" +} + +func (x *BalanceMoveSpec) GetTargetNode() string { + if x != nil { + return x.TargetNode + } + return "" +} + +func (x *BalanceMoveSpec) GetCollection() string { + if x != nil { + return x.Collection + } + return "" +} + +func (x *BalanceMoveSpec) GetVolumeSize() uint64 { + if x != nil { + return x.VolumeSize + } + return 0 +} + // BalanceTaskParams for volume balancing operations type BalanceTaskParams struct { - state protoimpl.MessageState `protogen:"open.v1"` - ForceMove bool `protobuf:"varint,1,opt,name=force_move,json=forceMove,proto3" json:"force_move,omitempty"` // Force move even with conflicts - TimeoutSeconds int32 `protobuf:"varint,2,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` // Operation timeout - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + ForceMove bool `protobuf:"varint,1,opt,name=force_move,json=forceMove,proto3" json:"force_move,omitempty"` // Force move even with conflicts + TimeoutSeconds int32 `protobuf:"varint,2,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` // Operation timeout + MaxConcurrentMoves int32 `protobuf:"varint,3,opt,name=max_concurrent_moves,json=maxConcurrentMoves,proto3" json:"max_concurrent_moves,omitempty"` // Max concurrent moves in a batch job (0 = default 5) + Moves []*BalanceMoveSpec `protobuf:"bytes,4,rep,name=moves,proto3" json:"moves,omitempty"` // Batch: multiple volume moves in one job + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *BalanceTaskParams) Reset() { *x = BalanceTaskParams{} - mi := &file_worker_proto_msgTypes[13] + mi := &file_worker_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1354,7 +1433,7 @@ func (x *BalanceTaskParams) String() string { func (*BalanceTaskParams) ProtoMessage() {} func (x *BalanceTaskParams) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[13] + mi := &file_worker_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1367,7 +1446,7 @@ func (x *BalanceTaskParams) ProtoReflect() protoreflect.Message { // Deprecated: Use BalanceTaskParams.ProtoReflect.Descriptor instead. func (*BalanceTaskParams) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{13} + return file_worker_proto_rawDescGZIP(), []int{14} } func (x *BalanceTaskParams) GetForceMove() bool { @@ -1384,6 +1463,20 @@ func (x *BalanceTaskParams) GetTimeoutSeconds() int32 { return 0 } +func (x *BalanceTaskParams) GetMaxConcurrentMoves() int32 { + if x != nil { + return x.MaxConcurrentMoves + } + return 0 +} + +func (x *BalanceTaskParams) GetMoves() []*BalanceMoveSpec { + if x != nil { + return x.Moves + } + return nil +} + // ReplicationTaskParams for adding replicas type ReplicationTaskParams struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1395,7 +1488,7 @@ type ReplicationTaskParams struct { func (x *ReplicationTaskParams) Reset() { *x = ReplicationTaskParams{} - mi := &file_worker_proto_msgTypes[14] + mi := &file_worker_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1407,7 +1500,7 @@ func (x *ReplicationTaskParams) String() string { func (*ReplicationTaskParams) ProtoMessage() {} func (x *ReplicationTaskParams) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[14] + mi := &file_worker_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1420,7 +1513,7 @@ func (x *ReplicationTaskParams) ProtoReflect() protoreflect.Message { // Deprecated: Use ReplicationTaskParams.ProtoReflect.Descriptor instead. func (*ReplicationTaskParams) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{14} + return file_worker_proto_rawDescGZIP(), []int{15} } func (x *ReplicationTaskParams) GetReplicaCount() int32 { @@ -1452,7 +1545,7 @@ type TaskUpdate struct { func (x *TaskUpdate) Reset() { *x = TaskUpdate{} - mi := &file_worker_proto_msgTypes[15] + mi := &file_worker_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1464,7 +1557,7 @@ func (x *TaskUpdate) String() string { func (*TaskUpdate) ProtoMessage() {} func (x *TaskUpdate) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[15] + mi := &file_worker_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1477,7 +1570,7 @@ func (x *TaskUpdate) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskUpdate.ProtoReflect.Descriptor instead. func (*TaskUpdate) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{15} + return file_worker_proto_rawDescGZIP(), []int{16} } func (x *TaskUpdate) GetTaskId() string { @@ -1537,7 +1630,7 @@ type TaskComplete struct { func (x *TaskComplete) Reset() { *x = TaskComplete{} - mi := &file_worker_proto_msgTypes[16] + mi := &file_worker_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1549,7 +1642,7 @@ func (x *TaskComplete) String() string { func (*TaskComplete) ProtoMessage() {} func (x *TaskComplete) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[16] + mi := &file_worker_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1562,7 +1655,7 @@ func (x *TaskComplete) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskComplete.ProtoReflect.Descriptor instead. func (*TaskComplete) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{16} + return file_worker_proto_rawDescGZIP(), []int{17} } func (x *TaskComplete) GetTaskId() string { @@ -1619,7 +1712,7 @@ type TaskCancellation struct { func (x *TaskCancellation) Reset() { *x = TaskCancellation{} - mi := &file_worker_proto_msgTypes[17] + mi := &file_worker_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1631,7 +1724,7 @@ func (x *TaskCancellation) String() string { func (*TaskCancellation) ProtoMessage() {} func (x *TaskCancellation) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[17] + mi := &file_worker_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1644,7 +1737,7 @@ func (x *TaskCancellation) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskCancellation.ProtoReflect.Descriptor instead. func (*TaskCancellation) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{17} + return file_worker_proto_rawDescGZIP(), []int{18} } func (x *TaskCancellation) GetTaskId() string { @@ -1680,7 +1773,7 @@ type WorkerShutdown struct { func (x *WorkerShutdown) Reset() { *x = WorkerShutdown{} - mi := &file_worker_proto_msgTypes[18] + mi := &file_worker_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1692,7 +1785,7 @@ func (x *WorkerShutdown) String() string { func (*WorkerShutdown) ProtoMessage() {} func (x *WorkerShutdown) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[18] + mi := &file_worker_proto_msgTypes[19] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1705,7 +1798,7 @@ func (x *WorkerShutdown) ProtoReflect() protoreflect.Message { // Deprecated: Use WorkerShutdown.ProtoReflect.Descriptor instead. func (*WorkerShutdown) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{18} + return file_worker_proto_rawDescGZIP(), []int{19} } func (x *WorkerShutdown) GetWorkerId() string { @@ -1740,7 +1833,7 @@ type AdminShutdown struct { func (x *AdminShutdown) Reset() { *x = AdminShutdown{} - mi := &file_worker_proto_msgTypes[19] + mi := &file_worker_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1752,7 +1845,7 @@ func (x *AdminShutdown) String() string { func (*AdminShutdown) ProtoMessage() {} func (x *AdminShutdown) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[19] + mi := &file_worker_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1765,7 +1858,7 @@ func (x *AdminShutdown) ProtoReflect() protoreflect.Message { // Deprecated: Use AdminShutdown.ProtoReflect.Descriptor instead. func (*AdminShutdown) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{19} + return file_worker_proto_rawDescGZIP(), []int{20} } func (x *AdminShutdown) GetReason() string { @@ -1798,7 +1891,7 @@ type TaskLogRequest struct { func (x *TaskLogRequest) Reset() { *x = TaskLogRequest{} - mi := &file_worker_proto_msgTypes[20] + mi := &file_worker_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1810,7 +1903,7 @@ func (x *TaskLogRequest) String() string { func (*TaskLogRequest) ProtoMessage() {} func (x *TaskLogRequest) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[20] + mi := &file_worker_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1823,7 +1916,7 @@ func (x *TaskLogRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogRequest.ProtoReflect.Descriptor instead. func (*TaskLogRequest) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{20} + return file_worker_proto_rawDescGZIP(), []int{21} } func (x *TaskLogRequest) GetTaskId() string { @@ -1890,7 +1983,7 @@ type TaskLogResponse struct { func (x *TaskLogResponse) Reset() { *x = TaskLogResponse{} - mi := &file_worker_proto_msgTypes[21] + mi := &file_worker_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1902,7 +1995,7 @@ func (x *TaskLogResponse) String() string { func (*TaskLogResponse) ProtoMessage() {} func (x *TaskLogResponse) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[21] + mi := &file_worker_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1915,7 +2008,7 @@ func (x *TaskLogResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogResponse.ProtoReflect.Descriptor instead. func (*TaskLogResponse) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{21} + return file_worker_proto_rawDescGZIP(), []int{22} } func (x *TaskLogResponse) GetTaskId() string { @@ -1983,7 +2076,7 @@ type TaskLogMetadata struct { func (x *TaskLogMetadata) Reset() { *x = TaskLogMetadata{} - mi := &file_worker_proto_msgTypes[22] + mi := &file_worker_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1995,7 +2088,7 @@ func (x *TaskLogMetadata) String() string { func (*TaskLogMetadata) ProtoMessage() {} func (x *TaskLogMetadata) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[22] + mi := &file_worker_proto_msgTypes[23] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2008,7 +2101,7 @@ func (x *TaskLogMetadata) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogMetadata.ProtoReflect.Descriptor instead. func (*TaskLogMetadata) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{22} + return file_worker_proto_rawDescGZIP(), []int{23} } func (x *TaskLogMetadata) GetTaskId() string { @@ -2124,7 +2217,7 @@ type TaskLogEntry struct { func (x *TaskLogEntry) Reset() { *x = TaskLogEntry{} - mi := &file_worker_proto_msgTypes[23] + mi := &file_worker_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2136,7 +2229,7 @@ func (x *TaskLogEntry) String() string { func (*TaskLogEntry) ProtoMessage() {} func (x *TaskLogEntry) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[23] + mi := &file_worker_proto_msgTypes[24] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2149,7 +2242,7 @@ func (x *TaskLogEntry) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogEntry.ProtoReflect.Descriptor instead. func (*TaskLogEntry) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{23} + return file_worker_proto_rawDescGZIP(), []int{24} } func (x *TaskLogEntry) GetTimestamp() int64 { @@ -2212,7 +2305,7 @@ type MaintenanceConfig struct { func (x *MaintenanceConfig) Reset() { *x = MaintenanceConfig{} - mi := &file_worker_proto_msgTypes[24] + mi := &file_worker_proto_msgTypes[25] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2224,7 +2317,7 @@ func (x *MaintenanceConfig) String() string { func (*MaintenanceConfig) ProtoMessage() {} func (x *MaintenanceConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[24] + mi := &file_worker_proto_msgTypes[25] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2237,7 +2330,7 @@ func (x *MaintenanceConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenanceConfig.ProtoReflect.Descriptor instead. func (*MaintenanceConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{24} + return file_worker_proto_rawDescGZIP(), []int{25} } func (x *MaintenanceConfig) GetEnabled() bool { @@ -2316,7 +2409,7 @@ type MaintenancePolicy struct { func (x *MaintenancePolicy) Reset() { *x = MaintenancePolicy{} - mi := &file_worker_proto_msgTypes[25] + mi := &file_worker_proto_msgTypes[26] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2328,7 +2421,7 @@ func (x *MaintenancePolicy) String() string { func (*MaintenancePolicy) ProtoMessage() {} func (x *MaintenancePolicy) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[25] + mi := &file_worker_proto_msgTypes[26] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2341,7 +2434,7 @@ func (x *MaintenancePolicy) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenancePolicy.ProtoReflect.Descriptor instead. func (*MaintenancePolicy) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{25} + return file_worker_proto_rawDescGZIP(), []int{26} } func (x *MaintenancePolicy) GetTaskPolicies() map[string]*TaskPolicy { @@ -2394,7 +2487,7 @@ type TaskPolicy struct { func (x *TaskPolicy) Reset() { *x = TaskPolicy{} - mi := &file_worker_proto_msgTypes[26] + mi := &file_worker_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2406,7 +2499,7 @@ func (x *TaskPolicy) String() string { func (*TaskPolicy) ProtoMessage() {} func (x *TaskPolicy) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[26] + mi := &file_worker_proto_msgTypes[27] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2419,7 +2512,7 @@ func (x *TaskPolicy) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskPolicy.ProtoReflect.Descriptor instead. func (*TaskPolicy) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{26} + return file_worker_proto_rawDescGZIP(), []int{27} } func (x *TaskPolicy) GetEnabled() bool { @@ -2533,7 +2626,7 @@ type VacuumTaskConfig struct { func (x *VacuumTaskConfig) Reset() { *x = VacuumTaskConfig{} - mi := &file_worker_proto_msgTypes[27] + mi := &file_worker_proto_msgTypes[28] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2545,7 +2638,7 @@ func (x *VacuumTaskConfig) String() string { func (*VacuumTaskConfig) ProtoMessage() {} func (x *VacuumTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[27] + mi := &file_worker_proto_msgTypes[28] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2558,7 +2651,7 @@ func (x *VacuumTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use VacuumTaskConfig.ProtoReflect.Descriptor instead. func (*VacuumTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{27} + return file_worker_proto_rawDescGZIP(), []int{28} } func (x *VacuumTaskConfig) GetGarbageThreshold() float64 { @@ -2596,7 +2689,7 @@ type ErasureCodingTaskConfig struct { func (x *ErasureCodingTaskConfig) Reset() { *x = ErasureCodingTaskConfig{} - mi := &file_worker_proto_msgTypes[28] + mi := &file_worker_proto_msgTypes[29] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2608,7 +2701,7 @@ func (x *ErasureCodingTaskConfig) String() string { func (*ErasureCodingTaskConfig) ProtoMessage() {} func (x *ErasureCodingTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[28] + mi := &file_worker_proto_msgTypes[29] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2621,7 +2714,7 @@ func (x *ErasureCodingTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use ErasureCodingTaskConfig.ProtoReflect.Descriptor instead. func (*ErasureCodingTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{28} + return file_worker_proto_rawDescGZIP(), []int{29} } func (x *ErasureCodingTaskConfig) GetFullnessRatio() float64 { @@ -2670,7 +2763,7 @@ type BalanceTaskConfig struct { func (x *BalanceTaskConfig) Reset() { *x = BalanceTaskConfig{} - mi := &file_worker_proto_msgTypes[29] + mi := &file_worker_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2682,7 +2775,7 @@ func (x *BalanceTaskConfig) String() string { func (*BalanceTaskConfig) ProtoMessage() {} func (x *BalanceTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[29] + mi := &file_worker_proto_msgTypes[30] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2695,7 +2788,7 @@ func (x *BalanceTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use BalanceTaskConfig.ProtoReflect.Descriptor instead. func (*BalanceTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{29} + return file_worker_proto_rawDescGZIP(), []int{30} } func (x *BalanceTaskConfig) GetImbalanceThreshold() float64 { @@ -2722,7 +2815,7 @@ type ReplicationTaskConfig struct { func (x *ReplicationTaskConfig) Reset() { *x = ReplicationTaskConfig{} - mi := &file_worker_proto_msgTypes[30] + mi := &file_worker_proto_msgTypes[31] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2734,7 +2827,7 @@ func (x *ReplicationTaskConfig) String() string { func (*ReplicationTaskConfig) ProtoMessage() {} func (x *ReplicationTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[30] + mi := &file_worker_proto_msgTypes[31] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2747,7 +2840,7 @@ func (x *ReplicationTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use ReplicationTaskConfig.ProtoReflect.Descriptor instead. func (*ReplicationTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{30} + return file_worker_proto_rawDescGZIP(), []int{31} } func (x *ReplicationTaskConfig) GetTargetReplicaCount() int32 { @@ -2791,7 +2884,7 @@ type MaintenanceTaskData struct { func (x *MaintenanceTaskData) Reset() { *x = MaintenanceTaskData{} - mi := &file_worker_proto_msgTypes[31] + mi := &file_worker_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2803,7 +2896,7 @@ func (x *MaintenanceTaskData) String() string { func (*MaintenanceTaskData) ProtoMessage() {} func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[31] + mi := &file_worker_proto_msgTypes[32] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2816,7 +2909,7 @@ func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenanceTaskData.ProtoReflect.Descriptor instead. func (*MaintenanceTaskData) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{31} + return file_worker_proto_rawDescGZIP(), []int{32} } func (x *MaintenanceTaskData) GetId() string { @@ -3001,7 +3094,7 @@ type TaskAssignmentRecord struct { func (x *TaskAssignmentRecord) Reset() { *x = TaskAssignmentRecord{} - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3013,7 +3106,7 @@ func (x *TaskAssignmentRecord) String() string { func (*TaskAssignmentRecord) ProtoMessage() {} func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[33] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3026,7 +3119,7 @@ func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskAssignmentRecord.ProtoReflect.Descriptor instead. func (*TaskAssignmentRecord) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{32} + return file_worker_proto_rawDescGZIP(), []int{33} } func (x *TaskAssignmentRecord) GetWorkerId() string { @@ -3078,7 +3171,7 @@ type TaskCreationMetrics struct { func (x *TaskCreationMetrics) Reset() { *x = TaskCreationMetrics{} - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3090,7 +3183,7 @@ func (x *TaskCreationMetrics) String() string { func (*TaskCreationMetrics) ProtoMessage() {} func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[34] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3103,7 +3196,7 @@ func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskCreationMetrics.ProtoReflect.Descriptor instead. func (*TaskCreationMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{33} + return file_worker_proto_rawDescGZIP(), []int{34} } func (x *TaskCreationMetrics) GetTriggerMetric() string { @@ -3160,7 +3253,7 @@ type VolumeHealthMetrics struct { func (x *VolumeHealthMetrics) Reset() { *x = VolumeHealthMetrics{} - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3172,7 +3265,7 @@ func (x *VolumeHealthMetrics) String() string { func (*VolumeHealthMetrics) ProtoMessage() {} func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[35] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3185,7 +3278,7 @@ func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use VolumeHealthMetrics.ProtoReflect.Descriptor instead. func (*VolumeHealthMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{34} + return file_worker_proto_rawDescGZIP(), []int{35} } func (x *VolumeHealthMetrics) GetTotalSize() uint64 { @@ -3270,7 +3363,7 @@ type TaskStateFile struct { func (x *TaskStateFile) Reset() { *x = TaskStateFile{} - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3282,7 +3375,7 @@ func (x *TaskStateFile) String() string { func (*TaskStateFile) ProtoMessage() {} func (x *TaskStateFile) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[36] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3295,7 +3388,7 @@ func (x *TaskStateFile) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskStateFile.ProtoReflect.Descriptor instead. func (*TaskStateFile) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{35} + return file_worker_proto_rawDescGZIP(), []int{36} } func (x *TaskStateFile) GetTask() *MaintenanceTaskData { @@ -3441,11 +3534,24 @@ const file_worker_proto_rawDesc = "" + "dataCenter\x12\x1b\n" + "\tvolume_id\x18\x05 \x01(\rR\bvolumeId\x12\x1b\n" + "\tshard_ids\x18\x06 \x03(\rR\bshardIds\x12%\n" + - "\x0eestimated_size\x18\a \x01(\x04R\restimatedSize\"[\n" + + "\x0eestimated_size\x18\a \x01(\x04R\restimatedSize\"\xb1\x01\n" + + "\x0fBalanceMoveSpec\x12\x1b\n" + + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x1f\n" + + "\vsource_node\x18\x02 \x01(\tR\n" + + "sourceNode\x12\x1f\n" + + "\vtarget_node\x18\x03 \x01(\tR\n" + + "targetNode\x12\x1e\n" + + "\n" + + "collection\x18\x04 \x01(\tR\n" + + "collection\x12\x1f\n" + + "\vvolume_size\x18\x05 \x01(\x04R\n" + + "volumeSize\"\xbf\x01\n" + "\x11BalanceTaskParams\x12\x1d\n" + "\n" + "force_move\x18\x01 \x01(\bR\tforceMove\x12'\n" + - "\x0ftimeout_seconds\x18\x02 \x01(\x05R\x0etimeoutSeconds\"k\n" + + "\x0ftimeout_seconds\x18\x02 \x01(\x05R\x0etimeoutSeconds\x120\n" + + "\x14max_concurrent_moves\x18\x03 \x01(\x05R\x12maxConcurrentMoves\x120\n" + + "\x05moves\x18\x04 \x03(\v2\x1a.worker_pb.BalanceMoveSpecR\x05moves\"k\n" + "\x15ReplicationTaskParams\x12#\n" + "\rreplica_count\x18\x01 \x01(\x05R\freplicaCount\x12-\n" + "\x12verify_consistency\x18\x02 \x01(\bR\x11verifyConsistency\"\x8e\x02\n" + @@ -3667,7 +3773,7 @@ func file_worker_proto_rawDescGZIP() []byte { return file_worker_proto_rawDescData } -var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 45) +var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 46) var file_worker_proto_goTypes = []any{ (*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage (*AdminMessage)(nil), // 1: worker_pb.AdminMessage @@ -3682,89 +3788,91 @@ var file_worker_proto_goTypes = []any{ (*ErasureCodingTaskParams)(nil), // 10: worker_pb.ErasureCodingTaskParams (*TaskSource)(nil), // 11: worker_pb.TaskSource (*TaskTarget)(nil), // 12: worker_pb.TaskTarget - (*BalanceTaskParams)(nil), // 13: worker_pb.BalanceTaskParams - (*ReplicationTaskParams)(nil), // 14: worker_pb.ReplicationTaskParams - (*TaskUpdate)(nil), // 15: worker_pb.TaskUpdate - (*TaskComplete)(nil), // 16: worker_pb.TaskComplete - (*TaskCancellation)(nil), // 17: worker_pb.TaskCancellation - (*WorkerShutdown)(nil), // 18: worker_pb.WorkerShutdown - (*AdminShutdown)(nil), // 19: worker_pb.AdminShutdown - (*TaskLogRequest)(nil), // 20: worker_pb.TaskLogRequest - (*TaskLogResponse)(nil), // 21: worker_pb.TaskLogResponse - (*TaskLogMetadata)(nil), // 22: worker_pb.TaskLogMetadata - (*TaskLogEntry)(nil), // 23: worker_pb.TaskLogEntry - (*MaintenanceConfig)(nil), // 24: worker_pb.MaintenanceConfig - (*MaintenancePolicy)(nil), // 25: worker_pb.MaintenancePolicy - (*TaskPolicy)(nil), // 26: worker_pb.TaskPolicy - (*VacuumTaskConfig)(nil), // 27: worker_pb.VacuumTaskConfig - (*ErasureCodingTaskConfig)(nil), // 28: worker_pb.ErasureCodingTaskConfig - (*BalanceTaskConfig)(nil), // 29: worker_pb.BalanceTaskConfig - (*ReplicationTaskConfig)(nil), // 30: worker_pb.ReplicationTaskConfig - (*MaintenanceTaskData)(nil), // 31: worker_pb.MaintenanceTaskData - (*TaskAssignmentRecord)(nil), // 32: worker_pb.TaskAssignmentRecord - (*TaskCreationMetrics)(nil), // 33: worker_pb.TaskCreationMetrics - (*VolumeHealthMetrics)(nil), // 34: worker_pb.VolumeHealthMetrics - (*TaskStateFile)(nil), // 35: worker_pb.TaskStateFile - nil, // 36: worker_pb.WorkerRegistration.MetadataEntry - nil, // 37: worker_pb.TaskAssignment.MetadataEntry - nil, // 38: worker_pb.TaskUpdate.MetadataEntry - nil, // 39: worker_pb.TaskComplete.ResultMetadataEntry - nil, // 40: worker_pb.TaskLogMetadata.CustomDataEntry - nil, // 41: worker_pb.TaskLogEntry.FieldsEntry - nil, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry - nil, // 43: worker_pb.MaintenanceTaskData.TagsEntry - nil, // 44: worker_pb.TaskCreationMetrics.AdditionalDataEntry + (*BalanceMoveSpec)(nil), // 13: worker_pb.BalanceMoveSpec + (*BalanceTaskParams)(nil), // 14: worker_pb.BalanceTaskParams + (*ReplicationTaskParams)(nil), // 15: worker_pb.ReplicationTaskParams + (*TaskUpdate)(nil), // 16: worker_pb.TaskUpdate + (*TaskComplete)(nil), // 17: worker_pb.TaskComplete + (*TaskCancellation)(nil), // 18: worker_pb.TaskCancellation + (*WorkerShutdown)(nil), // 19: worker_pb.WorkerShutdown + (*AdminShutdown)(nil), // 20: worker_pb.AdminShutdown + (*TaskLogRequest)(nil), // 21: worker_pb.TaskLogRequest + (*TaskLogResponse)(nil), // 22: worker_pb.TaskLogResponse + (*TaskLogMetadata)(nil), // 23: worker_pb.TaskLogMetadata + (*TaskLogEntry)(nil), // 24: worker_pb.TaskLogEntry + (*MaintenanceConfig)(nil), // 25: worker_pb.MaintenanceConfig + (*MaintenancePolicy)(nil), // 26: worker_pb.MaintenancePolicy + (*TaskPolicy)(nil), // 27: worker_pb.TaskPolicy + (*VacuumTaskConfig)(nil), // 28: worker_pb.VacuumTaskConfig + (*ErasureCodingTaskConfig)(nil), // 29: worker_pb.ErasureCodingTaskConfig + (*BalanceTaskConfig)(nil), // 30: worker_pb.BalanceTaskConfig + (*ReplicationTaskConfig)(nil), // 31: worker_pb.ReplicationTaskConfig + (*MaintenanceTaskData)(nil), // 32: worker_pb.MaintenanceTaskData + (*TaskAssignmentRecord)(nil), // 33: worker_pb.TaskAssignmentRecord + (*TaskCreationMetrics)(nil), // 34: worker_pb.TaskCreationMetrics + (*VolumeHealthMetrics)(nil), // 35: worker_pb.VolumeHealthMetrics + (*TaskStateFile)(nil), // 36: worker_pb.TaskStateFile + nil, // 37: worker_pb.WorkerRegistration.MetadataEntry + nil, // 38: worker_pb.TaskAssignment.MetadataEntry + nil, // 39: worker_pb.TaskUpdate.MetadataEntry + nil, // 40: worker_pb.TaskComplete.ResultMetadataEntry + nil, // 41: worker_pb.TaskLogMetadata.CustomDataEntry + nil, // 42: worker_pb.TaskLogEntry.FieldsEntry + nil, // 43: worker_pb.MaintenancePolicy.TaskPoliciesEntry + nil, // 44: worker_pb.MaintenanceTaskData.TagsEntry + nil, // 45: worker_pb.TaskCreationMetrics.AdditionalDataEntry } var file_worker_proto_depIdxs = []int32{ 2, // 0: worker_pb.WorkerMessage.registration:type_name -> worker_pb.WorkerRegistration 4, // 1: worker_pb.WorkerMessage.heartbeat:type_name -> worker_pb.WorkerHeartbeat 6, // 2: worker_pb.WorkerMessage.task_request:type_name -> worker_pb.TaskRequest - 15, // 3: worker_pb.WorkerMessage.task_update:type_name -> worker_pb.TaskUpdate - 16, // 4: worker_pb.WorkerMessage.task_complete:type_name -> worker_pb.TaskComplete - 18, // 5: worker_pb.WorkerMessage.shutdown:type_name -> worker_pb.WorkerShutdown - 21, // 6: worker_pb.WorkerMessage.task_log_response:type_name -> worker_pb.TaskLogResponse + 16, // 3: worker_pb.WorkerMessage.task_update:type_name -> worker_pb.TaskUpdate + 17, // 4: worker_pb.WorkerMessage.task_complete:type_name -> worker_pb.TaskComplete + 19, // 5: worker_pb.WorkerMessage.shutdown:type_name -> worker_pb.WorkerShutdown + 22, // 6: worker_pb.WorkerMessage.task_log_response:type_name -> worker_pb.TaskLogResponse 3, // 7: worker_pb.AdminMessage.registration_response:type_name -> worker_pb.RegistrationResponse 5, // 8: worker_pb.AdminMessage.heartbeat_response:type_name -> worker_pb.HeartbeatResponse 7, // 9: worker_pb.AdminMessage.task_assignment:type_name -> worker_pb.TaskAssignment - 17, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation - 19, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown - 20, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest - 36, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry + 18, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation + 20, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown + 21, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest + 37, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry 8, // 14: worker_pb.TaskAssignment.params:type_name -> worker_pb.TaskParams - 37, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry + 38, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry 11, // 16: worker_pb.TaskParams.sources:type_name -> worker_pb.TaskSource 12, // 17: worker_pb.TaskParams.targets:type_name -> worker_pb.TaskTarget 9, // 18: worker_pb.TaskParams.vacuum_params:type_name -> worker_pb.VacuumTaskParams 10, // 19: worker_pb.TaskParams.erasure_coding_params:type_name -> worker_pb.ErasureCodingTaskParams - 13, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams - 14, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams - 38, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry - 39, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry - 22, // 24: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata - 23, // 25: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry - 40, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry - 41, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry - 25, // 28: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy - 42, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry - 27, // 30: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig - 28, // 31: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig - 29, // 32: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig - 30, // 33: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig - 8, // 34: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams - 32, // 35: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord - 43, // 36: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry - 33, // 37: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics - 34, // 38: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics - 44, // 39: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry - 31, // 40: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData - 26, // 41: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy - 0, // 42: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage - 1, // 43: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage - 43, // [43:44] is the sub-list for method output_type - 42, // [42:43] is the sub-list for method input_type - 42, // [42:42] is the sub-list for extension type_name - 42, // [42:42] is the sub-list for extension extendee - 0, // [0:42] is the sub-list for field type_name + 14, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams + 15, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams + 13, // 22: worker_pb.BalanceTaskParams.moves:type_name -> worker_pb.BalanceMoveSpec + 39, // 23: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry + 40, // 24: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry + 23, // 25: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata + 24, // 26: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry + 41, // 27: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry + 42, // 28: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry + 26, // 29: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy + 43, // 30: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry + 28, // 31: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig + 29, // 32: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig + 30, // 33: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig + 31, // 34: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig + 8, // 35: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams + 33, // 36: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord + 44, // 37: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry + 34, // 38: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics + 35, // 39: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics + 45, // 40: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry + 32, // 41: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData + 27, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy + 0, // 43: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage + 1, // 44: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage + 44, // [44:45] is the sub-list for method output_type + 43, // [43:44] is the sub-list for method input_type + 43, // [43:43] is the sub-list for extension type_name + 43, // [43:43] is the sub-list for extension extendee + 0, // [0:43] is the sub-list for field type_name } func init() { file_worker_proto_init() } @@ -3795,7 +3903,7 @@ func file_worker_proto_init() { (*TaskParams_BalanceParams)(nil), (*TaskParams_ReplicationParams)(nil), } - file_worker_proto_msgTypes[26].OneofWrappers = []any{ + file_worker_proto_msgTypes[27].OneofWrappers = []any{ (*TaskPolicy_VacuumConfig)(nil), (*TaskPolicy_ErasureCodingConfig)(nil), (*TaskPolicy_BalanceConfig)(nil), @@ -3807,7 +3915,7 @@ func file_worker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_worker_proto_rawDesc), len(file_worker_proto_rawDesc)), NumEnums: 0, - NumMessages: 45, + NumMessages: 46, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 1a3af2b0d..327742b7a 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -5,6 +5,7 @@ import ( "fmt" "sort" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -35,6 +36,8 @@ func init() { type volumeBalanceWorkerConfig struct { TaskConfig *balancetask.Config MinIntervalSeconds int + MaxConcurrentMoves int + BatchSize int } // VolumeBalanceHandler is the plugin job handler for volume balancing. @@ -133,6 +136,31 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, }, + { + SectionId: "batch_execution", + Title: "Batch Execution", + Description: "Controls for running multiple volume moves per job. The worker coordinates moves via gRPC and is not on the data path.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "max_concurrent_moves", + Label: "Max Concurrent Moves", + Description: "Maximum number of volume moves to run concurrently within a single batch job.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}}, + }, + { + Name: "batch_size", + Label: "Batch Size", + Description: "Maximum number of volume moves to group into a single job. Set to 1 to disable batching.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 100}}, + }, + }, + }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": { @@ -144,6 +172,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "min_interval_seconds": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60}, }, + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20}, + }, }, }, AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ @@ -167,6 +201,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "min_interval_seconds": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60}, }, + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20}, + }, }, } } @@ -234,14 +274,19 @@ func (h *VolumeBalanceHandler) Detect( glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } - proposals := make([]*plugin_pb.JobProposal, 0, len(results)) - for _, result := range results { - proposal, proposalErr := buildVolumeBalanceProposal(result) - if proposalErr != nil { - glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr) - continue + var proposals []*plugin_pb.JobProposal + if workerConfig.BatchSize > 1 && len(results) > 1 { + proposals = buildBatchVolumeBalanceProposals(results, workerConfig.BatchSize, workerConfig.MaxConcurrentMoves) + } else { + proposals = make([]*plugin_pb.JobProposal, 0, len(results)) + for _, result := range results { + proposal, proposalErr := buildVolumeBalanceProposal(result) + if proposalErr != nil { + glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr) + continue + } + proposals = append(proposals, proposal) } - proposals = append(proposals, proposal) } if err := sender.SendProposals(&plugin_pb.DetectionProposals{ @@ -511,6 +556,8 @@ func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int { return len(diskTypes) } +const defaultMaxConcurrentMoves = 5 + func (h *VolumeBalanceHandler) Execute( ctx context.Context, request *plugin_pb.ExecuteJobRequest, @@ -530,6 +577,24 @@ func (h *VolumeBalanceHandler) Execute( if err != nil { return err } + + applyBalanceExecutionDefaults(params) + + // Batch path: if BalanceTaskParams has moves, execute them concurrently + if bp := params.GetBalanceParams(); bp != nil && len(bp.Moves) > 0 { + return h.executeBatchMoves(ctx, request, params, sender) + } + + // Single-move path (backward compatible) + return h.executeSingleMove(ctx, request, params, sender) +} + +func (h *VolumeBalanceHandler) executeSingleMove( + ctx context.Context, + request *plugin_pb.ExecuteJobRequest, + params *worker_pb.TaskParams, + sender ExecutionSender, +) error { if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" { return fmt.Errorf("volume balance source node is required") } @@ -537,8 +602,6 @@ func (h *VolumeBalanceHandler) Execute( return fmt.Errorf("volume balance target node is required") } - applyBalanceExecutionDefaults(params) - task := balancetask.NewBalanceTask( request.Job.JobId, params.Sources[0].Node, @@ -621,6 +684,181 @@ func (h *VolumeBalanceHandler) Execute( }) } +// executeBatchMoves runs multiple volume moves concurrently within a single job. +func (h *VolumeBalanceHandler) executeBatchMoves( + ctx context.Context, + request *plugin_pb.ExecuteJobRequest, + params *worker_pb.TaskParams, + sender ExecutionSender, +) error { + bp := params.GetBalanceParams() + moves := bp.Moves + maxConcurrent := int(bp.MaxConcurrentMoves) + if maxConcurrent <= 0 { + maxConcurrent = defaultMaxConcurrentMoves + } + + totalMoves := len(moves) + glog.Infof("batch volume balance: %d moves, max concurrent %d", totalMoves, maxConcurrent) + + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_ASSIGNED, + ProgressPercent: 0, + Stage: "assigned", + Message: fmt.Sprintf("batch volume balance accepted: %d moves", totalMoves), + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("assigned", fmt.Sprintf("batch volume balance: %d moves, concurrency %d", totalMoves, maxConcurrent)), + }, + }); err != nil { + return err + } + + // Per-move progress tracking + var mu sync.Mutex + moveProgress := make([]float64, totalMoves) + + reportAggregate := func(moveIndex int, progress float64, stage string) { + mu.Lock() + moveProgress[moveIndex] = progress + total := 0.0 + for _, p := range moveProgress { + total += p + } + mu.Unlock() + + aggregate := total / float64(totalMoves) + move := moves[moveIndex] + message := fmt.Sprintf("[Move %d/%d vol:%d] %s", moveIndex+1, totalMoves, move.VolumeId, stage) + + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: aggregate, + Stage: fmt.Sprintf("move %d/%d", moveIndex+1, totalMoves), + Message: message, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity(fmt.Sprintf("move-%d", moveIndex+1), message), + }, + }) + } + + type moveResult struct { + index int + volumeID uint32 + source string + target string + err error + } + + sem := make(chan struct{}, maxConcurrent) + results := make(chan moveResult, totalMoves) + + for i, move := range moves { + sem <- struct{}{} // acquire slot + go func(idx int, m *worker_pb.BalanceMoveSpec) { + defer func() { <-sem }() // release slot + + task := balancetask.NewBalanceTask( + fmt.Sprintf("%s-move-%d", request.Job.JobId, idx), + m.SourceNode, + m.VolumeId, + m.Collection, + h.grpcDialOption, + ) + task.SetProgressCallback(func(progress float64, stage string) { + reportAggregate(idx, progress, stage) + }) + + moveParams := buildMoveTaskParams(m, bp.TimeoutSeconds) + err := task.Execute(ctx, moveParams) + results <- moveResult{ + index: idx, + volumeID: m.VolumeId, + source: m.SourceNode, + target: m.TargetNode, + err: err, + } + }(i, move) + } + + // Collect all results + var succeeded, failed int + var errMessages []string + var successDetails []string + for range moves { + r := <-results + if r.err != nil { + failed++ + errMessages = append(errMessages, fmt.Sprintf("volume %d (%s→%s): %v", r.volumeID, r.source, r.target, r.err)) + glog.Warningf("batch balance move %d failed: volume %d %s→%s: %v", r.index, r.volumeID, r.source, r.target, r.err) + } else { + succeeded++ + successDetails = append(successDetails, fmt.Sprintf("volume %d (%s→%s)", r.volumeID, r.source, r.target)) + } + } + + summary := fmt.Sprintf("%d/%d volumes moved successfully", succeeded, totalMoves) + if failed > 0 { + summary += fmt.Sprintf("; %d failed", failed) + } + + success := failed == 0 + var errMsg string + if !success { + errMsg = strings.Join(errMessages, "; ") + } + + return sender.SendCompleted(&plugin_pb.JobCompleted{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + Success: success, + ErrorMessage: errMsg, + Result: &plugin_pb.JobResult{ + Summary: summary, + OutputValues: map[string]*plugin_pb.ConfigValue{ + "total_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalMoves)}, + }, + "succeeded": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(succeeded)}, + }, + "failed": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(failed)}, + }, + }, + }, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("completed", summary), + }, + }) +} + +// buildMoveTaskParams constructs a TaskParams for a single move within a batch. +func buildMoveTaskParams(move *worker_pb.BalanceMoveSpec, timeoutSeconds int32) *worker_pb.TaskParams { + if timeoutSeconds <= 0 { + timeoutSeconds = defaultBalanceTimeoutSeconds + } + return &worker_pb.TaskParams{ + VolumeId: move.VolumeId, + Collection: move.Collection, + VolumeSize: move.VolumeSize, + Sources: []*worker_pb.TaskSource{ + {Node: move.SourceNode, VolumeId: move.VolumeId}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: move.TargetNode, VolumeId: move.VolumeId}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: timeoutSeconds, + }, + }, + } +} + func (h *VolumeBalanceHandler) collectVolumeMetrics( ctx context.Context, masterAddresses []string, @@ -654,9 +892,27 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume minIntervalSeconds = 0 } + maxConcurrentMoves := int(readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves))) + if maxConcurrentMoves < 1 { + maxConcurrentMoves = 1 + } + if maxConcurrentMoves > 50 { + maxConcurrentMoves = 50 + } + + batchSize := int(readInt64Config(values, "batch_size", 20)) + if batchSize < 1 { + batchSize = 1 + } + if batchSize > 100 { + batchSize = 100 + } + return &volumeBalanceWorkerConfig{ TaskConfig: taskConfig, MinIntervalSeconds: minIntervalSeconds, + MaxConcurrentMoves: maxConcurrentMoves, + BatchSize: batchSize, } } @@ -738,6 +994,136 @@ func buildVolumeBalanceProposal( }, nil } +// buildBatchVolumeBalanceProposals groups detection results into batch proposals. +// Each batch proposal encodes multiple moves in BalanceTaskParams.Moves. +func buildBatchVolumeBalanceProposals( + results []*workertypes.TaskDetectionResult, + batchSize int, + maxConcurrentMoves int, +) []*plugin_pb.JobProposal { + if batchSize <= 0 { + batchSize = 1 + } + if maxConcurrentMoves <= 0 { + maxConcurrentMoves = defaultMaxConcurrentMoves + } + + var proposals []*plugin_pb.JobProposal + + for batchStart := 0; batchStart < len(results); batchStart += batchSize { + batchEnd := batchStart + batchSize + if batchEnd > len(results) { + batchEnd = len(results) + } + batch := results[batchStart:batchEnd] + + // If only one result in this batch, emit a single-move proposal + if len(batch) == 1 { + proposal, err := buildVolumeBalanceProposal(batch[0]) + if err != nil { + glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", err) + continue + } + proposals = append(proposals, proposal) + continue + } + + // Build batch proposal with BalanceMoveSpec entries + moves := make([]*worker_pb.BalanceMoveSpec, 0, len(batch)) + var volumeIDs []string + var dedupeKeys []string + highestPriority := workertypes.TaskPriorityLow + + for _, result := range batch { + if result == nil || result.TypedParams == nil { + continue + } + sourceNode := "" + targetNode := "" + if len(result.TypedParams.Sources) > 0 { + sourceNode = result.TypedParams.Sources[0].Node + } + if len(result.TypedParams.Targets) > 0 { + targetNode = result.TypedParams.Targets[0].Node + } + moves = append(moves, &worker_pb.BalanceMoveSpec{ + VolumeId: uint32(result.VolumeID), + SourceNode: sourceNode, + TargetNode: targetNode, + Collection: result.Collection, + VolumeSize: result.TypedParams.VolumeSize, + }) + volumeIDs = append(volumeIDs, fmt.Sprintf("%d", result.VolumeID)) + + dedupeKey := fmt.Sprintf("volume_balance:%d", result.VolumeID) + if result.Collection != "" { + dedupeKey += ":" + result.Collection + } + dedupeKeys = append(dedupeKeys, dedupeKey) + + if result.Priority > highestPriority { + highestPriority = result.Priority + } + } + + if len(moves) == 0 { + continue + } + + // Serialize batch params + taskParams := &worker_pb.TaskParams{ + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: defaultBalanceTimeoutSeconds, + MaxConcurrentMoves: int32(maxConcurrentMoves), + Moves: moves, + }, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + glog.Warningf("Plugin worker failed to marshal batch balance proposal: %v", err) + continue + } + + proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano()) + summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ",")) + if len(summary) > 200 { + summary = fmt.Sprintf("Batch balance %d volumes", len(moves)) + } + + // Use composite dedupe key for the batch + compositeDedupeKey := fmt.Sprintf("volume_balance_batch:%s", strings.Join(dedupeKeys, "+")) + if len(compositeDedupeKey) > 200 { + compositeDedupeKey = fmt.Sprintf("volume_balance_batch:%d-%d", batchStart, time.Now().UnixNano()) + } + + proposals = append(proposals, &plugin_pb.JobProposal{ + ProposalId: proposalID, + DedupeKey: compositeDedupeKey, + JobType: "volume_balance", + Priority: mapTaskPriority(highestPriority), + Summary: summary, + Detail: fmt.Sprintf("Batch of %d volume moves with concurrency %d", len(moves), maxConcurrentMoves), + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": { + Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))}, + }, + }, + Labels: map[string]string{ + "task_type": "balance", + "batch": "true", + "batch_size": fmt.Sprintf("%d", len(moves)), + }, + }) + } + + return proposals +} + func decodeVolumeBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) { if job == nil { return nil, fmt.Errorf("job spec is nil") diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index ace71ae3a..c08902c7b 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -2,7 +2,9 @@ package pluginworker import ( "context" + "fmt" "strings" + "sync" "testing" "time" @@ -111,6 +113,167 @@ func TestDeriveBalanceWorkerConfig(t *testing.T) { if cfg.MinIntervalSeconds != 33 { t.Fatalf("expected min_interval_seconds 33, got %d", cfg.MinIntervalSeconds) } + // Defaults for batch config when not specified + if cfg.MaxConcurrentMoves != defaultMaxConcurrentMoves { + t.Fatalf("expected default max_concurrent_moves %d, got %d", defaultMaxConcurrentMoves, cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 20 { + t.Fatalf("expected default batch_size 20, got %d", cfg.BatchSize) + } +} + +func TestDeriveBalanceWorkerConfigBatchFields(t *testing.T) { + values := map[string]*plugin_pb.ConfigValue{ + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}, + }, + } + + cfg := deriveBalanceWorkerConfig(values) + if cfg.MaxConcurrentMoves != 10 { + t.Fatalf("expected max_concurrent_moves 10, got %d", cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 50 { + t.Fatalf("expected batch_size 50, got %d", cfg.BatchSize) + } +} + +func TestDeriveBalanceWorkerConfigBatchClamping(t *testing.T) { + values := map[string]*plugin_pb.ConfigValue{ + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 999}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}, + }, + } + + cfg := deriveBalanceWorkerConfig(values) + if cfg.MaxConcurrentMoves != 50 { + t.Fatalf("expected max_concurrent_moves clamped to 50, got %d", cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 1 { + t.Fatalf("expected batch_size clamped to 1, got %d", cfg.BatchSize) + } +} + +func makeDetectionResult(volumeID uint32, source, target, collection string) *workertypes.TaskDetectionResult { + return &workertypes.TaskDetectionResult{ + TaskID: fmt.Sprintf("balance-%d", volumeID), + TaskType: workertypes.TaskTypeBalance, + VolumeID: volumeID, + Server: source, + Collection: collection, + Priority: workertypes.TaskPriorityNormal, + Reason: "imbalanced", + TypedParams: &worker_pb.TaskParams{ + VolumeId: volumeID, + Collection: collection, + VolumeSize: 1024, + Sources: []*worker_pb.TaskSource{ + {Node: source, VolumeId: volumeID}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: target, VolumeId: volumeID}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{TimeoutSeconds: 600}, + }, + }, + } +} + +func TestBuildBatchVolumeBalanceProposals_SingleBatch(t *testing.T) { + results := []*workertypes.TaskDetectionResult{ + makeDetectionResult(1, "s1:8080", "t1:8080", "c1"), + makeDetectionResult(2, "s2:8080", "t2:8080", "c1"), + makeDetectionResult(3, "s1:8080", "t2:8080", "c1"), + } + + proposals := buildBatchVolumeBalanceProposals(results, 10, 5) + if len(proposals) != 1 { + t.Fatalf("expected 1 batch proposal, got %d", len(proposals)) + } + + p := proposals[0] + if p.Labels["batch"] != "true" { + t.Fatalf("expected batch label") + } + if p.Labels["batch_size"] != "3" { + t.Fatalf("expected batch_size label '3', got %q", p.Labels["batch_size"]) + } + + // Decode and verify moves + payload := p.Parameters["task_params_pb"].GetBytesValue() + if len(payload) == 0 { + t.Fatalf("expected task_params_pb payload") + } + decoded := &worker_pb.TaskParams{} + if err := proto.Unmarshal(payload, decoded); err != nil { + t.Fatalf("unmarshal: %v", err) + } + moves := decoded.GetBalanceParams().GetMoves() + if len(moves) != 3 { + t.Fatalf("expected 3 moves, got %d", len(moves)) + } + if moves[0].VolumeId != 1 || moves[1].VolumeId != 2 || moves[2].VolumeId != 3 { + t.Fatalf("unexpected volume IDs: %v", moves) + } + if decoded.GetBalanceParams().MaxConcurrentMoves != 5 { + t.Fatalf("expected MaxConcurrentMoves 5, got %d", decoded.GetBalanceParams().MaxConcurrentMoves) + } +} + +func TestBuildBatchVolumeBalanceProposals_MultipleBatches(t *testing.T) { + results := make([]*workertypes.TaskDetectionResult, 5) + for i := range results { + results[i] = makeDetectionResult(uint32(i+1), "s1:8080", "t1:8080", "c1") + } + + proposals := buildBatchVolumeBalanceProposals(results, 2, 3) + // 5 results / batch_size 2 = 3 proposals (2, 2, 1) + if len(proposals) != 3 { + t.Fatalf("expected 3 proposals, got %d", len(proposals)) + } + + // First two should be batch proposals + if proposals[0].Labels["batch"] != "true" { + t.Fatalf("first proposal should be batch") + } + if proposals[1].Labels["batch"] != "true" { + t.Fatalf("second proposal should be batch") + } + // Last one has only 1 result, should fall back to single-move proposal + if proposals[2].Labels["batch"] == "true" { + t.Fatalf("last proposal with 1 result should be single-move, not batch") + } +} + +func TestBuildBatchVolumeBalanceProposals_BatchSizeOne(t *testing.T) { + results := []*workertypes.TaskDetectionResult{ + makeDetectionResult(1, "s1:8080", "t1:8080", "c1"), + makeDetectionResult(2, "s2:8080", "t2:8080", "c1"), + } + + // batch_size=1 should not be called (Detect guards this), but test the function directly + proposals := buildBatchVolumeBalanceProposals(results, 1, 5) + // Each result becomes its own single-move proposal + if len(proposals) != 2 { + t.Fatalf("expected 2 proposals, got %d", len(proposals)) + } +} + +func TestVolumeBalanceDescriptorHasBatchFields(t *testing.T) { + descriptor := NewVolumeBalanceHandler(nil).Descriptor() + if !workerConfigFormHasField(descriptor.WorkerConfigForm, "max_concurrent_moves") { + t.Fatalf("expected max_concurrent_moves in worker config form") + } + if !workerConfigFormHasField(descriptor.WorkerConfigForm, "batch_size") { + t.Fatalf("expected batch_size in worker config form") + } } func TestBuildVolumeBalanceProposal(t *testing.T) { @@ -265,6 +428,197 @@ func TestVolumeBalanceDescriptorOmitsExecutionTuningFields(t *testing.T) { } } +type recordingExecutionSender struct { + mu sync.Mutex + progress []*plugin_pb.JobProgressUpdate + completed *plugin_pb.JobCompleted +} + +func (r *recordingExecutionSender) SendProgress(p *plugin_pb.JobProgressUpdate) error { + r.mu.Lock() + defer r.mu.Unlock() + r.progress = append(r.progress, p) + return nil +} + +func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) error { + r.mu.Lock() + defer r.mu.Unlock() + r.completed = c + return nil +} + +func TestBuildMoveTaskParams(t *testing.T) { + move := &worker_pb.BalanceMoveSpec{ + VolumeId: 42, + SourceNode: "10.0.0.1:8080", + TargetNode: "10.0.0.2:8080", + Collection: "photos", + VolumeSize: 1024 * 1024, + } + + params := buildMoveTaskParams(move, 300) + if params.VolumeId != 42 { + t.Fatalf("expected volume_id 42, got %d", params.VolumeId) + } + if params.Collection != "photos" { + t.Fatalf("expected collection photos, got %s", params.Collection) + } + if params.VolumeSize != 1024*1024 { + t.Fatalf("expected volume_size %d, got %d", 1024*1024, params.VolumeSize) + } + if len(params.Sources) != 1 || params.Sources[0].Node != "10.0.0.1:8080" { + t.Fatalf("unexpected sources: %+v", params.Sources) + } + if len(params.Targets) != 1 || params.Targets[0].Node != "10.0.0.2:8080" { + t.Fatalf("unexpected targets: %+v", params.Targets) + } + bp := params.GetBalanceParams() + if bp == nil { + t.Fatalf("expected balance params") + } + if bp.TimeoutSeconds != 300 { + t.Fatalf("expected timeout 300, got %d", bp.TimeoutSeconds) + } +} + +func TestBuildMoveTaskParamsDefaultTimeout(t *testing.T) { + move := &worker_pb.BalanceMoveSpec{ + VolumeId: 1, + SourceNode: "a:8080", + TargetNode: "b:8080", + } + params := buildMoveTaskParams(move, 0) + if params.GetBalanceParams().TimeoutSeconds != defaultBalanceTimeoutSeconds { + t.Fatalf("expected default timeout %d, got %d", defaultBalanceTimeoutSeconds, params.GetBalanceParams().TimeoutSeconds) + } +} + +func TestExecuteDispatchesBatchPath(t *testing.T) { + // Build a job with batch moves in BalanceTaskParams + bp := &worker_pb.BalanceTaskParams{ + TimeoutSeconds: 60, + MaxConcurrentMoves: 2, + Moves: []*worker_pb.BalanceMoveSpec{ + {VolumeId: 1, SourceNode: "s1:8080", TargetNode: "t1:8080", Collection: "c1"}, + {VolumeId: 2, SourceNode: "s2:8080", TargetNode: "t2:8080", Collection: "c1"}, + }, + } + taskParams := &worker_pb.TaskParams{ + TaskId: "batch-1", + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: bp, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + job := &plugin_pb.JobSpec{ + JobId: "batch-job-1", + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + + handler := NewVolumeBalanceHandler(nil) + sender := &recordingExecutionSender{} + + // Execute will enter the batch path. It will fail because there's no real gRPC server, + // but we verify it sends the assigned progress and eventually a completion. + err = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{ + Job: job, + }, sender) + + // Expect an error since no real volume servers exist + // But verify the batch path was taken by checking the assigned message + sender.mu.Lock() + defer sender.mu.Unlock() + + if len(sender.progress) == 0 { + t.Fatalf("expected progress messages from batch path") + } + + // First progress should be "assigned" with batch info + first := sender.progress[0] + if first.Stage != "assigned" { + t.Fatalf("expected first stage 'assigned', got %q", first.Stage) + } + if !strings.Contains(first.Message, "batch") || !strings.Contains(first.Message, "2 moves") { + t.Fatalf("expected batch assigned message, got %q", first.Message) + } + + // Should have a completion with failure details (since no servers) + if sender.completed == nil { + t.Fatalf("expected completion message") + } + if sender.completed.Success { + t.Fatalf("expected failure since no real gRPC servers") + } + // Should report 0 succeeded, 2 failed + if v, ok := sender.completed.Result.OutputValues["failed"]; !ok || v.GetInt64Value() != 2 { + t.Fatalf("expected 2 failed moves, got %+v", sender.completed.Result.OutputValues) + } +} + +func TestExecuteSingleMovePathUnchanged(t *testing.T) { + // Build a single-move job (no batch moves) + taskParams := &worker_pb.TaskParams{ + TaskId: "single-1", + VolumeId: 99, + Collection: "videos", + Sources: []*worker_pb.TaskSource{ + {Node: "src:8080", VolumeId: 99}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: "dst:8080", VolumeId: 99}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: 60, + }, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + job := &plugin_pb.JobSpec{ + JobId: "single-job-1", + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + + handler := NewVolumeBalanceHandler(nil) + sender := &recordingExecutionSender{} + + // Execute single-move path. Will fail on gRPC but verify it takes the single-move path. + _ = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{ + Job: job, + }, sender) + + sender.mu.Lock() + defer sender.mu.Unlock() + + if len(sender.progress) == 0 { + t.Fatalf("expected progress messages") + } + + // Single-move path sends "volume balance job accepted" not "batch volume balance" + first := sender.progress[0] + if first.Stage != "assigned" { + t.Fatalf("expected first stage 'assigned', got %q", first.Stage) + } + if strings.Contains(first.Message, "batch") { + t.Fatalf("single-move path should not mention batch, got %q", first.Message) + } +} + func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool { if form == nil { return false