Browse Source

ec: add diskType parameter to core EC functions

Add diskType parameter to:
- ecBalancer struct
- collectEcVolumeServersByDc()
- collectEcNodesForDC()
- collectEcNodes()
- EcBalance()

This allows EC operations to target specific disk types (hdd, ssd, etc.)
instead of being hardcoded to HardDriveType only.

For backward compatibility, all callers currently pass types.HardDriveType
as the default value. Subsequent commits will add -diskType flags to
the individual EC commands.
pull/7607/head
chrislusf 4 months ago
parent
commit
e05994ca7e
  1. 4
      weed/shell/command_ec_balance.go
  2. 18
      weed/shell/command_ec_common.go
  3. 6
      weed/shell/command_ec_common_test.go
  4. 2
      weed/shell/command_ec_encode.go
  5. 3
      weed/shell/command_ec_rebuild.go
  6. 27
      weed/shell/command_volume_server_evacuate.go

4
weed/shell/command_ec_balance.go

@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func init() {
@ -67,5 +69,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return err
}
return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing)
return EcBalance(commandEnv, collections, *dc, rp, types.HardDriveType, *maxParallelization, *applyBalancing)
}

18
weed/shell/command_ec_common.go

@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura
}
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN
}
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType)
sortEcNodesByFreeslotsDescending(ecNodes)
return
}
func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "")
func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "", diskType)
}
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0
}
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
return
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
freeEcSlots := countFreeShardSlots(dn, diskType)
ecNode := &EcNode{
info: dn,
dc: dc,
@ -649,6 +649,7 @@ type ecBalancer struct {
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
maxParallelization int
diskType types.DiskType // target disk type for EC shards (default: HardDriveType)
}
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
@ -1194,9 +1195,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
}
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) {
// collect all ec nodes
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType)
if err != nil {
return err
}
@ -1210,6 +1211,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
maxParallelization: maxParallelization,
diskType: diskType,
}
if len(collections) == 0 {

6
weed/shell/command_ec_common_test.go

@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) {
func TestEcDistribution(t *testing.T) {
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "")
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType)
sortEcNodesByFreeslotsDescending(ecNodes)
@ -149,7 +149,7 @@ func TestPickRackToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid)
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement)
ecb := &ecBalancer{
@ -225,7 +225,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid)
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: allEcNodes,

2
weed/shell/command_ec_encode.go

@ -142,7 +142,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err)
}
// ...re-balance ec shards...
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
if err := EcBalance(commandEnv, balanceCollections, "", rp, types.HardDriveType, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err)
}
// ...then delete original volumes using pre-collected locations.

3
weed/shell/command_ec_rebuild.go

@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func init() {
@ -96,7 +97,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
}
// collect all ec nodes
allEcNodes, _, err := collectEcNodes(commandEnv)
allEcNodes, _, err := collectEcNodes(commandEnv, types.HardDriveType)
if err != nil {
return err
}

27
weed/shell/command_volume_server_evacuate.go

@ -156,21 +156,29 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
}
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
// Evacuate EC volumes for all disk types
// We need to handle each disk type separately because shards should be moved to nodes with the same disk type
// We collect topology once at the start and track capacity changes ourselves
// (via freeEcSlot decrement after each move) rather than repeatedly refreshing,
// which would give a false sense of correctness since topology could be stale.
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
diskTypes := []types.DiskType{types.HardDriveType, types.SsdType}
for _, diskType := range diskTypes {
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", diskType)
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
// This server doesn't have EC shards for this disk type, skip
continue
}
// move away ec volumes
// move away ec volumes for this disk type
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
diskInfo, found := thisNode.info.DiskInfos[string(diskType)]
if !found {
continue
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, writer)
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, diskType, writer)
if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v\n", ecShardInfo.Id, volumeServer, err)
}
@ -187,7 +195,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv,
return nil
}
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, writer io.Writer) (hasMoved bool, err error) {
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType, writer io.Writer) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
// Sort by: 1) fewest shards of this volume, 2) most free EC slots
@ -217,13 +225,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
collectionPrefix = ecShardInfo.Collection + "_"
}
vid := needle.VolumeId(ecShardInfo.Id)
destDiskId := pickBestDiskOnNode(emptyNode, vid)
// For evacuation, prefer same disk type but allow fallback to other types
destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false)
if destDiskId > 0 {
fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
} else {
fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, diskType)
if err != nil {
hasMoved = false
return

Loading…
Cancel
Save