diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 935348602..4f1069bbb 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "io" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -67,5 +69,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing) + return EcBalance(commandEnv, collections, *dc, rp, types.HardDriveType, *maxParallelization, *applyBalancing) } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f2cc581da..06284edf8 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura } -func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN } // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType) sortEcNodesByFreeslotsDescending(ecNodes) return } -func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - return collectEcNodesForDC(commandEnv, "") +func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + return collectEcNodesForDC(commandEnv, "", diskType) } func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { @@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { +func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if selectedDataCenter != "" && selectedDataCenter != string(dc) { return } - freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) + freeEcSlots := countFreeShardSlots(dn, diskType) ecNode := &EcNode{ info: dn, dc: dc, @@ -649,6 +649,7 @@ type ecBalancer struct { replicaPlacement *super_block.ReplicaPlacement applyBalancing bool maxParallelization int + diskType types.DiskType // target disk type for EC shards (default: HardDriveType) } func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { @@ -1194,9 +1195,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) { // collect all ec nodes - allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc) + allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType) if err != nil { return err } @@ -1210,6 +1211,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, maxParallelization: maxParallelization, + diskType: diskType, } if len(collections) == 0 { diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index f1f460bc6..eac3d7860 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) { func TestEcDistribution(t *testing.T) { // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "") + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType) sortEcNodesByFreeslotsDescending(ecNodes) @@ -149,7 +149,7 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) ecb := &ecBalancer{ @@ -225,7 +225,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) ecb := &ecBalancer{ ecNodes: allEcNodes, diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index d6b6b17b3..7d9536af6 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -138,7 +138,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) } // ...re-balance ec shards... - if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil { + if err := EcBalance(commandEnv, balanceCollections, "", rp, types.HardDriveType, *maxParallelization, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err) } // ...then delete original volumes using pre-collected locations. diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 79acebff1..698705853 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -96,7 +97,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W } // collect all ec nodes - allEcNodes, _, err := collectEcNodes(commandEnv) + allEcNodes, _, err := collectEcNodes(commandEnv, types.HardDriveType) if err != nil { return err } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 6135eb3eb..a61593231 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -158,7 +158,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server - ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", types.HardDriveType) thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer)