Browse Source

ec: add -diskType flag to EC commands for SSD support (#7607)

* ec: add diskType parameter to core EC functions

Add diskType parameter to:
- ecBalancer struct
- collectEcVolumeServersByDc()
- collectEcNodesForDC()
- collectEcNodes()
- EcBalance()

This allows EC operations to target specific disk types (hdd, ssd, etc.)
instead of being hardcoded to HardDriveType only.

For backward compatibility, all callers currently pass types.HardDriveType
as the default value. Subsequent commits will add -diskType flags to
the individual EC commands.

* ec: update helper functions to use configurable diskType

Update the following functions to accept/use diskType parameter:
- findEcVolumeShards()
- addEcVolumeShards()
- deleteEcVolumeShards()
- moveMountedShardToEcNode()
- countShardsByRack()
- pickNEcShardsToMoveFrom()

All ecBalancer methods now use ecb.diskType instead of hardcoded
types.HardDriveType. Non-ecBalancer callers (like volumeServer.evacuate
and ec.rebuild) use types.HardDriveType as the default.

Update all test files to pass diskType where needed.

* ec: add -diskType flag to ec.balance and ec.encode commands

Add -diskType flag to specify the target disk type for EC operations:
- ec.balance -diskType=ssd
- ec.encode -diskType=ssd

The disk type can be 'hdd', 'ssd', or empty for default (hdd).
This allows placing EC shards on SSD or other disk types instead of
only HDD.

Example usage:
  ec.balance -collection=mybucket -diskType=ssd -apply
  ec.encode -collection=mybucket -diskType=ssd -force

* test: add integration tests for EC disk type support

Add integration tests to verify the -diskType flag works correctly:
- TestECDiskTypeSupport: Tests EC encode and balance with SSD disk type
- TestECDiskTypeMixedCluster: Tests EC operations on a mixed HDD/SSD cluster

The tests verify:
- Volume servers can be configured with specific disk types
- ec.encode accepts -diskType flag and encodes to the correct disk type
- ec.balance accepts -diskType flag and balances on the correct disk type
- Mixed disk type clusters work correctly with separate collections

* ec: add -sourceDiskType to ec.encode and -diskType to ec.decode

ec.encode:
- Add -sourceDiskType flag to filter source volumes by disk type
- This enables tier migration scenarios (e.g., SSD volumes → HDD EC shards)
- -diskType specifies target disk type for EC shards

ec.decode:
- Add -diskType flag to specify source disk type where EC shards are stored
- Update collectEcShardIds() and collectEcNodeShardBits() to accept diskType

Examples:
  # Encode SSD volumes to HDD EC shards (tier migration)
  ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd

  # Decode EC shards from SSD
  ec.decode -collection=mybucket -diskType=ssd

Integration tests updated to cover new flags.

* ec: fix variable shadowing and add -diskType to ec.rebuild and volumeServer.evacuate

Address code review comments:

1. Fix variable shadowing in collectEcVolumeServersByDc():
   - Rename loop variable 'diskType' to 'diskTypeKey' and 'diskTypeStr'
     to avoid shadowing the function parameter

2. Fix hardcoded HardDriveType in ecBalancer methods:
   - balanceEcRack(): use ecb.diskType instead of types.HardDriveType
   - collectVolumeIdToEcNodes(): use ecb.diskType

3. Add -diskType flag to ec.rebuild command:
   - Add diskType field to ecRebuilder struct
   - Pass diskType to collectEcNodes() and addEcVolumeShards()

4. Add -diskType flag to volumeServer.evacuate command:
   - Add diskType field to commandVolumeServerEvacuate struct
   - Pass diskType to collectEcVolumeServersByDc() and moveMountedShardToEcNode()

* test: add diskType field to ecBalancer in TestPickEcNodeToBalanceShardsInto

Address nitpick comment: ensure test ecBalancer struct has diskType
field set for consistency with other tests.

* ec: filter disk selection by disk type in pickBestDiskOnNode

When evacuating or rebalancing EC shards, pickBestDiskOnNode now
filters disks by the target disk type. This ensures:

1. EC shards from SSD disks are moved to SSD disks on destination nodes
2. EC shards from HDD disks are moved to HDD disks on destination nodes
3. No cross-disk-type shard movement occurs

This maintains the storage tier isolation when moving EC shards
between nodes during evacuation or rebalancing operations.

* ec: allow disk type fallback during evacuation

Update pickBestDiskOnNode to accept a strictDiskType parameter:

- strictDiskType=true (balancing): Only use disks of matching type.
  This maintains storage tier isolation during normal rebalancing.

- strictDiskType=false (evacuation): Prefer same disk type, but
  fall back to other disk types if no matching disk is available.
  This ensures evacuation can complete even when same-type capacity
  is insufficient.

Priority order for evacuation:
1. Same disk type with lowest shard count (preferred)
2. Different disk type with lowest shard count (fallback)

* test: use defer for lock/unlock to prevent lock leaks

Use defer to ensure locks are always released, even on early returns
or test failures. This prevents lock leaks that could cause subsequent
tests to hang or fail.

Changes:
- Return early if lock acquisition fails
- Immediately defer unlock after successful lock
- Remove redundant explicit unlock calls at end of tests
- Fix unused variable warning (err -> encodeErr/locErr)

* ec: dynamically discover disk types from topology for evacuation

Disk types are free-form tags (e.g., 'ssd', 'nvme', 'archive') that come
from the topology, not a hardcoded set. Only 'hdd' (or empty) is the
default disk type.

Use collectVolumeDiskTypes() to discover all disk types present in the
cluster topology instead of hardcoding [HardDriveType, SsdType].

* test: add evacuation fallback and cross-rack EC placement tests

Add two new integration tests:

1. TestEvacuationFallbackBehavior:
   - Tests that when same disk type has no capacity, shards fall back
     to other disk types during evacuation
   - Creates cluster with 1 SSD + 2 HDD servers (limited SSD capacity)
   - Verifies pickBestDiskOnNode behavior with strictDiskType=false

2. TestCrossRackECPlacement:
   - Tests EC shard distribution across different racks
   - Creates cluster with 4 servers in 4 different racks
   - Verifies shards are spread across multiple racks
   - Tests that ec.balance respects rack placement

Helper functions added:
- startLimitedSsdCluster: 1 SSD + 2 HDD servers
- startMultiRackCluster: 4 servers in 4 racks
- countShardsPerRack: counts EC shards per rack from disk

* test: fix collection mismatch in TestCrossRackECPlacement

The EC commands were using collection 'rack_test' but uploaded test data
uses collection 'test' (default). This caused ec.encode/ec.balance to not
find the uploaded volume.

Fix: Change EC commands to use '-collection test' to match the uploaded data.

Addresses review comment from PR #7607.

* test: close log files in MultiDiskCluster.Stop() to prevent FD leaks

Track log files in MultiDiskCluster.logFiles and close them in Stop()
to prevent file descriptor accumulation in long-running or many-test
scenarios.

Addresses review comment about logging resources cleanup.

* test: improve EC integration tests with proper assertions

- Add assertNoFlagError helper to detect flag parsing regressions
- Update diskType subtests to fail on flag errors (ec.encode, ec.balance, ec.decode)
- Update verify_disktype_flag_parsing to check help output contains diskType
- Remove verify_fallback_disk_selection (was documentation-only, not executable)
- Add assertion to verify_cross_rack_distribution for minimum 2 racks
- Consolidate uploadTestDataWithDiskType to accept collection parameter
- Remove duplicate uploadTestDataWithDiskTypeMixed function

* test: extract captureCommandOutput helper and fix error handling

- Add captureCommandOutput helper to reduce code duplication in diskType tests
- Create commandRunner interface to match shell command Do method
- Update ec_encode_with_ssd_disktype, ec_balance_with_ssd_disktype,
  ec_encode_with_source_disktype, ec_decode_with_disktype to use helper
- Fix filepath.Glob error handling in countShardsPerRack instead of ignoring it

* test: add flag validation to ec_balance_targets_correct_disk_type

Add assertNoFlagError calls after ec.balance commands to ensure
-diskType flag is properly recognized for both SSD and HDD disk types.

* test: add proper assertions for EC command results

- ec_encode_with_ssd_disktype: check for expected volume-related errors
- ec_balance_with_ssd_disktype: require success with require.NoError
- ec_encode_with_source_disktype: check for expected no-volume errors
- ec_decode_with_disktype: check for expected no-ec-volume errors
- upload_to_ssd_and_hdd: use require.NoError for setup validation

Tests now properly fail on unexpected errors rather than just logging.

* test: fix missing unlock in ec_encode_with_disk_awareness

Add defer unlock pattern to ensure lock is always released, matching
the pattern used in other subtests.

* test: improve helper robustness

- Make assertNoFlagError case-insensitive for pattern matching
- Use defer in captureCommandOutput to restore stdout/stderr and close
  pipe ends to avoid FD leaks even if cmd.Do panics
pull/7184/merge
Chris Lu 6 days ago
committed by GitHub
parent
commit
df4f2f7020
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1136
      test/erasure_coding/ec_integration_test.go
  2. 12
      weed/shell/command_ec_balance.go
  3. 118
      weed/shell/command_ec_common.go
  4. 10
      weed/shell/command_ec_common_test.go
  5. 32
      weed/shell/command_ec_decode.go
  6. 32
      weed/shell/command_ec_encode.go
  7. 13
      weed/shell/command_ec_rebuild.go
  8. 8
      weed/shell/command_ec_test.go
  9. 28
      weed/shell/command_volume_server_evacuate.go

1136
test/erasure_coding/ec_integration_test.go
File diff suppressed because it is too large
View File

12
weed/shell/command_ec_balance.go

@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func init() {
@ -20,7 +22,10 @@ func (c *commandEcBalance) Name() string {
func (c *commandEcBalance) Help() string {
return `balance all ec shards among all racks and volume servers
ec.balance [-c EACH_COLLECTION|<collection_name>] [-apply] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>]
ec.balance [-c EACH_COLLECTION|<collection_name>] [-apply] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>] [-diskType <disk_type>]
Options:
-diskType: the disk type for EC shards (hdd, ssd, or empty for default hdd)
Algorithm:
` + ecBalanceAlgorithmDescription
@ -35,6 +40,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
diskTypeStr := balanceCommand.String("diskType", "", "the disk type for EC shards (hdd, ssd, or empty for default hdd)")
maxParallelization := balanceCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyBalancing := balanceCommand.Bool("apply", false, "apply the balancing plan")
// TODO: remove this alias
@ -67,5 +73,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return err
}
return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing)
diskType := types.ToDiskType(*diskTypeStr)
return EcBalance(commandEnv, collections, *dc, rp, diskType, *maxParallelization, *applyBalancing)
}

118
weed/shell/command_ec_common.go

@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura
}
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN
}
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType)
sortEcNodesByFreeslotsDescending(ecNodes)
return
}
func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "")
func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "", diskType)
}
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
@ -242,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@ -280,8 +280,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
}
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType)
existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType)
return nil
@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0
}
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
return
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
freeEcSlots := countFreeShardSlots(dn, diskType)
ecNode := &EcNode{
info: dn,
dc: dc,
@ -439,17 +439,17 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
// Build disk-level information from volumes and EC shards
// First, discover all unique disk IDs from VolumeInfos (includes empty disks)
allDiskIds := make(map[uint32]string) // diskId -> diskType
for diskType, diskInfo := range dn.DiskInfos {
for diskTypeKey, diskInfo := range dn.DiskInfos {
if diskInfo == nil {
continue
}
// Get all disk IDs from volumes
for _, vi := range diskInfo.VolumeInfos {
allDiskIds[vi.DiskId] = diskType
allDiskIds[vi.DiskId] = diskTypeKey
}
// Also get disk IDs from EC shards
for _, ecShardInfo := range diskInfo.EcShardInfos {
allDiskIds[ecShardInfo.DiskId] = diskType
allDiskIds[ecShardInfo.DiskId] = diskTypeKey
}
}
@ -476,7 +476,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freePerDisk := int(freeEcSlots) / diskCount
for diskId, diskType := range allDiskIds {
for diskId, diskTypeStr := range allDiskIds {
shards := diskShards[diskId]
if shards == nil {
shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
@ -488,7 +488,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
ecNode.disks[diskId] = &EcDisk{
diskId: diskId,
diskType: diskType,
diskType: diskTypeStr,
freeEcSlots: freePerDisk,
ecShardCount: totalShardCount,
ecShards: shards,
@ -551,9 +551,9 @@ func ceilDivide(a, b int) int {
return (a / b) + r
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
@ -564,10 +564,10 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
return 0
}
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode {
foundVolume := false
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(diskType)]
if found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
@ -584,9 +584,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
}
} else {
diskInfo = &master_pb.DiskInfo{
Type: string(types.HardDriveType),
Type: string(diskType),
}
ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
ecNode.info.DiskInfos[string(diskType)] = diskInfo
}
if !foundVolume {
@ -598,7 +598,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
DiskType: string(types.HardDriveType),
DiskType: string(diskType),
})
ecNode.freeEcSlot -= len(shardIds)
}
@ -606,9 +606,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
return ecNode
}
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
@ -649,6 +649,7 @@ type ecBalancer struct {
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
maxParallelization int
diskType types.DiskType // target disk type for EC shards (default: HardDriveType)
}
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
@ -705,7 +706,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
// Use MaxShardCount (32) to support custom EC ratios
shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
for _, shardId := range shardBits.ShardIds() {
shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
}
@ -728,7 +729,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds, ecb.diskType)
}
}
return nil
@ -748,9 +749,9 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
return ewg.Wait()
}
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int {
return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, diskType)
return string(ecNode.rack), shardBits.ShardIdCount()
})
}
@ -759,7 +760,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
racks := ecb.racks()
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
// Calculate actual total shards for this volume (not hardcoded default)
var totalShardsForVolume int
@ -779,7 +780,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
continue
}
possibleEcNodes := rackEcNodesWithVid[rackId]
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) {
ecShardsToMove[shardId] = ecNode
}
}
@ -856,7 +857,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
@ -865,7 +866,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
var possibleDestinationEcNodes []*EcNode
for _, n := range racks[RackId(rackId)].ecNodes {
if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
if _, found := n.info.DiskInfos[string(ecb.diskType)]; found {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
}
@ -882,7 +883,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
for _, ecNode := range existingLocations {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
for _, shardId := range shardBits.ShardIds() {
@ -927,7 +928,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
}
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
return
}
@ -955,17 +956,18 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
emptyNodeIds := make(map[uint32]bool)
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range emptyDiskInfo.EcShardInfos {
emptyNodeIds[shards.Id] = true
}
}
if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
vid := needle.VolumeId(shards.Id)
destDiskId := pickBestDiskOnNode(emptyNode, vid)
// For balancing, strictly require matching disk type
destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true)
if destDiskId > 0 {
fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
@ -973,7 +975,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
}
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType)
if err != nil {
return err
}
@ -1003,7 +1005,7 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
nodeShards := map[*EcNode]int{}
for _, node := range possibleDestinations {
nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount()
}
targets := []*EcNode{}
@ -1078,14 +1080,17 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
}
// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
// It prefers disks with fewer shards and more free slots
func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
// It prefers disks of the specified type with fewer shards and more free slots
// If strictDiskType is false, it will fall back to other disk types if no matching disk is found
func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool) uint32 {
if len(ecNode.disks) == 0 {
return 0 // No disk info available, let the server decide
}
var bestDiskId uint32
bestScore := -1
var fallbackDiskId uint32
fallbackScore := -1
for diskId, disk := range ecNode.disks {
if disk.freeEcSlots <= 0 {
@ -1102,14 +1107,27 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
// Lower score is better
score := disk.ecShardCount*10 + existingShards*100
if disk.diskType == string(diskType) {
// Matching disk type - this is preferred
if bestScore == -1 || score < bestScore {
bestScore = score
bestDiskId = diskId
}
} else if !strictDiskType {
// Non-matching disk type - use as fallback if allowed
if fallbackScore == -1 || score < fallbackScore {
fallbackScore = score
fallbackDiskId = diskId
}
}
}
// Return matching disk type if found, otherwise fallback
if bestDiskId != 0 {
return bestDiskId
}
return fallbackDiskId
}
// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) {
@ -1118,7 +1136,8 @@ func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId,
return nil, 0, err
}
diskId := pickBestDiskOnNode(node, vid)
// For balancing, strictly require matching disk type
diskId := pickBestDiskOnNode(node, vid, ecb.diskType, true)
return node, diskId, nil
}
@ -1134,14 +1153,14 @@ func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, co
} else {
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
}
return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing, ecb.diskType)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, diskType types.DiskType) map[erasure_coding.ShardId]*EcNode {
picked := make(map[erasure_coding.ShardId]*EcNode)
var candidateEcNodes []*CandidateEcNode
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, diskType)
if shardBits.ShardIdCount() > 0 {
candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
ecNode: ecNode,
@ -1155,13 +1174,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
for i := 0; i < n; i++ {
selectedEcNodeIndex := -1
for i, candidateEcNode := range candidateEcNodes {
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType)
if shardBits > 0 {
selectedEcNodeIndex = i
for _, shardId := range shardBits.ShardIds() {
candidateEcNode.shardCount--
picked[shardId] = candidateEcNode.ecNode
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType)
break
}
break
@ -1180,7 +1199,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
vidLocations := make(map[needle.VolumeId][]*EcNode)
for _, ecNode := range ecb.ecNodes {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
continue
}
@ -1194,9 +1213,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
}
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) {
// collect all ec nodes
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType)
if err != nil {
return err
}
@ -1210,6 +1229,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
maxParallelization: maxParallelization,
diskType: diskType,
}
if len(collections) == 0 {

10
weed/shell/command_ec_common_test.go

@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) {
func TestEcDistribution(t *testing.T) {
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "")
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType)
sortEcNodesByFreeslotsDescending(ecNodes)
@ -149,16 +149,17 @@ func TestPickRackToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid)
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement)
ecb := &ecBalancer{
ecNodes: ecNodes,
replicaPlacement: rp,
diskType: types.HardDriveType,
}
racks := ecb.racks()
rackToShardCount := countShardsByRack(vid, ecNodes)
rackToShardCount := countShardsByRack(vid, ecNodes, types.HardDriveType)
got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
if err := errorCheck(gotErr, tc.wantErr); err != nil {
@ -225,10 +226,11 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid)
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: allEcNodes,
diskType: types.HardDriveType,
}
// Resolve target node by name

32
weed/shell/command_ec_decode.go

@ -32,13 +32,23 @@ func (c *commandEcDecode) Name() string {
func (c *commandEcDecode) Help() string {
return `decode a erasure coded volume into a normal volume
ec.decode [-collection=""] [-volumeId=<volume_id>]
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>]
The -collection parameter supports regular expressions for pattern matching:
- Use exact match: ec.decode -collection="^mybucket$"
- Match multiple buckets: ec.decode -collection="bucket.*"
- Match all collections: ec.decode -collection=".*"
Options:
-diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)
Examples:
# Decode EC shards from HDD (default)
ec.decode -collection=mybucket
# Decode EC shards from SSD
ec.decode -collection=mybucket -diskType=ssd
`
}
@ -50,6 +60,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
collection := decodeCommand.String("collection", "", "the collection name")
diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)")
if err = decodeCommand.Parse(args); err != nil {
return nil
}
@ -59,6 +70,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
vid := needle.VolumeId(*volumeId)
diskType := types.ToDiskType(*diskTypeStr)
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -68,17 +80,17 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
// volumeId is provided
if vid != 0 {
return doEcDecode(commandEnv, topologyInfo, *collection, vid)
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType)
}
// apply to all volumes in the collection
volumeIds, err := collectEcShardIds(topologyInfo, *collection)
volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType)
if err != nil {
return err
}
fmt.Printf("ec decode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil {
return err
}
}
@ -86,14 +98,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
// find volume location
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType)
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
@ -248,7 +260,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
return resp.VolumeIdLocations, nil
}
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string) (vids []needle.VolumeId, err error) {
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) {
// compile regex pattern for collection matching
collectionRegex, err := compileCollectionPattern(collectionPattern)
if err != nil {
@ -257,7 +269,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
vidMap := make(map[uint32]bool)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
for _, v := range diskInfo.EcShardInfos {
if collectionRegex.MatchString(v.Collection) {
vidMap[v.Id] = true
@ -273,11 +285,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
return
}
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits {
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)

32
weed/shell/command_ec_encode.go

@ -37,8 +37,8 @@ func (c *commandEcEncode) Name() string {
func (c *commandEcEncode) Help() string {
return `apply erasure coding to a volume
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose]
ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose]
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-sourceDiskType=<disk_type>] [-diskType=<disk_type>]
ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose] [-diskType=<disk_type>]
This command will:
1. freeze one volume
@ -61,6 +61,18 @@ func (c *commandEcEncode) Help() string {
Options:
-verbose: show detailed reasons why volumes are not selected for encoding
-sourceDiskType: filter source volumes by disk type (hdd, ssd, or empty for all)
-diskType: target disk type for EC shards (hdd, ssd, or empty for default hdd)
Examples:
# Encode SSD volumes to SSD EC shards (same tier)
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=ssd
# Encode SSD volumes to HDD EC shards (tier migration to cheaper storage)
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd
# Encode all volumes to SSD EC shards
ec.encode -collection=mybucket -diskType=ssd
Re-balancing algorithm:
` + ecBalanceAlgorithmDescription
@ -80,6 +92,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)")
diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)")
applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding")
@ -94,6 +108,16 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return err
}
// Parse source disk type filter (optional)
var sourceDiskType *types.DiskType
if *sourceDiskTypeStr != "" {
sdt := types.ToDiskType(*sourceDiskTypeStr)
sourceDiskType = &sdt
}
// Parse target disk type for EC shards
diskType := types.ToDiskType(*diskTypeStr)
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
@ -119,7 +143,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
} else {
// apply to all volumes for the given collection pattern (regex)
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, sourceDiskType, *fullPercentage, *quietPeriod, *verbose)
if err != nil {
return err
}
@ -142,7 +166,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err)
}
// ...re-balance ec shards...
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
if err := EcBalance(commandEnv, balanceCollections, "", rp, diskType, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err)
}
// ...then delete original volumes using pre-collected locations.

13
weed/shell/command_ec_rebuild.go

@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func init() {
@ -24,6 +25,7 @@ type ecRebuilder struct {
writer io.Writer
applyChanges bool
collections []string
diskType types.DiskType
ewg *ErrorWaitGroup
ecNodesMu sync.Mutex
@ -39,7 +41,7 @@ func (c *commandEcRebuild) Name() string {
func (c *commandEcRebuild) Help() string {
return `find and rebuild missing ec shards among volume servers
ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N]
ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N] [-diskType=<disk_type>]
Options:
-collection: specify a collection name, or "EACH_COLLECTION" to process all collections
@ -47,6 +49,7 @@ func (c *commandEcRebuild) Help() string {
-maxParallelization: number of volumes to rebuild concurrently (default: 10)
Increase for faster rebuilds with more system resources.
Decrease if experiencing resource contention or instability.
-diskType: disk type for EC shards (hdd, ssd, or empty for default hdd)
Algorithm:
@ -83,6 +86,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyChanges := fixCommand.Bool("apply", false, "apply the changes")
diskTypeStr := fixCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)")
// TODO: remove this alias
applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)")
if err = fixCommand.Parse(args); err != nil {
@ -95,8 +99,10 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
return
}
diskType := types.ToDiskType(*diskTypeStr)
// collect all ec nodes
allEcNodes, _, err := collectEcNodes(commandEnv)
allEcNodes, _, err := collectEcNodes(commandEnv, diskType)
if err != nil {
return err
}
@ -117,6 +123,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
writer: writer,
applyChanges: *applyChanges,
collections: collections,
diskType: diskType,
ewg: NewErrorWaitGroup(*maxParallelization),
}
@ -294,7 +301,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
// ensure ECNode updates are atomic
erb.ecNodesMu.Lock()
defer erb.ecNodesMu.Unlock()
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds)
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, erb.diskType)
return nil
}

8
weed/shell/command_ec_test.go

@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestCommandEcBalanceSmall(t *testing.T) {
@ -14,6 +15,7 @@ func TestCommandEcBalanceSmall(t *testing.T) {
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -30,6 +32,7 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) {
addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -48,6 +51,7 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) {
newEcNode("dc1", "rack1", "dn4", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -66,6 +70,7 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) {
newEcNode("dc1", "rack2", "dn4", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -109,6 +114,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
newEcNode("dc1", "rack1", "dn3", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -128,5 +134,5 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod
}
func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode {
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds)
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType)
}

28
weed/shell/command_volume_server_evacuate.go

@ -156,21 +156,30 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
}
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
// Evacuate EC volumes for all disk types discovered from topology
// Disk types are free-form tags (e.g., "", "hdd", "ssd", "nvme", etc.)
// We need to handle each disk type separately because shards should be moved to nodes with the same disk type
// We collect topology once at the start and track capacity changes ourselves
// (via freeEcSlot decrement after each move) rather than repeatedly refreshing,
// which would give a false sense of correctness since topology could be stale.
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
diskTypes := collectVolumeDiskTypes(c.topologyInfo)
for _, diskType := range diskTypes {
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", diskType)
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
// This server doesn't have EC shards for this disk type, skip
continue
}
// move away ec volumes
// move away ec volumes for this disk type
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
diskInfo, found := thisNode.info.DiskInfos[string(diskType)]
if !found {
continue
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, writer)
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, diskType, writer)
if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v\n", ecShardInfo.Id, volumeServer, err)
}
@ -187,7 +196,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv,
return nil
}
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, writer io.Writer) (hasMoved bool, err error) {
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType, writer io.Writer) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
// Sort by: 1) fewest shards of this volume, 2) most free EC slots
@ -217,13 +226,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
collectionPrefix = ecShardInfo.Collection + "_"
}
vid := needle.VolumeId(ecShardInfo.Id)
destDiskId := pickBestDiskOnNode(emptyNode, vid)
// For evacuation, prefer same disk type but allow fallback to other types
destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false)
if destDiskId > 0 {
fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
} else {
fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, diskType)
if err != nil {
hasMoved = false
return

Loading…
Cancel
Save