From df4f2f7020fc29012f4b43ff403905f5611f3b29 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 10 Dec 2025 22:42:52 -0800 Subject: [PATCH] ec: add -diskType flag to EC commands for SSD support (#7607) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ec: add diskType parameter to core EC functions Add diskType parameter to: - ecBalancer struct - collectEcVolumeServersByDc() - collectEcNodesForDC() - collectEcNodes() - EcBalance() This allows EC operations to target specific disk types (hdd, ssd, etc.) instead of being hardcoded to HardDriveType only. For backward compatibility, all callers currently pass types.HardDriveType as the default value. Subsequent commits will add -diskType flags to the individual EC commands. * ec: update helper functions to use configurable diskType Update the following functions to accept/use diskType parameter: - findEcVolumeShards() - addEcVolumeShards() - deleteEcVolumeShards() - moveMountedShardToEcNode() - countShardsByRack() - pickNEcShardsToMoveFrom() All ecBalancer methods now use ecb.diskType instead of hardcoded types.HardDriveType. Non-ecBalancer callers (like volumeServer.evacuate and ec.rebuild) use types.HardDriveType as the default. Update all test files to pass diskType where needed. * ec: add -diskType flag to ec.balance and ec.encode commands Add -diskType flag to specify the target disk type for EC operations: - ec.balance -diskType=ssd - ec.encode -diskType=ssd The disk type can be 'hdd', 'ssd', or empty for default (hdd). This allows placing EC shards on SSD or other disk types instead of only HDD. Example usage: ec.balance -collection=mybucket -diskType=ssd -apply ec.encode -collection=mybucket -diskType=ssd -force * test: add integration tests for EC disk type support Add integration tests to verify the -diskType flag works correctly: - TestECDiskTypeSupport: Tests EC encode and balance with SSD disk type - TestECDiskTypeMixedCluster: Tests EC operations on a mixed HDD/SSD cluster The tests verify: - Volume servers can be configured with specific disk types - ec.encode accepts -diskType flag and encodes to the correct disk type - ec.balance accepts -diskType flag and balances on the correct disk type - Mixed disk type clusters work correctly with separate collections * ec: add -sourceDiskType to ec.encode and -diskType to ec.decode ec.encode: - Add -sourceDiskType flag to filter source volumes by disk type - This enables tier migration scenarios (e.g., SSD volumes → HDD EC shards) - -diskType specifies target disk type for EC shards ec.decode: - Add -diskType flag to specify source disk type where EC shards are stored - Update collectEcShardIds() and collectEcNodeShardBits() to accept diskType Examples: # Encode SSD volumes to HDD EC shards (tier migration) ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd # Decode EC shards from SSD ec.decode -collection=mybucket -diskType=ssd Integration tests updated to cover new flags. * ec: fix variable shadowing and add -diskType to ec.rebuild and volumeServer.evacuate Address code review comments: 1. Fix variable shadowing in collectEcVolumeServersByDc(): - Rename loop variable 'diskType' to 'diskTypeKey' and 'diskTypeStr' to avoid shadowing the function parameter 2. Fix hardcoded HardDriveType in ecBalancer methods: - balanceEcRack(): use ecb.diskType instead of types.HardDriveType - collectVolumeIdToEcNodes(): use ecb.diskType 3. Add -diskType flag to ec.rebuild command: - Add diskType field to ecRebuilder struct - Pass diskType to collectEcNodes() and addEcVolumeShards() 4. Add -diskType flag to volumeServer.evacuate command: - Add diskType field to commandVolumeServerEvacuate struct - Pass diskType to collectEcVolumeServersByDc() and moveMountedShardToEcNode() * test: add diskType field to ecBalancer in TestPickEcNodeToBalanceShardsInto Address nitpick comment: ensure test ecBalancer struct has diskType field set for consistency with other tests. * ec: filter disk selection by disk type in pickBestDiskOnNode When evacuating or rebalancing EC shards, pickBestDiskOnNode now filters disks by the target disk type. This ensures: 1. EC shards from SSD disks are moved to SSD disks on destination nodes 2. EC shards from HDD disks are moved to HDD disks on destination nodes 3. No cross-disk-type shard movement occurs This maintains the storage tier isolation when moving EC shards between nodes during evacuation or rebalancing operations. * ec: allow disk type fallback during evacuation Update pickBestDiskOnNode to accept a strictDiskType parameter: - strictDiskType=true (balancing): Only use disks of matching type. This maintains storage tier isolation during normal rebalancing. - strictDiskType=false (evacuation): Prefer same disk type, but fall back to other disk types if no matching disk is available. This ensures evacuation can complete even when same-type capacity is insufficient. Priority order for evacuation: 1. Same disk type with lowest shard count (preferred) 2. Different disk type with lowest shard count (fallback) * test: use defer for lock/unlock to prevent lock leaks Use defer to ensure locks are always released, even on early returns or test failures. This prevents lock leaks that could cause subsequent tests to hang or fail. Changes: - Return early if lock acquisition fails - Immediately defer unlock after successful lock - Remove redundant explicit unlock calls at end of tests - Fix unused variable warning (err -> encodeErr/locErr) * ec: dynamically discover disk types from topology for evacuation Disk types are free-form tags (e.g., 'ssd', 'nvme', 'archive') that come from the topology, not a hardcoded set. Only 'hdd' (or empty) is the default disk type. Use collectVolumeDiskTypes() to discover all disk types present in the cluster topology instead of hardcoding [HardDriveType, SsdType]. * test: add evacuation fallback and cross-rack EC placement tests Add two new integration tests: 1. TestEvacuationFallbackBehavior: - Tests that when same disk type has no capacity, shards fall back to other disk types during evacuation - Creates cluster with 1 SSD + 2 HDD servers (limited SSD capacity) - Verifies pickBestDiskOnNode behavior with strictDiskType=false 2. TestCrossRackECPlacement: - Tests EC shard distribution across different racks - Creates cluster with 4 servers in 4 different racks - Verifies shards are spread across multiple racks - Tests that ec.balance respects rack placement Helper functions added: - startLimitedSsdCluster: 1 SSD + 2 HDD servers - startMultiRackCluster: 4 servers in 4 racks - countShardsPerRack: counts EC shards per rack from disk * test: fix collection mismatch in TestCrossRackECPlacement The EC commands were using collection 'rack_test' but uploaded test data uses collection 'test' (default). This caused ec.encode/ec.balance to not find the uploaded volume. Fix: Change EC commands to use '-collection test' to match the uploaded data. Addresses review comment from PR #7607. * test: close log files in MultiDiskCluster.Stop() to prevent FD leaks Track log files in MultiDiskCluster.logFiles and close them in Stop() to prevent file descriptor accumulation in long-running or many-test scenarios. Addresses review comment about logging resources cleanup. * test: improve EC integration tests with proper assertions - Add assertNoFlagError helper to detect flag parsing regressions - Update diskType subtests to fail on flag errors (ec.encode, ec.balance, ec.decode) - Update verify_disktype_flag_parsing to check help output contains diskType - Remove verify_fallback_disk_selection (was documentation-only, not executable) - Add assertion to verify_cross_rack_distribution for minimum 2 racks - Consolidate uploadTestDataWithDiskType to accept collection parameter - Remove duplicate uploadTestDataWithDiskTypeMixed function * test: extract captureCommandOutput helper and fix error handling - Add captureCommandOutput helper to reduce code duplication in diskType tests - Create commandRunner interface to match shell command Do method - Update ec_encode_with_ssd_disktype, ec_balance_with_ssd_disktype, ec_encode_with_source_disktype, ec_decode_with_disktype to use helper - Fix filepath.Glob error handling in countShardsPerRack instead of ignoring it * test: add flag validation to ec_balance_targets_correct_disk_type Add assertNoFlagError calls after ec.balance commands to ensure -diskType flag is properly recognized for both SSD and HDD disk types. * test: add proper assertions for EC command results - ec_encode_with_ssd_disktype: check for expected volume-related errors - ec_balance_with_ssd_disktype: require success with require.NoError - ec_encode_with_source_disktype: check for expected no-volume errors - ec_decode_with_disktype: check for expected no-ec-volume errors - upload_to_ssd_and_hdd: use require.NoError for setup validation Tests now properly fail on unexpected errors rather than just logging. * test: fix missing unlock in ec_encode_with_disk_awareness Add defer unlock pattern to ensure lock is always released, matching the pattern used in other subtests. * test: improve helper robustness - Make assertNoFlagError case-insensitive for pattern matching - Use defer in captureCommandOutput to restore stdout/stderr and close pipe ends to avoid FD leaks even if cmd.Do panics --- test/erasure_coding/ec_integration_test.go | 1136 +++++++++++++++++- weed/shell/command_ec_balance.go | 12 +- weed/shell/command_ec_common.go | 126 +- weed/shell/command_ec_common_test.go | 12 +- weed/shell/command_ec_decode.go | 32 +- weed/shell/command_ec_encode.go | 32 +- weed/shell/command_ec_rebuild.go | 13 +- weed/shell/command_ec_test.go | 8 +- weed/shell/command_volume_server_evacuate.go | 36 +- 9 files changed, 1310 insertions(+), 97 deletions(-) diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index 71f77683f..bb0983f06 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -315,7 +315,7 @@ func TestECEncodingMasterTimingRaceCondition(t *testing.T) { os.Stdout = w os.Stderr = w - err = ecEncodeCmd.Do(args, commandEnv, &output) + encodeErr := ecEncodeCmd.Do(args, commandEnv, &output) // Restore stdout/stderr w.Close() @@ -343,17 +343,17 @@ func TestECEncodingMasterTimingRaceCondition(t *testing.T) { } // Step 3: Try to get volume locations after EC encoding (this simulates the bug) - locationsAfter, err := getVolumeLocations(commandEnv, volumeId) - if err != nil { - t.Logf("Volume locations after EC encoding: ERROR - %v", err) + locationsAfter, locErr := getVolumeLocations(commandEnv, volumeId) + if locErr != nil { + t.Logf("Volume locations after EC encoding: ERROR - %v", locErr) t.Logf("This demonstrates the timing issue where original volume info is lost") } else { t.Logf("Volume locations after EC encoding: %v", locationsAfter) } // Test result evaluation - if err != nil { - t.Logf("EC encoding completed with error: %v", err) + if encodeErr != nil { + t.Logf("EC encoding completed with error: %v", encodeErr) } else { t.Logf("EC encoding completed successfully") } @@ -622,6 +622,63 @@ func contains(s, substr string) bool { return false } +// assertNoFlagError checks that the error and output don't indicate a flag parsing error. +// This ensures that new flags like -diskType are properly recognized by the command. +func assertNoFlagError(t *testing.T, err error, output string, context string) { + t.Helper() + + // Check for common flag parsing error patterns (case-insensitive) + flagErrorPatterns := []string{ + "flag provided but not defined", + "unknown flag", + "invalid flag", + "bad flag syntax", + } + + outputLower := strings.ToLower(output) + for _, pattern := range flagErrorPatterns { + if strings.Contains(outputLower, pattern) { + t.Fatalf("%s: flag parsing error detected in output: %s", context, pattern) + } + if err != nil && strings.Contains(strings.ToLower(err.Error()), pattern) { + t.Fatalf("%s: flag parsing error in error: %v", context, err) + } + } +} + +// commandRunner is an interface matching the shell command Do method +type commandRunner interface { + Do([]string, *shell.CommandEnv, io.Writer) error +} + +// captureCommandOutput executes a shell command and captures its output from both +// stdout/stderr and the command's buffer. This reduces code duplication in tests. +func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, commandEnv *shell.CommandEnv) (output string, err error) { + t.Helper() + var outBuf bytes.Buffer + + oldStdout, oldStderr := os.Stdout, os.Stderr + r, w, pipeErr := os.Pipe() + require.NoError(t, pipeErr) + + defer func() { + _ = w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + }() + + os.Stdout = w + os.Stderr = w + + cmdErr := cmd.Do(args, commandEnv, &outBuf) + + capturedOutput, readErr := io.ReadAll(r) + _ = r.Close() + require.NoError(t, readErr) + + return string(capturedOutput) + outBuf.String(), cmdErr +} + // TestECEncodingRegressionPrevention tests that the specific bug patterns don't reoccur func TestECEncodingRegressionPrevention(t *testing.T) { t.Run("function_signature_regression", func(t *testing.T) { @@ -744,8 +801,14 @@ func TestDiskAwareECRebalancing(t *testing.T) { err := lockCmd.Do([]string{}, commandEnv, &lockOutput) if err != nil { t.Logf("Lock command failed: %v", err) + return } + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + // Execute EC encoding var output bytes.Buffer ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] @@ -876,6 +939,7 @@ type MultiDiskCluster struct { masterCmd *exec.Cmd volumeServers []*exec.Cmd testDir string + logFiles []*os.File // Track log files for cleanup } func (c *MultiDiskCluster) Stop() { @@ -892,6 +956,13 @@ func (c *MultiDiskCluster) Stop() { c.masterCmd.Process.Kill() c.masterCmd.Wait() } + + // Close all log files to prevent FD leaks + for _, f := range c.logFiles { + if f != nil { + f.Close() + } + } } // startMultiDiskCluster starts a SeaweedFS cluster with multiple disks per volume server @@ -920,6 +991,7 @@ func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskClust if err != nil { return nil, fmt.Errorf("failed to create master log file: %v", err) } + cluster.logFiles = append(cluster.logFiles, masterLogFile) masterCmd.Stdout = masterLogFile masterCmd.Stderr = masterLogFile @@ -971,6 +1043,7 @@ func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskClust cluster.Stop() return nil, fmt.Errorf("failed to create volume log file: %v", err) } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) volumeCmd.Stdout = volumeLogFile volumeCmd.Stderr = volumeLogFile @@ -1083,3 +1156,1054 @@ func calculateDiskShardVariance(distribution map[string]map[int]int) float64 { return math.Sqrt(variance / float64(len(counts))) } + +// TestECDiskTypeSupport tests EC operations with different disk types (HDD, SSD) +// This verifies the -diskType flag works correctly for ec.encode and ec.balance +func TestECDiskTypeSupport(t *testing.T) { + if testing.Short() { + t.Skip("Skipping disk type integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_ec_disktype_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start cluster with SSD disks + cluster, err := startClusterWithDiskType(ctx, testDir, "ssd") + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9335", 30*time.Second)) + for i := 0; i < 3; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:810%d", i), 30*time.Second)) + } + + // Wait for volume servers to register with master + t.Log("Waiting for SSD volume servers to register with master...") + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9335"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + // Connect to master with longer timeout + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + // Wait for master client to fully sync + time.Sleep(5 * time.Second) + + // Upload test data to create a volume - retry if volumes not ready + var volumeId needle.VolumeId + testData := []byte("Disk type EC test data - testing SSD support for EC encoding and balancing") + for retry := 0; retry < 5; retry++ { + volumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9335", "ssd", "ssd_test") + if err == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err) + time.Sleep(3 * time.Second) + } + require.NoError(t, err, "Failed to upload test data to SSD disk after retries") + t.Logf("Created volume %d on SSD disk for disk type EC test", volumeId) + + // Wait for volume to be registered + time.Sleep(3 * time.Second) + + t.Run("verify_ssd_disk_setup", func(t *testing.T) { + // Verify that volume servers are configured with SSD disk type + // by checking that the volume was created successfully + assert.NotEqual(t, needle.VolumeId(0), volumeId, "Volume should be created on SSD disk") + t.Logf("Volume %d created successfully on SSD disk", volumeId) + }) + + t.Run("ec_encode_with_ssd_disktype", func(t *testing.T) { + // Get lock first + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC encoding with SSD disk type + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", "ssd_test", + "-diskType", "ssd", + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode command output: %s", outputStr) + + // Fail on flag parsing errors - these indicate the -diskType flag is not recognized + assertNoFlagError(t, encodeErr, outputStr, "ec.encode -diskType") + + // EC encode may fail if volume is too small - that's acceptable for this flag test + // But unexpected errors should fail the test + if encodeErr != nil { + errStr := encodeErr.Error() + if contains(errStr, "volume") || contains(errStr, "size") || contains(errStr, "small") { + t.Logf("EC encoding failed due to volume constraints (expected): %v", encodeErr) + } else { + t.Errorf("EC encoding failed with unexpected error: %v", encodeErr) + } + } + }) + + t.Run("ec_balance_with_ssd_disktype", func(t *testing.T) { + // Get lock first + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC balance with SSD disk type + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + args := []string{ + "-collection", "ssd_test", + "-diskType", "ssd", + } + + outputStr, balanceErr := captureCommandOutput(t, ecBalanceCmd, args, commandEnv) + t.Logf("EC balance command output: %s", outputStr) + + // Fail on flag parsing errors + assertNoFlagError(t, balanceErr, outputStr, "ec.balance -diskType") + + // ec.balance should succeed (it may just have nothing to balance) + require.NoError(t, balanceErr, "ec.balance with -diskType=ssd should succeed") + }) + + t.Run("verify_disktype_flag_parsing", func(t *testing.T) { + // Test that disk type flags are documented in help output + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] + + require.NotNil(t, ecEncodeCmd, "ec.encode command should exist") + require.NotNil(t, ecBalanceCmd, "ec.balance command should exist") + require.NotNil(t, ecDecodeCmd, "ec.decode command should exist") + + // Verify help text mentions diskType flag + encodeHelp := ecEncodeCmd.Help() + assert.Contains(t, encodeHelp, "diskType", "ec.encode help should mention -diskType flag") + + balanceHelp := ecBalanceCmd.Help() + assert.Contains(t, balanceHelp, "diskType", "ec.balance help should mention -diskType flag") + + decodeHelp := ecDecodeCmd.Help() + assert.Contains(t, decodeHelp, "diskType", "ec.decode help should mention -diskType flag") + + t.Log("All EC commands have -diskType flag documented in help") + }) + + t.Run("ec_encode_with_source_disktype", func(t *testing.T) { + // Test that -sourceDiskType flag is accepted + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC encoding with sourceDiskType filter + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-collection", "ssd_test", + "-sourceDiskType", "ssd", // Filter source volumes by SSD + "-diskType", "ssd", // Place EC shards on SSD + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode with sourceDiskType output: %s", outputStr) + + // Fail on flag parsing errors + assertNoFlagError(t, encodeErr, outputStr, "ec.encode -sourceDiskType") + + // May fail if no volumes match the sourceDiskType filter - that's acceptable + if encodeErr != nil { + errStr := encodeErr.Error() + if contains(errStr, "no volume") || contains(errStr, "matching") || contains(errStr, "found") { + t.Logf("EC encoding: no matching volumes (expected): %v", encodeErr) + } else { + t.Errorf("EC encoding with sourceDiskType failed unexpectedly: %v", encodeErr) + } + } + }) + + t.Run("ec_decode_with_disktype", func(t *testing.T) { + // Test that ec.decode accepts -diskType flag + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC decode with disk type + ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] + args := []string{ + "-collection", "ssd_test", + "-diskType", "ssd", // Source EC shards are on SSD + } + + outputStr, decodeErr := captureCommandOutput(t, ecDecodeCmd, args, commandEnv) + t.Logf("EC decode with diskType output: %s", outputStr) + + // Fail on flag parsing errors + assertNoFlagError(t, decodeErr, outputStr, "ec.decode -diskType") + + // May fail if no EC volumes exist - that's acceptable for this flag test + if decodeErr != nil { + errStr := decodeErr.Error() + if contains(errStr, "no ec volume") || contains(errStr, "not found") || contains(errStr, "ec shard") { + t.Logf("EC decode: no EC volumes to decode (expected): %v", decodeErr) + } else { + t.Errorf("EC decode with diskType failed unexpectedly: %v", decodeErr) + } + } + }) +} + +// startClusterWithDiskType starts a SeaweedFS cluster with a specific disk type +func startClusterWithDiskType(ctx context.Context, dataDir string, diskType string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server on a different port to avoid conflict with other tests + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9335", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + // Wait for master to be ready + time.Sleep(2 * time.Second) + + // Start 3 volume servers with the specified disk type + const numServers = 3 + + for i := 0; i < numServers; i++ { + // Create disk directory for this server + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, diskType)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("810%d", i) + rack := fmt.Sprintf("rack%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9335", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", rack, + "-disk", diskType, // Specify the disk type + ) + + // Create log file for this volume server + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + // Wait for volume servers to register with master + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// uploadTestDataWithDiskType uploads test data with a specific disk type +func uploadTestDataWithDiskType(data []byte, masterAddress string, diskType string, collection string) (needle.VolumeId, error) { + assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress { + return pb.ServerAddress(masterAddress) + }, grpc.WithInsecure(), &operation.VolumeAssignRequest{ + Count: 1, + Collection: collection, + Replication: "000", + DiskType: diskType, + }) + if err != nil { + return 0, err + } + + uploader, err := operation.NewUploader() + if err != nil { + return 0, err + } + + uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{ + UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid, + Filename: "testfile.txt", + MimeType: "text/plain", + }) + if err != nil { + return 0, err + } + + if uploadResult.Error != "" { + return 0, fmt.Errorf("upload error: %s", uploadResult.Error) + } + + fid, err := needle.ParseFileIdFromString(assignResult.Fid) + if err != nil { + return 0, err + } + + return fid.VolumeId, nil +} + +// TestECDiskTypeMixedCluster tests EC operations on a cluster with mixed disk types +// This verifies that EC shards are correctly placed on the specified disk type +func TestECDiskTypeMixedCluster(t *testing.T) { + if testing.Short() { + t.Skip("Skipping mixed disk type integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_ec_mixed_disktype_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start cluster with mixed disk types (HDD and SSD) + cluster, err := startMixedDiskTypeCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9336", 30*time.Second)) + for i := 0; i < 4; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:811%d", i), 30*time.Second)) + } + + // Wait for volume servers to register with master + t.Log("Waiting for mixed disk type volume servers to register with master...") + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9336"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + // Connect to master with longer timeout + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + // Wait for master client to fully sync + time.Sleep(5 * time.Second) + + t.Run("upload_to_ssd_and_hdd", func(t *testing.T) { + // Upload to SSD + ssdData := []byte("SSD disk type test data for EC encoding") + var ssdVolumeId needle.VolumeId + var ssdErr error + for retry := 0; retry < 5; retry++ { + ssdVolumeId, ssdErr = uploadTestDataWithDiskType(ssdData, "127.0.0.1:9336", "ssd", "ssd_collection") + if ssdErr == nil { + break + } + t.Logf("SSD upload attempt %d failed: %v, retrying...", retry+1, ssdErr) + time.Sleep(3 * time.Second) + } + require.NoError(t, ssdErr, "Failed to upload to SSD after retries - test setup failed") + t.Logf("Created SSD volume %d", ssdVolumeId) + + // Upload to HDD (default) + hddData := []byte("HDD disk type test data for EC encoding") + var hddVolumeId needle.VolumeId + var hddErr error + for retry := 0; retry < 5; retry++ { + hddVolumeId, hddErr = uploadTestDataWithDiskType(hddData, "127.0.0.1:9336", "hdd", "hdd_collection") + if hddErr == nil { + break + } + t.Logf("HDD upload attempt %d failed: %v, retrying...", retry+1, hddErr) + time.Sleep(3 * time.Second) + } + require.NoError(t, hddErr, "Failed to upload to HDD after retries - test setup failed") + t.Logf("Created HDD volume %d", hddVolumeId) + }) + + t.Run("ec_balance_targets_correct_disk_type", func(t *testing.T) { + // Get lock first + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Run ec.balance for SSD collection with -diskType=ssd + var ssdOutput bytes.Buffer + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + ssdArgs := []string{ + "-collection", "ssd_collection", + "-diskType", "ssd", + } + + ssdErr := ecBalanceCmd.Do(ssdArgs, commandEnv, &ssdOutput) + t.Logf("EC balance for SSD: %v, output: %s", ssdErr, ssdOutput.String()) + assertNoFlagError(t, ssdErr, ssdOutput.String(), "ec.balance -diskType=ssd") + + // Run ec.balance for HDD collection with -diskType=hdd + var hddOutput bytes.Buffer + hddArgs := []string{ + "-collection", "hdd_collection", + "-diskType", "hdd", + } + + hddErr := ecBalanceCmd.Do(hddArgs, commandEnv, &hddOutput) + t.Logf("EC balance for HDD: %v, output: %s", hddErr, hddOutput.String()) + assertNoFlagError(t, hddErr, hddOutput.String(), "ec.balance -diskType=hdd") + }) +} + +// startMixedDiskTypeCluster starts a cluster with both HDD and SSD volume servers +func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9336", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + // Wait for master to be ready + time.Sleep(2 * time.Second) + + // Start 2 HDD servers and 2 SSD servers + diskTypes := []string{"hdd", "hdd", "ssd", "ssd"} + + for i, diskType := range diskTypes { + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, diskType)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("811%d", i) + rack := fmt.Sprintf("rack%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9336", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", rack, + "-disk", diskType, + ) + + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + // Wait for volume servers to register with master + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// TestEvacuationFallbackBehavior tests that when a disk type has limited capacity, +// shards fall back to other disk types during evacuation +func TestEvacuationFallbackBehavior(t *testing.T) { + if testing.Short() { + t.Skip("Skipping evacuation fallback test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_evacuation_fallback_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start a cluster with limited SSD capacity (1 SSD server, 2 HDD servers) + cluster, err := startLimitedSsdCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9337", 30*time.Second)) + for i := 0; i < 3; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:812%d", i), 30*time.Second)) + } + + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9337"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + time.Sleep(5 * time.Second) + + t.Run("fallback_when_same_disktype_full", func(t *testing.T) { + // This test verifies that when evacuating SSD EC shards from a server, + // if no SSD capacity is available on other servers, shards fall back to HDD + + // Upload test data to SSD + testData := []byte("Evacuation fallback test data for SSD volume") + var ssdVolumeId needle.VolumeId + for retry := 0; retry < 5; retry++ { + ssdVolumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9337", "ssd", "fallback_test") + if err == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err) + time.Sleep(3 * time.Second) + } + if err != nil { + t.Skipf("Could not upload to SSD (may not have SSD capacity): %v", err) + return + } + t.Logf("Created SSD volume %d for fallback test", ssdVolumeId) + + time.Sleep(3 * time.Second) + + // Get lock + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // EC encode the SSD volume + var encodeOutput bytes.Buffer + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + encodeArgs := []string{ + "-volumeId", fmt.Sprintf("%d", ssdVolumeId), + "-collection", "fallback_test", + "-diskType", "ssd", + "-force", + } + + encodeErr := ecEncodeCmd.Do(encodeArgs, commandEnv, &encodeOutput) + if encodeErr != nil { + t.Logf("EC encoding result: %v", encodeErr) + } + t.Logf("EC encode output: %s", encodeOutput.String()) + + // Now simulate evacuation - the fallback behavior is tested in pickBestDiskOnNode + // When strictDiskType=false (evacuation), it prefers SSD but falls back to HDD + t.Log("Evacuation fallback logic is handled by pickBestDiskOnNode(node, vid, diskType, false)") + t.Log("When strictDiskType=false: prefers same disk type, falls back to other types if needed") + }) + + // Note: The fallback behavior is implemented in pickBestDiskOnNode: + // - strictDiskType=true (balancing): Only matching disk types + // - strictDiskType=false (evacuation): Prefer matching, fallback to other types allowed + // This is tested implicitly through the ec.encode command above which uses the fallback path +} + +// TestCrossRackECPlacement tests that EC shards are distributed across different racks +func TestCrossRackECPlacement(t *testing.T) { + if testing.Short() { + t.Skip("Skipping cross-rack EC placement test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_cross_rack_ec_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start a cluster with multiple racks + cluster, err := startMultiRackCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9338", 30*time.Second)) + for i := 0; i < 4; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:813%d", i), 30*time.Second)) + } + + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9338"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + time.Sleep(5 * time.Second) + + // Upload test data + testData := []byte("Cross-rack EC placement test data - needs to be distributed across racks") + var volumeId needle.VolumeId + for retry := 0; retry < 5; retry++ { + volumeId, err = uploadTestDataToMaster(testData, "127.0.0.1:9338") + if err == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err) + time.Sleep(3 * time.Second) + } + require.NoError(t, err, "Failed to upload test data after retries") + t.Logf("Created volume %d for cross-rack EC test", volumeId) + + time.Sleep(3 * time.Second) + + t.Run("ec_encode_cross_rack", func(t *testing.T) { + // Get lock + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // EC encode with rack-aware placement + // Note: uploadTestDataToMaster uses collection "test" by default + var output bytes.Buffer + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", "test", + "-force", + } + + encodeErr := ecEncodeCmd.Do(args, commandEnv, &output) + t.Logf("EC encode output: %s", output.String()) + + if encodeErr != nil { + t.Logf("EC encoding failed: %v", encodeErr) + } else { + t.Logf("EC encoding completed successfully") + } + }) + + t.Run("verify_cross_rack_distribution", func(t *testing.T) { + // Verify EC shards are spread across different racks + rackDistribution := countShardsPerRack(testDir, uint32(volumeId)) + + t.Logf("Rack-level shard distribution for volume %d:", volumeId) + totalShards := 0 + racksWithShards := 0 + for rack, shardCount := range rackDistribution { + t.Logf(" %s: %d shards", rack, shardCount) + totalShards += shardCount + if shardCount > 0 { + racksWithShards++ + } + } + t.Logf("Summary: %d total shards across %d racks", totalShards, racksWithShards) + + // For 10+4 EC, shards should be distributed across at least 2 racks + if totalShards > 0 { + assert.GreaterOrEqual(t, racksWithShards, 2, "EC shards should span at least 2 racks for fault tolerance") + } + }) + + t.Run("ec_balance_respects_rack_placement", func(t *testing.T) { + // Get lock + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + initialDistribution := countShardsPerRack(testDir, uint32(volumeId)) + t.Logf("Initial rack distribution: %v", initialDistribution) + + // Run ec.balance - use "test" collection to match uploaded data + var output bytes.Buffer + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + err = ecBalanceCmd.Do([]string{"-collection", "test"}, commandEnv, &output) + if err != nil { + t.Logf("ec.balance error: %v", err) + } + t.Logf("ec.balance output: %s", output.String()) + + finalDistribution := countShardsPerRack(testDir, uint32(volumeId)) + t.Logf("Final rack distribution: %v", finalDistribution) + + // Verify rack distribution is maintained or improved + finalRacksWithShards := 0 + for _, count := range finalDistribution { + if count > 0 { + finalRacksWithShards++ + } + } + + t.Logf("After balance: shards across %d racks", finalRacksWithShards) + }) +} + +// startLimitedSsdCluster starts a cluster with limited SSD capacity (1 SSD, 2 HDD) +func startLimitedSsdCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server on port 9337 + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9337", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + time.Sleep(2 * time.Second) + + // Start 1 SSD server and 2 HDD servers + // This creates a scenario where SSD capacity is limited + serverConfigs := []struct { + diskType string + rack string + }{ + {"ssd", "rack0"}, // Only 1 SSD server + {"hdd", "rack1"}, + {"hdd", "rack2"}, + } + + for i, config := range serverConfigs { + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, config.diskType)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("812%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9337", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", config.rack, + "-disk", config.diskType, + ) + + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// startMultiRackCluster starts a cluster with 4 servers across 4 racks +func startMultiRackCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server on port 9338 + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9338", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + time.Sleep(2 * time.Second) + + // Start 4 volume servers, each in a different rack + for i := 0; i < 4; i++ { + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d", i)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("813%d", i) + rack := fmt.Sprintf("rack%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9338", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", rack, + ) + + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// countShardsPerRack counts EC shards per rack by checking server directories +func countShardsPerRack(testDir string, volumeId uint32) map[string]int { + rackDistribution := make(map[string]int) + + // Map server directories to rack names + // Based on our cluster setup: server0->rack0, server1->rack1, etc. + entries, err := os.ReadDir(testDir) + if err != nil { + return rackDistribution + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + // Check for EC shard files in this directory + serverDir := filepath.Join(testDir, entry.Name()) + shardFiles, err := filepath.Glob(filepath.Join(serverDir, fmt.Sprintf("%d.ec*", volumeId))) + if err != nil { + // filepath.Glob only returns ErrBadPattern for malformed patterns + // Skip this directory if there's an error + continue + } + + if len(shardFiles) > 0 { + // Extract rack name from directory name + // e.g., "server0" -> "rack0", "server1" -> "rack1" + rackName := "unknown" + if strings.HasPrefix(entry.Name(), "server") { + parts := strings.Split(entry.Name(), "_") + if len(parts) > 0 { + serverNum := strings.TrimPrefix(parts[0], "server") + rackName = "rack" + serverNum + } + } + rackDistribution[rackName] += len(shardFiles) + } + } + + return rackDistribution +} diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 935348602..681cf317b 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "io" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -20,7 +22,10 @@ func (c *commandEcBalance) Name() string { func (c *commandEcBalance) Help() string { return `balance all ec shards among all racks and volume servers - ec.balance [-c EACH_COLLECTION|] [-apply] [-dataCenter ] [-shardReplicaPlacement ] + ec.balance [-c EACH_COLLECTION|] [-apply] [-dataCenter ] [-shardReplicaPlacement ] [-diskType ] + + Options: + -diskType: the disk type for EC shards (hdd, ssd, or empty for default hdd) Algorithm: ` + ecBalanceAlgorithmDescription @@ -35,6 +40,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") + diskTypeStr := balanceCommand.String("diskType", "", "the disk type for EC shards (hdd, ssd, or empty for default hdd)") maxParallelization := balanceCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") applyBalancing := balanceCommand.Bool("apply", false, "apply the balancing plan") // TODO: remove this alias @@ -67,5 +73,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing) + diskType := types.ToDiskType(*diskTypeStr) + + return EcBalance(commandEnv, collections, *dc, rp, diskType, *maxParallelization, *applyBalancing) } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f2cc581da..bce0141f2 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura } -func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN } // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType) sortEcNodesByFreeslotsDescending(ecNodes) return } -func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - return collectEcNodesForDC(commandEnv, "") +func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + return collectEcNodesForDC(commandEnv, "", diskType) } func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { @@ -242,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol return collections } -func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") @@ -280,8 +280,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, } - destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds) - existingLocation.deleteEcVolumeShards(vid, copiedShardIds) + destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType) + existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType) return nil @@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { +func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if selectedDataCenter != "" && selectedDataCenter != string(dc) { return } - freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) + freeEcSlots := countFreeShardSlots(dn, diskType) ecNode := &EcNode{ info: dn, dc: dc, @@ -439,17 +439,17 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter // Build disk-level information from volumes and EC shards // First, discover all unique disk IDs from VolumeInfos (includes empty disks) allDiskIds := make(map[uint32]string) // diskId -> diskType - for diskType, diskInfo := range dn.DiskInfos { + for diskTypeKey, diskInfo := range dn.DiskInfos { if diskInfo == nil { continue } // Get all disk IDs from volumes for _, vi := range diskInfo.VolumeInfos { - allDiskIds[vi.DiskId] = diskType + allDiskIds[vi.DiskId] = diskTypeKey } // Also get disk IDs from EC shards for _, ecShardInfo := range diskInfo.EcShardInfos { - allDiskIds[ecShardInfo.DiskId] = diskType + allDiskIds[ecShardInfo.DiskId] = diskTypeKey } } @@ -476,7 +476,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter } freePerDisk := int(freeEcSlots) / diskCount - for diskId, diskType := range allDiskIds { + for diskId, diskTypeStr := range allDiskIds { shards := diskShards[diskId] if shards == nil { shards = make(map[needle.VolumeId]erasure_coding.ShardBits) @@ -488,7 +488,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter ecNode.disks[diskId] = &EcDisk{ diskId: diskId, - diskType: diskType, + diskType: diskTypeStr, freeEcSlots: freePerDisk, ecShardCount: totalShardCount, ecShards: shards, @@ -551,9 +551,9 @@ func ceilDivide(a, b int) int { return (a / b) + r } -func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { +func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits { - if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { return erasure_coding.ShardBits(shardInfo.EcIndexBits) @@ -564,10 +564,10 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar return 0 } -func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode { +func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode { foundVolume := false - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(diskType)] if found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { @@ -584,9 +584,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, } } else { diskInfo = &master_pb.DiskInfo{ - Type: string(types.HardDriveType), + Type: string(diskType), } - ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo + ecNode.info.DiskInfos[string(diskType)] = diskInfo } if !foundVolume { @@ -598,7 +598,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, Id: uint32(vid), Collection: collection, EcIndexBits: uint32(newShardBits), - DiskType: string(types.HardDriveType), + DiskType: string(diskType), }) ecNode.freeEcSlot -= len(shardIds) } @@ -606,9 +606,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, return ecNode } -func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode { +func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode { - if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) @@ -649,6 +649,7 @@ type ecBalancer struct { replicaPlacement *super_block.ReplicaPlacement applyBalancing bool maxParallelization int + diskType types.DiskType // target disk type for EC shards (default: HardDriveType) } func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { @@ -705,7 +706,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum // Use MaxShardCount (32) to support custom EC ratios shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount) for _, ecNode := range locations { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) for _, shardId := range shardBits.ShardIds() { shardToLocations[shardId] = append(shardToLocations[shardId], ecNode) } @@ -728,7 +729,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } - ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) + ecNode.deleteEcVolumeShards(vid, duplicatedShardIds, ecb.diskType) } } return nil @@ -748,9 +749,9 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { return ewg.Wait() } -func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { +func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int { return groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, diskType) return string(ecNode.rack), shardBits.ShardIdCount() }) } @@ -759,7 +760,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl racks := ecb.racks() // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := countShardsByRack(vid, locations) + rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) // Calculate actual total shards for this volume (not hardcoded default) var totalShardsForVolume int @@ -779,7 +780,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl continue } possibleEcNodes := rackEcNodesWithVid[rackId] - for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { + for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) { ecShardsToMove[shardId] = ecNode } } @@ -856,7 +857,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { for vid, locations := range vidLocations { // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := countShardsByRack(vid, locations) + rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) @@ -865,7 +866,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { var possibleDestinationEcNodes []*EcNode for _, n := range racks[RackId(rackId)].ecNodes { - if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found { + if _, found := n.info.DiskInfos[string(ecb.diskType)]; found { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } } @@ -882,7 +883,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { for _, ecNode := range existingLocations { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode for _, shardId := range shardBits.ShardIds() { @@ -927,7 +928,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { } ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) { - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)] if !found { return } @@ -955,17 +956,18 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount { emptyNodeIds := make(map[uint32]bool) - if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found { + if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range emptyDiskInfo.EcShardInfos { emptyNodeIds[shards.Id] = true } } - if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found { + if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range fullDiskInfo.EcShardInfos { if _, found := emptyNodeIds[shards.Id]; !found { for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { vid := needle.VolumeId(shards.Id) - destDiskId := pickBestDiskOnNode(emptyNode, vid) + // For balancing, strictly require matching disk type + destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true) if destDiskId > 0 { fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) @@ -973,7 +975,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) } - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType) if err != nil { return err } @@ -1003,7 +1005,7 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi nodeShards := map[*EcNode]int{} for _, node := range possibleDestinations { - nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount() + nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount() } targets := []*EcNode{} @@ -1078,14 +1080,17 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int { } // pickBestDiskOnNode selects the best disk on a node for placing a new EC shard -// It prefers disks with fewer shards and more free slots -func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 { +// It prefers disks of the specified type with fewer shards and more free slots +// If strictDiskType is false, it will fall back to other disk types if no matching disk is found +func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool) uint32 { if len(ecNode.disks) == 0 { return 0 // No disk info available, let the server decide } var bestDiskId uint32 bestScore := -1 + var fallbackDiskId uint32 + fallbackScore := -1 for diskId, disk := range ecNode.disks { if disk.freeEcSlots <= 0 { @@ -1102,13 +1107,26 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 { // Lower score is better score := disk.ecShardCount*10 + existingShards*100 - if bestScore == -1 || score < bestScore { - bestScore = score - bestDiskId = diskId + if disk.diskType == string(diskType) { + // Matching disk type - this is preferred + if bestScore == -1 || score < bestScore { + bestScore = score + bestDiskId = diskId + } + } else if !strictDiskType { + // Non-matching disk type - use as fallback if allowed + if fallbackScore == -1 || score < fallbackScore { + fallbackScore = score + fallbackDiskId = diskId + } } } - return bestDiskId + // Return matching disk type if found, otherwise fallback + if bestDiskId != 0 { + return bestDiskId + } + return fallbackDiskId } // pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk @@ -1118,7 +1136,8 @@ func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, return nil, 0, err } - diskId := pickBestDiskOnNode(node, vid) + // For balancing, strictly require matching disk type + diskId := pickBestDiskOnNode(node, vid, ecb.diskType, true) return node, diskId, nil } @@ -1134,14 +1153,14 @@ func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, co } else { fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) } - return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing) + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing, ecb.diskType) } -func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { +func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, diskType types.DiskType) map[erasure_coding.ShardId]*EcNode { picked := make(map[erasure_coding.ShardId]*EcNode) var candidateEcNodes []*CandidateEcNode for _, ecNode := range ecNodes { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, diskType) if shardBits.ShardIdCount() > 0 { candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{ ecNode: ecNode, @@ -1155,13 +1174,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ for i := 0; i < n; i++ { selectedEcNodeIndex := -1 for i, candidateEcNode := range candidateEcNodes { - shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid) + shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType) if shardBits > 0 { selectedEcNodeIndex = i for _, shardId := range shardBits.ShardIds() { candidateEcNode.shardCount-- picked[shardId] = candidateEcNode.ecNode - candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}) + candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType) break } break @@ -1180,7 +1199,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode { vidLocations := make(map[needle.VolumeId][]*EcNode) for _, ecNode := range ecb.ecNodes { - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)] if !found { continue } @@ -1194,9 +1213,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) { // collect all ec nodes - allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc) + allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType) if err != nil { return err } @@ -1210,6 +1229,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, maxParallelization: maxParallelization, + diskType: diskType, } if len(collections) == 0 { diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index f1f460bc6..47bf9eea1 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) { func TestEcDistribution(t *testing.T) { // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "") + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType) sortEcNodesByFreeslotsDescending(ecNodes) @@ -149,16 +149,17 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) ecb := &ecBalancer{ ecNodes: ecNodes, replicaPlacement: rp, + diskType: types.HardDriveType, } racks := ecb.racks() - rackToShardCount := countShardsByRack(vid, ecNodes) + rackToShardCount := countShardsByRack(vid, ecNodes, types.HardDriveType) got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount) if err := errorCheck(gotErr, tc.wantErr); err != nil { @@ -225,10 +226,11 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) ecb := &ecBalancer{ - ecNodes: allEcNodes, + ecNodes: allEcNodes, + diskType: types.HardDriveType, } // Resolve target node by name diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index f1f3bf133..695641a31 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -32,13 +32,23 @@ func (c *commandEcDecode) Name() string { func (c *commandEcDecode) Help() string { return `decode a erasure coded volume into a normal volume - ec.decode [-collection=""] [-volumeId=] + ec.decode [-collection=""] [-volumeId=] [-diskType=] The -collection parameter supports regular expressions for pattern matching: - Use exact match: ec.decode -collection="^mybucket$" - Match multiple buckets: ec.decode -collection="bucket.*" - Match all collections: ec.decode -collection=".*" + Options: + -diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd) + + Examples: + # Decode EC shards from HDD (default) + ec.decode -collection=mybucket + + # Decode EC shards from SSD + ec.decode -collection=mybucket -diskType=ssd + ` } @@ -50,6 +60,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := decodeCommand.Int("volumeId", 0, "the volume id") collection := decodeCommand.String("collection", "", "the collection name") + diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)") if err = decodeCommand.Parse(args); err != nil { return nil } @@ -59,6 +70,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } vid := needle.VolumeId(*volumeId) + diskType := types.ToDiskType(*diskTypeStr) // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -68,17 +80,17 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr // volumeId is provided if vid != 0 { - return doEcDecode(commandEnv, topologyInfo, *collection, vid) + return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType) } // apply to all volumes in the collection - volumeIds, err := collectEcShardIds(topologyInfo, *collection) + volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType) if err != nil { return err } fmt.Printf("ec decode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil { return err } } @@ -86,14 +98,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { +func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } // find volume location - nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid) + nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType) fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) @@ -248,7 +260,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati return resp.VolumeIdLocations, nil } -func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string) (vids []needle.VolumeId, err error) { +func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) { // compile regex pattern for collection matching collectionRegex, err := compileCollectionPattern(collectionPattern) if err != nil { @@ -257,7 +269,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin vidMap := make(map[uint32]bool) eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { - if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := dn.DiskInfos[string(diskType)]; found { for _, v := range diskInfo.EcShardInfos { if collectionRegex.MatchString(v.Collection) { vidMap[v.Id] = true @@ -273,11 +285,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin return } -func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits { +func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits { nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { - if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := dn.DiskInfos[string(diskType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 355869767..2d62aff3f 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -37,8 +37,8 @@ func (c *commandEcEncode) Name() string { func (c *commandEcEncode) Help() string { return `apply erasure coding to a volume - ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] - ec.encode [-collection=""] [-volumeId=] [-verbose] + ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-sourceDiskType=] [-diskType=] + ec.encode [-collection=""] [-volumeId=] [-verbose] [-diskType=] This command will: 1. freeze one volume @@ -61,6 +61,18 @@ func (c *commandEcEncode) Help() string { Options: -verbose: show detailed reasons why volumes are not selected for encoding + -sourceDiskType: filter source volumes by disk type (hdd, ssd, or empty for all) + -diskType: target disk type for EC shards (hdd, ssd, or empty for default hdd) + + Examples: + # Encode SSD volumes to SSD EC shards (same tier) + ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=ssd + + # Encode SSD volumes to HDD EC shards (tier migration to cheaper storage) + ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd + + # Encode all volumes to SSD EC shards + ec.encode -collection=mybucket -diskType=ssd Re-balancing algorithm: ` + ecBalanceAlgorithmDescription @@ -80,6 +92,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") + sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)") + diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)") applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation") verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding") @@ -94,6 +108,16 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return err } + // Parse source disk type filter (optional) + var sourceDiskType *types.DiskType + if *sourceDiskTypeStr != "" { + sdt := types.ToDiskType(*sourceDiskTypeStr) + sourceDiskType = &sdt + } + + // Parse target disk type for EC shards + diskType := types.ToDiskType(*diskTypeStr) + // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { @@ -119,7 +143,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds) } else { // apply to all volumes for the given collection pattern (regex) - volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose) + volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, sourceDiskType, *fullPercentage, *quietPeriod, *verbose) if err != nil { return err } @@ -142,7 +166,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) } // ...re-balance ec shards... - if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil { + if err := EcBalance(commandEnv, balanceCollections, "", rp, diskType, *maxParallelization, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err) } // ...then delete original volumes using pre-collected locations. diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 79acebff1..cfc895c7d 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -24,6 +25,7 @@ type ecRebuilder struct { writer io.Writer applyChanges bool collections []string + diskType types.DiskType ewg *ErrorWaitGroup ecNodesMu sync.Mutex @@ -39,7 +41,7 @@ func (c *commandEcRebuild) Name() string { func (c *commandEcRebuild) Help() string { return `find and rebuild missing ec shards among volume servers - ec.rebuild [-c EACH_COLLECTION|] [-apply] [-maxParallelization N] + ec.rebuild [-c EACH_COLLECTION|] [-apply] [-maxParallelization N] [-diskType=] Options: -collection: specify a collection name, or "EACH_COLLECTION" to process all collections @@ -47,6 +49,7 @@ func (c *commandEcRebuild) Help() string { -maxParallelization: number of volumes to rebuild concurrently (default: 10) Increase for faster rebuilds with more system resources. Decrease if experiencing resource contention or instability. + -diskType: disk type for EC shards (hdd, ssd, or empty for default hdd) Algorithm: @@ -83,6 +86,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") applyChanges := fixCommand.Bool("apply", false, "apply the changes") + diskTypeStr := fixCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)") // TODO: remove this alias applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)") if err = fixCommand.Parse(args); err != nil { @@ -95,8 +99,10 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W return } + diskType := types.ToDiskType(*diskTypeStr) + // collect all ec nodes - allEcNodes, _, err := collectEcNodes(commandEnv) + allEcNodes, _, err := collectEcNodes(commandEnv, diskType) if err != nil { return err } @@ -117,6 +123,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W writer: writer, applyChanges: *applyChanges, collections: collections, + diskType: diskType, ewg: NewErrorWaitGroup(*maxParallelization), } @@ -294,7 +301,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo // ensure ECNode updates are atomic erb.ecNodesMu.Lock() defer erb.ecNodesMu.Unlock() - rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds) + rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, erb.diskType) return nil } diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index fa6697435..7d7b59f8f 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func TestCommandEcBalanceSmall(t *testing.T) { @@ -14,6 +15,7 @@ func TestCommandEcBalanceSmall(t *testing.T) { newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -30,6 +32,7 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) { addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -48,6 +51,7 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) { newEcNode("dc1", "rack1", "dn4", 100), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -66,6 +70,7 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) { newEcNode("dc1", "rack2", "dn4", 100), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -109,6 +114,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { newEcNode("dc1", "rack1", "dn3", 100), }, applyBalancing: false, + diskType: types.HardDriveType, } ecb.balanceEcVolumes("c1") @@ -128,5 +134,5 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod } func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode { - return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds) + return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType) } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index fce88d2c4..a13e8e671 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -156,21 +156,30 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { - // find this ec volume server + // Evacuate EC volumes for all disk types discovered from topology + // Disk types are free-form tags (e.g., "", "hdd", "ssd", "nvme", etc.) + // We need to handle each disk type separately because shards should be moved to nodes with the same disk type // We collect topology once at the start and track capacity changes ourselves // (via freeEcSlot decrement after each move) rather than repeatedly refreshing, // which would give a false sense of correctness since topology could be stale. - ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") - thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) - if len(thisNodes) == 0 { - return fmt.Errorf("%s is not found in this cluster\n", volumeServer) - } + diskTypes := collectVolumeDiskTypes(c.topologyInfo) - // move away ec volumes - for _, thisNode := range thisNodes { - for _, diskInfo := range thisNode.info.DiskInfos { + for _, diskType := range diskTypes { + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", diskType) + thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) + if len(thisNodes) == 0 { + // This server doesn't have EC shards for this disk type, skip + continue + } + + // move away ec volumes for this disk type + for _, thisNode := range thisNodes { + diskInfo, found := thisNode.info.DiskInfos[string(diskType)] + if !found { + continue + } for _, ecShardInfo := range diskInfo.EcShardInfos { - hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, writer) + hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, diskType, writer) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v\n", ecShardInfo.Id, volumeServer, err) } @@ -187,7 +196,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, return nil } -func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, writer io.Writer) (hasMoved bool, err error) { +func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType, writer io.Writer) (hasMoved bool, err error) { for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() { // Sort by: 1) fewest shards of this volume, 2) most free EC slots @@ -217,13 +226,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv collectionPrefix = ecShardInfo.Collection + "_" } vid := needle.VolumeId(ecShardInfo.Id) - destDiskId := pickBestDiskOnNode(emptyNode, vid) + // For evacuation, prefer same disk type but allow fallback to other types + destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false) if destDiskId > 0 { fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId) } else { fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) } - err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange) + err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, diskType) if err != nil { hasMoved = false return