Browse Source

test: improve EC integration tests with proper assertions

- Add assertNoFlagError helper to detect flag parsing regressions
- Update diskType subtests to fail on flag errors (ec.encode, ec.balance, ec.decode)
- Update verify_disktype_flag_parsing to check help output contains diskType
- Remove verify_fallback_disk_selection (was documentation-only, not executable)
- Add assertion to verify_cross_rack_distribution for minimum 2 racks
- Consolidate uploadTestDataWithDiskType to accept collection parameter
- Remove duplicate uploadTestDataWithDiskTypeMixed function
pull/7607/head
chrislusf 2 months ago
parent
commit
489dddf84e
  1. 156
      test/erasure_coding/ec_integration_test.go

156
test/erasure_coding/ec_integration_test.go

@ -622,6 +622,29 @@ func contains(s, substr string) bool {
return false
}
// assertNoFlagError checks that the error and output don't indicate a flag parsing error.
// This ensures that new flags like -diskType are properly recognized by the command.
func assertNoFlagError(t *testing.T, err error, output string, context string) {
t.Helper()
// Check for common flag parsing error patterns
flagErrorPatterns := []string{
"flag provided but not defined",
"unknown flag",
"invalid flag",
"bad flag syntax",
}
for _, pattern := range flagErrorPatterns {
if contains(output, pattern) {
t.Fatalf("%s: flag parsing error detected in output: %s", context, pattern)
}
if err != nil && contains(err.Error(), pattern) {
t.Fatalf("%s: flag parsing error in error: %v", context, err)
}
}
}
// TestECEncodingRegressionPrevention tests that the specific bug patterns don't reoccur
func TestECEncodingRegressionPrevention(t *testing.T) {
t.Run("function_signature_regression", func(t *testing.T) {
@ -1144,7 +1167,7 @@ func TestECDiskTypeSupport(t *testing.T) {
var volumeId needle.VolumeId
testData := []byte("Disk type EC test data - testing SSD support for EC encoding and balancing")
for retry := 0; retry < 5; retry++ {
volumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9335", "ssd")
volumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9335", "ssd", "ssd_test")
if err == nil {
break
}
@ -1202,13 +1225,15 @@ func TestECDiskTypeSupport(t *testing.T) {
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
t.Logf("EC encode command output: %s", outputStr)
t.Logf("EC encode command output: %s", string(capturedOutput))
t.Logf("EC encode buffer output: %s", output.String())
// Fail on flag parsing errors - these indicate the -diskType flag is not recognized
assertNoFlagError(t, encodeErr, outputStr, "ec.encode -diskType")
if encodeErr != nil {
t.Logf("EC encoding with SSD disk type failed: %v", encodeErr)
// The command may fail if volume is too small, but we can check the argument parsing worked
t.Logf("EC encoding with SSD disk type failed: %v (expected if volume too small)", encodeErr)
}
})
@ -1248,9 +1273,12 @@ func TestECDiskTypeSupport(t *testing.T) {
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
t.Logf("EC balance command output: %s", outputStr)
t.Logf("EC balance command output: %s", string(capturedOutput))
t.Logf("EC balance buffer output: %s", output.String())
// Fail on flag parsing errors
assertNoFlagError(t, balanceErr, outputStr, "ec.balance -diskType")
if balanceErr != nil {
t.Logf("EC balance with SSD disk type result: %v", balanceErr)
@ -1258,17 +1286,26 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("verify_disktype_flag_parsing", func(t *testing.T) {
// Test that disk type flags are correctly parsed
// This ensures the command accepts the -diskType flag without errors
// Test that disk type flags are documented in help output
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
// Test help output contains diskType
assert.NotNil(t, ecEncodeCmd, "ec.encode command should exist")
assert.NotNil(t, ecBalanceCmd, "ec.balance command should exist")
assert.NotNil(t, ecDecodeCmd, "ec.decode command should exist")
t.Log("ec.encode, ec.balance, and ec.decode commands all support -diskType flag")
require.NotNil(t, ecEncodeCmd, "ec.encode command should exist")
require.NotNil(t, ecBalanceCmd, "ec.balance command should exist")
require.NotNil(t, ecDecodeCmd, "ec.decode command should exist")
// Verify help text mentions diskType flag
encodeHelp := ecEncodeCmd.Help()
assert.Contains(t, encodeHelp, "diskType", "ec.encode help should mention -diskType flag")
balanceHelp := ecBalanceCmd.Help()
assert.Contains(t, balanceHelp, "diskType", "ec.balance help should mention -diskType flag")
decodeHelp := ecDecodeCmd.Help()
assert.Contains(t, decodeHelp, "diskType", "ec.decode help should mention -diskType flag")
t.Log("All EC commands have -diskType flag documented in help")
})
t.Run("ec_encode_with_source_disktype", func(t *testing.T) {
@ -1292,7 +1329,7 @@ func TestECDiskTypeSupport(t *testing.T) {
args := []string{
"-collection", "ssd_test",
"-sourceDiskType", "ssd", // Filter source volumes by SSD
"-diskType", "ssd", // Place EC shards on SSD
"-diskType", "ssd", // Place EC shards on SSD
"-force",
}
@ -1309,9 +1346,13 @@ func TestECDiskTypeSupport(t *testing.T) {
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
t.Logf("EC encode with sourceDiskType output: %s", outputStr)
// Fail on flag parsing errors
assertNoFlagError(t, encodeErr, outputStr, "ec.encode -sourceDiskType")
t.Logf("EC encode with sourceDiskType output: %s", string(capturedOutput))
// The command should accept the flag even if no volumes match
if encodeErr != nil {
t.Logf("EC encoding with sourceDiskType: %v (expected if no matching volumes)", encodeErr)
}
@ -1353,9 +1394,13 @@ func TestECDiskTypeSupport(t *testing.T) {
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
t.Logf("EC decode with diskType output: %s", outputStr)
// Fail on flag parsing errors
assertNoFlagError(t, decodeErr, outputStr, "ec.decode -diskType")
t.Logf("EC decode with diskType output: %s", string(capturedOutput))
// The command should accept the flag
if decodeErr != nil {
t.Logf("EC decode with diskType: %v (expected if no EC volumes)", decodeErr)
}
@ -1450,12 +1495,12 @@ func startClusterWithDiskType(ctx context.Context, dataDir string, diskType stri
}
// uploadTestDataWithDiskType uploads test data with a specific disk type
func uploadTestDataWithDiskType(data []byte, masterAddress string, diskType string) (needle.VolumeId, error) {
func uploadTestDataWithDiskType(data []byte, masterAddress string, diskType string, collection string) (needle.VolumeId, error) {
assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
return pb.ServerAddress(masterAddress)
}, grpc.WithInsecure(), &operation.VolumeAssignRequest{
Count: 1,
Collection: "ssd_test",
Collection: collection,
Replication: "000",
DiskType: diskType,
})
@ -1540,7 +1585,7 @@ func TestECDiskTypeMixedCluster(t *testing.T) {
ssdData := []byte("SSD disk type test data for EC encoding")
var ssdVolumeId needle.VolumeId
for retry := 0; retry < 5; retry++ {
ssdVolumeId, err = uploadTestDataWithDiskTypeMixed(ssdData, "127.0.0.1:9336", "ssd", "ssd_collection")
ssdVolumeId, err = uploadTestDataWithDiskType(ssdData, "127.0.0.1:9336", "ssd", "ssd_collection")
if err == nil {
break
}
@ -1557,7 +1602,7 @@ func TestECDiskTypeMixedCluster(t *testing.T) {
hddData := []byte("HDD disk type test data for EC encoding")
var hddVolumeId needle.VolumeId
for retry := 0; retry < 5; retry++ {
hddVolumeId, err = uploadTestDataWithDiskTypeMixed(hddData, "127.0.0.1:9336", "hdd", "hdd_collection")
hddVolumeId, err = uploadTestDataWithDiskType(hddData, "127.0.0.1:9336", "hdd", "hdd_collection")
if err == nil {
break
}
@ -1694,46 +1739,6 @@ func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskC
return cluster, nil
}
// uploadTestDataWithDiskTypeMixed uploads test data with disk type and collection
func uploadTestDataWithDiskTypeMixed(data []byte, masterAddress string, diskType string, collection string) (needle.VolumeId, error) {
assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
return pb.ServerAddress(masterAddress)
}, grpc.WithInsecure(), &operation.VolumeAssignRequest{
Count: 1,
Collection: collection,
Replication: "000",
DiskType: diskType,
})
if err != nil {
return 0, err
}
uploader, err := operation.NewUploader()
if err != nil {
return 0, err
}
uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{
UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid,
Filename: "testfile.txt",
MimeType: "text/plain",
})
if err != nil {
return 0, err
}
if uploadResult.Error != "" {
return 0, fmt.Errorf("upload error: %s", uploadResult.Error)
}
fid, err := needle.ParseFileIdFromString(assignResult.Fid)
if err != nil {
return 0, err
}
return fid.VolumeId, nil
}
// TestEvacuationFallbackBehavior tests that when a disk type has limited capacity,
// shards fall back to other disk types during evacuation
func TestEvacuationFallbackBehavior(t *testing.T) {
@ -1784,7 +1789,7 @@ func TestEvacuationFallbackBehavior(t *testing.T) {
testData := []byte("Evacuation fallback test data for SSD volume")
var ssdVolumeId needle.VolumeId
for retry := 0; retry < 5; retry++ {
ssdVolumeId, err = uploadTestDataWithDiskTypeMixed(testData, "127.0.0.1:9337", "ssd", "fallback_test")
ssdVolumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9337", "ssd", "fallback_test")
if err == nil {
break
}
@ -1834,16 +1839,10 @@ func TestEvacuationFallbackBehavior(t *testing.T) {
t.Log("When strictDiskType=false: prefers same disk type, falls back to other types if needed")
})
t.Run("verify_fallback_disk_selection", func(t *testing.T) {
// Test the disk selection logic directly
// pickBestDiskOnNode with strictDiskType=false should:
// 1. First try to find a disk of matching type
// 2. If none available, fall back to any disk with free slots
t.Log("pickBestDiskOnNode behavior:")
t.Log(" - strictDiskType=true (balancing): Only matching disk types")
t.Log(" - strictDiskType=false (evacuation): Prefer matching, fallback allowed")
})
// Note: The fallback behavior is implemented in pickBestDiskOnNode:
// - strictDiskType=true (balancing): Only matching disk types
// - strictDiskType=false (evacuation): Prefer matching, fallback to other types allowed
// This is tested implicitly through the ec.encode command above which uses the fallback path
}
// TestCrossRackECPlacement tests that EC shards are distributed across different racks
@ -1953,12 +1952,9 @@ func TestCrossRackECPlacement(t *testing.T) {
}
t.Logf("Summary: %d total shards across %d racks", totalShards, racksWithShards)
// For 10+4 EC, we want shards distributed across multiple racks
// Ideally at least 2 racks should have shards
if racksWithShards >= 2 {
t.Logf("GOOD: Shards distributed across %d racks", racksWithShards)
} else {
t.Logf("WARNING: Shards only on %d rack(s) - may lack rack-level redundancy", racksWithShards)
// For 10+4 EC, shards should be distributed across at least 2 racks
if totalShards > 0 {
assert.GreaterOrEqual(t, racksWithShards, 2, "EC shards should span at least 2 racks for fault tolerance")
}
})

Loading…
Cancel
Save