Chris Lu 1 day ago
committed by GitHub
parent
commit
6e86472570
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 49
      weed/admin/view/app/plugin.templ
  2. 2
      weed/admin/view/app/plugin_templ.go
  3. 11
      weed/pb/worker.proto
  4. 406
      weed/pb/worker_pb/worker.pb.go
  5. 404
      weed/plugin/worker/volume_balance_handler.go
  6. 354
      weed/plugin/worker/volume_balance_handler_test.go

49
weed/admin/view/app/plugin.templ

@ -1026,6 +1026,9 @@ templ Plugin(page string) {
}
var jobType = String(plan.job_type || '').trim().toLowerCase();
if (jobType === 'volume_balance') {
return renderBalanceExecutionPlan(plan);
}
if (jobType !== 'erasure_coding') {
var fallbackText = toPrettyJson(plan);
if (!fallbackText) {
@ -1107,6 +1110,44 @@ templ Plugin(page string) {
return html;
}
function renderBalanceExecutionPlan(plan) {
var html = '<div class="mb-3"><h6>Execution Plan</h6>';
var moves = Array.isArray(plan.moves) ? plan.moves : [];
if (moves.length === 0) {
// Single-move balance job
var src = textOrDash(plan.source_node || plan.source_server);
var dst = textOrDash(plan.target_node || plan.target_server);
var vid = textOrDash(plan.volume_id);
var col = textOrDash(plan.collection);
html += '<div class="row g-2 mb-2">' +
'<div class="col-md-3"><small><strong>Volume:</strong> ' + escapeHtml(vid) + '</small></div>' +
'<div class="col-md-3"><small><strong>Collection:</strong> ' + escapeHtml(col) + '</small></div>' +
'<div class="col-md-3"><small><strong>Source:</strong> <code>' + escapeHtml(src) + '</code></small></div>' +
'<div class="col-md-3"><small><strong>Target:</strong> <code>' + escapeHtml(dst) + '</code></small></div>' +
'</div>';
} else {
// Batch balance job
html += '<div class="mb-2"><span class="badge bg-info">' + escapeHtml(String(moves.length)) + ' moves</span></div>';
html += '<div class="table-responsive"><table class="table table-sm table-striped mb-0">' +
'<thead><tr><th>#</th><th>Volume</th><th>Source</th><th>Target</th><th>Collection</th></tr></thead><tbody>';
for (var i = 0; i < moves.length; i++) {
var move = moves[i] || {};
html += '<tr>' +
'<td>' + escapeHtml(String(i + 1)) + '</td>' +
'<td>' + escapeHtml(textOrDash(move.volume_id)) + '</td>' +
'<td><code>' + escapeHtml(textOrDash(move.source_node)) + '</code></td>' +
'<td><code>' + escapeHtml(textOrDash(move.target_node)) + '</code></td>' +
'<td>' + escapeHtml(textOrDash(move.collection)) + '</td>' +
'</tr>';
}
html += '</tbody></table></div>';
}
html += '</div>';
return html;
}
function isActiveJobState(candidateState) {
var jobState = candidateState;
if (candidateState && typeof candidateState === 'object' && candidateState.state !== undefined) {
@ -1676,9 +1717,15 @@ templ Plugin(page string) {
barClass = 'bg-warning';
}
var jobTypeCell = escapeHtml(textOrDash(executionJob.job_type));
var execLabels = executionJob.labels || {};
if (execLabels.batch === 'true' && execLabels.batch_size) {
jobTypeCell += ' <span class="badge bg-info">' + escapeHtml(execLabels.batch_size) + ' moves</span>';
}
rows += '<tr>' +
'<td>' + renderJobLink(executionJob.job_id) + '</td>' +
'<td>' + escapeHtml(textOrDash(executionJob.job_type)) + '</td>' +
'<td>' + jobTypeCell + '</td>' +
'<td><span class="badge bg-light text-dark">' + escapeHtml(textOrDash(executionJob.state)) + '</span></td>' +
'<td class="plugin-job-progress"><div class="progress" style="height: 14px;"><div class="progress-bar ' + barClass + '" role="progressbar" style="width:' + progress + '%">' + Math.round(progress) + '%</div></div></td>' +
'<td><small>' + escapeHtml(textOrDash(executionJob.worker_id)) + '</small></td>' +

2
weed/admin/view/app/plugin_templ.go
File diff suppressed because it is too large
View File

11
weed/pb/worker.proto

@ -157,10 +157,21 @@ message TaskTarget {
// BalanceMoveSpec describes a single volume move within a batch balance job
message BalanceMoveSpec {
uint32 volume_id = 1; // Volume to move
string source_node = 2; // Source server address (host:port)
string target_node = 3; // Destination server address (host:port)
string collection = 4; // Collection name
uint64 volume_size = 5; // Volume size in bytes (informational)
}
// BalanceTaskParams for volume balancing operations
message BalanceTaskParams {
bool force_move = 1; // Force move even with conflicts
int32 timeout_seconds = 2; // Operation timeout
int32 max_concurrent_moves = 3; // Max concurrent moves in a batch job (0 = default 5)
repeated BalanceMoveSpec moves = 4; // Batch: multiple volume moves in one job
}
// ReplicationTaskParams for adding replicas

406
weed/pb/worker_pb/worker.pb.go

@ -1331,18 +1331,97 @@ func (x *TaskTarget) GetEstimatedSize() uint64 {
return 0
}
// BalanceMoveSpec describes a single volume move within a batch balance job
type BalanceMoveSpec struct {
state protoimpl.MessageState `protogen:"open.v1"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` // Volume to move
SourceNode string `protobuf:"bytes,2,opt,name=source_node,json=sourceNode,proto3" json:"source_node,omitempty"` // Source server address (host:port)
TargetNode string `protobuf:"bytes,3,opt,name=target_node,json=targetNode,proto3" json:"target_node,omitempty"` // Destination server address (host:port)
Collection string `protobuf:"bytes,4,opt,name=collection,proto3" json:"collection,omitempty"` // Collection name
VolumeSize uint64 `protobuf:"varint,5,opt,name=volume_size,json=volumeSize,proto3" json:"volume_size,omitempty"` // Volume size in bytes (informational)
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *BalanceMoveSpec) Reset() {
*x = BalanceMoveSpec{}
mi := &file_worker_proto_msgTypes[13]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *BalanceMoveSpec) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BalanceMoveSpec) ProtoMessage() {}
func (x *BalanceMoveSpec) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[13]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BalanceMoveSpec.ProtoReflect.Descriptor instead.
func (*BalanceMoveSpec) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{13}
}
func (x *BalanceMoveSpec) GetVolumeId() uint32 {
if x != nil {
return x.VolumeId
}
return 0
}
func (x *BalanceMoveSpec) GetSourceNode() string {
if x != nil {
return x.SourceNode
}
return ""
}
func (x *BalanceMoveSpec) GetTargetNode() string {
if x != nil {
return x.TargetNode
}
return ""
}
func (x *BalanceMoveSpec) GetCollection() string {
if x != nil {
return x.Collection
}
return ""
}
func (x *BalanceMoveSpec) GetVolumeSize() uint64 {
if x != nil {
return x.VolumeSize
}
return 0
}
// BalanceTaskParams for volume balancing operations
type BalanceTaskParams struct {
state protoimpl.MessageState `protogen:"open.v1"`
ForceMove bool `protobuf:"varint,1,opt,name=force_move,json=forceMove,proto3" json:"force_move,omitempty"` // Force move even with conflicts
TimeoutSeconds int32 `protobuf:"varint,2,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` // Operation timeout
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
ForceMove bool `protobuf:"varint,1,opt,name=force_move,json=forceMove,proto3" json:"force_move,omitempty"` // Force move even with conflicts
TimeoutSeconds int32 `protobuf:"varint,2,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` // Operation timeout
MaxConcurrentMoves int32 `protobuf:"varint,3,opt,name=max_concurrent_moves,json=maxConcurrentMoves,proto3" json:"max_concurrent_moves,omitempty"` // Max concurrent moves in a batch job (0 = default 5)
Moves []*BalanceMoveSpec `protobuf:"bytes,4,rep,name=moves,proto3" json:"moves,omitempty"` // Batch: multiple volume moves in one job
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *BalanceTaskParams) Reset() {
*x = BalanceTaskParams{}
mi := &file_worker_proto_msgTypes[13]
mi := &file_worker_proto_msgTypes[14]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1354,7 +1433,7 @@ func (x *BalanceTaskParams) String() string {
func (*BalanceTaskParams) ProtoMessage() {}
func (x *BalanceTaskParams) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[13]
mi := &file_worker_proto_msgTypes[14]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1367,7 +1446,7 @@ func (x *BalanceTaskParams) ProtoReflect() protoreflect.Message {
// Deprecated: Use BalanceTaskParams.ProtoReflect.Descriptor instead.
func (*BalanceTaskParams) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{13}
return file_worker_proto_rawDescGZIP(), []int{14}
}
func (x *BalanceTaskParams) GetForceMove() bool {
@ -1384,6 +1463,20 @@ func (x *BalanceTaskParams) GetTimeoutSeconds() int32 {
return 0
}
func (x *BalanceTaskParams) GetMaxConcurrentMoves() int32 {
if x != nil {
return x.MaxConcurrentMoves
}
return 0
}
func (x *BalanceTaskParams) GetMoves() []*BalanceMoveSpec {
if x != nil {
return x.Moves
}
return nil
}
// ReplicationTaskParams for adding replicas
type ReplicationTaskParams struct {
state protoimpl.MessageState `protogen:"open.v1"`
@ -1395,7 +1488,7 @@ type ReplicationTaskParams struct {
func (x *ReplicationTaskParams) Reset() {
*x = ReplicationTaskParams{}
mi := &file_worker_proto_msgTypes[14]
mi := &file_worker_proto_msgTypes[15]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1407,7 +1500,7 @@ func (x *ReplicationTaskParams) String() string {
func (*ReplicationTaskParams) ProtoMessage() {}
func (x *ReplicationTaskParams) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[14]
mi := &file_worker_proto_msgTypes[15]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1420,7 +1513,7 @@ func (x *ReplicationTaskParams) ProtoReflect() protoreflect.Message {
// Deprecated: Use ReplicationTaskParams.ProtoReflect.Descriptor instead.
func (*ReplicationTaskParams) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{14}
return file_worker_proto_rawDescGZIP(), []int{15}
}
func (x *ReplicationTaskParams) GetReplicaCount() int32 {
@ -1452,7 +1545,7 @@ type TaskUpdate struct {
func (x *TaskUpdate) Reset() {
*x = TaskUpdate{}
mi := &file_worker_proto_msgTypes[15]
mi := &file_worker_proto_msgTypes[16]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1464,7 +1557,7 @@ func (x *TaskUpdate) String() string {
func (*TaskUpdate) ProtoMessage() {}
func (x *TaskUpdate) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[15]
mi := &file_worker_proto_msgTypes[16]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1477,7 +1570,7 @@ func (x *TaskUpdate) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskUpdate.ProtoReflect.Descriptor instead.
func (*TaskUpdate) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{15}
return file_worker_proto_rawDescGZIP(), []int{16}
}
func (x *TaskUpdate) GetTaskId() string {
@ -1537,7 +1630,7 @@ type TaskComplete struct {
func (x *TaskComplete) Reset() {
*x = TaskComplete{}
mi := &file_worker_proto_msgTypes[16]
mi := &file_worker_proto_msgTypes[17]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1549,7 +1642,7 @@ func (x *TaskComplete) String() string {
func (*TaskComplete) ProtoMessage() {}
func (x *TaskComplete) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[16]
mi := &file_worker_proto_msgTypes[17]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1562,7 +1655,7 @@ func (x *TaskComplete) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskComplete.ProtoReflect.Descriptor instead.
func (*TaskComplete) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{16}
return file_worker_proto_rawDescGZIP(), []int{17}
}
func (x *TaskComplete) GetTaskId() string {
@ -1619,7 +1712,7 @@ type TaskCancellation struct {
func (x *TaskCancellation) Reset() {
*x = TaskCancellation{}
mi := &file_worker_proto_msgTypes[17]
mi := &file_worker_proto_msgTypes[18]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1631,7 +1724,7 @@ func (x *TaskCancellation) String() string {
func (*TaskCancellation) ProtoMessage() {}
func (x *TaskCancellation) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[17]
mi := &file_worker_proto_msgTypes[18]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1644,7 +1737,7 @@ func (x *TaskCancellation) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskCancellation.ProtoReflect.Descriptor instead.
func (*TaskCancellation) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{17}
return file_worker_proto_rawDescGZIP(), []int{18}
}
func (x *TaskCancellation) GetTaskId() string {
@ -1680,7 +1773,7 @@ type WorkerShutdown struct {
func (x *WorkerShutdown) Reset() {
*x = WorkerShutdown{}
mi := &file_worker_proto_msgTypes[18]
mi := &file_worker_proto_msgTypes[19]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1692,7 +1785,7 @@ func (x *WorkerShutdown) String() string {
func (*WorkerShutdown) ProtoMessage() {}
func (x *WorkerShutdown) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[18]
mi := &file_worker_proto_msgTypes[19]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1705,7 +1798,7 @@ func (x *WorkerShutdown) ProtoReflect() protoreflect.Message {
// Deprecated: Use WorkerShutdown.ProtoReflect.Descriptor instead.
func (*WorkerShutdown) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{18}
return file_worker_proto_rawDescGZIP(), []int{19}
}
func (x *WorkerShutdown) GetWorkerId() string {
@ -1740,7 +1833,7 @@ type AdminShutdown struct {
func (x *AdminShutdown) Reset() {
*x = AdminShutdown{}
mi := &file_worker_proto_msgTypes[19]
mi := &file_worker_proto_msgTypes[20]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1752,7 +1845,7 @@ func (x *AdminShutdown) String() string {
func (*AdminShutdown) ProtoMessage() {}
func (x *AdminShutdown) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[19]
mi := &file_worker_proto_msgTypes[20]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1765,7 +1858,7 @@ func (x *AdminShutdown) ProtoReflect() protoreflect.Message {
// Deprecated: Use AdminShutdown.ProtoReflect.Descriptor instead.
func (*AdminShutdown) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{19}
return file_worker_proto_rawDescGZIP(), []int{20}
}
func (x *AdminShutdown) GetReason() string {
@ -1798,7 +1891,7 @@ type TaskLogRequest struct {
func (x *TaskLogRequest) Reset() {
*x = TaskLogRequest{}
mi := &file_worker_proto_msgTypes[20]
mi := &file_worker_proto_msgTypes[21]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1810,7 +1903,7 @@ func (x *TaskLogRequest) String() string {
func (*TaskLogRequest) ProtoMessage() {}
func (x *TaskLogRequest) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[20]
mi := &file_worker_proto_msgTypes[21]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1823,7 +1916,7 @@ func (x *TaskLogRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskLogRequest.ProtoReflect.Descriptor instead.
func (*TaskLogRequest) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{20}
return file_worker_proto_rawDescGZIP(), []int{21}
}
func (x *TaskLogRequest) GetTaskId() string {
@ -1890,7 +1983,7 @@ type TaskLogResponse struct {
func (x *TaskLogResponse) Reset() {
*x = TaskLogResponse{}
mi := &file_worker_proto_msgTypes[21]
mi := &file_worker_proto_msgTypes[22]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1902,7 +1995,7 @@ func (x *TaskLogResponse) String() string {
func (*TaskLogResponse) ProtoMessage() {}
func (x *TaskLogResponse) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[21]
mi := &file_worker_proto_msgTypes[22]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1915,7 +2008,7 @@ func (x *TaskLogResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskLogResponse.ProtoReflect.Descriptor instead.
func (*TaskLogResponse) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{21}
return file_worker_proto_rawDescGZIP(), []int{22}
}
func (x *TaskLogResponse) GetTaskId() string {
@ -1983,7 +2076,7 @@ type TaskLogMetadata struct {
func (x *TaskLogMetadata) Reset() {
*x = TaskLogMetadata{}
mi := &file_worker_proto_msgTypes[22]
mi := &file_worker_proto_msgTypes[23]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1995,7 +2088,7 @@ func (x *TaskLogMetadata) String() string {
func (*TaskLogMetadata) ProtoMessage() {}
func (x *TaskLogMetadata) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[22]
mi := &file_worker_proto_msgTypes[23]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2008,7 +2101,7 @@ func (x *TaskLogMetadata) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskLogMetadata.ProtoReflect.Descriptor instead.
func (*TaskLogMetadata) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{22}
return file_worker_proto_rawDescGZIP(), []int{23}
}
func (x *TaskLogMetadata) GetTaskId() string {
@ -2124,7 +2217,7 @@ type TaskLogEntry struct {
func (x *TaskLogEntry) Reset() {
*x = TaskLogEntry{}
mi := &file_worker_proto_msgTypes[23]
mi := &file_worker_proto_msgTypes[24]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2136,7 +2229,7 @@ func (x *TaskLogEntry) String() string {
func (*TaskLogEntry) ProtoMessage() {}
func (x *TaskLogEntry) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[23]
mi := &file_worker_proto_msgTypes[24]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2149,7 +2242,7 @@ func (x *TaskLogEntry) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskLogEntry.ProtoReflect.Descriptor instead.
func (*TaskLogEntry) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{23}
return file_worker_proto_rawDescGZIP(), []int{24}
}
func (x *TaskLogEntry) GetTimestamp() int64 {
@ -2212,7 +2305,7 @@ type MaintenanceConfig struct {
func (x *MaintenanceConfig) Reset() {
*x = MaintenanceConfig{}
mi := &file_worker_proto_msgTypes[24]
mi := &file_worker_proto_msgTypes[25]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2224,7 +2317,7 @@ func (x *MaintenanceConfig) String() string {
func (*MaintenanceConfig) ProtoMessage() {}
func (x *MaintenanceConfig) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[24]
mi := &file_worker_proto_msgTypes[25]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2237,7 +2330,7 @@ func (x *MaintenanceConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use MaintenanceConfig.ProtoReflect.Descriptor instead.
func (*MaintenanceConfig) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{24}
return file_worker_proto_rawDescGZIP(), []int{25}
}
func (x *MaintenanceConfig) GetEnabled() bool {
@ -2316,7 +2409,7 @@ type MaintenancePolicy struct {
func (x *MaintenancePolicy) Reset() {
*x = MaintenancePolicy{}
mi := &file_worker_proto_msgTypes[25]
mi := &file_worker_proto_msgTypes[26]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2328,7 +2421,7 @@ func (x *MaintenancePolicy) String() string {
func (*MaintenancePolicy) ProtoMessage() {}
func (x *MaintenancePolicy) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[25]
mi := &file_worker_proto_msgTypes[26]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2341,7 +2434,7 @@ func (x *MaintenancePolicy) ProtoReflect() protoreflect.Message {
// Deprecated: Use MaintenancePolicy.ProtoReflect.Descriptor instead.
func (*MaintenancePolicy) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{25}
return file_worker_proto_rawDescGZIP(), []int{26}
}
func (x *MaintenancePolicy) GetTaskPolicies() map[string]*TaskPolicy {
@ -2394,7 +2487,7 @@ type TaskPolicy struct {
func (x *TaskPolicy) Reset() {
*x = TaskPolicy{}
mi := &file_worker_proto_msgTypes[26]
mi := &file_worker_proto_msgTypes[27]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2406,7 +2499,7 @@ func (x *TaskPolicy) String() string {
func (*TaskPolicy) ProtoMessage() {}
func (x *TaskPolicy) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[26]
mi := &file_worker_proto_msgTypes[27]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2419,7 +2512,7 @@ func (x *TaskPolicy) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskPolicy.ProtoReflect.Descriptor instead.
func (*TaskPolicy) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{26}
return file_worker_proto_rawDescGZIP(), []int{27}
}
func (x *TaskPolicy) GetEnabled() bool {
@ -2533,7 +2626,7 @@ type VacuumTaskConfig struct {
func (x *VacuumTaskConfig) Reset() {
*x = VacuumTaskConfig{}
mi := &file_worker_proto_msgTypes[27]
mi := &file_worker_proto_msgTypes[28]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2545,7 +2638,7 @@ func (x *VacuumTaskConfig) String() string {
func (*VacuumTaskConfig) ProtoMessage() {}
func (x *VacuumTaskConfig) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[27]
mi := &file_worker_proto_msgTypes[28]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2558,7 +2651,7 @@ func (x *VacuumTaskConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use VacuumTaskConfig.ProtoReflect.Descriptor instead.
func (*VacuumTaskConfig) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{27}
return file_worker_proto_rawDescGZIP(), []int{28}
}
func (x *VacuumTaskConfig) GetGarbageThreshold() float64 {
@ -2596,7 +2689,7 @@ type ErasureCodingTaskConfig struct {
func (x *ErasureCodingTaskConfig) Reset() {
*x = ErasureCodingTaskConfig{}
mi := &file_worker_proto_msgTypes[28]
mi := &file_worker_proto_msgTypes[29]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2608,7 +2701,7 @@ func (x *ErasureCodingTaskConfig) String() string {
func (*ErasureCodingTaskConfig) ProtoMessage() {}
func (x *ErasureCodingTaskConfig) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[28]
mi := &file_worker_proto_msgTypes[29]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2621,7 +2714,7 @@ func (x *ErasureCodingTaskConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use ErasureCodingTaskConfig.ProtoReflect.Descriptor instead.
func (*ErasureCodingTaskConfig) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{28}
return file_worker_proto_rawDescGZIP(), []int{29}
}
func (x *ErasureCodingTaskConfig) GetFullnessRatio() float64 {
@ -2670,7 +2763,7 @@ type BalanceTaskConfig struct {
func (x *BalanceTaskConfig) Reset() {
*x = BalanceTaskConfig{}
mi := &file_worker_proto_msgTypes[29]
mi := &file_worker_proto_msgTypes[30]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2682,7 +2775,7 @@ func (x *BalanceTaskConfig) String() string {
func (*BalanceTaskConfig) ProtoMessage() {}
func (x *BalanceTaskConfig) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[29]
mi := &file_worker_proto_msgTypes[30]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2695,7 +2788,7 @@ func (x *BalanceTaskConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use BalanceTaskConfig.ProtoReflect.Descriptor instead.
func (*BalanceTaskConfig) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{29}
return file_worker_proto_rawDescGZIP(), []int{30}
}
func (x *BalanceTaskConfig) GetImbalanceThreshold() float64 {
@ -2722,7 +2815,7 @@ type ReplicationTaskConfig struct {
func (x *ReplicationTaskConfig) Reset() {
*x = ReplicationTaskConfig{}
mi := &file_worker_proto_msgTypes[30]
mi := &file_worker_proto_msgTypes[31]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2734,7 +2827,7 @@ func (x *ReplicationTaskConfig) String() string {
func (*ReplicationTaskConfig) ProtoMessage() {}
func (x *ReplicationTaskConfig) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[30]
mi := &file_worker_proto_msgTypes[31]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2747,7 +2840,7 @@ func (x *ReplicationTaskConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use ReplicationTaskConfig.ProtoReflect.Descriptor instead.
func (*ReplicationTaskConfig) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{30}
return file_worker_proto_rawDescGZIP(), []int{31}
}
func (x *ReplicationTaskConfig) GetTargetReplicaCount() int32 {
@ -2791,7 +2884,7 @@ type MaintenanceTaskData struct {
func (x *MaintenanceTaskData) Reset() {
*x = MaintenanceTaskData{}
mi := &file_worker_proto_msgTypes[31]
mi := &file_worker_proto_msgTypes[32]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -2803,7 +2896,7 @@ func (x *MaintenanceTaskData) String() string {
func (*MaintenanceTaskData) ProtoMessage() {}
func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[31]
mi := &file_worker_proto_msgTypes[32]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2816,7 +2909,7 @@ func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message {
// Deprecated: Use MaintenanceTaskData.ProtoReflect.Descriptor instead.
func (*MaintenanceTaskData) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{31}
return file_worker_proto_rawDescGZIP(), []int{32}
}
func (x *MaintenanceTaskData) GetId() string {
@ -3001,7 +3094,7 @@ type TaskAssignmentRecord struct {
func (x *TaskAssignmentRecord) Reset() {
*x = TaskAssignmentRecord{}
mi := &file_worker_proto_msgTypes[32]
mi := &file_worker_proto_msgTypes[33]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3013,7 +3106,7 @@ func (x *TaskAssignmentRecord) String() string {
func (*TaskAssignmentRecord) ProtoMessage() {}
func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[32]
mi := &file_worker_proto_msgTypes[33]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -3026,7 +3119,7 @@ func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskAssignmentRecord.ProtoReflect.Descriptor instead.
func (*TaskAssignmentRecord) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{32}
return file_worker_proto_rawDescGZIP(), []int{33}
}
func (x *TaskAssignmentRecord) GetWorkerId() string {
@ -3078,7 +3171,7 @@ type TaskCreationMetrics struct {
func (x *TaskCreationMetrics) Reset() {
*x = TaskCreationMetrics{}
mi := &file_worker_proto_msgTypes[33]
mi := &file_worker_proto_msgTypes[34]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3090,7 +3183,7 @@ func (x *TaskCreationMetrics) String() string {
func (*TaskCreationMetrics) ProtoMessage() {}
func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[33]
mi := &file_worker_proto_msgTypes[34]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -3103,7 +3196,7 @@ func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskCreationMetrics.ProtoReflect.Descriptor instead.
func (*TaskCreationMetrics) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{33}
return file_worker_proto_rawDescGZIP(), []int{34}
}
func (x *TaskCreationMetrics) GetTriggerMetric() string {
@ -3160,7 +3253,7 @@ type VolumeHealthMetrics struct {
func (x *VolumeHealthMetrics) Reset() {
*x = VolumeHealthMetrics{}
mi := &file_worker_proto_msgTypes[34]
mi := &file_worker_proto_msgTypes[35]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3172,7 +3265,7 @@ func (x *VolumeHealthMetrics) String() string {
func (*VolumeHealthMetrics) ProtoMessage() {}
func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[34]
mi := &file_worker_proto_msgTypes[35]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -3185,7 +3278,7 @@ func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message {
// Deprecated: Use VolumeHealthMetrics.ProtoReflect.Descriptor instead.
func (*VolumeHealthMetrics) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{34}
return file_worker_proto_rawDescGZIP(), []int{35}
}
func (x *VolumeHealthMetrics) GetTotalSize() uint64 {
@ -3270,7 +3363,7 @@ type TaskStateFile struct {
func (x *TaskStateFile) Reset() {
*x = TaskStateFile{}
mi := &file_worker_proto_msgTypes[35]
mi := &file_worker_proto_msgTypes[36]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3282,7 +3375,7 @@ func (x *TaskStateFile) String() string {
func (*TaskStateFile) ProtoMessage() {}
func (x *TaskStateFile) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[35]
mi := &file_worker_proto_msgTypes[36]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -3295,7 +3388,7 @@ func (x *TaskStateFile) ProtoReflect() protoreflect.Message {
// Deprecated: Use TaskStateFile.ProtoReflect.Descriptor instead.
func (*TaskStateFile) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{35}
return file_worker_proto_rawDescGZIP(), []int{36}
}
func (x *TaskStateFile) GetTask() *MaintenanceTaskData {
@ -3441,11 +3534,24 @@ const file_worker_proto_rawDesc = "" +
"dataCenter\x12\x1b\n" +
"\tvolume_id\x18\x05 \x01(\rR\bvolumeId\x12\x1b\n" +
"\tshard_ids\x18\x06 \x03(\rR\bshardIds\x12%\n" +
"\x0eestimated_size\x18\a \x01(\x04R\restimatedSize\"[\n" +
"\x0eestimated_size\x18\a \x01(\x04R\restimatedSize\"\xb1\x01\n" +
"\x0fBalanceMoveSpec\x12\x1b\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x1f\n" +
"\vsource_node\x18\x02 \x01(\tR\n" +
"sourceNode\x12\x1f\n" +
"\vtarget_node\x18\x03 \x01(\tR\n" +
"targetNode\x12\x1e\n" +
"\n" +
"collection\x18\x04 \x01(\tR\n" +
"collection\x12\x1f\n" +
"\vvolume_size\x18\x05 \x01(\x04R\n" +
"volumeSize\"\xbf\x01\n" +
"\x11BalanceTaskParams\x12\x1d\n" +
"\n" +
"force_move\x18\x01 \x01(\bR\tforceMove\x12'\n" +
"\x0ftimeout_seconds\x18\x02 \x01(\x05R\x0etimeoutSeconds\"k\n" +
"\x0ftimeout_seconds\x18\x02 \x01(\x05R\x0etimeoutSeconds\x120\n" +
"\x14max_concurrent_moves\x18\x03 \x01(\x05R\x12maxConcurrentMoves\x120\n" +
"\x05moves\x18\x04 \x03(\v2\x1a.worker_pb.BalanceMoveSpecR\x05moves\"k\n" +
"\x15ReplicationTaskParams\x12#\n" +
"\rreplica_count\x18\x01 \x01(\x05R\freplicaCount\x12-\n" +
"\x12verify_consistency\x18\x02 \x01(\bR\x11verifyConsistency\"\x8e\x02\n" +
@ -3667,7 +3773,7 @@ func file_worker_proto_rawDescGZIP() []byte {
return file_worker_proto_rawDescData
}
var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 45)
var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 46)
var file_worker_proto_goTypes = []any{
(*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage
(*AdminMessage)(nil), // 1: worker_pb.AdminMessage
@ -3682,89 +3788,91 @@ var file_worker_proto_goTypes = []any{
(*ErasureCodingTaskParams)(nil), // 10: worker_pb.ErasureCodingTaskParams
(*TaskSource)(nil), // 11: worker_pb.TaskSource
(*TaskTarget)(nil), // 12: worker_pb.TaskTarget
(*BalanceTaskParams)(nil), // 13: worker_pb.BalanceTaskParams
(*ReplicationTaskParams)(nil), // 14: worker_pb.ReplicationTaskParams
(*TaskUpdate)(nil), // 15: worker_pb.TaskUpdate
(*TaskComplete)(nil), // 16: worker_pb.TaskComplete
(*TaskCancellation)(nil), // 17: worker_pb.TaskCancellation
(*WorkerShutdown)(nil), // 18: worker_pb.WorkerShutdown
(*AdminShutdown)(nil), // 19: worker_pb.AdminShutdown
(*TaskLogRequest)(nil), // 20: worker_pb.TaskLogRequest
(*TaskLogResponse)(nil), // 21: worker_pb.TaskLogResponse
(*TaskLogMetadata)(nil), // 22: worker_pb.TaskLogMetadata
(*TaskLogEntry)(nil), // 23: worker_pb.TaskLogEntry
(*MaintenanceConfig)(nil), // 24: worker_pb.MaintenanceConfig
(*MaintenancePolicy)(nil), // 25: worker_pb.MaintenancePolicy
(*TaskPolicy)(nil), // 26: worker_pb.TaskPolicy
(*VacuumTaskConfig)(nil), // 27: worker_pb.VacuumTaskConfig
(*ErasureCodingTaskConfig)(nil), // 28: worker_pb.ErasureCodingTaskConfig
(*BalanceTaskConfig)(nil), // 29: worker_pb.BalanceTaskConfig
(*ReplicationTaskConfig)(nil), // 30: worker_pb.ReplicationTaskConfig
(*MaintenanceTaskData)(nil), // 31: worker_pb.MaintenanceTaskData
(*TaskAssignmentRecord)(nil), // 32: worker_pb.TaskAssignmentRecord
(*TaskCreationMetrics)(nil), // 33: worker_pb.TaskCreationMetrics
(*VolumeHealthMetrics)(nil), // 34: worker_pb.VolumeHealthMetrics
(*TaskStateFile)(nil), // 35: worker_pb.TaskStateFile
nil, // 36: worker_pb.WorkerRegistration.MetadataEntry
nil, // 37: worker_pb.TaskAssignment.MetadataEntry
nil, // 38: worker_pb.TaskUpdate.MetadataEntry
nil, // 39: worker_pb.TaskComplete.ResultMetadataEntry
nil, // 40: worker_pb.TaskLogMetadata.CustomDataEntry
nil, // 41: worker_pb.TaskLogEntry.FieldsEntry
nil, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry
nil, // 43: worker_pb.MaintenanceTaskData.TagsEntry
nil, // 44: worker_pb.TaskCreationMetrics.AdditionalDataEntry
(*BalanceMoveSpec)(nil), // 13: worker_pb.BalanceMoveSpec
(*BalanceTaskParams)(nil), // 14: worker_pb.BalanceTaskParams
(*ReplicationTaskParams)(nil), // 15: worker_pb.ReplicationTaskParams
(*TaskUpdate)(nil), // 16: worker_pb.TaskUpdate
(*TaskComplete)(nil), // 17: worker_pb.TaskComplete
(*TaskCancellation)(nil), // 18: worker_pb.TaskCancellation
(*WorkerShutdown)(nil), // 19: worker_pb.WorkerShutdown
(*AdminShutdown)(nil), // 20: worker_pb.AdminShutdown
(*TaskLogRequest)(nil), // 21: worker_pb.TaskLogRequest
(*TaskLogResponse)(nil), // 22: worker_pb.TaskLogResponse
(*TaskLogMetadata)(nil), // 23: worker_pb.TaskLogMetadata
(*TaskLogEntry)(nil), // 24: worker_pb.TaskLogEntry
(*MaintenanceConfig)(nil), // 25: worker_pb.MaintenanceConfig
(*MaintenancePolicy)(nil), // 26: worker_pb.MaintenancePolicy
(*TaskPolicy)(nil), // 27: worker_pb.TaskPolicy
(*VacuumTaskConfig)(nil), // 28: worker_pb.VacuumTaskConfig
(*ErasureCodingTaskConfig)(nil), // 29: worker_pb.ErasureCodingTaskConfig
(*BalanceTaskConfig)(nil), // 30: worker_pb.BalanceTaskConfig
(*ReplicationTaskConfig)(nil), // 31: worker_pb.ReplicationTaskConfig
(*MaintenanceTaskData)(nil), // 32: worker_pb.MaintenanceTaskData
(*TaskAssignmentRecord)(nil), // 33: worker_pb.TaskAssignmentRecord
(*TaskCreationMetrics)(nil), // 34: worker_pb.TaskCreationMetrics
(*VolumeHealthMetrics)(nil), // 35: worker_pb.VolumeHealthMetrics
(*TaskStateFile)(nil), // 36: worker_pb.TaskStateFile
nil, // 37: worker_pb.WorkerRegistration.MetadataEntry
nil, // 38: worker_pb.TaskAssignment.MetadataEntry
nil, // 39: worker_pb.TaskUpdate.MetadataEntry
nil, // 40: worker_pb.TaskComplete.ResultMetadataEntry
nil, // 41: worker_pb.TaskLogMetadata.CustomDataEntry
nil, // 42: worker_pb.TaskLogEntry.FieldsEntry
nil, // 43: worker_pb.MaintenancePolicy.TaskPoliciesEntry
nil, // 44: worker_pb.MaintenanceTaskData.TagsEntry
nil, // 45: worker_pb.TaskCreationMetrics.AdditionalDataEntry
}
var file_worker_proto_depIdxs = []int32{
2, // 0: worker_pb.WorkerMessage.registration:type_name -> worker_pb.WorkerRegistration
4, // 1: worker_pb.WorkerMessage.heartbeat:type_name -> worker_pb.WorkerHeartbeat
6, // 2: worker_pb.WorkerMessage.task_request:type_name -> worker_pb.TaskRequest
15, // 3: worker_pb.WorkerMessage.task_update:type_name -> worker_pb.TaskUpdate
16, // 4: worker_pb.WorkerMessage.task_complete:type_name -> worker_pb.TaskComplete
18, // 5: worker_pb.WorkerMessage.shutdown:type_name -> worker_pb.WorkerShutdown
21, // 6: worker_pb.WorkerMessage.task_log_response:type_name -> worker_pb.TaskLogResponse
16, // 3: worker_pb.WorkerMessage.task_update:type_name -> worker_pb.TaskUpdate
17, // 4: worker_pb.WorkerMessage.task_complete:type_name -> worker_pb.TaskComplete
19, // 5: worker_pb.WorkerMessage.shutdown:type_name -> worker_pb.WorkerShutdown
22, // 6: worker_pb.WorkerMessage.task_log_response:type_name -> worker_pb.TaskLogResponse
3, // 7: worker_pb.AdminMessage.registration_response:type_name -> worker_pb.RegistrationResponse
5, // 8: worker_pb.AdminMessage.heartbeat_response:type_name -> worker_pb.HeartbeatResponse
7, // 9: worker_pb.AdminMessage.task_assignment:type_name -> worker_pb.TaskAssignment
17, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation
19, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown
20, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest
36, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry
18, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation
20, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown
21, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest
37, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry
8, // 14: worker_pb.TaskAssignment.params:type_name -> worker_pb.TaskParams
37, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry
38, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry
11, // 16: worker_pb.TaskParams.sources:type_name -> worker_pb.TaskSource
12, // 17: worker_pb.TaskParams.targets:type_name -> worker_pb.TaskTarget
9, // 18: worker_pb.TaskParams.vacuum_params:type_name -> worker_pb.VacuumTaskParams
10, // 19: worker_pb.TaskParams.erasure_coding_params:type_name -> worker_pb.ErasureCodingTaskParams
13, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams
14, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams
38, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry
39, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry
22, // 24: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata
23, // 25: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry
40, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry
41, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry
25, // 28: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy
42, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry
27, // 30: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig
28, // 31: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig
29, // 32: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig
30, // 33: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig
8, // 34: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams
32, // 35: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord
43, // 36: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry
33, // 37: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics
34, // 38: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics
44, // 39: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry
31, // 40: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData
26, // 41: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy
0, // 42: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage
1, // 43: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage
43, // [43:44] is the sub-list for method output_type
42, // [42:43] is the sub-list for method input_type
42, // [42:42] is the sub-list for extension type_name
42, // [42:42] is the sub-list for extension extendee
0, // [0:42] is the sub-list for field type_name
14, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams
15, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams
13, // 22: worker_pb.BalanceTaskParams.moves:type_name -> worker_pb.BalanceMoveSpec
39, // 23: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry
40, // 24: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry
23, // 25: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata
24, // 26: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry
41, // 27: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry
42, // 28: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry
26, // 29: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy
43, // 30: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry
28, // 31: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig
29, // 32: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig
30, // 33: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig
31, // 34: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig
8, // 35: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams
33, // 36: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord
44, // 37: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry
34, // 38: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics
35, // 39: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics
45, // 40: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry
32, // 41: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData
27, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy
0, // 43: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage
1, // 44: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage
44, // [44:45] is the sub-list for method output_type
43, // [43:44] is the sub-list for method input_type
43, // [43:43] is the sub-list for extension type_name
43, // [43:43] is the sub-list for extension extendee
0, // [0:43] is the sub-list for field type_name
}
func init() { file_worker_proto_init() }
@ -3795,7 +3903,7 @@ func file_worker_proto_init() {
(*TaskParams_BalanceParams)(nil),
(*TaskParams_ReplicationParams)(nil),
}
file_worker_proto_msgTypes[26].OneofWrappers = []any{
file_worker_proto_msgTypes[27].OneofWrappers = []any{
(*TaskPolicy_VacuumConfig)(nil),
(*TaskPolicy_ErasureCodingConfig)(nil),
(*TaskPolicy_BalanceConfig)(nil),
@ -3807,7 +3915,7 @@ func file_worker_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_worker_proto_rawDesc), len(file_worker_proto_rawDesc)),
NumEnums: 0,
NumMessages: 45,
NumMessages: 46,
NumExtensions: 0,
NumServices: 1,
},

404
weed/plugin/worker/volume_balance_handler.go

@ -5,6 +5,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
@ -35,6 +36,8 @@ func init() {
type volumeBalanceWorkerConfig struct {
TaskConfig *balancetask.Config
MinIntervalSeconds int
MaxConcurrentMoves int
BatchSize int
}
// VolumeBalanceHandler is the plugin job handler for volume balancing.
@ -133,6 +136,31 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
},
},
},
{
SectionId: "batch_execution",
Title: "Batch Execution",
Description: "Controls for running multiple volume moves per job. The worker coordinates moves via gRPC and is not on the data path.",
Fields: []*plugin_pb.ConfigField{
{
Name: "max_concurrent_moves",
Label: "Max Concurrent Moves",
Description: "Maximum number of volume moves to run concurrently within a single batch job.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}},
},
{
Name: "batch_size",
Label: "Batch Size",
Description: "Maximum number of volume moves to group into a single job. Set to 1 to disable batching.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 100}},
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {
@ -144,6 +172,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
@ -167,6 +201,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20},
},
},
}
}
@ -234,14 +274,19 @@ func (h *VolumeBalanceHandler) Detect(
glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr)
}
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildVolumeBalanceProposal(result)
if proposalErr != nil {
glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr)
continue
var proposals []*plugin_pb.JobProposal
if workerConfig.BatchSize > 1 && len(results) > 1 {
proposals = buildBatchVolumeBalanceProposals(results, workerConfig.BatchSize, workerConfig.MaxConcurrentMoves)
} else {
proposals = make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildVolumeBalanceProposal(result)
if proposalErr != nil {
glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr)
continue
}
proposals = append(proposals, proposal)
}
proposals = append(proposals, proposal)
}
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
@ -511,6 +556,8 @@ func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int {
return len(diskTypes)
}
const defaultMaxConcurrentMoves = 5
func (h *VolumeBalanceHandler) Execute(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
@ -530,6 +577,24 @@ func (h *VolumeBalanceHandler) Execute(
if err != nil {
return err
}
applyBalanceExecutionDefaults(params)
// Batch path: if BalanceTaskParams has moves, execute them concurrently
if bp := params.GetBalanceParams(); bp != nil && len(bp.Moves) > 0 {
return h.executeBatchMoves(ctx, request, params, sender)
}
// Single-move path (backward compatible)
return h.executeSingleMove(ctx, request, params, sender)
}
func (h *VolumeBalanceHandler) executeSingleMove(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
params *worker_pb.TaskParams,
sender ExecutionSender,
) error {
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
return fmt.Errorf("volume balance source node is required")
}
@ -537,8 +602,6 @@ func (h *VolumeBalanceHandler) Execute(
return fmt.Errorf("volume balance target node is required")
}
applyBalanceExecutionDefaults(params)
task := balancetask.NewBalanceTask(
request.Job.JobId,
params.Sources[0].Node,
@ -621,6 +684,181 @@ func (h *VolumeBalanceHandler) Execute(
})
}
// executeBatchMoves runs multiple volume moves concurrently within a single job.
func (h *VolumeBalanceHandler) executeBatchMoves(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
params *worker_pb.TaskParams,
sender ExecutionSender,
) error {
bp := params.GetBalanceParams()
moves := bp.Moves
maxConcurrent := int(bp.MaxConcurrentMoves)
if maxConcurrent <= 0 {
maxConcurrent = defaultMaxConcurrentMoves
}
totalMoves := len(moves)
glog.Infof("batch volume balance: %d moves, max concurrent %d", totalMoves, maxConcurrent)
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: fmt.Sprintf("batch volume balance accepted: %d moves", totalMoves),
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity("assigned", fmt.Sprintf("batch volume balance: %d moves, concurrency %d", totalMoves, maxConcurrent)),
},
}); err != nil {
return err
}
// Per-move progress tracking
var mu sync.Mutex
moveProgress := make([]float64, totalMoves)
reportAggregate := func(moveIndex int, progress float64, stage string) {
mu.Lock()
moveProgress[moveIndex] = progress
total := 0.0
for _, p := range moveProgress {
total += p
}
mu.Unlock()
aggregate := total / float64(totalMoves)
move := moves[moveIndex]
message := fmt.Sprintf("[Move %d/%d vol:%d] %s", moveIndex+1, totalMoves, move.VolumeId, stage)
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: aggregate,
Stage: fmt.Sprintf("move %d/%d", moveIndex+1, totalMoves),
Message: message,
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity(fmt.Sprintf("move-%d", moveIndex+1), message),
},
})
}
type moveResult struct {
index int
volumeID uint32
source string
target string
err error
}
sem := make(chan struct{}, maxConcurrent)
results := make(chan moveResult, totalMoves)
for i, move := range moves {
sem <- struct{}{} // acquire slot
go func(idx int, m *worker_pb.BalanceMoveSpec) {
defer func() { <-sem }() // release slot
task := balancetask.NewBalanceTask(
fmt.Sprintf("%s-move-%d", request.Job.JobId, idx),
m.SourceNode,
m.VolumeId,
m.Collection,
h.grpcDialOption,
)
task.SetProgressCallback(func(progress float64, stage string) {
reportAggregate(idx, progress, stage)
})
moveParams := buildMoveTaskParams(m, bp.TimeoutSeconds)
err := task.Execute(ctx, moveParams)
results <- moveResult{
index: idx,
volumeID: m.VolumeId,
source: m.SourceNode,
target: m.TargetNode,
err: err,
}
}(i, move)
}
// Collect all results
var succeeded, failed int
var errMessages []string
var successDetails []string
for range moves {
r := <-results
if r.err != nil {
failed++
errMessages = append(errMessages, fmt.Sprintf("volume %d (%s→%s): %v", r.volumeID, r.source, r.target, r.err))
glog.Warningf("batch balance move %d failed: volume %d %s→%s: %v", r.index, r.volumeID, r.source, r.target, r.err)
} else {
succeeded++
successDetails = append(successDetails, fmt.Sprintf("volume %d (%s→%s)", r.volumeID, r.source, r.target))
}
}
summary := fmt.Sprintf("%d/%d volumes moved successfully", succeeded, totalMoves)
if failed > 0 {
summary += fmt.Sprintf("; %d failed", failed)
}
success := failed == 0
var errMsg string
if !success {
errMsg = strings.Join(errMessages, "; ")
}
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
Success: success,
ErrorMessage: errMsg,
Result: &plugin_pb.JobResult{
Summary: summary,
OutputValues: map[string]*plugin_pb.ConfigValue{
"total_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalMoves)},
},
"succeeded": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(succeeded)},
},
"failed": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(failed)},
},
},
},
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity("completed", summary),
},
})
}
// buildMoveTaskParams constructs a TaskParams for a single move within a batch.
func buildMoveTaskParams(move *worker_pb.BalanceMoveSpec, timeoutSeconds int32) *worker_pb.TaskParams {
if timeoutSeconds <= 0 {
timeoutSeconds = defaultBalanceTimeoutSeconds
}
return &worker_pb.TaskParams{
VolumeId: move.VolumeId,
Collection: move.Collection,
VolumeSize: move.VolumeSize,
Sources: []*worker_pb.TaskSource{
{Node: move.SourceNode, VolumeId: move.VolumeId},
},
Targets: []*worker_pb.TaskTarget{
{Node: move.TargetNode, VolumeId: move.VolumeId},
},
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
TimeoutSeconds: timeoutSeconds,
},
},
}
}
func (h *VolumeBalanceHandler) collectVolumeMetrics(
ctx context.Context,
masterAddresses []string,
@ -654,9 +892,27 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume
minIntervalSeconds = 0
}
maxConcurrentMoves := int(readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves)))
if maxConcurrentMoves < 1 {
maxConcurrentMoves = 1
}
if maxConcurrentMoves > 50 {
maxConcurrentMoves = 50
}
batchSize := int(readInt64Config(values, "batch_size", 20))
if batchSize < 1 {
batchSize = 1
}
if batchSize > 100 {
batchSize = 100
}
return &volumeBalanceWorkerConfig{
TaskConfig: taskConfig,
MinIntervalSeconds: minIntervalSeconds,
MaxConcurrentMoves: maxConcurrentMoves,
BatchSize: batchSize,
}
}
@ -738,6 +994,136 @@ func buildVolumeBalanceProposal(
}, nil
}
// buildBatchVolumeBalanceProposals groups detection results into batch proposals.
// Each batch proposal encodes multiple moves in BalanceTaskParams.Moves.
func buildBatchVolumeBalanceProposals(
results []*workertypes.TaskDetectionResult,
batchSize int,
maxConcurrentMoves int,
) []*plugin_pb.JobProposal {
if batchSize <= 0 {
batchSize = 1
}
if maxConcurrentMoves <= 0 {
maxConcurrentMoves = defaultMaxConcurrentMoves
}
var proposals []*plugin_pb.JobProposal
for batchStart := 0; batchStart < len(results); batchStart += batchSize {
batchEnd := batchStart + batchSize
if batchEnd > len(results) {
batchEnd = len(results)
}
batch := results[batchStart:batchEnd]
// If only one result in this batch, emit a single-move proposal
if len(batch) == 1 {
proposal, err := buildVolumeBalanceProposal(batch[0])
if err != nil {
glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", err)
continue
}
proposals = append(proposals, proposal)
continue
}
// Build batch proposal with BalanceMoveSpec entries
moves := make([]*worker_pb.BalanceMoveSpec, 0, len(batch))
var volumeIDs []string
var dedupeKeys []string
highestPriority := workertypes.TaskPriorityLow
for _, result := range batch {
if result == nil || result.TypedParams == nil {
continue
}
sourceNode := ""
targetNode := ""
if len(result.TypedParams.Sources) > 0 {
sourceNode = result.TypedParams.Sources[0].Node
}
if len(result.TypedParams.Targets) > 0 {
targetNode = result.TypedParams.Targets[0].Node
}
moves = append(moves, &worker_pb.BalanceMoveSpec{
VolumeId: uint32(result.VolumeID),
SourceNode: sourceNode,
TargetNode: targetNode,
Collection: result.Collection,
VolumeSize: result.TypedParams.VolumeSize,
})
volumeIDs = append(volumeIDs, fmt.Sprintf("%d", result.VolumeID))
dedupeKey := fmt.Sprintf("volume_balance:%d", result.VolumeID)
if result.Collection != "" {
dedupeKey += ":" + result.Collection
}
dedupeKeys = append(dedupeKeys, dedupeKey)
if result.Priority > highestPriority {
highestPriority = result.Priority
}
}
if len(moves) == 0 {
continue
}
// Serialize batch params
taskParams := &worker_pb.TaskParams{
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
TimeoutSeconds: defaultBalanceTimeoutSeconds,
MaxConcurrentMoves: int32(maxConcurrentMoves),
Moves: moves,
},
},
}
payload, err := proto.Marshal(taskParams)
if err != nil {
glog.Warningf("Plugin worker failed to marshal batch balance proposal: %v", err)
continue
}
proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano())
summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ","))
if len(summary) > 200 {
summary = fmt.Sprintf("Batch balance %d volumes", len(moves))
}
// Use composite dedupe key for the batch
compositeDedupeKey := fmt.Sprintf("volume_balance_batch:%s", strings.Join(dedupeKeys, "+"))
if len(compositeDedupeKey) > 200 {
compositeDedupeKey = fmt.Sprintf("volume_balance_batch:%d-%d", batchStart, time.Now().UnixNano())
}
proposals = append(proposals, &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: compositeDedupeKey,
JobType: "volume_balance",
Priority: mapTaskPriority(highestPriority),
Summary: summary,
Detail: fmt.Sprintf("Batch of %d volume moves with concurrency %d", len(moves), maxConcurrentMoves),
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))},
},
},
Labels: map[string]string{
"task_type": "balance",
"batch": "true",
"batch_size": fmt.Sprintf("%d", len(moves)),
},
})
}
return proposals
}
func decodeVolumeBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
if job == nil {
return nil, fmt.Errorf("job spec is nil")

354
weed/plugin/worker/volume_balance_handler_test.go

@ -2,7 +2,9 @@ package pluginworker
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
@ -111,6 +113,167 @@ func TestDeriveBalanceWorkerConfig(t *testing.T) {
if cfg.MinIntervalSeconds != 33 {
t.Fatalf("expected min_interval_seconds 33, got %d", cfg.MinIntervalSeconds)
}
// Defaults for batch config when not specified
if cfg.MaxConcurrentMoves != defaultMaxConcurrentMoves {
t.Fatalf("expected default max_concurrent_moves %d, got %d", defaultMaxConcurrentMoves, cfg.MaxConcurrentMoves)
}
if cfg.BatchSize != 20 {
t.Fatalf("expected default batch_size 20, got %d", cfg.BatchSize)
}
}
func TestDeriveBalanceWorkerConfigBatchFields(t *testing.T) {
values := map[string]*plugin_pb.ConfigValue{
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50},
},
}
cfg := deriveBalanceWorkerConfig(values)
if cfg.MaxConcurrentMoves != 10 {
t.Fatalf("expected max_concurrent_moves 10, got %d", cfg.MaxConcurrentMoves)
}
if cfg.BatchSize != 50 {
t.Fatalf("expected batch_size 50, got %d", cfg.BatchSize)
}
}
func TestDeriveBalanceWorkerConfigBatchClamping(t *testing.T) {
values := map[string]*plugin_pb.ConfigValue{
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 999},
},
"batch_size": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0},
},
}
cfg := deriveBalanceWorkerConfig(values)
if cfg.MaxConcurrentMoves != 50 {
t.Fatalf("expected max_concurrent_moves clamped to 50, got %d", cfg.MaxConcurrentMoves)
}
if cfg.BatchSize != 1 {
t.Fatalf("expected batch_size clamped to 1, got %d", cfg.BatchSize)
}
}
func makeDetectionResult(volumeID uint32, source, target, collection string) *workertypes.TaskDetectionResult {
return &workertypes.TaskDetectionResult{
TaskID: fmt.Sprintf("balance-%d", volumeID),
TaskType: workertypes.TaskTypeBalance,
VolumeID: volumeID,
Server: source,
Collection: collection,
Priority: workertypes.TaskPriorityNormal,
Reason: "imbalanced",
TypedParams: &worker_pb.TaskParams{
VolumeId: volumeID,
Collection: collection,
VolumeSize: 1024,
Sources: []*worker_pb.TaskSource{
{Node: source, VolumeId: volumeID},
},
Targets: []*worker_pb.TaskTarget{
{Node: target, VolumeId: volumeID},
},
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{TimeoutSeconds: 600},
},
},
}
}
func TestBuildBatchVolumeBalanceProposals_SingleBatch(t *testing.T) {
results := []*workertypes.TaskDetectionResult{
makeDetectionResult(1, "s1:8080", "t1:8080", "c1"),
makeDetectionResult(2, "s2:8080", "t2:8080", "c1"),
makeDetectionResult(3, "s1:8080", "t2:8080", "c1"),
}
proposals := buildBatchVolumeBalanceProposals(results, 10, 5)
if len(proposals) != 1 {
t.Fatalf("expected 1 batch proposal, got %d", len(proposals))
}
p := proposals[0]
if p.Labels["batch"] != "true" {
t.Fatalf("expected batch label")
}
if p.Labels["batch_size"] != "3" {
t.Fatalf("expected batch_size label '3', got %q", p.Labels["batch_size"])
}
// Decode and verify moves
payload := p.Parameters["task_params_pb"].GetBytesValue()
if len(payload) == 0 {
t.Fatalf("expected task_params_pb payload")
}
decoded := &worker_pb.TaskParams{}
if err := proto.Unmarshal(payload, decoded); err != nil {
t.Fatalf("unmarshal: %v", err)
}
moves := decoded.GetBalanceParams().GetMoves()
if len(moves) != 3 {
t.Fatalf("expected 3 moves, got %d", len(moves))
}
if moves[0].VolumeId != 1 || moves[1].VolumeId != 2 || moves[2].VolumeId != 3 {
t.Fatalf("unexpected volume IDs: %v", moves)
}
if decoded.GetBalanceParams().MaxConcurrentMoves != 5 {
t.Fatalf("expected MaxConcurrentMoves 5, got %d", decoded.GetBalanceParams().MaxConcurrentMoves)
}
}
func TestBuildBatchVolumeBalanceProposals_MultipleBatches(t *testing.T) {
results := make([]*workertypes.TaskDetectionResult, 5)
for i := range results {
results[i] = makeDetectionResult(uint32(i+1), "s1:8080", "t1:8080", "c1")
}
proposals := buildBatchVolumeBalanceProposals(results, 2, 3)
// 5 results / batch_size 2 = 3 proposals (2, 2, 1)
if len(proposals) != 3 {
t.Fatalf("expected 3 proposals, got %d", len(proposals))
}
// First two should be batch proposals
if proposals[0].Labels["batch"] != "true" {
t.Fatalf("first proposal should be batch")
}
if proposals[1].Labels["batch"] != "true" {
t.Fatalf("second proposal should be batch")
}
// Last one has only 1 result, should fall back to single-move proposal
if proposals[2].Labels["batch"] == "true" {
t.Fatalf("last proposal with 1 result should be single-move, not batch")
}
}
func TestBuildBatchVolumeBalanceProposals_BatchSizeOne(t *testing.T) {
results := []*workertypes.TaskDetectionResult{
makeDetectionResult(1, "s1:8080", "t1:8080", "c1"),
makeDetectionResult(2, "s2:8080", "t2:8080", "c1"),
}
// batch_size=1 should not be called (Detect guards this), but test the function directly
proposals := buildBatchVolumeBalanceProposals(results, 1, 5)
// Each result becomes its own single-move proposal
if len(proposals) != 2 {
t.Fatalf("expected 2 proposals, got %d", len(proposals))
}
}
func TestVolumeBalanceDescriptorHasBatchFields(t *testing.T) {
descriptor := NewVolumeBalanceHandler(nil).Descriptor()
if !workerConfigFormHasField(descriptor.WorkerConfigForm, "max_concurrent_moves") {
t.Fatalf("expected max_concurrent_moves in worker config form")
}
if !workerConfigFormHasField(descriptor.WorkerConfigForm, "batch_size") {
t.Fatalf("expected batch_size in worker config form")
}
}
func TestBuildVolumeBalanceProposal(t *testing.T) {
@ -265,6 +428,197 @@ func TestVolumeBalanceDescriptorOmitsExecutionTuningFields(t *testing.T) {
}
}
type recordingExecutionSender struct {
mu sync.Mutex
progress []*plugin_pb.JobProgressUpdate
completed *plugin_pb.JobCompleted
}
func (r *recordingExecutionSender) SendProgress(p *plugin_pb.JobProgressUpdate) error {
r.mu.Lock()
defer r.mu.Unlock()
r.progress = append(r.progress, p)
return nil
}
func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) error {
r.mu.Lock()
defer r.mu.Unlock()
r.completed = c
return nil
}
func TestBuildMoveTaskParams(t *testing.T) {
move := &worker_pb.BalanceMoveSpec{
VolumeId: 42,
SourceNode: "10.0.0.1:8080",
TargetNode: "10.0.0.2:8080",
Collection: "photos",
VolumeSize: 1024 * 1024,
}
params := buildMoveTaskParams(move, 300)
if params.VolumeId != 42 {
t.Fatalf("expected volume_id 42, got %d", params.VolumeId)
}
if params.Collection != "photos" {
t.Fatalf("expected collection photos, got %s", params.Collection)
}
if params.VolumeSize != 1024*1024 {
t.Fatalf("expected volume_size %d, got %d", 1024*1024, params.VolumeSize)
}
if len(params.Sources) != 1 || params.Sources[0].Node != "10.0.0.1:8080" {
t.Fatalf("unexpected sources: %+v", params.Sources)
}
if len(params.Targets) != 1 || params.Targets[0].Node != "10.0.0.2:8080" {
t.Fatalf("unexpected targets: %+v", params.Targets)
}
bp := params.GetBalanceParams()
if bp == nil {
t.Fatalf("expected balance params")
}
if bp.TimeoutSeconds != 300 {
t.Fatalf("expected timeout 300, got %d", bp.TimeoutSeconds)
}
}
func TestBuildMoveTaskParamsDefaultTimeout(t *testing.T) {
move := &worker_pb.BalanceMoveSpec{
VolumeId: 1,
SourceNode: "a:8080",
TargetNode: "b:8080",
}
params := buildMoveTaskParams(move, 0)
if params.GetBalanceParams().TimeoutSeconds != defaultBalanceTimeoutSeconds {
t.Fatalf("expected default timeout %d, got %d", defaultBalanceTimeoutSeconds, params.GetBalanceParams().TimeoutSeconds)
}
}
func TestExecuteDispatchesBatchPath(t *testing.T) {
// Build a job with batch moves in BalanceTaskParams
bp := &worker_pb.BalanceTaskParams{
TimeoutSeconds: 60,
MaxConcurrentMoves: 2,
Moves: []*worker_pb.BalanceMoveSpec{
{VolumeId: 1, SourceNode: "s1:8080", TargetNode: "t1:8080", Collection: "c1"},
{VolumeId: 2, SourceNode: "s2:8080", TargetNode: "t2:8080", Collection: "c1"},
},
}
taskParams := &worker_pb.TaskParams{
TaskId: "batch-1",
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: bp,
},
}
payload, err := proto.Marshal(taskParams)
if err != nil {
t.Fatalf("marshal: %v", err)
}
job := &plugin_pb.JobSpec{
JobId: "batch-job-1",
JobType: "volume_balance",
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}},
},
}
handler := NewVolumeBalanceHandler(nil)
sender := &recordingExecutionSender{}
// Execute will enter the batch path. It will fail because there's no real gRPC server,
// but we verify it sends the assigned progress and eventually a completion.
err = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{
Job: job,
}, sender)
// Expect an error since no real volume servers exist
// But verify the batch path was taken by checking the assigned message
sender.mu.Lock()
defer sender.mu.Unlock()
if len(sender.progress) == 0 {
t.Fatalf("expected progress messages from batch path")
}
// First progress should be "assigned" with batch info
first := sender.progress[0]
if first.Stage != "assigned" {
t.Fatalf("expected first stage 'assigned', got %q", first.Stage)
}
if !strings.Contains(first.Message, "batch") || !strings.Contains(first.Message, "2 moves") {
t.Fatalf("expected batch assigned message, got %q", first.Message)
}
// Should have a completion with failure details (since no servers)
if sender.completed == nil {
t.Fatalf("expected completion message")
}
if sender.completed.Success {
t.Fatalf("expected failure since no real gRPC servers")
}
// Should report 0 succeeded, 2 failed
if v, ok := sender.completed.Result.OutputValues["failed"]; !ok || v.GetInt64Value() != 2 {
t.Fatalf("expected 2 failed moves, got %+v", sender.completed.Result.OutputValues)
}
}
func TestExecuteSingleMovePathUnchanged(t *testing.T) {
// Build a single-move job (no batch moves)
taskParams := &worker_pb.TaskParams{
TaskId: "single-1",
VolumeId: 99,
Collection: "videos",
Sources: []*worker_pb.TaskSource{
{Node: "src:8080", VolumeId: 99},
},
Targets: []*worker_pb.TaskTarget{
{Node: "dst:8080", VolumeId: 99},
},
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
TimeoutSeconds: 60,
},
},
}
payload, err := proto.Marshal(taskParams)
if err != nil {
t.Fatalf("marshal: %v", err)
}
job := &plugin_pb.JobSpec{
JobId: "single-job-1",
JobType: "volume_balance",
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}},
},
}
handler := NewVolumeBalanceHandler(nil)
sender := &recordingExecutionSender{}
// Execute single-move path. Will fail on gRPC but verify it takes the single-move path.
_ = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{
Job: job,
}, sender)
sender.mu.Lock()
defer sender.mu.Unlock()
if len(sender.progress) == 0 {
t.Fatalf("expected progress messages")
}
// Single-move path sends "volume balance job accepted" not "batch volume balance"
first := sender.progress[0]
if first.Stage != "assigned" {
t.Fatalf("expected first stage 'assigned', got %q", first.Stage)
}
if strings.Contains(first.Message, "batch") {
t.Fatalf("single-move path should not mention batch, got %q", first.Message)
}
}
func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool {
if form == nil {
return false

Loading…
Cancel
Save