diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index bb0983f06..9d7cde572 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -96,13 +96,12 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) { // This simulates the race condition where EC encoding updates master metadata // but volume location collection happens after that update - // First acquire the lock (required for EC encode) - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err = lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } + defer unlock() // Execute EC encoding - test the timing directly var encodeOutput bytes.Buffer @@ -651,6 +650,55 @@ type commandRunner interface { Do([]string, *shell.CommandEnv, io.Writer) error } +// tryLockWithTimeout attempts to acquire the shell lock with a timeout. +// Returns true if lock was acquired, false if timeout or error occurred. +// If lock was acquired, the caller must call the returned unlock function. +func tryLockWithTimeout(t *testing.T, commandEnv *shell.CommandEnv, timeout time.Duration) (locked bool, unlock func()) { + t.Helper() + + type lockResult struct { + err error + output string + } + + lockDone := make(chan lockResult, 1) + go func() { + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + lockDone <- lockResult{err: err, output: lockOutput.String()} + }() + + select { + case res := <-lockDone: + if res.err != nil { + t.Logf("lock command failed: %v, output: %s", res.err, res.output) + return false, nil + } + unlockCmd := shell.Commands[findCommandIndex("unlock")] + return true, func() { + var unlockOutput bytes.Buffer + _ = unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + } + case <-time.After(timeout): + t.Logf("timed out acquiring lock after %s", timeout) + return false, nil + } +} + +// connectToMasterAndSync connects the command environment to the master and waits for sync. +// This helper reduces code duplication across test functions. +func connectToMasterAndSync(ctx context.Context, t *testing.T, commandEnv *shell.CommandEnv) { + t.Helper() + // Connect to master - use the main test context to avoid early disconnection + go commandEnv.MasterClient.KeepConnectedToMaster(ctx) + commandEnv.MasterClient.WaitUntilConnected(ctx) + + // Wait for master client to fully sync + t.Log("Waiting for master client to sync...") + time.Sleep(5 * time.Second) +} + // captureCommandOutput executes a shell command and captures its output from both // stdout/stderr and the command's buffer. This reduces code duplication in tests. func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, commandEnv *shell.CommandEnv) (output string, err error) { @@ -661,17 +709,16 @@ func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, comman r, w, pipeErr := os.Pipe() require.NoError(t, pipeErr) - defer func() { - _ = w.Close() - os.Stdout = oldStdout - os.Stderr = oldStderr - }() - os.Stdout = w os.Stderr = w cmdErr := cmd.Do(args, commandEnv, &outBuf) + // Close write end BEFORE reading to signal EOF to the reader + _ = w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + capturedOutput, readErr := io.ReadAll(r) _ = r.Close() require.NoError(t, readErr) @@ -753,14 +800,7 @@ func TestDiskAwareECRebalancing(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - // Connect to master with longer timeout - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - // Wait for master client to fully sync - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) // Upload test data to create a volume - retry if volumes not ready var volumeId needle.VolumeId @@ -1194,14 +1234,7 @@ func TestECDiskTypeSupport(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - // Connect to master with longer timeout - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - // Wait for master client to fully sync - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) // Upload test data to create a volume - retry if volumes not ready var volumeId needle.VolumeId @@ -1228,19 +1261,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_encode_with_ssd_disktype", func(t *testing.T) { - // Get lock first - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC encoding with SSD disk type ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] @@ -1270,19 +1296,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_balance_with_ssd_disktype", func(t *testing.T) { - // Get lock first - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC balance with SSD disk type ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] @@ -1325,19 +1344,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_encode_with_source_disktype", func(t *testing.T) { - // Test that -sourceDiskType flag is accepted - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC encoding with sourceDiskType filter ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] @@ -1366,19 +1378,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_decode_with_disktype", func(t *testing.T) { - // Test that ec.decode accepts -diskType flag - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC decode with disk type ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] @@ -1424,6 +1429,7 @@ func startClusterWithDiskType(ctx context.Context, dataDir string, diskType stri "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) @@ -1569,14 +1575,7 @@ func TestECDiskTypeMixedCluster(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - // Connect to master with longer timeout - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - // Wait for master client to fully sync - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) t.Run("upload_to_ssd_and_hdd", func(t *testing.T) { // Upload to SSD @@ -1611,19 +1610,12 @@ func TestECDiskTypeMixedCluster(t *testing.T) { }) t.Run("ec_balance_targets_correct_disk_type", func(t *testing.T) { - // Get lock first - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Run ec.balance for SSD collection with -diskType=ssd var ssdOutput bytes.Buffer @@ -1669,6 +1661,7 @@ func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskC "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) @@ -1770,12 +1763,7 @@ func TestEvacuationFallbackBehavior(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) t.Run("fallback_when_same_disktype_full", func(t *testing.T) { // This test verifies that when evacuating SSD EC shards from a server, @@ -1800,18 +1788,12 @@ func TestEvacuationFallbackBehavior(t *testing.T) { time.Sleep(3 * time.Second) - // Get lock - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // EC encode the SSD volume var encodeOutput bytes.Buffer @@ -1875,12 +1857,7 @@ func TestCrossRackECPlacement(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) // Upload test data testData := []byte("Cross-rack EC placement test data - needs to be distributed across racks") @@ -1899,18 +1876,12 @@ func TestCrossRackECPlacement(t *testing.T) { time.Sleep(3 * time.Second) t.Run("ec_encode_cross_rack", func(t *testing.T) { - // Get lock - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // EC encode with rack-aware placement // Note: uploadTestDataToMaster uses collection "test" by default @@ -1955,18 +1926,12 @@ func TestCrossRackECPlacement(t *testing.T) { }) t.Run("ec_balance_respects_rack_placement", func(t *testing.T) { - // Get lock - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() initialDistribution := countShardsPerRack(testDir, uint32(volumeId)) t.Logf("Initial rack distribution: %v", initialDistribution) @@ -2014,6 +1979,7 @@ func startLimitedSsdCluster(ctx context.Context, dataDir string) (*MultiDiskClus "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) @@ -2104,6 +2070,7 @@ func startMultiRackCluster(ctx context.Context, dataDir string) (*MultiDiskClust "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))