Browse Source

Merge 2b089065d6 into e9da64f62a

pull/7607/merge
Chris Lu 18 hours ago
committed by GitHub
parent
commit
75376181e4
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 629
      test/erasure_coding/ec_integration_test.go
  2. 12
      weed/shell/command_ec_balance.go
  3. 126
      weed/shell/command_ec_common.go
  4. 12
      weed/shell/command_ec_common_test.go
  5. 32
      weed/shell/command_ec_decode.go
  6. 32
      weed/shell/command_ec_encode.go
  7. 13
      weed/shell/command_ec_rebuild.go
  8. 8
      weed/shell/command_ec_test.go
  9. 35
      weed/shell/command_volume_server_evacuate.go

629
test/erasure_coding/ec_integration_test.go

@ -1082,3 +1082,632 @@ func calculateDiskShardVariance(distribution map[string]map[int]int) float64 {
return math.Sqrt(variance / float64(len(counts)))
}
// TestECDiskTypeSupport tests EC operations with different disk types (HDD, SSD)
// This verifies the -diskType flag works correctly for ec.encode and ec.balance
func TestECDiskTypeSupport(t *testing.T) {
if testing.Short() {
t.Skip("Skipping disk type integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_ec_disktype_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
// Start cluster with SSD disks
cluster, err := startClusterWithDiskType(ctx, testDir, "ssd")
require.NoError(t, err)
defer cluster.Stop()
// Wait for servers to be ready
require.NoError(t, waitForServer("127.0.0.1:9335", 30*time.Second))
for i := 0; i < 3; i++ {
require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:810%d", i), 30*time.Second))
}
// Wait for volume servers to register with master
t.Log("Waiting for SSD volume servers to register with master...")
time.Sleep(10 * time.Second)
// Create command environment
options := &shell.ShellOptions{
Masters: stringPtr("127.0.0.1:9335"),
GrpcDialOption: grpc.WithInsecure(),
FilerGroup: stringPtr("default"),
}
commandEnv := shell.NewCommandEnv(options)
// Connect to master with longer timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
// Wait for master client to fully sync
time.Sleep(5 * time.Second)
// Upload test data to create a volume - retry if volumes not ready
var volumeId needle.VolumeId
testData := []byte("Disk type EC test data - testing SSD support for EC encoding and balancing")
for retry := 0; retry < 5; retry++ {
volumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9335", "ssd")
if err == nil {
break
}
t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err)
time.Sleep(3 * time.Second)
}
require.NoError(t, err, "Failed to upload test data to SSD disk after retries")
t.Logf("Created volume %d on SSD disk for disk type EC test", volumeId)
// Wait for volume to be registered
time.Sleep(3 * time.Second)
t.Run("verify_ssd_disk_setup", func(t *testing.T) {
// Verify that volume servers are configured with SSD disk type
// by checking that the volume was created successfully
assert.NotEqual(t, needle.VolumeId(0), volumeId, "Volume should be created on SSD disk")
t.Logf("Volume %d created successfully on SSD disk", volumeId)
})
t.Run("ec_encode_with_ssd_disktype", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC encoding with SSD disk type
var output bytes.Buffer
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
args := []string{
"-volumeId", fmt.Sprintf("%d", volumeId),
"-collection", "ssd_test",
"-diskType", "ssd",
"-force",
}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
encodeErr := ecEncodeCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
t.Logf("EC encode command output: %s", string(capturedOutput))
t.Logf("EC encode buffer output: %s", output.String())
if encodeErr != nil {
t.Logf("EC encoding with SSD disk type failed: %v", encodeErr)
// The command may fail if volume is too small, but we can check the argument parsing worked
}
// Unlock
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
})
t.Run("ec_balance_with_ssd_disktype", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC balance with SSD disk type
var output bytes.Buffer
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
args := []string{
"-collection", "ssd_test",
"-diskType", "ssd",
}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
balanceErr := ecBalanceCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
t.Logf("EC balance command output: %s", string(capturedOutput))
t.Logf("EC balance buffer output: %s", output.String())
if balanceErr != nil {
t.Logf("EC balance with SSD disk type result: %v", balanceErr)
}
// Unlock
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
})
t.Run("verify_disktype_flag_parsing", func(t *testing.T) {
// Test that disk type flags are correctly parsed
// This ensures the command accepts the -diskType flag without errors
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
// Test help output contains diskType
assert.NotNil(t, ecEncodeCmd, "ec.encode command should exist")
assert.NotNil(t, ecBalanceCmd, "ec.balance command should exist")
assert.NotNil(t, ecDecodeCmd, "ec.decode command should exist")
t.Log("ec.encode, ec.balance, and ec.decode commands all support -diskType flag")
})
t.Run("ec_encode_with_source_disktype", func(t *testing.T) {
// Test that -sourceDiskType flag is accepted
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC encoding with sourceDiskType filter
var output bytes.Buffer
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
args := []string{
"-collection", "ssd_test",
"-sourceDiskType", "ssd", // Filter source volumes by SSD
"-diskType", "ssd", // Place EC shards on SSD
"-force",
}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
encodeErr := ecEncodeCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
t.Logf("EC encode with sourceDiskType output: %s", string(capturedOutput))
// The command should accept the flag even if no volumes match
if encodeErr != nil {
t.Logf("EC encoding with sourceDiskType: %v (expected if no matching volumes)", encodeErr)
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
})
t.Run("ec_decode_with_disktype", func(t *testing.T) {
// Test that ec.decode accepts -diskType flag
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC decode with disk type
var output bytes.Buffer
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
args := []string{
"-collection", "ssd_test",
"-diskType", "ssd", // Source EC shards are on SSD
}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
decodeErr := ecDecodeCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
t.Logf("EC decode with diskType output: %s", string(capturedOutput))
// The command should accept the flag
if decodeErr != nil {
t.Logf("EC decode with diskType: %v (expected if no EC volumes)", decodeErr)
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
})
}
// startClusterWithDiskType starts a SeaweedFS cluster with a specific disk type
func startClusterWithDiskType(ctx context.Context, dataDir string, diskType string) (*MultiDiskCluster, error) {
weedBinary := findWeedBinary()
if weedBinary == "" {
return nil, fmt.Errorf("weed binary not found")
}
cluster := &MultiDiskCluster{testDir: dataDir}
// Create master directory
masterDir := filepath.Join(dataDir, "master")
os.MkdirAll(masterDir, 0755)
// Start master server on a different port to avoid conflict with other tests
masterCmd := exec.CommandContext(ctx, weedBinary, "master",
"-port", "9335",
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
if err != nil {
return nil, fmt.Errorf("failed to create master log file: %v", err)
}
masterCmd.Stdout = masterLogFile
masterCmd.Stderr = masterLogFile
if err := masterCmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start master server: %v", err)
}
cluster.masterCmd = masterCmd
// Wait for master to be ready
time.Sleep(2 * time.Second)
// Start 3 volume servers with the specified disk type
const numServers = 3
for i := 0; i < numServers; i++ {
// Create disk directory for this server
diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, diskType))
if err := os.MkdirAll(diskDir, 0755); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create disk dir: %v", err)
}
port := fmt.Sprintf("810%d", i)
rack := fmt.Sprintf("rack%d", i)
volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
"-port", port,
"-dir", diskDir,
"-max", "10",
"-mserver", "127.0.0.1:9335",
"-ip", "127.0.0.1",
"-dataCenter", "dc1",
"-rack", rack,
"-disk", diskType, // Specify the disk type
)
// Create log file for this volume server
logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i))
os.MkdirAll(logDir, 0755)
volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log"))
if err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create volume log file: %v", err)
}
volumeCmd.Stdout = volumeLogFile
volumeCmd.Stderr = volumeLogFile
if err := volumeCmd.Start(); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to start volume server %d: %v", i, err)
}
cluster.volumeServers = append(cluster.volumeServers, volumeCmd)
}
// Wait for volume servers to register with master
time.Sleep(8 * time.Second)
return cluster, nil
}
// uploadTestDataWithDiskType uploads test data with a specific disk type
func uploadTestDataWithDiskType(data []byte, masterAddress string, diskType string) (needle.VolumeId, error) {
assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
return pb.ServerAddress(masterAddress)
}, grpc.WithInsecure(), &operation.VolumeAssignRequest{
Count: 1,
Collection: "ssd_test",
Replication: "000",
DiskType: diskType,
})
if err != nil {
return 0, err
}
uploader, err := operation.NewUploader()
if err != nil {
return 0, err
}
uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{
UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid,
Filename: "testfile.txt",
MimeType: "text/plain",
})
if err != nil {
return 0, err
}
if uploadResult.Error != "" {
return 0, fmt.Errorf("upload error: %s", uploadResult.Error)
}
fid, err := needle.ParseFileIdFromString(assignResult.Fid)
if err != nil {
return 0, err
}
return fid.VolumeId, nil
}
// TestECDiskTypeMixedCluster tests EC operations on a cluster with mixed disk types
// This verifies that EC shards are correctly placed on the specified disk type
func TestECDiskTypeMixedCluster(t *testing.T) {
if testing.Short() {
t.Skip("Skipping mixed disk type integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_ec_mixed_disktype_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
// Start cluster with mixed disk types (HDD and SSD)
cluster, err := startMixedDiskTypeCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
// Wait for servers to be ready
require.NoError(t, waitForServer("127.0.0.1:9336", 30*time.Second))
for i := 0; i < 4; i++ {
require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:811%d", i), 30*time.Second))
}
// Wait for volume servers to register with master
t.Log("Waiting for mixed disk type volume servers to register with master...")
time.Sleep(10 * time.Second)
// Create command environment
options := &shell.ShellOptions{
Masters: stringPtr("127.0.0.1:9336"),
GrpcDialOption: grpc.WithInsecure(),
FilerGroup: stringPtr("default"),
}
commandEnv := shell.NewCommandEnv(options)
// Connect to master with longer timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
// Wait for master client to fully sync
time.Sleep(5 * time.Second)
t.Run("upload_to_ssd_and_hdd", func(t *testing.T) {
// Upload to SSD
ssdData := []byte("SSD disk type test data for EC encoding")
var ssdVolumeId needle.VolumeId
for retry := 0; retry < 5; retry++ {
ssdVolumeId, err = uploadTestDataWithDiskTypeMixed(ssdData, "127.0.0.1:9336", "ssd", "ssd_collection")
if err == nil {
break
}
t.Logf("SSD upload attempt %d failed: %v, retrying...", retry+1, err)
time.Sleep(3 * time.Second)
}
if err != nil {
t.Logf("Failed to upload to SSD after retries: %v", err)
} else {
t.Logf("Created SSD volume %d", ssdVolumeId)
}
// Upload to HDD (default)
hddData := []byte("HDD disk type test data for EC encoding")
var hddVolumeId needle.VolumeId
for retry := 0; retry < 5; retry++ {
hddVolumeId, err = uploadTestDataWithDiskTypeMixed(hddData, "127.0.0.1:9336", "hdd", "hdd_collection")
if err == nil {
break
}
t.Logf("HDD upload attempt %d failed: %v, retrying...", retry+1, err)
time.Sleep(3 * time.Second)
}
if err != nil {
t.Logf("Failed to upload to HDD after retries: %v", err)
} else {
t.Logf("Created HDD volume %d", hddVolumeId)
}
})
t.Run("ec_balance_targets_correct_disk_type", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Run ec.balance for SSD collection with -diskType=ssd
var ssdOutput bytes.Buffer
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
ssdArgs := []string{
"-collection", "ssd_collection",
"-diskType", "ssd",
}
ssdErr := ecBalanceCmd.Do(ssdArgs, commandEnv, &ssdOutput)
t.Logf("EC balance for SSD: %v, output: %s", ssdErr, ssdOutput.String())
// Run ec.balance for HDD collection with -diskType=hdd
var hddOutput bytes.Buffer
hddArgs := []string{
"-collection", "hdd_collection",
"-diskType", "hdd",
}
hddErr := ecBalanceCmd.Do(hddArgs, commandEnv, &hddOutput)
t.Logf("EC balance for HDD: %v, output: %s", hddErr, hddOutput.String())
// Unlock
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
})
}
// startMixedDiskTypeCluster starts a cluster with both HDD and SSD volume servers
func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) {
weedBinary := findWeedBinary()
if weedBinary == "" {
return nil, fmt.Errorf("weed binary not found")
}
cluster := &MultiDiskCluster{testDir: dataDir}
// Create master directory
masterDir := filepath.Join(dataDir, "master")
os.MkdirAll(masterDir, 0755)
// Start master server
masterCmd := exec.CommandContext(ctx, weedBinary, "master",
"-port", "9336",
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
if err != nil {
return nil, fmt.Errorf("failed to create master log file: %v", err)
}
masterCmd.Stdout = masterLogFile
masterCmd.Stderr = masterLogFile
if err := masterCmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start master server: %v", err)
}
cluster.masterCmd = masterCmd
// Wait for master to be ready
time.Sleep(2 * time.Second)
// Start 2 HDD servers and 2 SSD servers
diskTypes := []string{"hdd", "hdd", "ssd", "ssd"}
for i, diskType := range diskTypes {
diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, diskType))
if err := os.MkdirAll(diskDir, 0755); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create disk dir: %v", err)
}
port := fmt.Sprintf("811%d", i)
rack := fmt.Sprintf("rack%d", i)
volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
"-port", port,
"-dir", diskDir,
"-max", "10",
"-mserver", "127.0.0.1:9336",
"-ip", "127.0.0.1",
"-dataCenter", "dc1",
"-rack", rack,
"-disk", diskType,
)
logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i))
os.MkdirAll(logDir, 0755)
volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log"))
if err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create volume log file: %v", err)
}
volumeCmd.Stdout = volumeLogFile
volumeCmd.Stderr = volumeLogFile
if err := volumeCmd.Start(); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to start volume server %d: %v", i, err)
}
cluster.volumeServers = append(cluster.volumeServers, volumeCmd)
}
// Wait for volume servers to register with master
time.Sleep(8 * time.Second)
return cluster, nil
}
// uploadTestDataWithDiskTypeMixed uploads test data with disk type and collection
func uploadTestDataWithDiskTypeMixed(data []byte, masterAddress string, diskType string, collection string) (needle.VolumeId, error) {
assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
return pb.ServerAddress(masterAddress)
}, grpc.WithInsecure(), &operation.VolumeAssignRequest{
Count: 1,
Collection: collection,
Replication: "000",
DiskType: diskType,
})
if err != nil {
return 0, err
}
uploader, err := operation.NewUploader()
if err != nil {
return 0, err
}
uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{
UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid,
Filename: "testfile.txt",
MimeType: "text/plain",
})
if err != nil {
return 0, err
}
if uploadResult.Error != "" {
return 0, fmt.Errorf("upload error: %s", uploadResult.Error)
}
fid, err := needle.ParseFileIdFromString(assignResult.Fid)
if err != nil {
return 0, err
}
return fid.VolumeId, nil
}

12
weed/shell/command_ec_balance.go

@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func init() {
@ -20,7 +22,10 @@ func (c *commandEcBalance) Name() string {
func (c *commandEcBalance) Help() string {
return `balance all ec shards among all racks and volume servers
ec.balance [-c EACH_COLLECTION|<collection_name>] [-apply] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>]
ec.balance [-c EACH_COLLECTION|<collection_name>] [-apply] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>] [-diskType <disk_type>]
Options:
-diskType: the disk type for EC shards (hdd, ssd, or empty for default hdd)
Algorithm:
` + ecBalanceAlgorithmDescription
@ -35,6 +40,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
diskTypeStr := balanceCommand.String("diskType", "", "the disk type for EC shards (hdd, ssd, or empty for default hdd)")
maxParallelization := balanceCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyBalancing := balanceCommand.Bool("apply", false, "apply the balancing plan")
// TODO: remove this alias
@ -67,5 +73,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return err
}
return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing)
diskType := types.ToDiskType(*diskTypeStr)
return EcBalance(commandEnv, collections, *dc, rp, diskType, *maxParallelization, *applyBalancing)
}

126
weed/shell/command_ec_common.go

@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura
}
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN
}
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType)
sortEcNodesByFreeslotsDescending(ecNodes)
return
}
func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "")
func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "", diskType)
}
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
@ -242,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@ -280,8 +280,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
}
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType)
existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType)
return nil
@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0
}
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
return
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
freeEcSlots := countFreeShardSlots(dn, diskType)
ecNode := &EcNode{
info: dn,
dc: dc,
@ -439,17 +439,17 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
// Build disk-level information from volumes and EC shards
// First, discover all unique disk IDs from VolumeInfos (includes empty disks)
allDiskIds := make(map[uint32]string) // diskId -> diskType
for diskType, diskInfo := range dn.DiskInfos {
for diskTypeKey, diskInfo := range dn.DiskInfos {
if diskInfo == nil {
continue
}
// Get all disk IDs from volumes
for _, vi := range diskInfo.VolumeInfos {
allDiskIds[vi.DiskId] = diskType
allDiskIds[vi.DiskId] = diskTypeKey
}
// Also get disk IDs from EC shards
for _, ecShardInfo := range diskInfo.EcShardInfos {
allDiskIds[ecShardInfo.DiskId] = diskType
allDiskIds[ecShardInfo.DiskId] = diskTypeKey
}
}
@ -476,7 +476,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freePerDisk := int(freeEcSlots) / diskCount
for diskId, diskType := range allDiskIds {
for diskId, diskTypeStr := range allDiskIds {
shards := diskShards[diskId]
if shards == nil {
shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
@ -488,7 +488,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
ecNode.disks[diskId] = &EcDisk{
diskId: diskId,
diskType: diskType,
diskType: diskTypeStr,
freeEcSlots: freePerDisk,
ecShardCount: totalShardCount,
ecShards: shards,
@ -551,9 +551,9 @@ func ceilDivide(a, b int) int {
return (a / b) + r
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
@ -564,10 +564,10 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
return 0
}
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode {
foundVolume := false
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(diskType)]
if found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
@ -584,9 +584,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
}
} else {
diskInfo = &master_pb.DiskInfo{
Type: string(types.HardDriveType),
Type: string(diskType),
}
ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
ecNode.info.DiskInfos[string(diskType)] = diskInfo
}
if !foundVolume {
@ -598,7 +598,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
DiskType: string(types.HardDriveType),
DiskType: string(diskType),
})
ecNode.freeEcSlot -= len(shardIds)
}
@ -606,9 +606,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
return ecNode
}
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
@ -649,6 +649,7 @@ type ecBalancer struct {
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
maxParallelization int
diskType types.DiskType // target disk type for EC shards (default: HardDriveType)
}
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
@ -705,7 +706,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
// Use MaxShardCount (32) to support custom EC ratios
shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
for _, shardId := range shardBits.ShardIds() {
shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
}
@ -728,7 +729,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds, ecb.diskType)
}
}
return nil
@ -748,9 +749,9 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
return ewg.Wait()
}
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int {
return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, diskType)
return string(ecNode.rack), shardBits.ShardIdCount()
})
}
@ -759,7 +760,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
racks := ecb.racks()
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
// Calculate actual total shards for this volume (not hardcoded default)
var totalShardsForVolume int
@ -779,7 +780,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
continue
}
possibleEcNodes := rackEcNodesWithVid[rackId]
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) {
ecShardsToMove[shardId] = ecNode
}
}
@ -856,7 +857,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
@ -865,7 +866,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
var possibleDestinationEcNodes []*EcNode
for _, n := range racks[RackId(rackId)].ecNodes {
if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
if _, found := n.info.DiskInfos[string(ecb.diskType)]; found {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
}
@ -882,7 +883,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
for _, ecNode := range existingLocations {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
for _, shardId := range shardBits.ShardIds() {
@ -927,7 +928,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
}
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
return
}
@ -955,17 +956,18 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
emptyNodeIds := make(map[uint32]bool)
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range emptyDiskInfo.EcShardInfos {
emptyNodeIds[shards.Id] = true
}
}
if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
vid := needle.VolumeId(shards.Id)
destDiskId := pickBestDiskOnNode(emptyNode, vid)
// For balancing, strictly require matching disk type
destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true)
if destDiskId > 0 {
fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
@ -973,7 +975,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
}
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType)
if err != nil {
return err
}
@ -1003,7 +1005,7 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
nodeShards := map[*EcNode]int{}
for _, node := range possibleDestinations {
nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount()
}
targets := []*EcNode{}
@ -1078,14 +1080,17 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
}
// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
// It prefers disks with fewer shards and more free slots
func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
// It prefers disks of the specified type with fewer shards and more free slots
// If strictDiskType is false, it will fall back to other disk types if no matching disk is found
func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool) uint32 {
if len(ecNode.disks) == 0 {
return 0 // No disk info available, let the server decide
}
var bestDiskId uint32
bestScore := -1
var fallbackDiskId uint32
fallbackScore := -1
for diskId, disk := range ecNode.disks {
if disk.freeEcSlots <= 0 {
@ -1102,13 +1107,26 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
// Lower score is better
score := disk.ecShardCount*10 + existingShards*100
if bestScore == -1 || score < bestScore {
bestScore = score
bestDiskId = diskId
if disk.diskType == string(diskType) {
// Matching disk type - this is preferred
if bestScore == -1 || score < bestScore {
bestScore = score
bestDiskId = diskId
}
} else if !strictDiskType {
// Non-matching disk type - use as fallback if allowed
if fallbackScore == -1 || score < fallbackScore {
fallbackScore = score
fallbackDiskId = diskId
}
}
}
return bestDiskId
// Return matching disk type if found, otherwise fallback
if bestDiskId != 0 {
return bestDiskId
}
return fallbackDiskId
}
// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
@ -1118,7 +1136,8 @@ func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId,
return nil, 0, err
}
diskId := pickBestDiskOnNode(node, vid)
// For balancing, strictly require matching disk type
diskId := pickBestDiskOnNode(node, vid, ecb.diskType, true)
return node, diskId, nil
}
@ -1134,14 +1153,14 @@ func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, co
} else {
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
}
return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing, ecb.diskType)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, diskType types.DiskType) map[erasure_coding.ShardId]*EcNode {
picked := make(map[erasure_coding.ShardId]*EcNode)
var candidateEcNodes []*CandidateEcNode
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, diskType)
if shardBits.ShardIdCount() > 0 {
candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
ecNode: ecNode,
@ -1155,13 +1174,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
for i := 0; i < n; i++ {
selectedEcNodeIndex := -1
for i, candidateEcNode := range candidateEcNodes {
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType)
if shardBits > 0 {
selectedEcNodeIndex = i
for _, shardId := range shardBits.ShardIds() {
candidateEcNode.shardCount--
picked[shardId] = candidateEcNode.ecNode
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType)
break
}
break
@ -1180,7 +1199,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
vidLocations := make(map[needle.VolumeId][]*EcNode)
for _, ecNode := range ecb.ecNodes {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
continue
}
@ -1194,9 +1213,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
}
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) {
// collect all ec nodes
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType)
if err != nil {
return err
}
@ -1210,6 +1229,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
maxParallelization: maxParallelization,
diskType: diskType,
}
if len(collections) == 0 {

12
weed/shell/command_ec_common_test.go

@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) {
func TestEcDistribution(t *testing.T) {
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "")
ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType)
sortEcNodesByFreeslotsDescending(ecNodes)
@ -149,16 +149,17 @@ func TestPickRackToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid)
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement)
ecb := &ecBalancer{
ecNodes: ecNodes,
replicaPlacement: rp,
diskType: types.HardDriveType,
}
racks := ecb.racks()
rackToShardCount := countShardsByRack(vid, ecNodes)
rackToShardCount := countShardsByRack(vid, ecNodes, types.HardDriveType)
got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
if err := errorCheck(gotErr, tc.wantErr); err != nil {
@ -225,10 +226,11 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) {
for _, tc := range testCases {
vid, _ := needle.NewVolumeId(tc.vid)
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType)
ecb := &ecBalancer{
ecNodes: allEcNodes,
ecNodes: allEcNodes,
diskType: types.HardDriveType,
}
// Resolve target node by name

32
weed/shell/command_ec_decode.go

@ -32,13 +32,23 @@ func (c *commandEcDecode) Name() string {
func (c *commandEcDecode) Help() string {
return `decode a erasure coded volume into a normal volume
ec.decode [-collection=""] [-volumeId=<volume_id>]
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>]
The -collection parameter supports regular expressions for pattern matching:
- Use exact match: ec.decode -collection="^mybucket$"
- Match multiple buckets: ec.decode -collection="bucket.*"
- Match all collections: ec.decode -collection=".*"
Options:
-diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)
Examples:
# Decode EC shards from HDD (default)
ec.decode -collection=mybucket
# Decode EC shards from SSD
ec.decode -collection=mybucket -diskType=ssd
`
}
@ -50,6 +60,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
collection := decodeCommand.String("collection", "", "the collection name")
diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)")
if err = decodeCommand.Parse(args); err != nil {
return nil
}
@ -59,6 +70,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
vid := needle.VolumeId(*volumeId)
diskType := types.ToDiskType(*diskTypeStr)
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@ -68,17 +80,17 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
// volumeId is provided
if vid != 0 {
return doEcDecode(commandEnv, topologyInfo, *collection, vid)
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType)
}
// apply to all volumes in the collection
volumeIds, err := collectEcShardIds(topologyInfo, *collection)
volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType)
if err != nil {
return err
}
fmt.Printf("ec decode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil {
return err
}
}
@ -86,14 +98,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
// find volume location
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType)
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
@ -248,7 +260,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
return resp.VolumeIdLocations, nil
}
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string) (vids []needle.VolumeId, err error) {
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) {
// compile regex pattern for collection matching
collectionRegex, err := compileCollectionPattern(collectionPattern)
if err != nil {
@ -257,7 +269,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
vidMap := make(map[uint32]bool)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
for _, v := range diskInfo.EcShardInfos {
if collectionRegex.MatchString(v.Collection) {
vidMap[v.Id] = true
@ -273,11 +285,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
return
}
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits {
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)

32
weed/shell/command_ec_encode.go

@ -37,8 +37,8 @@ func (c *commandEcEncode) Name() string {
func (c *commandEcEncode) Help() string {
return `apply erasure coding to a volume
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose]
ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose]
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-sourceDiskType=<disk_type>] [-diskType=<disk_type>]
ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose] [-diskType=<disk_type>]
This command will:
1. freeze one volume
@ -61,6 +61,18 @@ func (c *commandEcEncode) Help() string {
Options:
-verbose: show detailed reasons why volumes are not selected for encoding
-sourceDiskType: filter source volumes by disk type (hdd, ssd, or empty for all)
-diskType: target disk type for EC shards (hdd, ssd, or empty for default hdd)
Examples:
# Encode SSD volumes to SSD EC shards (same tier)
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=ssd
# Encode SSD volumes to HDD EC shards (tier migration to cheaper storage)
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd
# Encode all volumes to SSD EC shards
ec.encode -collection=mybucket -diskType=ssd
Re-balancing algorithm:
` + ecBalanceAlgorithmDescription
@ -80,6 +92,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)")
diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)")
applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding")
@ -94,6 +108,16 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return err
}
// Parse source disk type filter (optional)
var sourceDiskType *types.DiskType
if *sourceDiskTypeStr != "" {
sdt := types.ToDiskType(*sourceDiskTypeStr)
sourceDiskType = &sdt
}
// Parse target disk type for EC shards
diskType := types.ToDiskType(*diskTypeStr)
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
@ -119,7 +143,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
} else {
// apply to all volumes for the given collection pattern (regex)
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, sourceDiskType, *fullPercentage, *quietPeriod, *verbose)
if err != nil {
return err
}
@ -138,7 +162,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err)
}
// ...re-balance ec shards...
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
if err := EcBalance(commandEnv, balanceCollections, "", rp, diskType, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err)
}
// ...then delete original volumes using pre-collected locations.

13
weed/shell/command_ec_rebuild.go

@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func init() {
@ -24,6 +25,7 @@ type ecRebuilder struct {
writer io.Writer
applyChanges bool
collections []string
diskType types.DiskType
ewg *ErrorWaitGroup
ecNodesMu sync.Mutex
@ -39,7 +41,7 @@ func (c *commandEcRebuild) Name() string {
func (c *commandEcRebuild) Help() string {
return `find and rebuild missing ec shards among volume servers
ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N]
ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N] [-diskType=<disk_type>]
Options:
-collection: specify a collection name, or "EACH_COLLECTION" to process all collections
@ -47,6 +49,7 @@ func (c *commandEcRebuild) Help() string {
-maxParallelization: number of volumes to rebuild concurrently (default: 10)
Increase for faster rebuilds with more system resources.
Decrease if experiencing resource contention or instability.
-diskType: disk type for EC shards (hdd, ssd, or empty for default hdd)
Algorithm:
@ -83,6 +86,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyChanges := fixCommand.Bool("apply", false, "apply the changes")
diskTypeStr := fixCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)")
// TODO: remove this alias
applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)")
if err = fixCommand.Parse(args); err != nil {
@ -95,8 +99,10 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
return
}
diskType := types.ToDiskType(*diskTypeStr)
// collect all ec nodes
allEcNodes, _, err := collectEcNodes(commandEnv)
allEcNodes, _, err := collectEcNodes(commandEnv, diskType)
if err != nil {
return err
}
@ -117,6 +123,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
writer: writer,
applyChanges: *applyChanges,
collections: collections,
diskType: diskType,
ewg: NewErrorWaitGroup(*maxParallelization),
}
@ -294,7 +301,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
// ensure ECNode updates are atomic
erb.ecNodesMu.Lock()
defer erb.ecNodesMu.Unlock()
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds)
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, erb.diskType)
return nil
}

8
weed/shell/command_ec_test.go

@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestCommandEcBalanceSmall(t *testing.T) {
@ -14,6 +15,7 @@ func TestCommandEcBalanceSmall(t *testing.T) {
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -30,6 +32,7 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) {
addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -48,6 +51,7 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) {
newEcNode("dc1", "rack1", "dn4", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -66,6 +70,7 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) {
newEcNode("dc1", "rack2", "dn4", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -109,6 +114,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
newEcNode("dc1", "rack1", "dn3", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -128,5 +134,5 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod
}
func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode {
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds)
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType)
}

35
weed/shell/command_volume_server_evacuate.go

@ -157,18 +157,26 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
}
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
}
// Evacuate EC volumes for all disk types
// We need to handle each disk type separately because shards should be moved to nodes with the same disk type
diskTypes := []types.DiskType{types.HardDriveType, types.SsdType}
for _, diskType := range diskTypes {
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", diskType)
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
// This server doesn't have EC shards for this disk type, skip
continue
}
// move away ec volumes
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
// move away ec volumes for this disk type
for _, thisNode := range thisNodes {
diskInfo, found := thisNode.info.DiskInfos[string(diskType)]
if !found {
continue
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, diskType)
if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
}
@ -185,7 +193,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv,
return nil
}
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
slices.SortFunc(otherNodes, func(a, b *EcNode) int {
@ -198,13 +206,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
collectionPrefix = ecShardInfo.Collection + "_"
}
vid := needle.VolumeId(ecShardInfo.Id)
destDiskId := pickBestDiskOnNode(emptyNode, vid)
// For evacuation, prefer same disk type but allow fallback to other types
destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false)
if destDiskId > 0 {
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
} else {
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, diskType)
if err != nil {
return
} else {

Loading…
Cancel
Save