Browse Source

ec: add diskType parameter to core EC functions

Add diskType parameter to:
- ecBalancer struct
- collectEcVolumeServersByDc()
- collectEcNodesForDC()
- collectEcNodes()
- EcBalance()

This allows EC operations to target specific disk types (hdd, ssd, etc.)
instead of being hardcoded to HardDriveType only.

For backward compatibility, all callers currently pass types.HardDriveType
as the default value. Subsequent commits will add -diskType flags to
the individual EC commands.
ec-disk-type-support
chrislusf 2 days ago
parent
commit
306bc31a28
  1. 4
      weed/shell/command_ec_balance.go
  2. 18
      weed/shell/command_ec_common.go
  3. 6
      weed/shell/command_ec_common_test.go
  4. 2
      weed/shell/command_ec_encode.go
  5. 3
      weed/shell/command_ec_rebuild.go
  6. 2
      weed/shell/command_volume_server_evacuate.go

4
weed/shell/command_ec_balance.go

@ -4,6 +4,8 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
) )
func init() { func init() {
@ -67,5 +69,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return err return err
} }
return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing)
return EcBalance(commandEnv, collections, *dc, rp, types.HardDriveType, *maxParallelization, *applyBalancing)
} }

18
weed/shell/command_ec_common.go

@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura
} }
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations // list all possible locations
// collect topology information // collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN
} }
// find out all volume servers with one slot left. // find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType)
sortEcNodesByFreeslotsDescending(ecNodes) sortEcNodesByFreeslotsDescending(ecNodes)
return return
} }
func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "")
func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "", diskType)
} }
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0 return 0
} }
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) { if selectedDataCenter != "" && selectedDataCenter != string(dc) {
return return
} }
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
freeEcSlots := countFreeShardSlots(dn, diskType)
ecNode := &EcNode{ ecNode := &EcNode{
info: dn, info: dn,
dc: dc, dc: dc,
@ -649,6 +649,7 @@ type ecBalancer struct {
replicaPlacement *super_block.ReplicaPlacement replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool applyBalancing bool
maxParallelization int maxParallelization int
diskType types.DiskType // target disk type for EC shards (default: HardDriveType)
} }
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
@ -1194,9 +1195,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations return vidLocations
} }
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) {
// collect all ec nodes // collect all ec nodes
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType)
if err != nil { if err != nil {
return err return err
} }
@ -1210,6 +1211,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
replicaPlacement: ecReplicaPlacement, replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing, applyBalancing: applyBalancing,
maxParallelization: maxParallelization, maxParallelization: maxParallelization,
diskType: diskType,
} }
if len(collections) == 0 { if len(collections) == 0 {

6
weed/shell/command_ec_common_test.go

@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) {
func TestEcDistribution(t *testing.T) { func TestEcDistribution(t *testing.T) {
// find out all volume servers with one slot left. // find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "")
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType)
sortEcNodesByFreeslotsDescending(ecNodes) sortEcNodesByFreeslotsDescending(ecNodes)
@ -149,7 +149,7 @@ func TestPickRackToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid) vid, _ := needle.NewVolumeId(tc.vid)
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement)
ecb := &ecBalancer{ ecb := &ecBalancer{
@ -225,7 +225,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid) vid, _ := needle.NewVolumeId(tc.vid)
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
ecb := &ecBalancer{ ecb := &ecBalancer{
ecNodes: allEcNodes, ecNodes: allEcNodes,

2
weed/shell/command_ec_encode.go

@ -138,7 +138,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err)
} }
// ...re-balance ec shards... // ...re-balance ec shards...
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
if err := EcBalance(commandEnv, balanceCollections, "", rp, types.HardDriveType, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err) return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err)
} }
// ...then delete original volumes using pre-collected locations. // ...then delete original volumes using pre-collected locations.

3
weed/shell/command_ec_rebuild.go

@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
) )
func init() { func init() {
@ -96,7 +97,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
} }
// collect all ec nodes // collect all ec nodes
allEcNodes, _, err := collectEcNodes(commandEnv)
allEcNodes, _, err := collectEcNodes(commandEnv, types.HardDriveType)
if err != nil { if err != nil {
return err return err
} }

2
weed/shell/command_volume_server_evacuate.go

@ -158,7 +158,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server // find this ec volume server
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", types.HardDriveType)
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 { if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer) return fmt.Errorf("%s is not found in this cluster\n", volumeServer)

Loading…
Cancel
Save