Browse Source

ec: fix variable shadowing and add -diskType to ec.rebuild and volumeServer.evacuate

Address code review comments:

1. Fix variable shadowing in collectEcVolumeServersByDc():
   - Rename loop variable 'diskType' to 'diskTypeKey' and 'diskTypeStr'
     to avoid shadowing the function parameter

2. Fix hardcoded HardDriveType in ecBalancer methods:
   - balanceEcRack(): use ecb.diskType instead of types.HardDriveType
   - collectVolumeIdToEcNodes(): use ecb.diskType

3. Add -diskType flag to ec.rebuild command:
   - Add diskType field to ecRebuilder struct
   - Pass diskType to collectEcNodes() and addEcVolumeShards()

4. Add -diskType flag to volumeServer.evacuate command:
   - Add diskType field to commandVolumeServerEvacuate struct
   - Pass diskType to collectEcVolumeServersByDc() and moveMountedShardToEcNode()
ec-disk-type-support
chrislusf 17 hours ago
parent
commit
dc8a0fdf77
  1. 18
      weed/shell/command_ec_common.go
  2. 12
      weed/shell/command_ec_rebuild.go
  3. 13
      weed/shell/command_volume_server_evacuate.go

18
weed/shell/command_ec_common.go

@ -439,17 +439,17 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
// Build disk-level information from volumes and EC shards
// First, discover all unique disk IDs from VolumeInfos (includes empty disks)
allDiskIds := make(map[uint32]string) // diskId -> diskType
for diskType, diskInfo := range dn.DiskInfos {
for diskTypeKey, diskInfo := range dn.DiskInfos {
if diskInfo == nil {
continue
}
// Get all disk IDs from volumes
for _, vi := range diskInfo.VolumeInfos {
allDiskIds[vi.DiskId] = diskType
allDiskIds[vi.DiskId] = diskTypeKey
}
// Also get disk IDs from EC shards
for _, ecShardInfo := range diskInfo.EcShardInfos {
allDiskIds[ecShardInfo.DiskId] = diskType
allDiskIds[ecShardInfo.DiskId] = diskTypeKey
}
}
@ -476,7 +476,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freePerDisk := int(freeEcSlots) / diskCount
for diskId, diskType := range allDiskIds {
for diskId, diskTypeStr := range allDiskIds {
shards := diskShards[diskId]
if shards == nil {
shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
@ -488,7 +488,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
ecNode.disks[diskId] = &EcDisk{
diskId: diskId,
diskType: diskType,
diskType: diskTypeStr,
freeEcSlots: freePerDisk,
ecShardCount: totalShardCount,
ecShards: shards,
@ -928,7 +928,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
}
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
return
}
@ -956,12 +956,12 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
emptyNodeIds := make(map[uint32]bool)
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range emptyDiskInfo.EcShardInfos {
emptyNodeIds[shards.Id] = true
}
}
if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
@ -1181,7 +1181,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, disk
func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
vidLocations := make(map[needle.VolumeId][]*EcNode)
for _, ecNode := range ecb.ecNodes {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
continue
}

12
weed/shell/command_ec_rebuild.go

@ -25,6 +25,7 @@ type ecRebuilder struct {
writer io.Writer
applyChanges bool
collections []string
diskType types.DiskType
ewg *ErrorWaitGroup
ecNodesMu sync.Mutex
@ -40,7 +41,7 @@ func (c *commandEcRebuild) Name() string {
func (c *commandEcRebuild) Help() string {
return `find and rebuild missing ec shards among volume servers
ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N]
ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N] [-diskType=<disk_type>]
Options:
-collection: specify a collection name, or "EACH_COLLECTION" to process all collections
@ -48,6 +49,7 @@ func (c *commandEcRebuild) Help() string {
-maxParallelization: number of volumes to rebuild concurrently (default: 10)
Increase for faster rebuilds with more system resources.
Decrease if experiencing resource contention or instability.
-diskType: disk type for EC shards (hdd, ssd, or empty for default hdd)
Algorithm:
@ -84,6 +86,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyChanges := fixCommand.Bool("apply", false, "apply the changes")
diskTypeStr := fixCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)")
// TODO: remove this alias
applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)")
if err = fixCommand.Parse(args); err != nil {
@ -96,8 +99,10 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
return
}
diskType := types.ToDiskType(*diskTypeStr)
// collect all ec nodes
allEcNodes, _, err := collectEcNodes(commandEnv, types.HardDriveType)
allEcNodes, _, err := collectEcNodes(commandEnv, diskType)
if err != nil {
return err
}
@ -118,6 +123,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
writer: writer,
applyChanges: *applyChanges,
collections: collections,
diskType: diskType,
ewg: NewErrorWaitGroup(*maxParallelization),
}
@ -295,7 +301,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
// ensure ECNode updates are atomic
erb.ecNodesMu.Lock()
defer erb.ecNodesMu.Unlock()
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, types.HardDriveType)
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, erb.diskType)
return nil
}

13
weed/shell/command_volume_server_evacuate.go

@ -23,6 +23,7 @@ type commandVolumeServerEvacuate struct {
topologyInfo *master_pb.TopologyInfo
targetServer *string
volumeRack *string
diskType types.DiskType
}
func (c *commandVolumeServerEvacuate) Name() string {
@ -32,7 +33,7 @@ func (c *commandVolumeServerEvacuate) Name() string {
func (c *commandVolumeServerEvacuate) Help() string {
return `move out all data on a volume server
volumeServer.evacuate -node <host:port>
volumeServer.evacuate -node <host:port> [-diskType=<disk_type>]
This command moves all data away from the volume server.
The volumes on the volume servers will be redistributed.
@ -44,6 +45,9 @@ func (c *commandVolumeServerEvacuate) Help() string {
E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
You can use "-skipNonMoveable" to move the rest volumes.
Options:
-diskType: disk type for EC shards (hdd, ssd, or empty for default hdd)
`
}
@ -59,6 +63,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
c.targetServer = vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
applyChange := vsEvacuateCommand.Bool("apply", false, "actually apply the changes")
diskTypeStr := vsEvacuateCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)")
// TODO: remove this alias
applyChangeAlias := vsEvacuateCommand.Bool("force", false, "actually apply the changes (alias for -apply)")
retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
@ -69,6 +74,8 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
handleDeprecatedForceFlag(writer, vsEvacuateCommand, applyChangeAlias, applyChange)
infoAboutSimulationMode(writer, *applyChange, "-apply")
c.diskType = types.ToDiskType(*diskTypeStr)
if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
return
}
@ -158,7 +165,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", types.HardDriveType)
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", c.diskType)
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
@ -204,7 +211,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
} else {
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, types.HardDriveType)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, c.diskType)
if err != nil {
return
} else {

Loading…
Cancel
Save