Browse Source

ec: add -sourceDiskType to ec.encode and -diskType to ec.decode

ec.encode:
- Add -sourceDiskType flag to filter source volumes by disk type
- This enables tier migration scenarios (e.g., SSD volumes → HDD EC shards)
- -diskType specifies target disk type for EC shards

ec.decode:
- Add -diskType flag to specify source disk type where EC shards are stored
- Update collectEcShardIds() and collectEcNodeShardBits() to accept diskType

Examples:
  # Encode SSD volumes to HDD EC shards (tier migration)
  ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd

  # Decode EC shards from SSD
  ec.decode -collection=mybucket -diskType=ssd

Integration tests updated to cover new flags.
ec-disk-type-support
chrislusf 1 day ago
parent
commit
cbd9ca68bb
  1. 90
      test/erasure_coding/ec_integration_test.go
  2. 32
      weed/shell/command_ec_decode.go
  3. 28
      weed/shell/command_ec_encode.go

90
test/erasure_coding/ec_integration_test.go

@ -1249,11 +1249,99 @@ func TestECDiskTypeSupport(t *testing.T) {
// This ensures the command accepts the -diskType flag without errors // This ensures the command accepts the -diskType flag without errors
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
// Test help output contains diskType // Test help output contains diskType
assert.NotNil(t, ecEncodeCmd, "ec.encode command should exist") assert.NotNil(t, ecEncodeCmd, "ec.encode command should exist")
assert.NotNil(t, ecBalanceCmd, "ec.balance command should exist") assert.NotNil(t, ecBalanceCmd, "ec.balance command should exist")
t.Log("Both ec.encode and ec.balance commands support -diskType flag")
assert.NotNil(t, ecDecodeCmd, "ec.decode command should exist")
t.Log("ec.encode, ec.balance, and ec.decode commands all support -diskType flag")
})
t.Run("ec_encode_with_source_disktype", func(t *testing.T) {
// Test that -sourceDiskType flag is accepted
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC encoding with sourceDiskType filter
var output bytes.Buffer
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
args := []string{
"-collection", "ssd_test",
"-sourceDiskType", "ssd", // Filter source volumes by SSD
"-diskType", "ssd", // Place EC shards on SSD
"-force",
}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
encodeErr := ecEncodeCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
t.Logf("EC encode with sourceDiskType output: %s", string(capturedOutput))
// The command should accept the flag even if no volumes match
if encodeErr != nil {
t.Logf("EC encoding with sourceDiskType: %v (expected if no matching volumes)", encodeErr)
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
})
t.Run("ec_decode_with_disktype", func(t *testing.T) {
// Test that ec.decode accepts -diskType flag
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC decode with disk type
var output bytes.Buffer
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
args := []string{
"-collection", "ssd_test",
"-diskType", "ssd", // Source EC shards are on SSD
}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
decodeErr := ecDecodeCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
t.Logf("EC decode with diskType output: %s", string(capturedOutput))
// The command should accept the flag
if decodeErr != nil {
t.Logf("EC decode with diskType: %v (expected if no EC volumes)", decodeErr)
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
}) })
} }

32
weed/shell/command_ec_decode.go

@ -32,13 +32,23 @@ func (c *commandEcDecode) Name() string {
func (c *commandEcDecode) Help() string { func (c *commandEcDecode) Help() string {
return `decode a erasure coded volume into a normal volume return `decode a erasure coded volume into a normal volume
ec.decode [-collection=""] [-volumeId=<volume_id>]
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>]
The -collection parameter supports regular expressions for pattern matching: The -collection parameter supports regular expressions for pattern matching:
- Use exact match: ec.decode -collection="^mybucket$" - Use exact match: ec.decode -collection="^mybucket$"
- Match multiple buckets: ec.decode -collection="bucket.*" - Match multiple buckets: ec.decode -collection="bucket.*"
- Match all collections: ec.decode -collection=".*" - Match all collections: ec.decode -collection=".*"
Options:
-diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)
Examples:
# Decode EC shards from HDD (default)
ec.decode -collection=mybucket
# Decode EC shards from SSD
ec.decode -collection=mybucket -diskType=ssd
` `
} }
@ -50,6 +60,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := decodeCommand.Int("volumeId", 0, "the volume id") volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
collection := decodeCommand.String("collection", "", "the collection name") collection := decodeCommand.String("collection", "", "the collection name")
diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)")
if err = decodeCommand.Parse(args); err != nil { if err = decodeCommand.Parse(args); err != nil {
return nil return nil
} }
@ -59,6 +70,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
} }
vid := needle.VolumeId(*volumeId) vid := needle.VolumeId(*volumeId)
diskType := types.ToDiskType(*diskTypeStr)
// collect topology information // collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -68,17 +80,17 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
// volumeId is provided // volumeId is provided
if vid != 0 { if vid != 0 {
return doEcDecode(commandEnv, topologyInfo, *collection, vid)
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType)
} }
// apply to all volumes in the collection // apply to all volumes in the collection
volumeIds, err := collectEcShardIds(topologyInfo, *collection)
volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("ec decode volumes: %v\n", volumeIds) fmt.Printf("ec decode volumes: %v\n", volumeIds)
for _, vid := range volumeIds { for _, vid := range volumeIds {
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil {
return err return err
} }
} }
@ -86,14 +98,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil return nil
} }
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) {
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost") return fmt.Errorf("lock is lost")
} }
// find volume location // find volume location
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType)
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
@ -248,7 +260,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
return resp.VolumeIdLocations, nil return resp.VolumeIdLocations, nil
} }
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string) (vids []needle.VolumeId, err error) {
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) {
// compile regex pattern for collection matching // compile regex pattern for collection matching
collectionRegex, err := compileCollectionPattern(collectionPattern) collectionRegex, err := compileCollectionPattern(collectionPattern)
if err != nil { if err != nil {
@ -257,7 +269,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
vidMap := make(map[uint32]bool) vidMap := make(map[uint32]bool)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
for _, v := range diskInfo.EcShardInfos { for _, v := range diskInfo.EcShardInfos {
if collectionRegex.MatchString(v.Collection) { if collectionRegex.MatchString(v.Collection) {
vidMap[v.Id] = true vidMap[v.Id] = true
@ -273,11 +285,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
return return
} }
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits {
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
for _, v := range diskInfo.EcShardInfos { for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) { if v.Id == uint32(vid) {
nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits) nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)

28
weed/shell/command_ec_encode.go

@ -37,7 +37,7 @@ func (c *commandEcEncode) Name() string {
func (c *commandEcEncode) Help() string { func (c *commandEcEncode) Help() string {
return `apply erasure coding to a volume return `apply erasure coding to a volume
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-diskType=<disk_type>]
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-sourceDiskType=<disk_type>] [-diskType=<disk_type>]
ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose] [-diskType=<disk_type>] ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose] [-diskType=<disk_type>]
This command will: This command will:
@ -61,7 +61,18 @@ func (c *commandEcEncode) Help() string {
Options: Options:
-verbose: show detailed reasons why volumes are not selected for encoding -verbose: show detailed reasons why volumes are not selected for encoding
-diskType: the disk type for EC shards (hdd, ssd, or empty for default hdd)
-sourceDiskType: filter source volumes by disk type (hdd, ssd, or empty for all)
-diskType: target disk type for EC shards (hdd, ssd, or empty for default hdd)
Examples:
# Encode SSD volumes to SSD EC shards (same tier)
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=ssd
# Encode SSD volumes to HDD EC shards (tier migration to cheaper storage)
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd
# Encode all volumes to SSD EC shards
ec.encode -collection=mybucket -diskType=ssd
Re-balancing algorithm: Re-balancing algorithm:
` + ecBalanceAlgorithmDescription ` + ecBalanceAlgorithmDescription
@ -81,7 +92,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
diskTypeStr := encodeCommand.String("diskType", "", "the disk type for EC shards (hdd, ssd, or empty for default hdd)")
sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)")
diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)")
applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation") applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding") verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding")
@ -96,6 +108,14 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return err return err
} }
// Parse source disk type filter (optional)
var sourceDiskType *types.DiskType
if *sourceDiskTypeStr != "" {
sdt := types.ToDiskType(*sourceDiskTypeStr)
sourceDiskType = &sdt
}
// Parse target disk type for EC shards
diskType := types.ToDiskType(*diskTypeStr) diskType := types.ToDiskType(*diskTypeStr)
// collect topology information // collect topology information
@ -123,7 +143,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds) balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
} else { } else {
// apply to all volumes for the given collection pattern (regex) // apply to all volumes for the given collection pattern (regex)
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, sourceDiskType, *fullPercentage, *quietPeriod, *verbose)
if err != nil { if err != nil {
return err return err
} }

Loading…
Cancel
Save