Browse Source

test: fix master client timeout causing test hangs (#7715)

* test: fix master client timeout causing test hangs

Use the main test context for KeepConnectedToMaster instead of creating
a separate 60s context. The tests have 180s outer timeouts but the master
client was disconnecting after 60s, causing subsequent commands to hang
waiting for reconnection.

* test: add -peers=none to all test masters and timeout for lock

- Add -peers=none flag to all master servers for faster startup
- Add tryLockWithTimeout helper to avoid tests hanging on lock acquisition
- Skip tests if lock cannot be acquired within 30 seconds

* test: extract connectToMasterAndSync helper to reduce duplication

* test: fix captureCommandOutput pipe deadlock

Close write end of pipe before calling io.ReadAll to signal EOF,
otherwise ReadAll blocks forever waiting for more data.

* test: fix tryLockWithTimeout to check lock command errors

Propagate lock command error through channel and only treat as
locked if command succeeded. Previously any completion (including
errors) was treated as successful lock acquisition.
pull/7184/merge
Chris Lu 6 days ago
committed by GitHub
parent
commit
c6d6ee8297
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 249
      test/erasure_coding/ec_integration_test.go

249
test/erasure_coding/ec_integration_test.go

@ -96,13 +96,12 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) {
// This simulates the race condition where EC encoding updates master metadata
// but volume location collection happens after that update
// First acquire the lock (required for EC encode)
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err = lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
defer unlock()
// Execute EC encoding - test the timing directly
var encodeOutput bytes.Buffer
@ -651,6 +650,55 @@ type commandRunner interface {
Do([]string, *shell.CommandEnv, io.Writer) error
}
// tryLockWithTimeout attempts to acquire the shell lock with a timeout.
// Returns true if lock was acquired, false if timeout or error occurred.
// If lock was acquired, the caller must call the returned unlock function.
func tryLockWithTimeout(t *testing.T, commandEnv *shell.CommandEnv, timeout time.Duration) (locked bool, unlock func()) {
t.Helper()
type lockResult struct {
err error
output string
}
lockDone := make(chan lockResult, 1)
go func() {
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
lockDone <- lockResult{err: err, output: lockOutput.String()}
}()
select {
case res := <-lockDone:
if res.err != nil {
t.Logf("lock command failed: %v, output: %s", res.err, res.output)
return false, nil
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
return true, func() {
var unlockOutput bytes.Buffer
_ = unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
}
case <-time.After(timeout):
t.Logf("timed out acquiring lock after %s", timeout)
return false, nil
}
}
// connectToMasterAndSync connects the command environment to the master and waits for sync.
// This helper reduces code duplication across test functions.
func connectToMasterAndSync(ctx context.Context, t *testing.T, commandEnv *shell.CommandEnv) {
t.Helper()
// Connect to master - use the main test context to avoid early disconnection
go commandEnv.MasterClient.KeepConnectedToMaster(ctx)
commandEnv.MasterClient.WaitUntilConnected(ctx)
// Wait for master client to fully sync
t.Log("Waiting for master client to sync...")
time.Sleep(5 * time.Second)
}
// captureCommandOutput executes a shell command and captures its output from both
// stdout/stderr and the command's buffer. This reduces code duplication in tests.
func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, commandEnv *shell.CommandEnv) (output string, err error) {
@ -661,17 +709,16 @@ func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, comman
r, w, pipeErr := os.Pipe()
require.NoError(t, pipeErr)
defer func() {
_ = w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
}()
os.Stdout = w
os.Stderr = w
cmdErr := cmd.Do(args, commandEnv, &outBuf)
// Close write end BEFORE reading to signal EOF to the reader
_ = w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, readErr := io.ReadAll(r)
_ = r.Close()
require.NoError(t, readErr)
@ -753,14 +800,7 @@ func TestDiskAwareECRebalancing(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
// Connect to master with longer timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
// Wait for master client to fully sync
time.Sleep(5 * time.Second)
connectToMasterAndSync(ctx, t, commandEnv)
// Upload test data to create a volume - retry if volumes not ready
var volumeId needle.VolumeId
@ -1194,14 +1234,7 @@ func TestECDiskTypeSupport(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
// Connect to master with longer timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
// Wait for master client to fully sync
time.Sleep(5 * time.Second)
connectToMasterAndSync(ctx, t, commandEnv)
// Upload test data to create a volume - retry if volumes not ready
var volumeId needle.VolumeId
@ -1228,19 +1261,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_encode_with_ssd_disktype", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
// Defer unlock to ensure it's always released
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
// Execute EC encoding with SSD disk type
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
@ -1270,19 +1296,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_balance_with_ssd_disktype", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
// Defer unlock to ensure it's always released
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
// Execute EC balance with SSD disk type
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
@ -1325,19 +1344,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_encode_with_source_disktype", func(t *testing.T) {
// Test that -sourceDiskType flag is accepted
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
// Defer unlock to ensure it's always released
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
// Execute EC encoding with sourceDiskType filter
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
@ -1366,19 +1378,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_decode_with_disktype", func(t *testing.T) {
// Test that ec.decode accepts -diskType flag
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
// Defer unlock to ensure it's always released
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
// Execute EC decode with disk type
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
@ -1424,6 +1429,7 @@ func startClusterWithDiskType(ctx context.Context, dataDir string, diskType stri
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
"-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
@ -1569,14 +1575,7 @@ func TestECDiskTypeMixedCluster(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
// Connect to master with longer timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
// Wait for master client to fully sync
time.Sleep(5 * time.Second)
connectToMasterAndSync(ctx, t, commandEnv)
t.Run("upload_to_ssd_and_hdd", func(t *testing.T) {
// Upload to SSD
@ -1611,19 +1610,12 @@ func TestECDiskTypeMixedCluster(t *testing.T) {
})
t.Run("ec_balance_targets_correct_disk_type", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
// Defer unlock to ensure it's always released
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
// Run ec.balance for SSD collection with -diskType=ssd
var ssdOutput bytes.Buffer
@ -1669,6 +1661,7 @@ func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskC
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
"-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
@ -1770,12 +1763,7 @@ func TestEvacuationFallbackBehavior(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
time.Sleep(5 * time.Second)
connectToMasterAndSync(ctx, t, commandEnv)
t.Run("fallback_when_same_disktype_full", func(t *testing.T) {
// This test verifies that when evacuating SSD EC shards from a server,
@ -1800,18 +1788,12 @@ func TestEvacuationFallbackBehavior(t *testing.T) {
time.Sleep(3 * time.Second)
// Get lock
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
// EC encode the SSD volume
var encodeOutput bytes.Buffer
@ -1875,12 +1857,7 @@ func TestCrossRackECPlacement(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
time.Sleep(5 * time.Second)
connectToMasterAndSync(ctx, t, commandEnv)
// Upload test data
testData := []byte("Cross-rack EC placement test data - needs to be distributed across racks")
@ -1899,18 +1876,12 @@ func TestCrossRackECPlacement(t *testing.T) {
time.Sleep(3 * time.Second)
t.Run("ec_encode_cross_rack", func(t *testing.T) {
// Get lock
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
// EC encode with rack-aware placement
// Note: uploadTestDataToMaster uses collection "test" by default
@ -1955,18 +1926,12 @@ func TestCrossRackECPlacement(t *testing.T) {
})
t.Run("ec_balance_respects_rack_placement", func(t *testing.T) {
// Get lock
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
return
// Try to get lock with timeout to avoid hanging
locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
if !locked {
t.Skip("Could not acquire lock within timeout - master may not be ready")
}
unlockCmd := shell.Commands[findCommandIndex("unlock")]
var unlockOutput bytes.Buffer
defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
defer unlock()
initialDistribution := countShardsPerRack(testDir, uint32(volumeId))
t.Logf("Initial rack distribution: %v", initialDistribution)
@ -2014,6 +1979,7 @@ func startLimitedSsdCluster(ctx context.Context, dataDir string) (*MultiDiskClus
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
"-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
@ -2104,6 +2070,7 @@ func startMultiRackCluster(ctx context.Context, dataDir string) (*MultiDiskClust
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
"-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))

Loading…
Cancel
Save