From 306bc31a28416abeff6289205596c003fb4f673f Mon Sep 17 00:00:00 2001 From: chrislusf Date: Tue, 2 Dec 2025 12:43:11 -0800 Subject: [PATCH] ec: add diskType parameter to core EC functions Add diskType parameter to: - ecBalancer struct - collectEcVolumeServersByDc() - collectEcNodesForDC() - collectEcNodes() - EcBalance() This allows EC operations to target specific disk types (hdd, ssd, etc.) instead of being hardcoded to HardDriveType only. For backward compatibility, all callers currently pass types.HardDriveType as the default value. Subsequent commits will add -diskType flags to the individual EC commands. --- weed/shell/command_ec_balance.go | 4 +++- weed/shell/command_ec_common.go | 18 ++++++++++-------- weed/shell/command_ec_common_test.go | 6 +++--- weed/shell/command_ec_encode.go | 2 +- weed/shell/command_ec_rebuild.go | 3 ++- weed/shell/command_volume_server_evacuate.go | 2 +- 6 files changed, 20 insertions(+), 15 deletions(-) diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 935348602..4f1069bbb 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "io" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -67,5 +69,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing) + return EcBalance(commandEnv, collections, *dc, rp, types.HardDriveType, *maxParallelization, *applyBalancing) } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f2cc581da..06284edf8 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura } -func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN } // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType) sortEcNodesByFreeslotsDescending(ecNodes) return } -func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - return collectEcNodesForDC(commandEnv, "") +func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + return collectEcNodesForDC(commandEnv, "", diskType) } func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { @@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { +func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if selectedDataCenter != "" && selectedDataCenter != string(dc) { return } - freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) + freeEcSlots := countFreeShardSlots(dn, diskType) ecNode := &EcNode{ info: dn, dc: dc, @@ -649,6 +649,7 @@ type ecBalancer struct { replicaPlacement *super_block.ReplicaPlacement applyBalancing bool maxParallelization int + diskType types.DiskType // target disk type for EC shards (default: HardDriveType) } func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { @@ -1194,9 +1195,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) { // collect all ec nodes - allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc) + allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType) if err != nil { return err } @@ -1210,6 +1211,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, maxParallelization: maxParallelization, + diskType: diskType, } if len(collections) == 0 { diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index f1f460bc6..eac3d7860 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) { func TestEcDistribution(t *testing.T) { // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "") + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType) sortEcNodesByFreeslotsDescending(ecNodes) @@ -149,7 +149,7 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) ecb := &ecBalancer{ @@ -225,7 +225,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) ecb := &ecBalancer{ ecNodes: allEcNodes, diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index d6b6b17b3..7d9536af6 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -138,7 +138,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) } // ...re-balance ec shards... - if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil { + if err := EcBalance(commandEnv, balanceCollections, "", rp, types.HardDriveType, *maxParallelization, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err) } // ...then delete original volumes using pre-collected locations. diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 79acebff1..698705853 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -96,7 +97,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W } // collect all ec nodes - allEcNodes, _, err := collectEcNodes(commandEnv) + allEcNodes, _, err := collectEcNodes(commandEnv, types.HardDriveType) if err != nil { return err } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 6135eb3eb..a61593231 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -158,7 +158,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server - ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", types.HardDriveType) thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer)