From dc8a0fdf7729ad9f1bee6d52eb0744c176b2f34f Mon Sep 17 00:00:00 2001 From: chrislusf Date: Tue, 2 Dec 2025 21:30:27 -0800 Subject: [PATCH] ec: fix variable shadowing and add -diskType to ec.rebuild and volumeServer.evacuate Address code review comments: 1. Fix variable shadowing in collectEcVolumeServersByDc(): - Rename loop variable 'diskType' to 'diskTypeKey' and 'diskTypeStr' to avoid shadowing the function parameter 2. Fix hardcoded HardDriveType in ecBalancer methods: - balanceEcRack(): use ecb.diskType instead of types.HardDriveType - collectVolumeIdToEcNodes(): use ecb.diskType 3. Add -diskType flag to ec.rebuild command: - Add diskType field to ecRebuilder struct - Pass diskType to collectEcNodes() and addEcVolumeShards() 4. Add -diskType flag to volumeServer.evacuate command: - Add diskType field to commandVolumeServerEvacuate struct - Pass diskType to collectEcVolumeServersByDc() and moveMountedShardToEcNode() --- weed/shell/command_ec_common.go | 18 +++++++++--------- weed/shell/command_ec_rebuild.go | 12 +++++++++--- weed/shell/command_volume_server_evacuate.go | 13 ++++++++++--- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index aa9b49cb1..3c78dd3c1 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -439,17 +439,17 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter // Build disk-level information from volumes and EC shards // First, discover all unique disk IDs from VolumeInfos (includes empty disks) allDiskIds := make(map[uint32]string) // diskId -> diskType - for diskType, diskInfo := range dn.DiskInfos { + for diskTypeKey, diskInfo := range dn.DiskInfos { if diskInfo == nil { continue } // Get all disk IDs from volumes for _, vi := range diskInfo.VolumeInfos { - allDiskIds[vi.DiskId] = diskType + allDiskIds[vi.DiskId] = diskTypeKey } // Also get disk IDs from EC shards for _, ecShardInfo := range diskInfo.EcShardInfos { - allDiskIds[ecShardInfo.DiskId] = diskType + allDiskIds[ecShardInfo.DiskId] = diskTypeKey } } @@ -476,7 +476,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter } freePerDisk := int(freeEcSlots) / diskCount - for diskId, diskType := range allDiskIds { + for diskId, diskTypeStr := range allDiskIds { shards := diskShards[diskId] if shards == nil { shards = make(map[needle.VolumeId]erasure_coding.ShardBits) @@ -488,7 +488,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter ecNode.disks[diskId] = &EcDisk{ diskId: diskId, - diskType: diskType, + diskType: diskTypeStr, freeEcSlots: freePerDisk, ecShardCount: totalShardCount, ecShards: shards, @@ -928,7 +928,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { } ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) { - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)] if !found { return } @@ -956,12 +956,12 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount { emptyNodeIds := make(map[uint32]bool) - if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found { + if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range emptyDiskInfo.EcShardInfos { emptyNodeIds[shards.Id] = true } } - if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found { + if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range fullDiskInfo.EcShardInfos { if _, found := emptyNodeIds[shards.Id]; !found { for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { @@ -1181,7 +1181,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, disk func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode { vidLocations := make(map[needle.VolumeId][]*EcNode) for _, ecNode := range ecb.ecNodes { - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)] if !found { continue } diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 97c689248..cfc895c7d 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -25,6 +25,7 @@ type ecRebuilder struct { writer io.Writer applyChanges bool collections []string + diskType types.DiskType ewg *ErrorWaitGroup ecNodesMu sync.Mutex @@ -40,7 +41,7 @@ func (c *commandEcRebuild) Name() string { func (c *commandEcRebuild) Help() string { return `find and rebuild missing ec shards among volume servers - ec.rebuild [-c EACH_COLLECTION|] [-apply] [-maxParallelization N] + ec.rebuild [-c EACH_COLLECTION|] [-apply] [-maxParallelization N] [-diskType=] Options: -collection: specify a collection name, or "EACH_COLLECTION" to process all collections @@ -48,6 +49,7 @@ func (c *commandEcRebuild) Help() string { -maxParallelization: number of volumes to rebuild concurrently (default: 10) Increase for faster rebuilds with more system resources. Decrease if experiencing resource contention or instability. + -diskType: disk type for EC shards (hdd, ssd, or empty for default hdd) Algorithm: @@ -84,6 +86,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") applyChanges := fixCommand.Bool("apply", false, "apply the changes") + diskTypeStr := fixCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)") // TODO: remove this alias applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)") if err = fixCommand.Parse(args); err != nil { @@ -96,8 +99,10 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W return } + diskType := types.ToDiskType(*diskTypeStr) + // collect all ec nodes - allEcNodes, _, err := collectEcNodes(commandEnv, types.HardDriveType) + allEcNodes, _, err := collectEcNodes(commandEnv, diskType) if err != nil { return err } @@ -118,6 +123,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W writer: writer, applyChanges: *applyChanges, collections: collections, + diskType: diskType, ewg: NewErrorWaitGroup(*maxParallelization), } @@ -295,7 +301,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo // ensure ECNode updates are atomic erb.ecNodesMu.Lock() defer erb.ecNodesMu.Unlock() - rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, types.HardDriveType) + rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, erb.diskType) return nil } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index a074f4ff2..610de6c73 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -23,6 +23,7 @@ type commandVolumeServerEvacuate struct { topologyInfo *master_pb.TopologyInfo targetServer *string volumeRack *string + diskType types.DiskType } func (c *commandVolumeServerEvacuate) Name() string { @@ -32,7 +33,7 @@ func (c *commandVolumeServerEvacuate) Name() string { func (c *commandVolumeServerEvacuate) Help() string { return `move out all data on a volume server - volumeServer.evacuate -node + volumeServer.evacuate -node [-diskType=] This command moves all data away from the volume server. The volumes on the volume servers will be redistributed. @@ -44,6 +45,9 @@ func (c *commandVolumeServerEvacuate) Help() string { E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved. You can use "-skipNonMoveable" to move the rest volumes. + Options: + -diskType: disk type for EC shards (hdd, ssd, or empty for default hdd) + ` } @@ -59,6 +63,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, c.targetServer = vsEvacuateCommand.String("target", "", ": of target volume") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("apply", false, "actually apply the changes") + diskTypeStr := vsEvacuateCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)") // TODO: remove this alias applyChangeAlias := vsEvacuateCommand.Bool("force", false, "actually apply the changes (alias for -apply)") retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry") @@ -69,6 +74,8 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, handleDeprecatedForceFlag(writer, vsEvacuateCommand, applyChangeAlias, applyChange) infoAboutSimulationMode(writer, *applyChange, "-apply") + c.diskType = types.ToDiskType(*diskTypeStr) + if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange { return } @@ -158,7 +165,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server - ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", types.HardDriveType) + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", c.diskType) thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer) @@ -204,7 +211,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv } else { fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) } - err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, types.HardDriveType) + err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, c.diskType) if err != nil { return } else {