From cbd9ca68bbc320b98529dcfdb924af595d595942 Mon Sep 17 00:00:00 2001 From: chrislusf Date: Tue, 2 Dec 2025 21:20:59 -0800 Subject: [PATCH] ec: add -sourceDiskType to ec.encode and -diskType to ec.decode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ec.encode: - Add -sourceDiskType flag to filter source volumes by disk type - This enables tier migration scenarios (e.g., SSD volumes → HDD EC shards) - -diskType specifies target disk type for EC shards ec.decode: - Add -diskType flag to specify source disk type where EC shards are stored - Update collectEcShardIds() and collectEcNodeShardBits() to accept diskType Examples: # Encode SSD volumes to HDD EC shards (tier migration) ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd # Decode EC shards from SSD ec.decode -collection=mybucket -diskType=ssd Integration tests updated to cover new flags. --- test/erasure_coding/ec_integration_test.go | 90 +++++++++++++++++++++- weed/shell/command_ec_decode.go | 32 +++++--- weed/shell/command_ec_encode.go | 28 ++++++- 3 files changed, 135 insertions(+), 15 deletions(-) diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index a1c73e706..65287664f 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -1249,11 +1249,99 @@ func TestECDiskTypeSupport(t *testing.T) { // This ensures the command accepts the -diskType flag without errors ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] // Test help output contains diskType assert.NotNil(t, ecEncodeCmd, "ec.encode command should exist") assert.NotNil(t, ecBalanceCmd, "ec.balance command should exist") - t.Log("Both ec.encode and ec.balance commands support -diskType flag") + assert.NotNil(t, ecDecodeCmd, "ec.decode command should exist") + t.Log("ec.encode, ec.balance, and ec.decode commands all support -diskType flag") + }) + + t.Run("ec_encode_with_source_disktype", func(t *testing.T) { + // Test that -sourceDiskType flag is accepted + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + } + + // Execute EC encoding with sourceDiskType filter + var output bytes.Buffer + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-collection", "ssd_test", + "-sourceDiskType", "ssd", // Filter source volumes by SSD + "-diskType", "ssd", // Place EC shards on SSD + "-force", + } + + // Capture output + oldStdout := os.Stdout + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stdout = w + os.Stderr = w + + encodeErr := ecEncodeCmd.Do(args, commandEnv, &output) + + w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + capturedOutput, _ := io.ReadAll(r) + + t.Logf("EC encode with sourceDiskType output: %s", string(capturedOutput)) + // The command should accept the flag even if no volumes match + if encodeErr != nil { + t.Logf("EC encoding with sourceDiskType: %v (expected if no matching volumes)", encodeErr) + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + }) + + t.Run("ec_decode_with_disktype", func(t *testing.T) { + // Test that ec.decode accepts -diskType flag + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + } + + // Execute EC decode with disk type + var output bytes.Buffer + ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] + args := []string{ + "-collection", "ssd_test", + "-diskType", "ssd", // Source EC shards are on SSD + } + + // Capture output + oldStdout := os.Stdout + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stdout = w + os.Stderr = w + + decodeErr := ecDecodeCmd.Do(args, commandEnv, &output) + + w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + capturedOutput, _ := io.ReadAll(r) + + t.Logf("EC decode with diskType output: %s", string(capturedOutput)) + // The command should accept the flag + if decodeErr != nil { + t.Logf("EC decode with diskType: %v (expected if no EC volumes)", decodeErr) + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + unlockCmd.Do([]string{}, commandEnv, &unlockOutput) }) } diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index f1f3bf133..695641a31 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -32,13 +32,23 @@ func (c *commandEcDecode) Name() string { func (c *commandEcDecode) Help() string { return `decode a erasure coded volume into a normal volume - ec.decode [-collection=""] [-volumeId=] + ec.decode [-collection=""] [-volumeId=] [-diskType=] The -collection parameter supports regular expressions for pattern matching: - Use exact match: ec.decode -collection="^mybucket$" - Match multiple buckets: ec.decode -collection="bucket.*" - Match all collections: ec.decode -collection=".*" + Options: + -diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd) + + Examples: + # Decode EC shards from HDD (default) + ec.decode -collection=mybucket + + # Decode EC shards from SSD + ec.decode -collection=mybucket -diskType=ssd + ` } @@ -50,6 +60,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := decodeCommand.Int("volumeId", 0, "the volume id") collection := decodeCommand.String("collection", "", "the collection name") + diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)") if err = decodeCommand.Parse(args); err != nil { return nil } @@ -59,6 +70,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } vid := needle.VolumeId(*volumeId) + diskType := types.ToDiskType(*diskTypeStr) // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -68,17 +80,17 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr // volumeId is provided if vid != 0 { - return doEcDecode(commandEnv, topologyInfo, *collection, vid) + return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType) } // apply to all volumes in the collection - volumeIds, err := collectEcShardIds(topologyInfo, *collection) + volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType) if err != nil { return err } fmt.Printf("ec decode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil { return err } } @@ -86,14 +98,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { +func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } // find volume location - nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid) + nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType) fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) @@ -248,7 +260,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati return resp.VolumeIdLocations, nil } -func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string) (vids []needle.VolumeId, err error) { +func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) { // compile regex pattern for collection matching collectionRegex, err := compileCollectionPattern(collectionPattern) if err != nil { @@ -257,7 +269,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin vidMap := make(map[uint32]bool) eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { - if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := dn.DiskInfos[string(diskType)]; found { for _, v := range diskInfo.EcShardInfos { if collectionRegex.MatchString(v.Collection) { vidMap[v.Id] = true @@ -273,11 +285,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin return } -func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits { +func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits { nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { - if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := dn.DiskInfos[string(diskType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index a2761201e..b60eccdc4 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -37,7 +37,7 @@ func (c *commandEcEncode) Name() string { func (c *commandEcEncode) Help() string { return `apply erasure coding to a volume - ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-diskType=] + ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-sourceDiskType=] [-diskType=] ec.encode [-collection=""] [-volumeId=] [-verbose] [-diskType=] This command will: @@ -61,7 +61,18 @@ func (c *commandEcEncode) Help() string { Options: -verbose: show detailed reasons why volumes are not selected for encoding - -diskType: the disk type for EC shards (hdd, ssd, or empty for default hdd) + -sourceDiskType: filter source volumes by disk type (hdd, ssd, or empty for all) + -diskType: target disk type for EC shards (hdd, ssd, or empty for default hdd) + + Examples: + # Encode SSD volumes to SSD EC shards (same tier) + ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=ssd + + # Encode SSD volumes to HDD EC shards (tier migration to cheaper storage) + ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd + + # Encode all volumes to SSD EC shards + ec.encode -collection=mybucket -diskType=ssd Re-balancing algorithm: ` + ecBalanceAlgorithmDescription @@ -81,7 +92,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") - diskTypeStr := encodeCommand.String("diskType", "", "the disk type for EC shards (hdd, ssd, or empty for default hdd)") + sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)") + diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)") applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation") verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding") @@ -96,6 +108,14 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return err } + // Parse source disk type filter (optional) + var sourceDiskType *types.DiskType + if *sourceDiskTypeStr != "" { + sdt := types.ToDiskType(*sourceDiskTypeStr) + sourceDiskType = &sdt + } + + // Parse target disk type for EC shards diskType := types.ToDiskType(*diskTypeStr) // collect topology information @@ -123,7 +143,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds) } else { // apply to all volumes for the given collection pattern (regex) - volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose) + volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, sourceDiskType, *fullPercentage, *quietPeriod, *verbose) if err != nil { return err }