Browse Source

fix flanky concurrency tests

pull/7160/head
chrislu 1 month ago
parent
commit
65682eb853
  1. 193
      test/s3/iam/s3_iam_distributed_test.go

193
test/s3/iam/s3_iam_distributed_test.go

@ -103,15 +103,58 @@ func TestS3IAMDistributedTests(t *testing.T) {
})
t.Run("distributed_concurrent_operations", func(t *testing.T) {
// Test concurrent operations across distributed instances
// STRINGENT APPROACH: 8 total operations (4x2) - 33% more than original (6) with rigorous error detection
// Target >87.5% success rate to catch concurrency regressions while allowing minimal CI infrastructure issues
// Test concurrent operations across distributed instances with robust retry mechanisms
// This approach implements proper retry logic instead of tolerating errors to catch real concurrency issues
const numGoroutines = 4 // Optimal concurrency for CI reliability
const numOperationsPerGoroutine = 2 // Minimal operations per goroutine
const maxRetries = 3 // Maximum retry attempts for transient failures
const retryDelay = 100 * time.Millisecond
var wg sync.WaitGroup
errors := make(chan error, numGoroutines*numOperationsPerGoroutine)
// Helper function to determine if an error is retryable
isRetryableError := func(err error) bool {
if err == nil {
return false
}
errorMsg := err.Error()
return strings.Contains(errorMsg, "timeout") ||
strings.Contains(errorMsg, "connection reset") ||
strings.Contains(errorMsg, "temporary failure") ||
strings.Contains(errorMsg, "TooManyRequests") ||
strings.Contains(errorMsg, "ServiceUnavailable") ||
strings.Contains(errorMsg, "InternalError")
}
// Helper function to execute operations with retry logic
executeWithRetry := func(operation func() error, operationName string) error {
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
time.Sleep(retryDelay * time.Duration(attempt)) // Exponential backoff
}
lastErr = operation()
if lastErr == nil {
return nil // Success
}
if !isRetryableError(lastErr) {
// Non-retryable error - fail immediately
return fmt.Errorf("%s failed with non-retryable error: %w", operationName, lastErr)
}
// Retryable error - continue to next attempt
if attempt < maxRetries {
t.Logf("Retrying %s (attempt %d/%d) after error: %v", operationName, attempt+1, maxRetries, lastErr)
}
}
// All retries exhausted
return fmt.Errorf("%s failed after %d retries, last error: %w", operationName, maxRetries, lastErr)
}
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
@ -119,55 +162,70 @@ func TestS3IAMDistributedTests(t *testing.T) {
client, err := framework.CreateS3ClientWithJWT(fmt.Sprintf("user-%d", goroutineID), "TestAdminRole")
if err != nil {
errors <- err
errors <- fmt.Errorf("failed to create S3 client for goroutine %d: %w", goroutineID, err)
return
}
for j := 0; j < numOperationsPerGoroutine; j++ {
bucketName := fmt.Sprintf("test-concurrent-%d-%d", goroutineID, j)
objectKey := "test-object.txt"
objectContent := fmt.Sprintf("content-%d-%d", goroutineID, j)
// Execute full operation sequence with individual retries
operationFailed := false
// Create bucket
if err := framework.CreateBucket(client, bucketName); err != nil {
// 1. Create bucket with retry
if err := executeWithRetry(func() error {
return framework.CreateBucket(client, bucketName)
}, fmt.Sprintf("CreateBucket-%s", bucketName)); err != nil {
errors <- err
continue
operationFailed = true
}
// Moderate delay to reduce server load and improve CI stability
time.Sleep(200 * time.Millisecond)
// Put object
objectKey := "test-object.txt"
if err := framework.PutTestObject(client, bucketName, objectKey, fmt.Sprintf("content-%d-%d", goroutineID, j)); err != nil {
if !operationFailed {
// 2. Put object with retry
if err := executeWithRetry(func() error {
return framework.PutTestObject(client, bucketName, objectKey, objectContent)
}, fmt.Sprintf("PutObject-%s/%s", bucketName, objectKey)); err != nil {
errors <- err
continue
operationFailed = true
}
}
// Moderate delay to reduce server load and improve CI stability
time.Sleep(200 * time.Millisecond)
// Get object
if _, err := framework.GetTestObject(client, bucketName, objectKey); err != nil {
if !operationFailed {
// 3. Get object with retry
if err := executeWithRetry(func() error {
_, err := framework.GetTestObject(client, bucketName, objectKey)
return err
}, fmt.Sprintf("GetObject-%s/%s", bucketName, objectKey)); err != nil {
errors <- err
continue
operationFailed = true
}
}
// Moderate delay to reduce server load and improve CI stability
time.Sleep(200 * time.Millisecond)
// Delete object
if err := framework.DeleteTestObject(client, bucketName, objectKey); err != nil {
if !operationFailed {
// 4. Delete object with retry
if err := executeWithRetry(func() error {
return framework.DeleteTestObject(client, bucketName, objectKey)
}, fmt.Sprintf("DeleteObject-%s/%s", bucketName, objectKey)); err != nil {
errors <- err
continue
operationFailed = true
}
}
// Moderate delay to reduce server load and improve CI stability
time.Sleep(200 * time.Millisecond)
// Delete bucket
if _, err := client.DeleteBucket(&s3.DeleteBucketInput{
// 5. Always attempt bucket cleanup, even if previous operations failed
if err := executeWithRetry(func() error {
_, err := client.DeleteBucket(&s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
}); err != nil {
errors <- err
continue
})
return err
}, fmt.Sprintf("DeleteBucket-%s", bucketName)); err != nil {
// Only log cleanup failures, don't fail the test
t.Logf("Warning: Failed to cleanup bucket %s: %v", bucketName, err)
}
// Moderate delay to reduce server load and improve CI stability
time.Sleep(200 * time.Millisecond)
// Small delay between operation sequences to reduce server load
time.Sleep(50 * time.Millisecond)
}
}(i)
}
@ -175,74 +233,37 @@ func TestS3IAMDistributedTests(t *testing.T) {
wg.Wait()
close(errors)
// Analyze errors with categorization for better diagnostics
// Collect and analyze errors - with retry logic, we should see very few errors
var errorList []error
var transientErrors []error
var seriousErrors []error
for err := range errors {
errorList = append(errorList, err)
errorMsg := err.Error()
// Categorize errors: transient vs serious
if strings.Contains(errorMsg, "timeout") ||
strings.Contains(errorMsg, "connection reset") ||
strings.Contains(errorMsg, "temporary failure") ||
strings.Contains(errorMsg, "TooManyRequests") {
transientErrors = append(transientErrors, err)
} else {
seriousErrors = append(seriousErrors, err)
}
}
totalOperations := numGoroutines * numOperationsPerGoroutine
errorRate := float64(len(errorList)) / float64(totalOperations)
seriousErrorRate := float64(len(seriousErrors)) / float64(totalOperations)
transientErrorRate := float64(len(transientErrors)) / float64(totalOperations)
// Detailed error reporting
if len(errorList) > 0 {
// Report results
if len(errorList) == 0 {
t.Logf("🎉 All %d concurrent operations completed successfully with retry mechanisms!", totalOperations)
} else {
t.Logf("Concurrent operations summary:")
t.Logf(" Total operations: %d", totalOperations)
t.Logf(" Failed operations: %d (%.1f%% error rate)", len(errorList), errorRate*100)
t.Logf(" Serious errors: %d (%.1f%% rate)", len(seriousErrors), seriousErrorRate*100)
t.Logf(" Transient errors: %d (%.1f%% rate)", len(transientErrors), transientErrorRate*100)
t.Logf(" Failed operations: %d (%.1f%% error rate)", len(errorList), float64(len(errorList))/float64(totalOperations)*100)
if len(seriousErrors) > 0 {
t.Logf(" First serious error: %v", seriousErrors[0])
// Log first few errors for debugging
for i, err := range errorList {
if i >= 3 { // Limit to first 3 errors
t.Logf(" ... and %d more errors", len(errorList)-3)
break
}
if len(transientErrors) > 0 {
t.Logf(" First transient error: %v", transientErrors[0])
t.Logf(" Error %d: %v", i+1, err)
}
}
// STRINGENT CONCURRENCY TESTING: More rigorous thresholds to catch regressions while accounting for CI variability
// For totalOperations=8, target >87.5% success rate (≤12.5% error rate) to detect concurrency issues
// Serious errors (race conditions, deadlocks) should be very limited - allow only 1 for CI infrastructure issues
// Based on observed data: 1-3 errors due to volume allocation constraints, not actual concurrency bugs
maxSeriousErrors := 1 // Allow 1 serious error (12.5%) for CI infrastructure limitations only
if len(seriousErrors) > maxSeriousErrors {
t.Errorf("❌ %d serious error(s) detected (%.1f%%), exceeding threshold of %d. This indicates potential concurrency bugs. First error: %v",
len(seriousErrors), float64(len(seriousErrors))/float64(totalOperations)*100, maxSeriousErrors, seriousErrors[0])
}
// For total errors, use stringent thresholds to catch regressions while allowing minimal CI infrastructure issues
// Target >87.5% success rate to ensure system reliability and catch concurrency problems early
maxTotalErrorsStrict := 1 // Allow max 1 total error (12.5% rate) - excellent performance target
maxTotalErrorsRelaxed := 2 // Allow max 2 total errors (25% rate) - acceptable with infrastructure constraints
if len(errorList) > maxTotalErrorsRelaxed {
t.Errorf("❌ Too many total errors: %d (%.1f%%) - exceeds threshold of %d (%.1f%%). System may have concurrency issues.",
len(errorList), errorRate*100, maxTotalErrorsRelaxed, float64(maxTotalErrorsRelaxed)/float64(totalOperations)*100)
} else if len(errorList) > maxTotalErrorsStrict {
t.Logf("⚠️ Concurrent operations completed with %d errors (%.1f%%) - acceptable but monitor for patterns.",
len(errorList), errorRate*100)
} else if len(errorList) > 0 {
t.Logf("✅ Concurrent operations completed with %d errors (%.1f%%) - excellent performance!",
len(errorList), errorRate*100)
} else {
t.Logf("🎉 All %d concurrent operations completed successfully - perfect concurrency handling!", totalOperations)
// With proper retry mechanisms, we should expect near-zero failures
// Any remaining errors likely indicate real concurrency issues or system problems
if len(errorList) > 0 {
t.Errorf("❌ %d operation(s) failed even after retry mechanisms (%.1f%% failure rate). This indicates potential system issues or race conditions that need investigation.",
len(errorList), float64(len(errorList))/float64(totalOperations)*100)
}
})
}

Loading…
Cancel
Save