diff --git a/weed/admin/topology/storage_impact.go b/weed/admin/topology/storage_impact.go index e325fc9cf..f1a579271 100644 --- a/weed/admin/topology/storage_impact.go +++ b/weed/admin/topology/storage_impact.go @@ -28,6 +28,11 @@ func CalculateTaskStorageImpact(taskType TaskType, volumeSize int64) (sourceChan // Replication task: creates new replica on target return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} + case TaskTypeECBalance: + // EC balance task: moves individual shard from source to target + // Source frees 1 shard slot, target gains 1 shard slot + return StorageSlotChange{VolumeSlots: 0, ShardSlots: -1}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 1} + default: // Unknown task type, assume minimal impact glog.Warningf("unhandled task type %s in CalculateTaskStorageImpact, assuming default impact", taskType) diff --git a/weed/admin/topology/types.go b/weed/admin/topology/types.go index 054019271..d7c1d5352 100644 --- a/weed/admin/topology/types.go +++ b/weed/admin/topology/types.go @@ -15,6 +15,7 @@ const ( TaskTypeBalance TaskType = "balance" TaskTypeErasureCoding TaskType = "erasure_coding" TaskTypeReplication TaskType = "replication" + TaskTypeECBalance TaskType = "ec_balance" ) // Common task status constants diff --git a/weed/command/worker_test.go b/weed/command/worker_test.go index 8533b8978..881a73aec 100644 --- a/weed/command/worker_test.go +++ b/weed/command/worker_test.go @@ -14,8 +14,8 @@ func TestWorkerDefaultJobTypes(t *testing.T) { if err != nil { t.Fatalf("buildPluginWorkerHandlers(default worker flag) err = %v", err) } - // Expected: vacuum, volume_balance, admin_script, erasure_coding, iceberg_maintenance - if len(handlers) != 5 { - t.Fatalf("expected default worker job types to include 5 handlers, got %d", len(handlers)) + // Expected: vacuum, volume_balance, admin_script, erasure_coding, iceberg_maintenance, ec_balance + if len(handlers) != 6 { + t.Fatalf("expected default worker job types to include 6 handlers, got %d", len(handlers)) } } diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto index 9f9684473..4bf15c444 100644 --- a/weed/pb/worker.proto +++ b/weed/pb/worker.proto @@ -111,6 +111,7 @@ message TaskParams { ErasureCodingTaskParams erasure_coding_params = 10; BalanceTaskParams balance_params = 11; ReplicationTaskParams replication_params = 12; + EcBalanceTaskParams ec_balance_params = 13; } } @@ -307,6 +308,7 @@ message TaskPolicy { ErasureCodingTaskConfig erasure_coding_config = 6; BalanceTaskConfig balance_config = 7; ReplicationTaskConfig replication_config = 8; + EcBalanceTaskConfig ec_balance_config = 9; } } @@ -339,6 +341,34 @@ message ReplicationTaskConfig { int32 target_replica_count = 1; // Target number of replicas } +// EcBalanceTaskParams for EC shard balancing operations +message EcBalanceTaskParams { + string disk_type = 1; // Disk type filter (hdd, ssd, "") + int32 max_parallelization = 2; // Max parallel shard moves within a batch + int32 timeout_seconds = 3; // Operation timeout per move + repeated EcShardMoveSpec moves = 4; // Batch: multiple shard moves in one job +} + +// EcShardMoveSpec describes a single EC shard move within a batch +message EcShardMoveSpec { + uint32 volume_id = 1; // EC volume ID + uint32 shard_id = 2; // Shard ID (0-13) + string collection = 3; // Collection name + string source_node = 4; // Source server address + uint32 source_disk_id = 5; // Source disk ID + string target_node = 6; // Target server address + uint32 target_disk_id = 7; // Target disk ID +} + +// EcBalanceTaskConfig contains EC balance-specific configuration +message EcBalanceTaskConfig { + double imbalance_threshold = 1; // Threshold for triggering EC shard rebalancing + int32 min_server_count = 2; // Minimum number of servers required + string collection_filter = 3; // Collection filter + string disk_type = 4; // Disk type filter + repeated string preferred_tags = 5; // Preferred disk tags for placement +} + // ========== Task Persistence Messages ========== // MaintenanceTaskData represents complete task state for persistence diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go index 7677b0837..152417cfb 100644 --- a/weed/pb/worker_pb/worker.pb.go +++ b/weed/pb/worker_pb/worker.pb.go @@ -821,6 +821,7 @@ type TaskParams struct { // *TaskParams_ErasureCodingParams // *TaskParams_BalanceParams // *TaskParams_ReplicationParams + // *TaskParams_EcBalanceParams TaskParams isTaskParams_TaskParams `protobuf_oneof:"task_params"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -955,6 +956,15 @@ func (x *TaskParams) GetReplicationParams() *ReplicationTaskParams { return nil } +func (x *TaskParams) GetEcBalanceParams() *EcBalanceTaskParams { + if x != nil { + if x, ok := x.TaskParams.(*TaskParams_EcBalanceParams); ok { + return x.EcBalanceParams + } + } + return nil +} + type isTaskParams_TaskParams interface { isTaskParams_TaskParams() } @@ -975,6 +985,10 @@ type TaskParams_ReplicationParams struct { ReplicationParams *ReplicationTaskParams `protobuf:"bytes,12,opt,name=replication_params,json=replicationParams,proto3,oneof"` } +type TaskParams_EcBalanceParams struct { + EcBalanceParams *EcBalanceTaskParams `protobuf:"bytes,13,opt,name=ec_balance_params,json=ecBalanceParams,proto3,oneof"` +} + func (*TaskParams_VacuumParams) isTaskParams_TaskParams() {} func (*TaskParams_ErasureCodingParams) isTaskParams_TaskParams() {} @@ -983,6 +997,8 @@ func (*TaskParams_BalanceParams) isTaskParams_TaskParams() {} func (*TaskParams_ReplicationParams) isTaskParams_TaskParams() {} +func (*TaskParams_EcBalanceParams) isTaskParams_TaskParams() {} + // VacuumTaskParams for vacuum operations type VacuumTaskParams struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2480,6 +2496,7 @@ type TaskPolicy struct { // *TaskPolicy_ErasureCodingConfig // *TaskPolicy_BalanceConfig // *TaskPolicy_ReplicationConfig + // *TaskPolicy_EcBalanceConfig TaskConfig isTaskPolicy_TaskConfig `protobuf_oneof:"task_config"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -2586,6 +2603,15 @@ func (x *TaskPolicy) GetReplicationConfig() *ReplicationTaskConfig { return nil } +func (x *TaskPolicy) GetEcBalanceConfig() *EcBalanceTaskConfig { + if x != nil { + if x, ok := x.TaskConfig.(*TaskPolicy_EcBalanceConfig); ok { + return x.EcBalanceConfig + } + } + return nil +} + type isTaskPolicy_TaskConfig interface { isTaskPolicy_TaskConfig() } @@ -2606,6 +2632,10 @@ type TaskPolicy_ReplicationConfig struct { ReplicationConfig *ReplicationTaskConfig `protobuf:"bytes,8,opt,name=replication_config,json=replicationConfig,proto3,oneof"` } +type TaskPolicy_EcBalanceConfig struct { + EcBalanceConfig *EcBalanceTaskConfig `protobuf:"bytes,9,opt,name=ec_balance_config,json=ecBalanceConfig,proto3,oneof"` +} + func (*TaskPolicy_VacuumConfig) isTaskPolicy_TaskConfig() {} func (*TaskPolicy_ErasureCodingConfig) isTaskPolicy_TaskConfig() {} @@ -2614,6 +2644,8 @@ func (*TaskPolicy_BalanceConfig) isTaskPolicy_TaskConfig() {} func (*TaskPolicy_ReplicationConfig) isTaskPolicy_TaskConfig() {} +func (*TaskPolicy_EcBalanceConfig) isTaskPolicy_TaskConfig() {} + // VacuumTaskConfig contains vacuum-specific configuration type VacuumTaskConfig struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2850,6 +2882,245 @@ func (x *ReplicationTaskConfig) GetTargetReplicaCount() int32 { return 0 } +// EcBalanceTaskParams for EC shard balancing operations +type EcBalanceTaskParams struct { + state protoimpl.MessageState `protogen:"open.v1"` + DiskType string `protobuf:"bytes,1,opt,name=disk_type,json=diskType,proto3" json:"disk_type,omitempty"` // Disk type filter (hdd, ssd, "") + MaxParallelization int32 `protobuf:"varint,2,opt,name=max_parallelization,json=maxParallelization,proto3" json:"max_parallelization,omitempty"` // Max parallel shard moves within a batch + TimeoutSeconds int32 `protobuf:"varint,3,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` // Operation timeout per move + Moves []*EcShardMoveSpec `protobuf:"bytes,4,rep,name=moves,proto3" json:"moves,omitempty"` // Batch: multiple shard moves in one job + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EcBalanceTaskParams) Reset() { + *x = EcBalanceTaskParams{} + mi := &file_worker_proto_msgTypes[32] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EcBalanceTaskParams) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EcBalanceTaskParams) ProtoMessage() {} + +func (x *EcBalanceTaskParams) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[32] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EcBalanceTaskParams.ProtoReflect.Descriptor instead. +func (*EcBalanceTaskParams) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{32} +} + +func (x *EcBalanceTaskParams) GetDiskType() string { + if x != nil { + return x.DiskType + } + return "" +} + +func (x *EcBalanceTaskParams) GetMaxParallelization() int32 { + if x != nil { + return x.MaxParallelization + } + return 0 +} + +func (x *EcBalanceTaskParams) GetTimeoutSeconds() int32 { + if x != nil { + return x.TimeoutSeconds + } + return 0 +} + +func (x *EcBalanceTaskParams) GetMoves() []*EcShardMoveSpec { + if x != nil { + return x.Moves + } + return nil +} + +// EcShardMoveSpec describes a single EC shard move within a batch +type EcShardMoveSpec struct { + state protoimpl.MessageState `protogen:"open.v1"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` // EC volume ID + ShardId uint32 `protobuf:"varint,2,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` // Shard ID (0-13) + Collection string `protobuf:"bytes,3,opt,name=collection,proto3" json:"collection,omitempty"` // Collection name + SourceNode string `protobuf:"bytes,4,opt,name=source_node,json=sourceNode,proto3" json:"source_node,omitempty"` // Source server address + SourceDiskId uint32 `protobuf:"varint,5,opt,name=source_disk_id,json=sourceDiskId,proto3" json:"source_disk_id,omitempty"` // Source disk ID + TargetNode string `protobuf:"bytes,6,opt,name=target_node,json=targetNode,proto3" json:"target_node,omitempty"` // Target server address + TargetDiskId uint32 `protobuf:"varint,7,opt,name=target_disk_id,json=targetDiskId,proto3" json:"target_disk_id,omitempty"` // Target disk ID + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EcShardMoveSpec) Reset() { + *x = EcShardMoveSpec{} + mi := &file_worker_proto_msgTypes[33] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EcShardMoveSpec) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EcShardMoveSpec) ProtoMessage() {} + +func (x *EcShardMoveSpec) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[33] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EcShardMoveSpec.ProtoReflect.Descriptor instead. +func (*EcShardMoveSpec) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{33} +} + +func (x *EcShardMoveSpec) GetVolumeId() uint32 { + if x != nil { + return x.VolumeId + } + return 0 +} + +func (x *EcShardMoveSpec) GetShardId() uint32 { + if x != nil { + return x.ShardId + } + return 0 +} + +func (x *EcShardMoveSpec) GetCollection() string { + if x != nil { + return x.Collection + } + return "" +} + +func (x *EcShardMoveSpec) GetSourceNode() string { + if x != nil { + return x.SourceNode + } + return "" +} + +func (x *EcShardMoveSpec) GetSourceDiskId() uint32 { + if x != nil { + return x.SourceDiskId + } + return 0 +} + +func (x *EcShardMoveSpec) GetTargetNode() string { + if x != nil { + return x.TargetNode + } + return "" +} + +func (x *EcShardMoveSpec) GetTargetDiskId() uint32 { + if x != nil { + return x.TargetDiskId + } + return 0 +} + +// EcBalanceTaskConfig contains EC balance-specific configuration +type EcBalanceTaskConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + ImbalanceThreshold float64 `protobuf:"fixed64,1,opt,name=imbalance_threshold,json=imbalanceThreshold,proto3" json:"imbalance_threshold,omitempty"` // Threshold for triggering EC shard rebalancing + MinServerCount int32 `protobuf:"varint,2,opt,name=min_server_count,json=minServerCount,proto3" json:"min_server_count,omitempty"` // Minimum number of servers required + CollectionFilter string `protobuf:"bytes,3,opt,name=collection_filter,json=collectionFilter,proto3" json:"collection_filter,omitempty"` // Collection filter + DiskType string `protobuf:"bytes,4,opt,name=disk_type,json=diskType,proto3" json:"disk_type,omitempty"` // Disk type filter + PreferredTags []string `protobuf:"bytes,5,rep,name=preferred_tags,json=preferredTags,proto3" json:"preferred_tags,omitempty"` // Preferred disk tags for placement + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EcBalanceTaskConfig) Reset() { + *x = EcBalanceTaskConfig{} + mi := &file_worker_proto_msgTypes[34] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EcBalanceTaskConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EcBalanceTaskConfig) ProtoMessage() {} + +func (x *EcBalanceTaskConfig) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[34] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EcBalanceTaskConfig.ProtoReflect.Descriptor instead. +func (*EcBalanceTaskConfig) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{34} +} + +func (x *EcBalanceTaskConfig) GetImbalanceThreshold() float64 { + if x != nil { + return x.ImbalanceThreshold + } + return 0 +} + +func (x *EcBalanceTaskConfig) GetMinServerCount() int32 { + if x != nil { + return x.MinServerCount + } + return 0 +} + +func (x *EcBalanceTaskConfig) GetCollectionFilter() string { + if x != nil { + return x.CollectionFilter + } + return "" +} + +func (x *EcBalanceTaskConfig) GetDiskType() string { + if x != nil { + return x.DiskType + } + return "" +} + +func (x *EcBalanceTaskConfig) GetPreferredTags() []string { + if x != nil { + return x.PreferredTags + } + return nil +} + // MaintenanceTaskData represents complete task state for persistence type MaintenanceTaskData struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2884,7 +3155,7 @@ type MaintenanceTaskData struct { func (x *MaintenanceTaskData) Reset() { *x = MaintenanceTaskData{} - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2896,7 +3167,7 @@ func (x *MaintenanceTaskData) String() string { func (*MaintenanceTaskData) ProtoMessage() {} func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[35] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2909,7 +3180,7 @@ func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenanceTaskData.ProtoReflect.Descriptor instead. func (*MaintenanceTaskData) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{32} + return file_worker_proto_rawDescGZIP(), []int{35} } func (x *MaintenanceTaskData) GetId() string { @@ -3094,7 +3365,7 @@ type TaskAssignmentRecord struct { func (x *TaskAssignmentRecord) Reset() { *x = TaskAssignmentRecord{} - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3106,7 +3377,7 @@ func (x *TaskAssignmentRecord) String() string { func (*TaskAssignmentRecord) ProtoMessage() {} func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[36] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3119,7 +3390,7 @@ func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskAssignmentRecord.ProtoReflect.Descriptor instead. func (*TaskAssignmentRecord) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{33} + return file_worker_proto_rawDescGZIP(), []int{36} } func (x *TaskAssignmentRecord) GetWorkerId() string { @@ -3171,7 +3442,7 @@ type TaskCreationMetrics struct { func (x *TaskCreationMetrics) Reset() { *x = TaskCreationMetrics{} - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[37] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3183,7 +3454,7 @@ func (x *TaskCreationMetrics) String() string { func (*TaskCreationMetrics) ProtoMessage() {} func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[37] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3196,7 +3467,7 @@ func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskCreationMetrics.ProtoReflect.Descriptor instead. func (*TaskCreationMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{34} + return file_worker_proto_rawDescGZIP(), []int{37} } func (x *TaskCreationMetrics) GetTriggerMetric() string { @@ -3253,7 +3524,7 @@ type VolumeHealthMetrics struct { func (x *VolumeHealthMetrics) Reset() { *x = VolumeHealthMetrics{} - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[38] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3265,7 +3536,7 @@ func (x *VolumeHealthMetrics) String() string { func (*VolumeHealthMetrics) ProtoMessage() {} func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[38] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3278,7 +3549,7 @@ func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use VolumeHealthMetrics.ProtoReflect.Descriptor instead. func (*VolumeHealthMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{35} + return file_worker_proto_rawDescGZIP(), []int{38} } func (x *VolumeHealthMetrics) GetTotalSize() uint64 { @@ -3363,7 +3634,7 @@ type TaskStateFile struct { func (x *TaskStateFile) Reset() { *x = TaskStateFile{} - mi := &file_worker_proto_msgTypes[36] + mi := &file_worker_proto_msgTypes[39] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3375,7 +3646,7 @@ func (x *TaskStateFile) String() string { func (*TaskStateFile) ProtoMessage() {} func (x *TaskStateFile) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[36] + mi := &file_worker_proto_msgTypes[39] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3388,7 +3659,7 @@ func (x *TaskStateFile) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskStateFile.ProtoReflect.Descriptor instead. func (*TaskStateFile) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{36} + return file_worker_proto_rawDescGZIP(), []int{39} } func (x *TaskStateFile) GetTask() *MaintenanceTaskData { @@ -3477,7 +3748,7 @@ const file_worker_proto_rawDesc = "" + "\bmetadata\x18\x06 \x03(\v2'.worker_pb.TaskAssignment.MetadataEntryR\bmetadata\x1a;\n" + "\rMetadataEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xe1\x04\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xaf\x05\n" + "\n" + "TaskParams\x12\x17\n" + "\atask_id\x18\x01 \x01(\tR\x06taskId\x12\x1b\n" + @@ -3496,7 +3767,8 @@ const file_worker_proto_rawDesc = "" + "\x15erasure_coding_params\x18\n" + " \x01(\v2\".worker_pb.ErasureCodingTaskParamsH\x00R\x13erasureCodingParams\x12E\n" + "\x0ebalance_params\x18\v \x01(\v2\x1c.worker_pb.BalanceTaskParamsH\x00R\rbalanceParams\x12Q\n" + - "\x12replication_params\x18\f \x01(\v2 .worker_pb.ReplicationTaskParamsH\x00R\x11replicationParamsB\r\n" + + "\x12replication_params\x18\f \x01(\v2 .worker_pb.ReplicationTaskParamsH\x00R\x11replicationParams\x12L\n" + + "\x11ec_balance_params\x18\r \x01(\v2\x1e.worker_pb.EcBalanceTaskParamsH\x00R\x0fecBalanceParamsB\r\n" + "\vtask_params\"\xcb\x01\n" + "\x10VacuumTaskParams\x12+\n" + "\x11garbage_threshold\x18\x01 \x01(\x01R\x10garbageThreshold\x12!\n" + @@ -3658,7 +3930,7 @@ const file_worker_proto_rawDesc = "" + "\x1edefault_check_interval_seconds\x18\x04 \x01(\x05R\x1bdefaultCheckIntervalSeconds\x1aV\n" + "\x11TaskPoliciesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12+\n" + - "\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\x82\x04\n" + + "\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\xd0\x04\n" + "\n" + "TaskPolicy\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12%\n" + @@ -3668,7 +3940,8 @@ const file_worker_proto_rawDesc = "" + "\rvacuum_config\x18\x05 \x01(\v2\x1b.worker_pb.VacuumTaskConfigH\x00R\fvacuumConfig\x12X\n" + "\x15erasure_coding_config\x18\x06 \x01(\v2\".worker_pb.ErasureCodingTaskConfigH\x00R\x13erasureCodingConfig\x12E\n" + "\x0ebalance_config\x18\a \x01(\v2\x1c.worker_pb.BalanceTaskConfigH\x00R\rbalanceConfig\x12Q\n" + - "\x12replication_config\x18\b \x01(\v2 .worker_pb.ReplicationTaskConfigH\x00R\x11replicationConfigB\r\n" + + "\x12replication_config\x18\b \x01(\v2 .worker_pb.ReplicationTaskConfigH\x00R\x11replicationConfig\x12L\n" + + "\x11ec_balance_config\x18\t \x01(\v2\x1e.worker_pb.EcBalanceTaskConfigH\x00R\x0fecBalanceConfigB\r\n" + "\vtask_config\"\xa2\x01\n" + "\x10VacuumTaskConfig\x12+\n" + "\x11garbage_threshold\x18\x01 \x01(\x01R\x10garbageThreshold\x12/\n" + @@ -3684,7 +3957,30 @@ const file_worker_proto_rawDesc = "" + "\x13imbalance_threshold\x18\x01 \x01(\x01R\x12imbalanceThreshold\x12(\n" + "\x10min_server_count\x18\x02 \x01(\x05R\x0eminServerCount\"I\n" + "\x15ReplicationTaskConfig\x120\n" + - "\x14target_replica_count\x18\x01 \x01(\x05R\x12targetReplicaCount\"\xae\a\n" + + "\x14target_replica_count\x18\x01 \x01(\x05R\x12targetReplicaCount\"\xbe\x01\n" + + "\x13EcBalanceTaskParams\x12\x1b\n" + + "\tdisk_type\x18\x01 \x01(\tR\bdiskType\x12/\n" + + "\x13max_parallelization\x18\x02 \x01(\x05R\x12maxParallelization\x12'\n" + + "\x0ftimeout_seconds\x18\x03 \x01(\x05R\x0etimeoutSeconds\x120\n" + + "\x05moves\x18\x04 \x03(\v2\x1a.worker_pb.EcShardMoveSpecR\x05moves\"\xf7\x01\n" + + "\x0fEcShardMoveSpec\x12\x1b\n" + + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" + + "\bshard_id\x18\x02 \x01(\rR\ashardId\x12\x1e\n" + + "\n" + + "collection\x18\x03 \x01(\tR\n" + + "collection\x12\x1f\n" + + "\vsource_node\x18\x04 \x01(\tR\n" + + "sourceNode\x12$\n" + + "\x0esource_disk_id\x18\x05 \x01(\rR\fsourceDiskId\x12\x1f\n" + + "\vtarget_node\x18\x06 \x01(\tR\n" + + "targetNode\x12$\n" + + "\x0etarget_disk_id\x18\a \x01(\rR\ftargetDiskId\"\xe1\x01\n" + + "\x13EcBalanceTaskConfig\x12/\n" + + "\x13imbalance_threshold\x18\x01 \x01(\x01R\x12imbalanceThreshold\x12(\n" + + "\x10min_server_count\x18\x02 \x01(\x05R\x0eminServerCount\x12+\n" + + "\x11collection_filter\x18\x03 \x01(\tR\x10collectionFilter\x12\x1b\n" + + "\tdisk_type\x18\x04 \x01(\tR\bdiskType\x12%\n" + + "\x0epreferred_tags\x18\x05 \x03(\tR\rpreferredTags\"\xae\a\n" + "\x13MaintenanceTaskData\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + "\x04type\x18\x02 \x01(\tR\x04type\x12\x1a\n" + @@ -3773,7 +4069,7 @@ func file_worker_proto_rawDescGZIP() []byte { return file_worker_proto_rawDescData } -var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 46) +var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 49) var file_worker_proto_goTypes = []any{ (*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage (*AdminMessage)(nil), // 1: worker_pb.AdminMessage @@ -3807,20 +4103,23 @@ var file_worker_proto_goTypes = []any{ (*ErasureCodingTaskConfig)(nil), // 29: worker_pb.ErasureCodingTaskConfig (*BalanceTaskConfig)(nil), // 30: worker_pb.BalanceTaskConfig (*ReplicationTaskConfig)(nil), // 31: worker_pb.ReplicationTaskConfig - (*MaintenanceTaskData)(nil), // 32: worker_pb.MaintenanceTaskData - (*TaskAssignmentRecord)(nil), // 33: worker_pb.TaskAssignmentRecord - (*TaskCreationMetrics)(nil), // 34: worker_pb.TaskCreationMetrics - (*VolumeHealthMetrics)(nil), // 35: worker_pb.VolumeHealthMetrics - (*TaskStateFile)(nil), // 36: worker_pb.TaskStateFile - nil, // 37: worker_pb.WorkerRegistration.MetadataEntry - nil, // 38: worker_pb.TaskAssignment.MetadataEntry - nil, // 39: worker_pb.TaskUpdate.MetadataEntry - nil, // 40: worker_pb.TaskComplete.ResultMetadataEntry - nil, // 41: worker_pb.TaskLogMetadata.CustomDataEntry - nil, // 42: worker_pb.TaskLogEntry.FieldsEntry - nil, // 43: worker_pb.MaintenancePolicy.TaskPoliciesEntry - nil, // 44: worker_pb.MaintenanceTaskData.TagsEntry - nil, // 45: worker_pb.TaskCreationMetrics.AdditionalDataEntry + (*EcBalanceTaskParams)(nil), // 32: worker_pb.EcBalanceTaskParams + (*EcShardMoveSpec)(nil), // 33: worker_pb.EcShardMoveSpec + (*EcBalanceTaskConfig)(nil), // 34: worker_pb.EcBalanceTaskConfig + (*MaintenanceTaskData)(nil), // 35: worker_pb.MaintenanceTaskData + (*TaskAssignmentRecord)(nil), // 36: worker_pb.TaskAssignmentRecord + (*TaskCreationMetrics)(nil), // 37: worker_pb.TaskCreationMetrics + (*VolumeHealthMetrics)(nil), // 38: worker_pb.VolumeHealthMetrics + (*TaskStateFile)(nil), // 39: worker_pb.TaskStateFile + nil, // 40: worker_pb.WorkerRegistration.MetadataEntry + nil, // 41: worker_pb.TaskAssignment.MetadataEntry + nil, // 42: worker_pb.TaskUpdate.MetadataEntry + nil, // 43: worker_pb.TaskComplete.ResultMetadataEntry + nil, // 44: worker_pb.TaskLogMetadata.CustomDataEntry + nil, // 45: worker_pb.TaskLogEntry.FieldsEntry + nil, // 46: worker_pb.MaintenancePolicy.TaskPoliciesEntry + nil, // 47: worker_pb.MaintenanceTaskData.TagsEntry + nil, // 48: worker_pb.TaskCreationMetrics.AdditionalDataEntry } var file_worker_proto_depIdxs = []int32{ 2, // 0: worker_pb.WorkerMessage.registration:type_name -> worker_pb.WorkerRegistration @@ -3836,43 +4135,46 @@ var file_worker_proto_depIdxs = []int32{ 18, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation 20, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown 21, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest - 37, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry + 40, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry 8, // 14: worker_pb.TaskAssignment.params:type_name -> worker_pb.TaskParams - 38, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry + 41, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry 11, // 16: worker_pb.TaskParams.sources:type_name -> worker_pb.TaskSource 12, // 17: worker_pb.TaskParams.targets:type_name -> worker_pb.TaskTarget 9, // 18: worker_pb.TaskParams.vacuum_params:type_name -> worker_pb.VacuumTaskParams 10, // 19: worker_pb.TaskParams.erasure_coding_params:type_name -> worker_pb.ErasureCodingTaskParams 14, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams 15, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams - 13, // 22: worker_pb.BalanceTaskParams.moves:type_name -> worker_pb.BalanceMoveSpec - 39, // 23: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry - 40, // 24: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry - 23, // 25: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata - 24, // 26: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry - 41, // 27: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry - 42, // 28: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry - 26, // 29: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy - 43, // 30: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry - 28, // 31: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig - 29, // 32: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig - 30, // 33: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig - 31, // 34: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig - 8, // 35: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams - 33, // 36: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord - 44, // 37: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry - 34, // 38: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics - 35, // 39: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics - 45, // 40: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry - 32, // 41: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData - 27, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy - 0, // 43: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage - 1, // 44: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage - 44, // [44:45] is the sub-list for method output_type - 43, // [43:44] is the sub-list for method input_type - 43, // [43:43] is the sub-list for extension type_name - 43, // [43:43] is the sub-list for extension extendee - 0, // [0:43] is the sub-list for field type_name + 32, // 22: worker_pb.TaskParams.ec_balance_params:type_name -> worker_pb.EcBalanceTaskParams + 13, // 23: worker_pb.BalanceTaskParams.moves:type_name -> worker_pb.BalanceMoveSpec + 42, // 24: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry + 43, // 25: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry + 23, // 26: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata + 24, // 27: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry + 44, // 28: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry + 45, // 29: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry + 26, // 30: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy + 46, // 31: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry + 28, // 32: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig + 29, // 33: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig + 30, // 34: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig + 31, // 35: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig + 34, // 36: worker_pb.TaskPolicy.ec_balance_config:type_name -> worker_pb.EcBalanceTaskConfig + 33, // 37: worker_pb.EcBalanceTaskParams.moves:type_name -> worker_pb.EcShardMoveSpec + 8, // 38: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams + 36, // 39: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord + 47, // 40: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry + 37, // 41: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics + 38, // 42: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics + 48, // 43: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry + 35, // 44: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData + 27, // 45: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy + 0, // 46: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage + 1, // 47: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage + 47, // [47:48] is the sub-list for method output_type + 46, // [46:47] is the sub-list for method input_type + 46, // [46:46] is the sub-list for extension type_name + 46, // [46:46] is the sub-list for extension extendee + 0, // [0:46] is the sub-list for field type_name } func init() { file_worker_proto_init() } @@ -3902,12 +4204,14 @@ func file_worker_proto_init() { (*TaskParams_ErasureCodingParams)(nil), (*TaskParams_BalanceParams)(nil), (*TaskParams_ReplicationParams)(nil), + (*TaskParams_EcBalanceParams)(nil), } file_worker_proto_msgTypes[27].OneofWrappers = []any{ (*TaskPolicy_VacuumConfig)(nil), (*TaskPolicy_ErasureCodingConfig)(nil), (*TaskPolicy_BalanceConfig)(nil), (*TaskPolicy_ReplicationConfig)(nil), + (*TaskPolicy_EcBalanceConfig)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -3915,7 +4219,7 @@ func file_worker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_worker_proto_rawDesc), len(file_worker_proto_rawDesc)), NumEnums: 0, - NumMessages: 46, + NumMessages: 49, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/plugin/worker/ec_balance_handler.go b/weed/plugin/worker/ec_balance_handler.go new file mode 100644 index 000000000..7f5d1d7a0 --- /dev/null +++ b/weed/plugin/worker/ec_balance_handler.go @@ -0,0 +1,692 @@ +package pluginworker + +import ( + "context" + "fmt" + "math" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + ecbalancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_balance" + workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +const ( + ecBalanceMinImbalanceThreshold = 0.05 + ecBalanceMaxImbalanceThreshold = 0.5 + ecBalanceMinServerCount = 2 +) + +func init() { + RegisterHandler(HandlerFactory{ + JobType: "ec_balance", + Category: CategoryDefault, + Aliases: []string{"ec-balance", "ec.balance", "ec_shard_balance"}, + Build: func(opts HandlerBuildOptions) (JobHandler, error) { + return NewECBalanceHandler(opts.GrpcDialOption), nil + }, + }) +} + +type ecBalanceWorkerConfig struct { + TaskConfig *ecbalancetask.Config + MinIntervalSeconds int +} + +// ECBalanceHandler is the plugin job handler for EC shard balancing. +type ECBalanceHandler struct { + grpcDialOption grpc.DialOption +} + +func NewECBalanceHandler(grpcDialOption grpc.DialOption) *ECBalanceHandler { + return &ECBalanceHandler{grpcDialOption: grpcDialOption} +} + +func (h *ECBalanceHandler) Capability() *plugin_pb.JobTypeCapability { + return &plugin_pb.JobTypeCapability{ + JobType: "ec_balance", + CanDetect: true, + CanExecute: true, + MaxDetectionConcurrency: 1, + MaxExecutionConcurrency: 3, + DisplayName: "EC Shard Balance", + Description: "Balance EC shard distribution across racks and nodes", + Weight: 60, + } +} + +func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { + return &plugin_pb.JobTypeDescriptor{ + JobType: "ec_balance", + DisplayName: "EC Shard Balance", + Description: "Detect and execute EC shard rebalancing across the cluster", + Icon: "fas fa-balance-scale-left", + DescriptorVersion: 1, + AdminConfigForm: &plugin_pb.ConfigForm{ + FormId: "ec-balance-admin", + Title: "EC Shard Balance Admin Config", + Description: "Admin-side controls for EC shard balance detection scope.", + Sections: []*plugin_pb.ConfigSection{ + { + SectionId: "scope", + Title: "Scope", + Description: "Optional filters applied before EC shard balance detection.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "collection_filter", + Label: "Collection Filter", + Description: "Only balance EC shards in matching collections (wildcard supported).", + Placeholder: "all collections", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "data_center_filter", + Label: "Data Center Filter", + Description: "Only balance within matching data centers (wildcard supported).", + Placeholder: "all data centers", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "disk_type", + Label: "Disk Type", + Description: "Only balance EC shards on this disk type (hdd, ssd, or empty for all).", + Placeholder: "all disk types", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + }, + }, + }, + DefaultValues: map[string]*plugin_pb.ConfigValue{ + "collection_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "data_center_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "disk_type": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + }, + }, + WorkerConfigForm: &plugin_pb.ConfigForm{ + FormId: "ec-balance-worker", + Title: "EC Shard Balance Worker Config", + Description: "Worker-side detection thresholds and execution controls.", + Sections: []*plugin_pb.ConfigSection{ + { + SectionId: "thresholds", + Title: "Detection Thresholds", + Description: "Controls for when EC shard balance jobs should be proposed.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "imbalance_threshold", + Label: "Imbalance Threshold", + Description: "Minimum shard count imbalance ratio to trigger balancing (0.0-1.0).", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + Required: true, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: ecBalanceMinImbalanceThreshold}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: ecBalanceMaxImbalanceThreshold}}, + }, + { + Name: "min_server_count", + Label: "Minimum Server Count", + Description: "Minimum servers required for EC shard balancing.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + Required: true, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: ecBalanceMinServerCount}}, + }, + { + Name: "min_interval_seconds", + Label: "Minimum Detection Interval (s)", + Description: "Skip detection if the last successful run is more recent than this interval.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + Required: true, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + }, + { + Name: "preferred_tags", + Label: "Preferred Tags", + Description: "Comma-separated disk tags to prioritize for shard placement, ordered by preference.", + Placeholder: "fast,ssd", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + }, + }, + }, + DefaultValues: map[string]*plugin_pb.ConfigValue{ + "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, + "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}}, + "min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}}, + "preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + }, + }, + AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ + Enabled: true, + DetectionIntervalSeconds: 60 * 30, + DetectionTimeoutSeconds: 300, + MaxJobsPerDetection: 500, + GlobalExecutionConcurrency: 16, + PerWorkerExecutionConcurrency: 4, + RetryLimit: 1, + RetryBackoffSeconds: 30, + JobTypeMaxRuntimeSeconds: 1800, + }, + WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ + "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, + "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}}, + "min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}}, + "preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + }, + } +} + +func (h *ECBalanceHandler) Detect( + ctx context.Context, + request *plugin_pb.RunDetectionRequest, + sender DetectionSender, +) error { + if request == nil { + return fmt.Errorf("run detection request is nil") + } + if sender == nil { + return fmt.Errorf("detection sender is nil") + } + if request.JobType != "" && request.JobType != "ec_balance" { + return fmt.Errorf("job type %q is not handled by ec_balance worker", request.JobType) + } + + workerConfig := deriveECBalanceWorkerConfig(request.GetWorkerConfigValues()) + if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) { + minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second + _ = sender.SendActivity(BuildDetectorActivity( + "skipped_by_interval", + fmt.Sprintf("EC BALANCE: Detection skipped due to min interval (%s)", minInterval), + map[string]*plugin_pb.ConfigValue{ + "min_interval_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)}, + }, + }, + )) + if err := sender.SendProposals(&plugin_pb.DetectionProposals{ + JobType: "ec_balance", + Proposals: []*plugin_pb.JobProposal{}, + HasMore: false, + }); err != nil { + return err + } + return sender.SendComplete(&plugin_pb.DetectionComplete{ + JobType: "ec_balance", + Success: true, + TotalProposals: 0, + }) + } + + // Apply admin-side scope filters + collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", "")) + if collectionFilter != "" { + workerConfig.TaskConfig.CollectionFilter = collectionFilter + } + dcFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", "")) + if dcFilter != "" { + workerConfig.TaskConfig.DataCenterFilter = dcFilter + } + diskType := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "disk_type", "")) + if diskType != "" { + workerConfig.TaskConfig.DiskType = diskType + } + + masters := make([]string, 0) + if request.ClusterContext != nil { + masters = append(masters, request.ClusterContext.MasterGrpcAddresses...) + } + + metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter) + if err != nil { + return err + } + + clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} + maxResults := int(request.MaxResults) + if maxResults < 0 { + maxResults = 0 + } + + results, hasMore, err := ecbalancetask.Detection(ctx, metrics, clusterInfo, workerConfig.TaskConfig, maxResults) + if err != nil { + return err + } + + if traceErr := emitECBalanceDecisionTrace(sender, workerConfig.TaskConfig, results, maxResults, hasMore); traceErr != nil { + glog.Warningf("Plugin worker failed to emit ec_balance detection trace: %v", traceErr) + } + + proposals := make([]*plugin_pb.JobProposal, 0, len(results)) + for _, result := range results { + proposal, proposalErr := buildECBalanceProposal(result) + if proposalErr != nil { + glog.Warningf("Plugin worker skip invalid ec_balance proposal: %v", proposalErr) + continue + } + proposals = append(proposals, proposal) + } + + if err := sender.SendProposals(&plugin_pb.DetectionProposals{ + JobType: "ec_balance", + Proposals: proposals, + HasMore: hasMore, + }); err != nil { + return err + } + + return sender.SendComplete(&plugin_pb.DetectionComplete{ + JobType: "ec_balance", + Success: true, + TotalProposals: int32(len(proposals)), + }) +} + +func (h *ECBalanceHandler) Execute( + ctx context.Context, + request *plugin_pb.ExecuteJobRequest, + sender ExecutionSender, +) error { + if request == nil || request.Job == nil { + return fmt.Errorf("execute request/job is nil") + } + if sender == nil { + return fmt.Errorf("execution sender is nil") + } + if request.Job.JobType != "" && request.Job.JobType != "ec_balance" { + return fmt.Errorf("job type %q is not handled by ec_balance worker", request.Job.JobType) + } + + params, err := decodeECBalanceTaskParams(request.Job) + if err != nil { + return err + } + + if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" { + return fmt.Errorf("ec balance source node is required") + } + if len(params.Targets) == 0 || strings.TrimSpace(params.Targets[0].Node) == "" { + return fmt.Errorf("ec balance target node is required") + } + + task := ecbalancetask.NewECBalanceTask( + request.Job.JobId, + params.VolumeId, + params.Collection, + h.grpcDialOption, + ) + + execCtx, execCancel := context.WithCancel(ctx) + defer execCancel() + + task.SetProgressCallback(func(progress float64, stage string) { + message := fmt.Sprintf("ec balance progress %.0f%%", progress) + if strings.TrimSpace(stage) != "" { + message = stage + } + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: progress, + Stage: stage, + Message: message, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity(stage, message), + }, + }); err != nil { + glog.Warningf("EC balance job %s (%s): failed to send progress (%.0f%%, stage=%q): %v, cancelling execution", + request.Job.JobId, request.Job.JobType, progress, stage, err) + execCancel() + } + }) + + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_ASSIGNED, + ProgressPercent: 0, + Stage: "assigned", + Message: "ec balance job accepted", + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("assigned", "ec balance job accepted"), + }, + }); err != nil { + return err + } + + if err := task.Execute(execCtx, params); err != nil { + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_FAILED, + ProgressPercent: 100, + Stage: "failed", + Message: err.Error(), + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("failed", err.Error()), + }, + }) + return err + } + + sourceNode := params.Sources[0].Node + targetNode := params.Targets[0].Node + resultSummary := fmt.Sprintf("EC shard balance completed: volume %d shards moved from %s to %s", + params.VolumeId, sourceNode, targetNode) + + return sender.SendCompleted(&plugin_pb.JobCompleted{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + Success: true, + Result: &plugin_pb.JobResult{ + Summary: resultSummary, + OutputValues: map[string]*plugin_pb.ConfigValue{ + "volume_id": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)}, + }, + "source_server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode}, + }, + "target_server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode}, + }, + }, + }, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("completed", resultSummary), + }, + }) +} + +func (h *ECBalanceHandler) collectVolumeMetrics( + ctx context.Context, + masterAddresses []string, + collectionFilter string, +) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { + metrics, activeTopology, _, err := collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) + return metrics, activeTopology, err +} + +func deriveECBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *ecBalanceWorkerConfig { + taskConfig := ecbalancetask.NewDefaultConfig() + + imbalanceThreshold := readDoubleConfig(values, "imbalance_threshold", taskConfig.ImbalanceThreshold) + if imbalanceThreshold < ecBalanceMinImbalanceThreshold { + imbalanceThreshold = ecBalanceMinImbalanceThreshold + } + if imbalanceThreshold > ecBalanceMaxImbalanceThreshold { + imbalanceThreshold = ecBalanceMaxImbalanceThreshold + } + taskConfig.ImbalanceThreshold = imbalanceThreshold + + minServerCountRaw := readInt64Config(values, "min_server_count", int64(taskConfig.MinServerCount)) + if minServerCountRaw < int64(ecBalanceMinServerCount) { + minServerCountRaw = int64(ecBalanceMinServerCount) + } + if minServerCountRaw > math.MaxInt32 { + minServerCountRaw = math.MaxInt32 + } + taskConfig.MinServerCount = int(minServerCountRaw) + + minIntervalRaw := readInt64Config(values, "min_interval_seconds", 60*60) + if minIntervalRaw < 0 { + minIntervalRaw = 0 + } + if minIntervalRaw > math.MaxInt32 { + minIntervalRaw = math.MaxInt32 + } + minIntervalSeconds := int(minIntervalRaw) + + taskConfig.PreferredTags = util.NormalizeTagList(readStringListConfig(values, "preferred_tags")) + + return &ecBalanceWorkerConfig{ + TaskConfig: taskConfig, + MinIntervalSeconds: minIntervalSeconds, + } +} + +func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.JobProposal, error) { + if result == nil { + return nil, fmt.Errorf("task detection result is nil") + } + if result.TypedParams == nil { + return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID) + } + + paramsPayload, err := proto.Marshal(result.TypedParams) + if err != nil { + return nil, fmt.Errorf("marshal task params: %w", err) + } + + proposalID := strings.TrimSpace(result.TaskID) + if proposalID == "" { + proposalID = fmt.Sprintf("ec-balance-%d-%d", result.VolumeID, time.Now().UnixNano()) + } + + // Dedupe key includes volume ID, shard ID, source node, and collection + // to distinguish moves of the same shard from different source nodes (e.g. dedup) + dedupeKey := fmt.Sprintf("ec_balance:%d", result.VolumeID) + if len(result.TypedParams.Sources) > 0 { + src := result.TypedParams.Sources[0] + if len(src.ShardIds) > 0 { + dedupeKey = fmt.Sprintf("ec_balance:%d:%d:%s", result.VolumeID, src.ShardIds[0], src.Node) + } + } + if result.Collection != "" { + dedupeKey += ":" + result.Collection + } + + sourceNode := "" + targetNode := "" + if len(result.TypedParams.Sources) > 0 { + sourceNode = strings.TrimSpace(result.TypedParams.Sources[0].Node) + } + if len(result.TypedParams.Targets) > 0 { + targetNode = strings.TrimSpace(result.TypedParams.Targets[0].Node) + } + + summary := fmt.Sprintf("Balance EC shard of volume %d", result.VolumeID) + if sourceNode != "" && targetNode != "" { + summary = fmt.Sprintf("Move EC shard of volume %d: %s → %s", result.VolumeID, sourceNode, targetNode) + } + + return &plugin_pb.JobProposal{ + ProposalId: proposalID, + DedupeKey: dedupeKey, + JobType: "ec_balance", + Priority: mapTaskPriority(result.Priority), + Summary: summary, + Detail: strings.TrimSpace(result.Reason), + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": { + Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload}, + }, + "volume_id": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)}, + }, + "source_server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode}, + }, + "target_server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode}, + }, + "collection": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, + }, + }, + Labels: map[string]string{ + "task_type": "ec_balance", + "volume_id": fmt.Sprintf("%d", result.VolumeID), + "collection": result.Collection, + "source_node": sourceNode, + "target_node": targetNode, + }, + }, nil +} + +func decodeECBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) { + if job == nil { + return nil, fmt.Errorf("job spec is nil") + } + + // Try protobuf-encoded params first (preferred path) + if payload := readBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 { + params := &worker_pb.TaskParams{} + if err := proto.Unmarshal(payload, params); err != nil { + return nil, fmt.Errorf("decodeECBalanceTaskParams: unmarshal task_params_pb: %w", err) + } + if params.TaskId == "" { + params.TaskId = job.JobId + } + // Validate execution-critical fields in the deserialized TaskParams + if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" { + return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Sources[0].Node") + } + if len(params.Sources[0].ShardIds) == 0 { + return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Sources[0].ShardIds") + } + if len(params.Targets) == 0 || strings.TrimSpace(params.Targets[0].Node) == "" { + return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Targets[0].Node") + } + if len(params.Targets[0].ShardIds) == 0 { + return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Targets[0].ShardIds") + } + return params, nil + } + + // Legacy fallback: construct TaskParams from individual scalar parameters. + // All execution-critical fields are required. + volumeID := readInt64Config(job.Parameters, "volume_id", 0) + sourceNode := strings.TrimSpace(readStringConfig(job.Parameters, "source_server", "")) + targetNode := strings.TrimSpace(readStringConfig(job.Parameters, "target_server", "")) + collection := readStringConfig(job.Parameters, "collection", "") + + if volumeID <= 0 || volumeID > math.MaxUint32 { + return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid or missing volume_id: %d", volumeID) + } + if sourceNode == "" { + return nil, fmt.Errorf("decodeECBalanceTaskParams: missing source_server") + } + if targetNode == "" { + return nil, fmt.Errorf("decodeECBalanceTaskParams: missing target_server") + } + + shardIDVal, hasShardID := job.Parameters["shard_id"] + if !hasShardID || shardIDVal == nil { + return nil, fmt.Errorf("decodeECBalanceTaskParams: missing shard_id (required for EcBalanceTaskParams)") + } + shardID := readInt64Config(job.Parameters, "shard_id", -1) + if shardID < 0 || shardID > math.MaxUint32 { + return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid shard_id: %d", shardID) + } + + sourceDiskID := readInt64Config(job.Parameters, "source_disk_id", 0) + if sourceDiskID < 0 || sourceDiskID > math.MaxUint32 { + return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid source_disk_id: %d", sourceDiskID) + } + targetDiskID := readInt64Config(job.Parameters, "target_disk_id", 0) + if targetDiskID < 0 || targetDiskID > math.MaxUint32 { + return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid target_disk_id: %d", targetDiskID) + } + + return &worker_pb.TaskParams{ + TaskId: job.JobId, + VolumeId: uint32(volumeID), + Collection: collection, + Sources: []*worker_pb.TaskSource{{ + Node: sourceNode, + DiskId: uint32(sourceDiskID), + ShardIds: []uint32{uint32(shardID)}, + }}, + Targets: []*worker_pb.TaskTarget{{ + Node: targetNode, + DiskId: uint32(targetDiskID), + ShardIds: []uint32{uint32(shardID)}, + }}, + TaskParams: &worker_pb.TaskParams_EcBalanceParams{ + EcBalanceParams: &worker_pb.EcBalanceTaskParams{ + TimeoutSeconds: 600, + }, + }, + }, nil +} + +func emitECBalanceDecisionTrace( + sender DetectionSender, + taskConfig *ecbalancetask.Config, + results []*workertypes.TaskDetectionResult, + maxResults int, + hasMore bool, +) error { + if sender == nil || taskConfig == nil { + return nil + } + + // Count moves by phase + phaseCounts := make(map[string]int) + for _, result := range results { + if result.Reason != "" { + // Extract phase from reason string + for _, phase := range []string{"dedup", "cross_rack", "within_rack", "global"} { + if strings.Contains(result.Reason, phase) { + phaseCounts[phase]++ + break + } + } + } + } + + summarySuffix := "" + if hasMore { + summarySuffix = fmt.Sprintf(" (max_results=%d reached)", maxResults) + } + + summaryMessage := fmt.Sprintf( + "EC balance detection: %d moves proposed%s (dedup=%d, cross_rack=%d, within_rack=%d, global=%d)", + len(results), + summarySuffix, + phaseCounts["dedup"], + phaseCounts["cross_rack"], + phaseCounts["within_rack"], + phaseCounts["global"], + ) + + return sender.SendActivity(BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{ + "total_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))}, + }, + "has_more": { + Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasMore}, + }, + "dedup_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["dedup"])}, + }, + "cross_rack_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["cross_rack"])}, + }, + "within_rack_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["within_rack"])}, + }, + "global_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["global"])}, + }, + "imbalance_threshold": { + Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.ImbalanceThreshold}, + }, + "min_server_count": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinServerCount)}, + }, + })) +} diff --git a/weed/plugin/worker/ec_balance_handler_test.go b/weed/plugin/worker/ec_balance_handler_test.go new file mode 100644 index 000000000..a5d23c5ea --- /dev/null +++ b/weed/plugin/worker/ec_balance_handler_test.go @@ -0,0 +1,325 @@ +package pluginworker + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + ecbalancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_balance" + workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" + "google.golang.org/protobuf/proto" +) + +func TestDeriveECBalanceWorkerConfig(t *testing.T) { + tests := []struct { + name string + values map[string]*plugin_pb.ConfigValue + check func(t *testing.T, config *ecBalanceWorkerConfig) + }{ + { + name: "nil values uses defaults", + values: nil, + check: func(t *testing.T, config *ecBalanceWorkerConfig) { + if config.TaskConfig.ImbalanceThreshold != 0.2 { + t.Errorf("expected default threshold 0.2, got %f", config.TaskConfig.ImbalanceThreshold) + } + if config.TaskConfig.MinServerCount != 3 { + t.Errorf("expected default min_server_count 3, got %d", config.TaskConfig.MinServerCount) + } + }, + }, + { + name: "custom threshold", + values: map[string]*plugin_pb.ConfigValue{ + "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.3}}, + "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 5}}, + }, + check: func(t *testing.T, config *ecBalanceWorkerConfig) { + if config.TaskConfig.ImbalanceThreshold != 0.3 { + t.Errorf("expected threshold 0.3, got %f", config.TaskConfig.ImbalanceThreshold) + } + if config.TaskConfig.MinServerCount != 5 { + t.Errorf("expected min_server_count 5, got %d", config.TaskConfig.MinServerCount) + } + }, + }, + { + name: "threshold clamped to min", + values: map[string]*plugin_pb.ConfigValue{ + "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.01}}, + }, + check: func(t *testing.T, config *ecBalanceWorkerConfig) { + if config.TaskConfig.ImbalanceThreshold != 0.05 { + t.Errorf("expected clamped threshold 0.05, got %f", config.TaskConfig.ImbalanceThreshold) + } + }, + }, + { + name: "threshold clamped to max", + values: map[string]*plugin_pb.ConfigValue{ + "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.9}}, + }, + check: func(t *testing.T, config *ecBalanceWorkerConfig) { + if config.TaskConfig.ImbalanceThreshold != 0.5 { + t.Errorf("expected clamped threshold 0.5, got %f", config.TaskConfig.ImbalanceThreshold) + } + }, + }, + { + name: "min_server_count clamped to 2", + values: map[string]*plugin_pb.ConfigValue{ + "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }, + check: func(t *testing.T, config *ecBalanceWorkerConfig) { + if config.TaskConfig.MinServerCount != 2 { + t.Errorf("expected min_server_count 2, got %d", config.TaskConfig.MinServerCount) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := deriveECBalanceWorkerConfig(tt.values) + tt.check(t, config) + }) + } +} + +func TestBuildECBalanceProposal(t *testing.T) { + result := &workertypes.TaskDetectionResult{ + TaskID: "test-task-123", + TaskType: workertypes.TaskTypeECBalance, + VolumeID: 42, + Collection: "test-col", + Priority: workertypes.TaskPriorityMedium, + Reason: "cross_rack balance", + TypedParams: &worker_pb.TaskParams{ + VolumeId: 42, + Collection: "test-col", + Sources: []*worker_pb.TaskSource{{ + Node: "source:8080", + ShardIds: []uint32{5}, + }}, + Targets: []*worker_pb.TaskTarget{{ + Node: "target:8080", + ShardIds: []uint32{5}, + }}, + }, + } + + proposal, err := buildECBalanceProposal(result) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if proposal.JobType != "ec_balance" { + t.Errorf("expected job type ec_balance, got %s", proposal.JobType) + } + if proposal.ProposalId != "test-task-123" { + t.Errorf("expected proposal ID test-task-123, got %s", proposal.ProposalId) + } + if proposal.DedupeKey != "ec_balance:42:5:source:8080:test-col" { + t.Errorf("expected dedupe key ec_balance:42:5:source:8080:test-col, got %s", proposal.DedupeKey) + } + if proposal.Labels["source_node"] != "source:8080" { + t.Errorf("expected source_node label source:8080, got %s", proposal.Labels["source_node"]) + } + if proposal.Labels["target_node"] != "target:8080" { + t.Errorf("expected target_node label target:8080, got %s", proposal.Labels["target_node"]) + } +} + +func TestBuildECBalanceProposalNilResult(t *testing.T) { + _, err := buildECBalanceProposal(nil) + if err == nil { + t.Fatal("expected error for nil result") + } +} + +func TestDecodeECBalanceTaskParamsFromProtobuf(t *testing.T) { + originalParams := &worker_pb.TaskParams{ + TaskId: "test-id", + VolumeId: 100, + Collection: "test-col", + Sources: []*worker_pb.TaskSource{{ + Node: "source:8080", + ShardIds: []uint32{3}, + }}, + Targets: []*worker_pb.TaskTarget{{ + Node: "target:8080", + ShardIds: []uint32{3}, + }}, + TaskParams: &worker_pb.TaskParams_EcBalanceParams{ + EcBalanceParams: &worker_pb.EcBalanceTaskParams{ + DiskType: "hdd", + TimeoutSeconds: 300, + }, + }, + } + + payload, err := proto.Marshal(originalParams) + if err != nil { + t.Fatalf("failed to marshal: %v", err) + } + + job := &plugin_pb.JobSpec{ + JobId: "job-1", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + + decoded, err := decodeECBalanceTaskParams(job) + if err != nil { + t.Fatalf("failed to decode: %v", err) + } + + if decoded.VolumeId != 100 { + t.Errorf("expected volume_id 100, got %d", decoded.VolumeId) + } + if decoded.Collection != "test-col" { + t.Errorf("expected collection test-col, got %s", decoded.Collection) + } + if len(decoded.Sources) != 1 || decoded.Sources[0].Node != "source:8080" { + t.Error("source mismatch") + } + if len(decoded.Targets) != 1 || decoded.Targets[0].Node != "target:8080" { + t.Error("target mismatch") + } + ecParams := decoded.GetEcBalanceParams() + if ecParams == nil { + t.Fatal("expected ec_balance_params") + } + if ecParams.DiskType != "hdd" { + t.Errorf("expected disk_type hdd, got %s", ecParams.DiskType) + } +} + +func TestDecodeECBalanceTaskParamsFallback(t *testing.T) { + job := &plugin_pb.JobSpec{ + JobId: "job-2", + Parameters: map[string]*plugin_pb.ConfigValue{ + "volume_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 200}}, + "source_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "src:8080"}}, + "target_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "dst:8080"}}, + "collection": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "fallback-col"}}, + "shard_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 7}}, + "source_disk_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + "target_disk_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}}, + }, + } + + decoded, err := decodeECBalanceTaskParams(job) + if err != nil { + t.Fatalf("failed to decode fallback: %v", err) + } + + if decoded.VolumeId != 200 { + t.Errorf("expected volume_id 200, got %d", decoded.VolumeId) + } + if len(decoded.Sources) != 1 || decoded.Sources[0].Node != "src:8080" { + t.Error("source mismatch") + } + if decoded.Sources[0].ShardIds[0] != 7 { + t.Errorf("expected shard_id 7, got %d", decoded.Sources[0].ShardIds[0]) + } + if decoded.Sources[0].DiskId != 1 { + t.Errorf("expected source disk_id 1, got %d", decoded.Sources[0].DiskId) + } + if decoded.Targets[0].DiskId != 2 { + t.Errorf("expected target disk_id 2, got %d", decoded.Targets[0].DiskId) + } +} + +func TestDecodeECBalanceTaskParamsProtobufValidation(t *testing.T) { + // Protobuf payload with missing ShardIds should fail validation + badParams := &worker_pb.TaskParams{ + TaskId: "test-id", + VolumeId: 100, + Sources: []*worker_pb.TaskSource{{Node: "source:8080"}}, // no ShardIds + Targets: []*worker_pb.TaskTarget{{Node: "target:8080", ShardIds: []uint32{3}}}, + } + payload, _ := proto.Marshal(badParams) + job := &plugin_pb.JobSpec{ + JobId: "job-validate", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + _, err := decodeECBalanceTaskParams(job) + if err == nil { + t.Fatal("expected error for missing Sources[0].ShardIds in protobuf") + } +} + +func TestDecodeECBalanceTaskParamsMissingShardID(t *testing.T) { + job := &plugin_pb.JobSpec{ + JobId: "job-3", + Parameters: map[string]*plugin_pb.ConfigValue{ + "volume_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300}}, + "source_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "src:8080"}}, + "target_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "dst:8080"}}, + "collection": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "test-col"}}, + // shard_id deliberately missing + }, + } + + _, err := decodeECBalanceTaskParams(job) + if err == nil { + t.Fatal("expected error for missing shard_id") + } +} + +func TestECBalanceHandlerCapability(t *testing.T) { + handler := NewECBalanceHandler(nil) + cap := handler.Capability() + + if cap.JobType != "ec_balance" { + t.Errorf("expected job type ec_balance, got %s", cap.JobType) + } + if !cap.CanDetect { + t.Error("expected CanDetect=true") + } + if !cap.CanExecute { + t.Error("expected CanExecute=true") + } + if cap.Weight != 60 { + t.Errorf("expected weight 60, got %d", cap.Weight) + } +} + +func TestECBalanceConfigRoundTrip(t *testing.T) { + config := ecbalancetask.NewDefaultConfig() + config.ImbalanceThreshold = 0.3 + config.MinServerCount = 5 + config.CollectionFilter = "my_col" + config.DiskType = "ssd" + config.PreferredTags = []string{"fast", "ssd"} + + policy := config.ToTaskPolicy() + if policy == nil { + t.Fatal("expected non-nil policy") + } + + config2 := ecbalancetask.NewDefaultConfig() + if err := config2.FromTaskPolicy(policy); err != nil { + t.Fatalf("failed to load from policy: %v", err) + } + + if config2.ImbalanceThreshold != 0.3 { + t.Errorf("expected threshold 0.3, got %f", config2.ImbalanceThreshold) + } + if config2.MinServerCount != 5 { + t.Errorf("expected min_server_count 5, got %d", config2.MinServerCount) + } + if config2.CollectionFilter != "my_col" { + t.Errorf("expected collection_filter my_col, got %s", config2.CollectionFilter) + } + if config2.DiskType != "ssd" { + t.Errorf("expected disk_type ssd, got %s", config2.DiskType) + } + if len(config2.PreferredTags) != 2 || config2.PreferredTags[0] != "fast" { + t.Errorf("expected preferred_tags [fast ssd], got %v", config2.PreferredTags) + } +} diff --git a/weed/worker/tasks/ec_balance/config.go b/weed/worker/tasks/ec_balance/config.go new file mode 100644 index 000000000..3ddad226b --- /dev/null +++ b/weed/worker/tasks/ec_balance/config.go @@ -0,0 +1,220 @@ +package ec_balance + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with EC balance specific settings +type Config struct { + base.BaseConfig + ImbalanceThreshold float64 `json:"imbalance_threshold"` + MinServerCount int `json:"min_server_count"` + CollectionFilter string `json:"collection_filter"` + DiskType string `json:"disk_type"` + PreferredTags []string `json:"preferred_tags"` + DataCenterFilter string `json:"-"` // per-detection-run, not persisted +} + +// NewDefaultConfig creates a new default EC balance configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 60 * 60, // 1 hour + MaxConcurrent: 1, + }, + ImbalanceThreshold: 0.2, // 20% + MinServerCount: 3, + CollectionFilter: "", + DiskType: "", + PreferredTags: nil, + } +} + +// GetConfigSpec returns the configuration schema for EC balance tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable EC Shard Balance Tasks", + Description: "Whether EC shard balance tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic EC shard balancing", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for EC shard imbalances", + HelpText: "The system will check for EC shard distribution imbalances at this interval", + Placeholder: "1", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 5, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of EC shard balance tasks that can run simultaneously", + HelpText: "Limits the number of EC shard balancing operations running at the same time", + Placeholder: "1 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "imbalance_threshold", + JSONName: "imbalance_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.2, + MinValue: 0.05, + MaxValue: 0.5, + Required: true, + DisplayName: "Imbalance Threshold", + Description: "Minimum shard count imbalance ratio to trigger balancing", + HelpText: "EC shard distribution imbalances above this threshold will trigger rebalancing", + Placeholder: "0.20 (20%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_server_count", + JSONName: "min_server_count", + Type: config.FieldTypeInt, + DefaultValue: 3, + MinValue: 2, + MaxValue: 100, + Required: true, + DisplayName: "Minimum Server Count", + Description: "Minimum number of servers required for EC shard balancing", + HelpText: "EC shard balancing will only occur if there are at least this many servers", + Placeholder: "3 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "collection_filter", + JSONName: "collection_filter", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Collection Filter", + Description: "Only balance EC shards from specific collections", + HelpText: "Leave empty to balance all collections, or specify collection name/wildcard", + Placeholder: "my_collection", + InputType: "text", + CSSClasses: "form-control", + }, + { + Name: "disk_type", + JSONName: "disk_type", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Disk Type", + Description: "Only balance EC shards on this disk type", + HelpText: "Leave empty for all disk types, or specify hdd or ssd", + Placeholder: "hdd", + InputType: "text", + CSSClasses: "form-control", + }, + { + Name: "preferred_tags", + JSONName: "preferred_tags", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Preferred Disk Tags", + Description: "Comma-separated disk tags to prioritize for shard placement", + HelpText: "EC shards will be placed on disks with these tags first, ordered by preference", + Placeholder: "fast,ssd", + InputType: "text", + CSSClasses: "form-control", + }, + }, + } +} + +// ToTaskPolicy converts configuration to a TaskPolicy protobuf message +func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { + preferredTagsCopy := append([]string(nil), c.PreferredTags...) + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_EcBalanceConfig{ + EcBalanceConfig: &worker_pb.EcBalanceTaskConfig{ + ImbalanceThreshold: c.ImbalanceThreshold, + MinServerCount: int32(c.MinServerCount), + CollectionFilter: c.CollectionFilter, + DiskType: c.DiskType, + PreferredTags: preferredTagsCopy, + }, + }, + } +} + +// FromTaskPolicy loads configuration from a TaskPolicy protobuf message +func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) + + if ecbConfig := policy.GetEcBalanceConfig(); ecbConfig != nil { + c.ImbalanceThreshold = ecbConfig.ImbalanceThreshold + c.MinServerCount = int(ecbConfig.MinServerCount) + c.CollectionFilter = ecbConfig.CollectionFilter + c.DiskType = ecbConfig.DiskType + c.PreferredTags = append([]string(nil), ecbConfig.PreferredTags...) + } + + return nil +} + +// LoadConfigFromPersistence loads configuration from the persistence layer if available +func LoadConfigFromPersistence(configPersistence interface{}) *Config { + cfg := NewDefaultConfig() + + if persistence, ok := configPersistence.(interface { + LoadEcBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) + }); ok { + if policy, err := persistence.LoadEcBalanceTaskPolicy(); err == nil && policy != nil { + if err := cfg.FromTaskPolicy(policy); err == nil { + glog.V(1).Infof("Loaded EC balance configuration from persistence") + return cfg + } + } + } + + glog.V(1).Infof("Using default EC balance configuration") + return cfg +} diff --git a/weed/worker/tasks/ec_balance/detection.go b/weed/worker/tasks/ec_balance/detection.go new file mode 100644 index 000000000..b91dff701 --- /dev/null +++ b/weed/worker/tasks/ec_balance/detection.go @@ -0,0 +1,784 @@ +package ec_balance + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// ecNodeInfo represents a volume server with EC shard information for detection +type ecNodeInfo struct { + nodeID string + address string + dc string + rack string // dc:rack composite key + freeSlots int + // volumeID -> shardBits (bitmask of shard IDs present on this node) + ecShards map[uint32]*ecVolumeInfo +} + +type ecVolumeInfo struct { + collection string + shardBits uint32 // bitmask + diskID uint32 +} + +// ecRackInfo represents a rack with EC node information +type ecRackInfo struct { + nodes map[string]*ecNodeInfo + freeSlots int +} + +// shardMove represents a proposed EC shard move +type shardMove struct { + volumeID uint32 + shardID int + collection string + source *ecNodeInfo + sourceDisk uint32 + target *ecNodeInfo + targetDisk uint32 + phase string // "dedup", "cross_rack", "within_rack", "global" +} + +// Detection implements the multi-phase EC shard balance detection algorithm. +// It analyzes EC shard distribution and proposes moves to achieve even distribution. +func Detection( + ctx context.Context, + metrics []*types.VolumeHealthMetrics, + clusterInfo *types.ClusterInfo, + config base.TaskConfig, + maxResults int, +) ([]*types.TaskDetectionResult, bool, error) { + if !config.IsEnabled() { + return nil, false, nil + } + + ecConfig := config.(*Config) + if maxResults < 0 { + maxResults = 0 + } + + if clusterInfo == nil || clusterInfo.ActiveTopology == nil { + return nil, false, fmt.Errorf("active topology not available for EC balance detection") + } + + topoInfo := clusterInfo.ActiveTopology.GetTopologyInfo() + if topoInfo == nil { + return nil, false, fmt.Errorf("topology info not available") + } + + // Build EC topology view + nodes, racks := buildECTopology(topoInfo, ecConfig) + + if len(nodes) < ecConfig.MinServerCount { + glog.V(1).Infof("EC balance: only %d servers, need at least %d", len(nodes), ecConfig.MinServerCount) + return nil, false, nil + } + + // Collect all EC volumes grouped by collection + collections := collectECCollections(nodes, ecConfig) + if len(collections) == 0 { + glog.V(1).Infof("EC balance: no EC volumes found matching filters") + return nil, false, nil + } + + threshold := ecConfig.ImbalanceThreshold + var allMoves []*shardMove + + // Build set of allowed collections for global phase filtering + allowedVids := make(map[uint32]bool) + for _, volumeIDs := range collections { + for _, vid := range volumeIDs { + allowedVids[vid] = true + } + } + + for collection, volumeIDs := range collections { + if ctx != nil { + if err := ctx.Err(); err != nil { + return nil, false, err + } + } + + // Phase 1: Detect duplicate shards (always run, duplicates are errors not imbalance) + for _, vid := range volumeIDs { + moves := detectDuplicateShards(vid, collection, nodes, ecConfig.DiskType) + applyMovesToTopology(moves) + allMoves = append(allMoves, moves...) + } + + // Phase 2: Balance shards across racks (operates on updated topology from phase 1) + for _, vid := range volumeIDs { + moves := detectCrossRackImbalance(vid, collection, nodes, racks, ecConfig.DiskType, threshold) + applyMovesToTopology(moves) + allMoves = append(allMoves, moves...) + } + + // Phase 3: Balance shards within racks (operates on updated topology from phases 1-2) + for _, vid := range volumeIDs { + moves := detectWithinRackImbalance(vid, collection, nodes, racks, ecConfig.DiskType, threshold) + applyMovesToTopology(moves) + allMoves = append(allMoves, moves...) + } + } + + // Phase 4: Global node balance across racks (only for volumes in allowed collections) + globalMoves := detectGlobalImbalance(nodes, racks, ecConfig, allowedVids) + allMoves = append(allMoves, globalMoves...) + + // Cap results + hasMore := false + if maxResults > 0 && len(allMoves) > maxResults { + allMoves = allMoves[:maxResults] + hasMore = true + } + + // Convert moves to TaskDetectionResults + now := time.Now() + results := make([]*types.TaskDetectionResult, 0, len(allMoves)) + for i, move := range allMoves { + // Include loop index and source/target in TaskID for uniqueness + taskID := fmt.Sprintf("ec_balance_%d_%d_%s_%s_%d_%d", + move.volumeID, move.shardID, + move.source.nodeID, move.target.nodeID, + now.UnixNano(), i) + + result := &types.TaskDetectionResult{ + TaskID: taskID, + TaskType: types.TaskTypeECBalance, + VolumeID: move.volumeID, + Server: move.source.nodeID, + Collection: move.collection, + Priority: movePhasePriority(move.phase), + Reason: fmt.Sprintf("EC shard %d.%d %s: %s → %s (%s)", + move.volumeID, move.shardID, move.phase, + move.source.nodeID, move.target.nodeID, move.phase), + ScheduleAt: now, + TypedParams: &worker_pb.TaskParams{ + TaskId: taskID, + VolumeId: move.volumeID, + Collection: move.collection, + Sources: []*worker_pb.TaskSource{{ + Node: move.source.address, + DiskId: move.sourceDisk, + Rack: move.source.rack, + ShardIds: []uint32{uint32(move.shardID)}, + }}, + Targets: []*worker_pb.TaskTarget{{ + Node: move.target.address, + DiskId: move.targetDisk, + Rack: move.target.rack, + ShardIds: []uint32{uint32(move.shardID)}, + }}, + TaskParams: &worker_pb.TaskParams_EcBalanceParams{ + EcBalanceParams: &worker_pb.EcBalanceTaskParams{ + DiskType: ecConfig.DiskType, + TimeoutSeconds: 600, + }, + }, + }, + } + results = append(results, result) + } + + glog.V(1).Infof("EC balance detection: %d moves proposed across %d collections", + len(results), len(collections)) + + return results, hasMore, nil +} + +// buildECTopology constructs EC node and rack structures from topology info. +// Rack keys are dc:rack composites to avoid cross-DC name collisions. +// Only racks with eligible nodes (matching disk type, having EC shards or capacity) are included. +func buildECTopology(topoInfo *master_pb.TopologyInfo, config *Config) (map[string]*ecNodeInfo, map[string]*ecRackInfo) { + nodes := make(map[string]*ecNodeInfo) + racks := make(map[string]*ecRackInfo) + + for _, dc := range topoInfo.DataCenterInfos { + if config.DataCenterFilter != "" { + matchers := wildcard.CompileWildcardMatchers(config.DataCenterFilter) + if !wildcard.MatchesAnyWildcard(matchers, dc.Id) { + continue + } + } + + for _, rack := range dc.RackInfos { + // Use dc:rack composite key to avoid cross-DC name collisions + rackKey := dc.Id + ":" + rack.Id + + for _, dn := range rack.DataNodeInfos { + node := &ecNodeInfo{ + nodeID: dn.Id, + address: dn.Id, + dc: dc.Id, + rack: rackKey, + ecShards: make(map[uint32]*ecVolumeInfo), + } + + hasMatchingDisk := false + for diskType, diskInfo := range dn.DiskInfos { + if config.DiskType != "" && diskType != config.DiskType { + continue + } + hasMatchingDisk = true + + freeSlots := int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countEcShards(diskInfo.EcShardInfos) + if freeSlots > 0 { + node.freeSlots += freeSlots + } + + for _, ecShardInfo := range diskInfo.EcShardInfos { + vid := ecShardInfo.Id + existing, ok := node.ecShards[vid] + if !ok { + existing = &ecVolumeInfo{ + collection: ecShardInfo.Collection, + diskID: ecShardInfo.DiskId, + } + node.ecShards[vid] = existing + } + existing.shardBits |= ecShardInfo.EcIndexBits + } + } + + if !hasMatchingDisk { + continue + } + + nodes[dn.Id] = node + + // Only create rack entry when we have an eligible node + if _, ok := racks[rackKey]; !ok { + racks[rackKey] = &ecRackInfo{nodes: make(map[string]*ecNodeInfo)} + } + racks[rackKey].nodes[dn.Id] = node + racks[rackKey].freeSlots += node.freeSlots + } + } + } + + return nodes, racks +} + +// collectECCollections groups EC volume IDs by collection, applying filters +func collectECCollections(nodes map[string]*ecNodeInfo, config *Config) map[string][]uint32 { + allowedCollections := wildcard.CompileWildcardMatchers(config.CollectionFilter) + + // Collect unique volume IDs per collection + collectionVids := make(map[string]map[uint32]bool) + for _, node := range nodes { + for vid, info := range node.ecShards { + if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, info.collection) { + continue + } + if _, ok := collectionVids[info.collection]; !ok { + collectionVids[info.collection] = make(map[uint32]bool) + } + collectionVids[info.collection][vid] = true + } + } + + // Convert to sorted slices + result := make(map[string][]uint32, len(collectionVids)) + for collection, vids := range collectionVids { + vidSlice := make([]uint32, 0, len(vids)) + for vid := range vids { + vidSlice = append(vidSlice, vid) + } + sort.Slice(vidSlice, func(i, j int) bool { return vidSlice[i] < vidSlice[j] }) + result[collection] = vidSlice + } + + return result +} + +// detectDuplicateShards finds shards that exist on multiple nodes. +// Duplicates are always returned regardless of threshold since they are data errors. +func detectDuplicateShards(vid uint32, collection string, nodes map[string]*ecNodeInfo, diskType string) []*shardMove { + // Build shard -> list of nodes mapping + shardLocations := make(map[int][]*ecNodeInfo) + for _, node := range nodes { + info, ok := node.ecShards[vid] + if !ok { + continue + } + for shardID := 0; shardID < erasure_coding.MaxShardCount; shardID++ { + if info.shardBits&(1< 0 { + rackShardCount[node.rack] += count + rackShardNodes[node.rack] = append(rackShardNodes[node.rack], node) + totalShards += count + } + } + + if totalShards == 0 { + return nil + } + + // Check if imbalance exceeds threshold + if !exceedsImbalanceThreshold(rackShardCount, totalShards, numRacks, threshold) { + return nil + } + + maxPerRack := ceilDivide(totalShards, numRacks) + + var moves []*shardMove + + // Find over-loaded racks and move excess shards to under-loaded racks + for rackID, count := range rackShardCount { + if count <= maxPerRack { + continue + } + excess := count - maxPerRack + movedFromRack := 0 + + // Find shards to move from this rack + for _, node := range rackShardNodes[rackID] { + if movedFromRack >= excess { + break + } + info := node.ecShards[vid] + for shardID := 0; shardID < erasure_coding.TotalShardsCount; shardID++ { + if movedFromRack >= excess { + break + } + if info.shardBits&(1<= excess { + break + } + if info.shardBits&(1< 0 && !allowedVids[vid] { + continue + } + count += shardBitCount(info.shardBits) + } + nodeShardCounts[nodeID] = count + totalShards += count + } + + if totalShards == 0 { + continue + } + + // Check if imbalance exceeds threshold + if !exceedsImbalanceThreshold(nodeShardCounts, totalShards, len(rack.nodes), config.ImbalanceThreshold) { + continue + } + + avgShards := ceilDivide(totalShards, len(rack.nodes)) + + // Iteratively move shards from most-loaded to least-loaded + for i := 0; i < 10; i++ { // cap iterations to avoid infinite loops + // Find min and max nodes, skipping full nodes for min + var minNode, maxNode *ecNodeInfo + minCount, maxCount := totalShards+1, -1 + for nodeID, count := range nodeShardCounts { + node := rack.nodes[nodeID] + if count < minCount && node.freeSlots > 0 { + minCount = count + minNode = node + } + if count > maxCount { + maxCount = count + maxNode = rack.nodes[nodeID] + } + } + + if maxNode == nil || minNode == nil || maxNode.nodeID == minNode.nodeID { + break + } + if maxCount <= avgShards || minCount+1 > avgShards { + break + } + if maxCount-minCount <= 1 { + break + } + + // Pick a shard from maxNode that doesn't already exist on minNode + moved := false + for vid, info := range maxNode.ecShards { + if moved { + break + } + if len(allowedVids) > 0 && !allowedVids[vid] { + continue + } + // Check minNode doesn't have this volume's shards already (avoid same-volume overlap) + minInfo := minNode.ecShards[vid] + for shardID := 0; shardID < erasure_coding.TotalShardsCount; shardID++ { + if info.shardBits&(1<= maxPerRack { + continue + } + if rack.freeSlots <= 0 { + continue + } + for _, node := range rack.nodes { + if node.freeSlots <= 0 { + continue + } + if node.freeSlots > bestFreeSlots { + bestFreeSlots = node.freeSlots + bestNode = node + } + } + } + + return bestNode +} + +// findLeastLoadedNodeInRack finds the node with fewest shards in a rack +func findLeastLoadedNodeInRack(vid uint32, rack *ecRackInfo, excludeNode string, nodeShardCount map[string]int, maxPerNode int) *ecNodeInfo { + var bestNode *ecNodeInfo + bestCount := maxPerNode + 1 + + for nodeID, node := range rack.nodes { + if nodeID == excludeNode { + continue + } + if node.freeSlots <= 0 { + continue + } + count := nodeShardCount[nodeID] + if count >= maxPerNode { + continue + } + if count < bestCount { + bestCount = count + bestNode = node + } + } + + return bestNode +} + +// exceedsImbalanceThreshold checks if the distribution of counts exceeds the threshold. +// numGroups is the total number of groups (including those with 0 shards that aren't in the map). +// imbalanceRatio = (maxCount - minCount) / avgCount +func exceedsImbalanceThreshold(counts map[string]int, total int, numGroups int, threshold float64) bool { + if numGroups <= 1 || total == 0 { + return false + } + + minCount := 0 // groups not in map have 0 shards + if len(counts) >= numGroups { + // All groups have entries; find actual min + minCount = total + 1 + for _, count := range counts { + if count < minCount { + minCount = count + } + } + } + + maxCount := -1 + for _, count := range counts { + if count > maxCount { + maxCount = count + } + } + + avg := float64(total) / float64(numGroups) + if avg == 0 { + return false + } + + imbalanceRatio := float64(maxCount-minCount) / avg + return imbalanceRatio > threshold +} + +// applyMovesToTopology simulates planned moves on the in-memory topology +// so subsequent detection phases see updated shard placement. +func applyMovesToTopology(moves []*shardMove) { + for _, move := range moves { + shardBit := uint32(1 << uint(move.shardID)) + + // Remove shard from source + if srcInfo, ok := move.source.ecShards[move.volumeID]; ok { + srcInfo.shardBits &^= shardBit + } + + // For non-dedup moves, add shard to target + if move.source.nodeID != move.target.nodeID { + dstInfo, ok := move.target.ecShards[move.volumeID] + if !ok { + dstInfo = &ecVolumeInfo{ + collection: move.collection, + diskID: move.targetDisk, + } + move.target.ecShards[move.volumeID] = dstInfo + } + dstInfo.shardBits |= shardBit + } + } +} + +// Helper functions + +func countEcShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) int { + count := 0 + for _, eci := range ecShardInfos { + count += erasure_coding.GetShardCount(eci) + } + return count +} + +func shardBitCount(bits uint32) int { + count := 0 + for bits != 0 { + count += int(bits & 1) + bits >>= 1 + } + return count +} + +func ecShardDiskID(node *ecNodeInfo, vid uint32) uint32 { + if info, ok := node.ecShards[vid]; ok { + return info.diskID + } + return 0 +} + +func ceilDivide(a, b int) int { + if b == 0 { + return 0 + } + return (a + b - 1) / b +} + +func movePhasePriority(phase string) types.TaskPriority { + switch phase { + case "dedup": + return types.TaskPriorityHigh + case "cross_rack": + return types.TaskPriorityMedium + default: + return types.TaskPriorityLow + } +} diff --git a/weed/worker/tasks/ec_balance/detection_test.go b/weed/worker/tasks/ec_balance/detection_test.go new file mode 100644 index 000000000..81e6ffb41 --- /dev/null +++ b/weed/worker/tasks/ec_balance/detection_test.go @@ -0,0 +1,514 @@ +package ec_balance + +import ( + "context" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +func TestShardBitCount(t *testing.T) { + tests := []struct { + bits uint32 + expected int + }{ + {0, 0}, + {1, 1}, + {0b111, 3}, + {0x3FFF, 14}, // all 14 shards + {0b10101010, 4}, + } + for _, tt := range tests { + got := shardBitCount(tt.bits) + if got != tt.expected { + t.Errorf("shardBitCount(%b) = %d, want %d", tt.bits, got, tt.expected) + } + } +} + +func TestCeilDivide(t *testing.T) { + tests := []struct { + a, b int + expected int + }{ + {14, 3, 5}, + {14, 7, 2}, + {10, 3, 4}, + {0, 5, 0}, + {5, 0, 0}, + } + for _, tt := range tests { + got := ceilDivide(tt.a, tt.b) + if got != tt.expected { + t.Errorf("ceilDivide(%d, %d) = %d, want %d", tt.a, tt.b, got, tt.expected) + } + } +} + +func TestDetectDuplicateShards(t *testing.T) { + nodes := map[string]*ecNodeInfo{ + "node1": { + nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0b11}, // shard 0, 1 + }, + }, + "node2": { + nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 10, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0b01}, // shard 0 (duplicate) + }, + }, + } + + moves := detectDuplicateShards(100, "col1", nodes, "") + + if len(moves) != 1 { + t.Fatalf("expected 1 dedup move, got %d", len(moves)) + } + + move := moves[0] + if move.phase != "dedup" { + t.Errorf("expected phase 'dedup', got %q", move.phase) + } + if move.shardID != 0 { + t.Errorf("expected shard 0 to be deduplicated, got %d", move.shardID) + } + // node1 has fewer free slots, so the duplicate on node1 should be removed (keeper is node2) + if move.source.nodeID != "node1" { + t.Errorf("expected source node1 (fewer free slots), got %s", move.source.nodeID) + } + // Dedup moves set target=source so isDedupPhase recognizes unmount+delete only + if move.target.nodeID != "node1" { + t.Errorf("expected target node1 (same as source for dedup), got %s", move.target.nodeID) + } +} + +func TestDetectCrossRackImbalance(t *testing.T) { + // 14 shards all on rack1, 2 racks available — large imbalance + nodes := map[string]*ecNodeInfo{ + "node1": { + nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 0, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0x3FFF}, // all 14 shards + }, + }, + "node2": { + nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 20, + ecShards: map[uint32]*ecVolumeInfo{}, + }, + } + racks := map[string]*ecRackInfo{ + "dc1:rack1": { + nodes: map[string]*ecNodeInfo{"node1": nodes["node1"]}, + freeSlots: 0, + }, + "dc1:rack2": { + nodes: map[string]*ecNodeInfo{"node2": nodes["node2"]}, + freeSlots: 20, + }, + } + + // Use very low threshold so this triggers + moves := detectCrossRackImbalance(100, "col1", nodes, racks, "", 0.01) + + // With 14 shards across 2 racks, max per rack = 7 + // rack1 has 14 -> excess = 7, should move 7 to rack2 + if len(moves) != 7 { + t.Fatalf("expected 7 cross-rack moves, got %d", len(moves)) + } + for _, move := range moves { + if move.phase != "cross_rack" { + t.Errorf("expected phase 'cross_rack', got %q", move.phase) + } + if move.source.rack != "dc1:rack1" { + t.Errorf("expected source dc1:rack1, got %s", move.source.rack) + } + if move.target.rack != "dc1:rack2" { + t.Errorf("expected target dc1:rack2, got %s", move.target.rack) + } + } +} + +func TestDetectCrossRackImbalanceBelowThreshold(t *testing.T) { + // Slight imbalance: rack1 has 8, rack2 has 6 — imbalance = 2/7 ≈ 0.29 + nodes := map[string]*ecNodeInfo{ + "node1": { + nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 10, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0xFF}, // 8 shards + }, + }, + "node2": { + nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 10, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0x3F00}, // 6 shards + }, + }, + } + racks := map[string]*ecRackInfo{ + "dc1:rack1": { + nodes: map[string]*ecNodeInfo{"node1": nodes["node1"]}, + freeSlots: 10, + }, + "dc1:rack2": { + nodes: map[string]*ecNodeInfo{"node2": nodes["node2"]}, + freeSlots: 10, + }, + } + + // High threshold should skip this + moves := detectCrossRackImbalance(100, "col1", nodes, racks, "", 0.5) + if len(moves) != 0 { + t.Fatalf("expected 0 moves below threshold, got %d", len(moves)) + } +} + +func TestDetectWithinRackImbalance(t *testing.T) { + // rack1 has 2 nodes: node1 has 10 shards, node2 has 0 shards + nodes := map[string]*ecNodeInfo{ + "node1": { + nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0b1111111111}, // shards 0-9 + }, + }, + "node2": { + nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 20, + ecShards: map[uint32]*ecVolumeInfo{}, + }, + } + racks := map[string]*ecRackInfo{ + "dc1:rack1": { + nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]}, + freeSlots: 25, + }, + } + + moves := detectWithinRackImbalance(100, "col1", nodes, racks, "", 0.01) + + // 10 shards on 2 nodes, max per node = 5 + // node1 has 10 -> excess = 5, should move 5 to node2 + if len(moves) != 5 { + t.Fatalf("expected 5 within-rack moves, got %d", len(moves)) + } + for _, move := range moves { + if move.phase != "within_rack" { + t.Errorf("expected phase 'within_rack', got %q", move.phase) + } + if move.source.nodeID != "node1" { + t.Errorf("expected source node1, got %s", move.source.nodeID) + } + if move.target.nodeID != "node2" { + t.Errorf("expected target node2, got %s", move.target.nodeID) + } + } +} + +func TestDetectGlobalImbalance(t *testing.T) { + // node1 has 20 total shards, node2 has 2 total shards (same rack) + nodes := map[string]*ecNodeInfo{ + "node1": { + nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards + 200: {collection: "col1", shardBits: 0b111111}, // 6 shards + }, + }, + "node2": { + nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 30, + ecShards: map[uint32]*ecVolumeInfo{ + 300: {collection: "col1", shardBits: 0b11}, // 2 shards + }, + }, + } + racks := map[string]*ecRackInfo{ + "dc1:rack1": { + nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]}, + freeSlots: 35, + }, + } + + config := NewDefaultConfig() + config.ImbalanceThreshold = 0.01 // low threshold to ensure moves happen + moves := detectGlobalImbalance(nodes, racks, config, nil) + + // Total = 22 shards, avg = 11. node1 has 20, node2 has 2. + // Should move shards until balanced (max 10 iterations) + if len(moves) == 0 { + t.Fatal("expected global balance moves, got 0") + } + for _, move := range moves { + if move.phase != "global" { + t.Errorf("expected phase 'global', got %q", move.phase) + } + if move.source.nodeID != "node1" { + t.Errorf("expected moves from node1, got %s", move.source.nodeID) + } + if move.target.nodeID != "node2" { + t.Errorf("expected moves to node2, got %s", move.target.nodeID) + } + } +} + +func TestDetectGlobalImbalanceSkipsFullNodes(t *testing.T) { + // node2 has 0 free slots — should not be chosen as destination + nodes := map[string]*ecNodeInfo{ + "node1": { + nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 10, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards + }, + }, + "node2": { + nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 0, + ecShards: map[uint32]*ecVolumeInfo{ + 200: {collection: "col1", shardBits: 0b11}, // 2 shards + }, + }, + } + racks := map[string]*ecRackInfo{ + "dc1:rack1": { + nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]}, + freeSlots: 10, + }, + } + + config := NewDefaultConfig() + config.ImbalanceThreshold = 0.01 + moves := detectGlobalImbalance(nodes, racks, config, nil) + + // node2 has no free slots so no moves should be proposed + if len(moves) != 0 { + t.Fatalf("expected 0 moves (node2 full), got %d", len(moves)) + } +} + +func TestBuildECTopology(t *testing.T) { + topoInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "": { + MaxVolumeCount: 100, + VolumeCount: 50, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{ + { + Id: 1, + Collection: "test", + EcIndexBits: 0x3FFF, // all 14 shards + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + config := NewDefaultConfig() + nodes, racks := buildECTopology(topoInfo, config) + + if len(nodes) != 1 { + t.Fatalf("expected 1 node, got %d", len(nodes)) + } + if len(racks) != 1 { + t.Fatalf("expected 1 rack, got %d", len(racks)) + } + + node := nodes["server1:8080"] + if node == nil { + t.Fatal("expected node server1:8080") + } + if node.dc != "dc1" { + t.Errorf("expected dc=dc1, got %s", node.dc) + } + // Rack key should be dc:rack composite + if node.rack != "dc1:rack1" { + t.Errorf("expected rack=dc1:rack1, got %s", node.rack) + } + + ecInfo, ok := node.ecShards[1] + if !ok { + t.Fatal("expected EC shard info for volume 1") + } + if ecInfo.collection != "test" { + t.Errorf("expected collection=test, got %s", ecInfo.collection) + } + if shardBitCount(ecInfo.shardBits) != 14 { + t.Errorf("expected 14 shards, got %d", shardBitCount(ecInfo.shardBits)) + } +} + +func TestBuildECTopologyCrossDCRackNames(t *testing.T) { + // Two DCs with identically-named racks should produce distinct rack keys + topoInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{{ + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{{ + Id: "node-dc1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "": {MaxVolumeCount: 10, VolumeCount: 0}, + }, + }}, + }}, + }, + { + Id: "dc2", + RackInfos: []*master_pb.RackInfo{{ + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{{ + Id: "node-dc2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "": {MaxVolumeCount: 10, VolumeCount: 0}, + }, + }}, + }}, + }, + }, + } + + config := NewDefaultConfig() + _, racks := buildECTopology(topoInfo, config) + + if len(racks) != 2 { + t.Fatalf("expected 2 distinct racks, got %d", len(racks)) + } + if _, ok := racks["dc1:rack1"]; !ok { + t.Error("expected dc1:rack1 rack key") + } + if _, ok := racks["dc2:rack1"]; !ok { + t.Error("expected dc2:rack1 rack key") + } +} + +func TestCollectECCollections(t *testing.T) { + nodes := map[string]*ecNodeInfo{ + "node1": { + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1"}, + 200: {collection: "col2"}, + }, + }, + "node2": { + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1"}, + 300: {collection: "col2"}, + }, + }, + } + + config := NewDefaultConfig() + collections := collectECCollections(nodes, config) + + if len(collections) != 2 { + t.Fatalf("expected 2 collections, got %d", len(collections)) + } + if len(collections["col1"]) != 1 { + t.Errorf("expected 1 volume in col1, got %d", len(collections["col1"])) + } + if len(collections["col2"]) != 2 { + t.Errorf("expected 2 volumes in col2, got %d", len(collections["col2"])) + } +} + +func TestCollectECCollectionsWithFilter(t *testing.T) { + nodes := map[string]*ecNodeInfo{ + "node1": { + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1"}, + 200: {collection: "col2"}, + }, + }, + } + + config := NewDefaultConfig() + config.CollectionFilter = "col1" + collections := collectECCollections(nodes, config) + + if len(collections) != 1 { + t.Fatalf("expected 1 collection, got %d", len(collections)) + } + if _, ok := collections["col1"]; !ok { + t.Error("expected col1 to be present") + } +} + +func TestDetectionDisabled(t *testing.T) { + config := NewDefaultConfig() + config.Enabled = false + + results, hasMore, err := Detection(context.Background(), nil, nil, config, 0) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if hasMore { + t.Error("expected hasMore=false") + } + if len(results) != 0 { + t.Errorf("expected 0 results, got %d", len(results)) + } +} + +func TestDetectionNilTopology(t *testing.T) { + config := NewDefaultConfig() + clusterInfo := &types.ClusterInfo{ActiveTopology: nil} + + _, _, err := Detection(context.Background(), nil, clusterInfo, config, 0) + if err == nil { + t.Fatal("expected error for nil topology") + } +} + +func TestMovePhasePriority(t *testing.T) { + if movePhasePriority("dedup") != types.TaskPriorityHigh { + t.Error("dedup should be high priority") + } + if movePhasePriority("cross_rack") != types.TaskPriorityMedium { + t.Error("cross_rack should be medium priority") + } + if movePhasePriority("within_rack") != types.TaskPriorityLow { + t.Error("within_rack should be low priority") + } + if movePhasePriority("global") != types.TaskPriorityLow { + t.Error("global should be low priority") + } +} + +func TestExceedsImbalanceThreshold(t *testing.T) { + // 14 vs 0 across 2 groups: imbalance = 14/7 = 2.0 > any reasonable threshold + counts := map[string]int{"a": 14, "b": 0} + if !exceedsImbalanceThreshold(counts, 14, 2, 0.2) { + t.Error("expected imbalance to exceed 0.2 threshold") + } + + // Only one group has shards but numGroups=2: min is 0 from absent group + counts2 := map[string]int{"a": 14} + if !exceedsImbalanceThreshold(counts2, 14, 2, 0.2) { + t.Error("expected imbalance with absent group to exceed 0.2 threshold") + } + + // 7 vs 7: perfectly balanced + counts3 := map[string]int{"a": 7, "b": 7} + if exceedsImbalanceThreshold(counts3, 14, 2, 0.01) { + t.Error("expected balanced distribution to not exceed threshold") + } +} + +// helper to avoid unused import +var _ = erasure_coding.DataShardsCount diff --git a/weed/worker/tasks/ec_balance/ec_balance_task.go b/weed/worker/tasks/ec_balance/ec_balance_task.go new file mode 100644 index 000000000..c58c34809 --- /dev/null +++ b/weed/worker/tasks/ec_balance/ec_balance_task.go @@ -0,0 +1,224 @@ +package ec_balance + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/types" + "github.com/seaweedfs/seaweedfs/weed/worker/types/base" + "google.golang.org/grpc" +) + +// ECBalanceTask implements a single EC shard move operation. +// The move sequence is: copy+mount on dest → unmount on source → delete on source. +type ECBalanceTask struct { + *base.BaseTask + volumeID uint32 + collection string + grpcDialOption grpc.DialOption + progress float64 +} + +// NewECBalanceTask creates a new EC balance task instance +func NewECBalanceTask(id string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *ECBalanceTask { + return &ECBalanceTask{ + BaseTask: base.NewBaseTask(id, types.TaskTypeECBalance), + volumeID: volumeID, + collection: collection, + grpcDialOption: grpcDialOption, + } +} + +// Execute performs the EC shard move operation using the same RPC sequence +// as the shell ec.balance command's moveMountedShardToEcNode function. +func (t *ECBalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { + if params == nil { + return fmt.Errorf("task parameters are required") + } + + if len(params.Sources) == 0 || len(params.Targets) == 0 { + return fmt.Errorf("sources and targets are required for EC shard move") + } + if len(params.Sources) > 1 || len(params.Targets) > 1 { + return fmt.Errorf("batch EC shard moves not supported: got %d sources and %d targets, expected 1 each", len(params.Sources), len(params.Targets)) + } + + source := params.Sources[0] + target := params.Targets[0] + + if len(source.ShardIds) == 0 || len(target.ShardIds) == 0 { + return fmt.Errorf("shard IDs are required in sources and targets") + } + + sourceAddr := pb.ServerAddress(source.Node) + targetAddr := pb.ServerAddress(target.Node) + + ecParams := params.GetEcBalanceParams() + + // Apply configured timeout to the context for all RPC operations + if ecParams != nil && ecParams.TimeoutSeconds > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(ecParams.TimeoutSeconds)*time.Second) + defer cancel() + } + + isDedupDelete := ecParams != nil && isDedupPhase(params) + + glog.Infof("EC balance: moving shard(s) %v of volume %d from %s to %s", + source.ShardIds, params.VolumeId, source.Node, target.Node) + + // For dedup, we only unmount+delete from source (no copy needed) + if isDedupDelete { + return t.executeDedupDelete(ctx, params.VolumeId, sourceAddr, source.ShardIds) + } + + // Step 1: Copy shard to destination and mount + t.reportProgress(10.0, "Copying EC shard to destination") + if err := t.copyAndMountShard(ctx, params.VolumeId, sourceAddr, targetAddr, source.ShardIds, target.DiskId); err != nil { + return fmt.Errorf("copy and mount shard: %v", err) + } + + // Step 2: Unmount shard on source + t.reportProgress(50.0, "Unmounting EC shard from source") + if err := t.unmountShard(ctx, params.VolumeId, sourceAddr, source.ShardIds); err != nil { + return fmt.Errorf("unmount shard on source: %v", err) + } + + // Step 3: Delete shard from source + t.reportProgress(75.0, "Deleting EC shard from source") + if err := t.deleteShard(ctx, params.VolumeId, params.Collection, sourceAddr, source.ShardIds); err != nil { + return fmt.Errorf("delete shard on source: %v", err) + } + + t.reportProgress(100.0, "EC shard move complete") + glog.Infof("EC balance: successfully moved shard(s) %v of volume %d from %s to %s", + source.ShardIds, params.VolumeId, source.Node, target.Node) + return nil +} + +// executeDedupDelete removes a duplicate shard without copying +func (t *ECBalanceTask) executeDedupDelete(ctx context.Context, volumeID uint32, sourceAddr pb.ServerAddress, shardIDs []uint32) error { + t.reportProgress(25.0, "Unmounting duplicate EC shard") + if err := t.unmountShard(ctx, volumeID, sourceAddr, shardIDs); err != nil { + return fmt.Errorf("unmount duplicate shard: %v", err) + } + + t.reportProgress(75.0, "Deleting duplicate EC shard") + if err := t.deleteShard(ctx, volumeID, t.collection, sourceAddr, shardIDs); err != nil { + return fmt.Errorf("delete duplicate shard: %v", err) + } + + t.reportProgress(100.0, "Duplicate shard removed") + return nil +} + +// copyAndMountShard copies EC shard from source to destination and mounts it +func (t *ECBalanceTask) copyAndMountShard(ctx context.Context, volumeID uint32, sourceAddr, targetAddr pb.ServerAddress, shardIDs []uint32, destDiskID uint32) error { + return operation.WithVolumeServerClient(false, targetAddr, t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + // Copy shard data (if source != target) + if sourceAddr != targetAddr { + _, err := client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: volumeID, + Collection: t.collection, + ShardIds: shardIDs, + CopyEcxFile: true, + CopyEcjFile: true, + CopyVifFile: true, + SourceDataNode: string(sourceAddr), + DiskId: destDiskID, + }) + if err != nil { + return fmt.Errorf("copy shard(s) %v from %s to %s: %v", shardIDs, sourceAddr, targetAddr, err) + } + } + + // Mount the shard on destination + _, err := client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, + Collection: t.collection, + ShardIds: shardIDs, + }) + if err != nil { + return fmt.Errorf("mount shard(s) %v on %s: %v", shardIDs, targetAddr, err) + } + + return nil + }) +} + +// unmountShard unmounts EC shards from a server +func (t *ECBalanceTask) unmountShard(ctx context.Context, volumeID uint32, addr pb.ServerAddress, shardIDs []uint32) error { + return operation.WithVolumeServerClient(false, addr, t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{ + VolumeId: volumeID, + ShardIds: shardIDs, + }) + return err + }) +} + +// deleteShard deletes EC shards from a server +func (t *ECBalanceTask) deleteShard(ctx context.Context, volumeID uint32, collection string, addr pb.ServerAddress, shardIDs []uint32) error { + return operation.WithVolumeServerClient(false, addr, t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: volumeID, + Collection: collection, + ShardIds: shardIDs, + }) + return err + }) +} + +// Validate validates the task parameters. +// ECBalanceTask handles exactly one source→target shard move per execution. +func (t *ECBalanceTask) Validate(params *worker_pb.TaskParams) error { + if params == nil { + return fmt.Errorf("ECBalanceTask.Validate: TaskParams are required") + } + if len(params.Sources) != 1 { + return fmt.Errorf("ECBalanceTask.Validate: expected exactly 1 source, got %d", len(params.Sources)) + } + if len(params.Targets) != 1 { + return fmt.Errorf("ECBalanceTask.Validate: expected exactly 1 target, got %d", len(params.Targets)) + } + if len(params.Sources[0].ShardIds) == 0 { + return fmt.Errorf("ECBalanceTask.Validate: Sources[0].ShardIds is empty") + } + if len(params.Targets[0].ShardIds) == 0 { + return fmt.Errorf("ECBalanceTask.Validate: Targets[0].ShardIds is empty") + } + return nil +} + +// EstimateTime estimates the time for an EC shard move +func (t *ECBalanceTask) EstimateTime(params *worker_pb.TaskParams) time.Duration { + return 30 * time.Second +} + +// GetProgress returns current progress +func (t *ECBalanceTask) GetProgress() float64 { + return t.progress +} + +// reportProgress updates the stored progress and reports it via the callback +func (t *ECBalanceTask) reportProgress(progress float64, stage string) { + t.progress = progress + t.reportProgress(progress, stage) +} + +// isDedupPhase checks if this is a dedup-phase task (source and target are the same node) +func isDedupPhase(params *worker_pb.TaskParams) bool { + if len(params.Sources) > 0 && len(params.Targets) > 0 { + return params.Sources[0].Node == params.Targets[0].Node + } + return false +} diff --git a/weed/worker/tasks/ec_balance/register.go b/weed/worker/tasks/ec_balance/register.go new file mode 100644 index 000000000..4ab0e8c2b --- /dev/null +++ b/weed/worker/tasks/ec_balance/register.go @@ -0,0 +1,89 @@ +package ec_balance + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Global variable to hold the task definition for configuration updates +var globalTaskDef *base.TaskDefinition + +// Auto-register this task when the package is imported +func init() { + RegisterECBalanceTask() + + // Register config updater + tasks.AutoRegisterConfigUpdater(types.TaskTypeECBalance, UpdateConfigFromPersistence) +} + +// RegisterECBalanceTask registers the EC balance task with the task architecture +func RegisterECBalanceTask() { + cfg := NewDefaultConfig() + + // Create shared gRPC dial option using TLS configuration + dialOpt := security.LoadClientTLS(util.GetViper(), "grpc.worker") + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeECBalance, + Name: "ec_balance", + DisplayName: "EC Shard Balance", + Description: "Balances EC shard distribution across racks and servers", + Icon: "fas fa-balance-scale-left text-info", + Capabilities: []string{"ec_balance", "erasure_coding"}, + + Config: cfg, + ConfigSpec: GetConfigSpec(), + CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) { + if params == nil { + return nil, fmt.Errorf("task parameters are required") + } + return NewECBalanceTask( + fmt.Sprintf("ec_balance-%d", params.VolumeId), + params.VolumeId, + params.Collection, + dialOpt, + ), nil + }, + DetectionFunc: func(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + results, _, err := Detection(context.Background(), metrics, clusterInfo, config, 0) + return results, err + }, + ScanInterval: 1 * time.Hour, + SchedulingFunc: Scheduling, + MaxConcurrent: 1, + RepeatInterval: 4 * time.Hour, + } + + // Store task definition globally for configuration updates + globalTaskDef = taskDef + + // Register everything with a single function call + base.RegisterTask(taskDef) +} + +// UpdateConfigFromPersistence updates the EC balance configuration from persistence +func UpdateConfigFromPersistence(configPersistence interface{}) error { + if globalTaskDef == nil { + return fmt.Errorf("ec balance task not registered") + } + + newConfig := LoadConfigFromPersistence(configPersistence) + if newConfig == nil { + return fmt.Errorf("failed to load configuration from persistence") + } + + globalTaskDef.Config = newConfig + + glog.V(1).Infof("Updated EC balance task configuration from persistence") + return nil +} diff --git a/weed/worker/tasks/ec_balance/scheduling.go b/weed/worker/tasks/ec_balance/scheduling.go new file mode 100644 index 000000000..3c6c68526 --- /dev/null +++ b/weed/worker/tasks/ec_balance/scheduling.go @@ -0,0 +1,40 @@ +package ec_balance + +import ( + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Scheduling implements the scheduling logic for EC balance tasks +func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool { + ecbConfig := config.(*Config) + + // Check if we have available workers + if len(availableWorkers) == 0 { + return false + } + + // Count running EC balance tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeECBalance { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecbConfig.MaxConcurrent { + return false + } + + // Check if any worker can handle EC balance tasks + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeECBalance { + return true + } + } + } + + return false +} diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go index c4cafd07f..8b1b6494a 100644 --- a/weed/worker/types/task_types.go +++ b/weed/worker/types/task_types.go @@ -15,6 +15,7 @@ const ( TaskTypeErasureCoding TaskType = "erasure_coding" TaskTypeBalance TaskType = "balance" TaskTypeReplication TaskType = "replication" + TaskTypeECBalance TaskType = "ec_balance" ) // TaskStatus represents the status of a maintenance task