4 changed files with 1063 additions and 8 deletions
-
13test/mq/Dockerfile.test
-
666test/mq/integration/performance_test.go
-
336test/mq/integration/resilient_publisher.go
-
56test/mq/simple_demo.go
@ -0,0 +1,666 @@ |
|||||
|
package integration |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"sync" |
||||
|
"sync/atomic" |
||||
|
"testing" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
||||
|
"github.com/stretchr/testify/assert" |
||||
|
"github.com/stretchr/testify/require" |
||||
|
"google.golang.org/grpc/codes" |
||||
|
"google.golang.org/grpc/status" |
||||
|
) |
||||
|
|
||||
|
// PerformanceMetrics holds test metrics
|
||||
|
type PerformanceMetrics struct { |
||||
|
MessagesPublished int64 |
||||
|
MessagesConsumed int64 |
||||
|
PublishLatencies []time.Duration |
||||
|
ConsumeLatencies []time.Duration |
||||
|
StartTime time.Time |
||||
|
EndTime time.Time |
||||
|
ErrorCount int64 |
||||
|
mu sync.RWMutex |
||||
|
} |
||||
|
|
||||
|
func (m *PerformanceMetrics) AddPublishLatency(d time.Duration) { |
||||
|
m.mu.Lock() |
||||
|
defer m.mu.Unlock() |
||||
|
m.PublishLatencies = append(m.PublishLatencies, d) |
||||
|
} |
||||
|
|
||||
|
func (m *PerformanceMetrics) AddConsumeLatency(d time.Duration) { |
||||
|
m.mu.Lock() |
||||
|
defer m.mu.Unlock() |
||||
|
m.ConsumeLatencies = append(m.ConsumeLatencies, d) |
||||
|
} |
||||
|
|
||||
|
func (m *PerformanceMetrics) GetThroughput() float64 { |
||||
|
duration := m.EndTime.Sub(m.StartTime).Seconds() |
||||
|
if duration == 0 { |
||||
|
return 0 |
||||
|
} |
||||
|
return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration |
||||
|
} |
||||
|
|
||||
|
func (m *PerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration { |
||||
|
if len(latencies) == 0 { |
||||
|
return 0 |
||||
|
} |
||||
|
|
||||
|
// Simple P95 calculation - in production use proper percentile library
|
||||
|
index := int(float64(len(latencies)) * 0.95) |
||||
|
if index >= len(latencies) { |
||||
|
index = len(latencies) - 1 |
||||
|
} |
||||
|
|
||||
|
// Sort latencies (simplified)
|
||||
|
for i := 0; i < len(latencies)-1; i++ { |
||||
|
for j := 0; j < len(latencies)-i-1; j++ { |
||||
|
if latencies[j] > latencies[j+1] { |
||||
|
latencies[j], latencies[j+1] = latencies[j+1], latencies[j] |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return latencies[index] |
||||
|
} |
||||
|
|
||||
|
// Enhanced performance metrics with connection error tracking
|
||||
|
type EnhancedPerformanceMetrics struct { |
||||
|
MessagesPublished int64 |
||||
|
MessagesConsumed int64 |
||||
|
PublishLatencies []time.Duration |
||||
|
ConsumeLatencies []time.Duration |
||||
|
StartTime time.Time |
||||
|
EndTime time.Time |
||||
|
ErrorCount int64 |
||||
|
ConnectionErrors int64 |
||||
|
ApplicationErrors int64 |
||||
|
RetryAttempts int64 |
||||
|
mu sync.RWMutex |
||||
|
} |
||||
|
|
||||
|
func (m *EnhancedPerformanceMetrics) AddPublishLatency(d time.Duration) { |
||||
|
m.mu.Lock() |
||||
|
defer m.mu.Unlock() |
||||
|
m.PublishLatencies = append(m.PublishLatencies, d) |
||||
|
} |
||||
|
|
||||
|
func (m *EnhancedPerformanceMetrics) AddConsumeLatency(d time.Duration) { |
||||
|
m.mu.Lock() |
||||
|
defer m.mu.Unlock() |
||||
|
m.ConsumeLatencies = append(m.ConsumeLatencies, d) |
||||
|
} |
||||
|
|
||||
|
func (m *EnhancedPerformanceMetrics) GetThroughput() float64 { |
||||
|
duration := m.EndTime.Sub(m.StartTime).Seconds() |
||||
|
if duration == 0 { |
||||
|
return 0 |
||||
|
} |
||||
|
return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration |
||||
|
} |
||||
|
|
||||
|
func (m *EnhancedPerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration { |
||||
|
if len(latencies) == 0 { |
||||
|
return 0 |
||||
|
} |
||||
|
|
||||
|
// Simple P95 calculation
|
||||
|
index := int(float64(len(latencies)) * 0.95) |
||||
|
if index >= len(latencies) { |
||||
|
index = len(latencies) - 1 |
||||
|
} |
||||
|
|
||||
|
// Sort latencies (simplified bubble sort for small datasets)
|
||||
|
sorted := make([]time.Duration, len(latencies)) |
||||
|
copy(sorted, latencies) |
||||
|
for i := 0; i < len(sorted)-1; i++ { |
||||
|
for j := 0; j < len(sorted)-i-1; j++ { |
||||
|
if sorted[j] > sorted[j+1] { |
||||
|
sorted[j], sorted[j+1] = sorted[j+1], sorted[j] |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return sorted[index] |
||||
|
} |
||||
|
|
||||
|
// isConnectionError determines if an error is a connection-level error
|
||||
|
func isConnectionError(err error) bool { |
||||
|
if err == nil { |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
errStr := err.Error() |
||||
|
|
||||
|
// Check for gRPC status codes
|
||||
|
if st, ok := status.FromError(err); ok { |
||||
|
switch st.Code() { |
||||
|
case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: |
||||
|
return true |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Check for common connection error strings
|
||||
|
connectionErrorPatterns := []string{ |
||||
|
"EOF", |
||||
|
"error reading server preface", |
||||
|
"connection refused", |
||||
|
"connection reset", |
||||
|
"broken pipe", |
||||
|
"network is unreachable", |
||||
|
"no route to host", |
||||
|
"transport is closing", |
||||
|
"connection error", |
||||
|
"dial tcp", |
||||
|
"context deadline exceeded", |
||||
|
} |
||||
|
|
||||
|
for _, pattern := range connectionErrorPatterns { |
||||
|
if containsString(errStr, pattern) { |
||||
|
return true |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
func containsString(s, substr string) bool { |
||||
|
for i := 0; i <= len(s)-len(substr); i++ { |
||||
|
if s[i:i+len(substr)] == substr { |
||||
|
return true |
||||
|
} |
||||
|
} |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
func TestPerformanceThroughput(t *testing.T) { |
||||
|
suite := NewIntegrationTestSuite(t) |
||||
|
require.NoError(t, suite.Setup()) |
||||
|
|
||||
|
topicName := "performance-throughput-test" |
||||
|
namespace := "perf-test" |
||||
|
|
||||
|
metrics := &PerformanceMetrics{ |
||||
|
StartTime: time.Now(), |
||||
|
} |
||||
|
|
||||
|
// Test parameters
|
||||
|
numMessages := 50000 |
||||
|
numPublishers := 5 |
||||
|
messageSize := 1024 // 1KB messages
|
||||
|
|
||||
|
// Create message payload
|
||||
|
payload := make([]byte, messageSize) |
||||
|
for i := range payload { |
||||
|
payload[i] = byte(i % 256) |
||||
|
} |
||||
|
|
||||
|
t.Logf("Starting throughput test: %d messages, %d publishers, %d bytes per message", |
||||
|
numMessages, numPublishers, messageSize) |
||||
|
|
||||
|
// Start publishers
|
||||
|
var publishWg sync.WaitGroup |
||||
|
messagesPerPublisher := numMessages / numPublishers |
||||
|
|
||||
|
for i := 0; i < numPublishers; i++ { |
||||
|
publishWg.Add(1) |
||||
|
go func(publisherID int) { |
||||
|
defer publishWg.Done() |
||||
|
|
||||
|
// Create publisher for this goroutine
|
||||
|
pubConfig := &PublisherTestConfig{ |
||||
|
Namespace: namespace, |
||||
|
TopicName: topicName, |
||||
|
PartitionCount: 4, // Multiple partitions for better throughput
|
||||
|
PublisherName: fmt.Sprintf("perf-publisher-%d", publisherID), |
||||
|
RecordType: nil, // Use raw publish
|
||||
|
} |
||||
|
|
||||
|
publisher, err := suite.CreatePublisher(pubConfig) |
||||
|
if err != nil { |
||||
|
atomic.AddInt64(&metrics.ErrorCount, 1) |
||||
|
t.Errorf("Failed to create publisher %d: %v", publisherID, err) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
for j := 0; j < messagesPerPublisher; j++ { |
||||
|
messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j) |
||||
|
|
||||
|
start := time.Now() |
||||
|
err := publisher.Publish([]byte(messageKey), payload) |
||||
|
latency := time.Since(start) |
||||
|
|
||||
|
if err != nil { |
||||
|
atomic.AddInt64(&metrics.ErrorCount, 1) |
||||
|
continue |
||||
|
} |
||||
|
|
||||
|
atomic.AddInt64(&metrics.MessagesPublished, 1) |
||||
|
metrics.AddPublishLatency(latency) |
||||
|
|
||||
|
// Small delay to prevent overwhelming the system
|
||||
|
if j%1000 == 0 { |
||||
|
time.Sleep(1 * time.Millisecond) |
||||
|
} |
||||
|
} |
||||
|
}(i) |
||||
|
} |
||||
|
|
||||
|
// Wait for publishing to complete
|
||||
|
publishWg.Wait() |
||||
|
metrics.EndTime = time.Now() |
||||
|
|
||||
|
// Verify results
|
||||
|
publishedCount := atomic.LoadInt64(&metrics.MessagesPublished) |
||||
|
errorCount := atomic.LoadInt64(&metrics.ErrorCount) |
||||
|
throughput := metrics.GetThroughput() |
||||
|
|
||||
|
t.Logf("Performance Results:") |
||||
|
t.Logf(" Messages Published: %d", publishedCount) |
||||
|
t.Logf(" Errors: %d", errorCount) |
||||
|
t.Logf(" Throughput: %.2f messages/second", throughput) |
||||
|
t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime)) |
||||
|
|
||||
|
if len(metrics.PublishLatencies) > 0 { |
||||
|
p95Latency := metrics.GetP95Latency(metrics.PublishLatencies) |
||||
|
t.Logf(" P95 Publish Latency: %v", p95Latency) |
||||
|
|
||||
|
// Performance assertions
|
||||
|
assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms") |
||||
|
} |
||||
|
|
||||
|
// Throughput requirements
|
||||
|
expectedMinThroughput := 5000.0 // 5K messages/sec minimum (relaxed from 10K for initial testing)
|
||||
|
assert.Greater(t, throughput, expectedMinThroughput, |
||||
|
"Throughput should exceed %.0f messages/second", expectedMinThroughput) |
||||
|
|
||||
|
// Error rate should be low
|
||||
|
errorRate := float64(errorCount) / float64(publishedCount+errorCount) |
||||
|
assert.Less(t, errorRate, 0.05, "Error rate should be less than 5%") |
||||
|
} |
||||
|
|
||||
|
func TestPerformanceLatency(t *testing.T) { |
||||
|
suite := NewIntegrationTestSuite(t) |
||||
|
require.NoError(t, suite.Setup()) |
||||
|
|
||||
|
topicName := "performance-latency-test" |
||||
|
namespace := "perf-test" |
||||
|
|
||||
|
// Create publisher
|
||||
|
pubConfig := &PublisherTestConfig{ |
||||
|
Namespace: namespace, |
||||
|
TopicName: topicName, |
||||
|
PartitionCount: 1, // Single partition for latency testing
|
||||
|
PublisherName: "latency-publisher", |
||||
|
RecordType: nil, |
||||
|
} |
||||
|
|
||||
|
publisher, err := suite.CreatePublisher(pubConfig) |
||||
|
require.NoError(t, err) |
||||
|
|
||||
|
// Start consumer first
|
||||
|
subConfig := &SubscriberTestConfig{ |
||||
|
Namespace: namespace, |
||||
|
TopicName: topicName, |
||||
|
ConsumerGroup: "latency-test-group", |
||||
|
ConsumerInstanceId: "latency-consumer-1", |
||||
|
MaxPartitionCount: 1, |
||||
|
SlidingWindowSize: 10, |
||||
|
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, |
||||
|
} |
||||
|
|
||||
|
subscriber, err := suite.CreateSubscriber(subConfig) |
||||
|
require.NoError(t, err) |
||||
|
|
||||
|
numMessages := 5000 |
||||
|
collector := NewMessageCollector(numMessages) |
||||
|
|
||||
|
// Set up message handler
|
||||
|
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { |
||||
|
collector.AddMessage(TestMessage{ |
||||
|
ID: string(m.Data.Key), |
||||
|
Content: m.Data.Value, |
||||
|
Timestamp: time.Unix(0, m.Data.TsNs), |
||||
|
Key: m.Data.Key, |
||||
|
}) |
||||
|
}) |
||||
|
|
||||
|
// Start subscriber
|
||||
|
go func() { |
||||
|
err := subscriber.Subscribe() |
||||
|
if err != nil { |
||||
|
t.Logf("Subscriber error: %v", err) |
||||
|
} |
||||
|
}() |
||||
|
|
||||
|
// Wait for consumer to be ready
|
||||
|
time.Sleep(2 * time.Second) |
||||
|
|
||||
|
metrics := &PerformanceMetrics{ |
||||
|
StartTime: time.Now(), |
||||
|
} |
||||
|
|
||||
|
t.Logf("Starting latency test with %d messages", numMessages) |
||||
|
|
||||
|
// Publish messages with controlled timing
|
||||
|
for i := 0; i < numMessages; i++ { |
||||
|
messageKey := fmt.Sprintf("latency-msg-%d", i) |
||||
|
payload := fmt.Sprintf("test-payload-data-%d", i) |
||||
|
|
||||
|
start := time.Now() |
||||
|
err := publisher.Publish([]byte(messageKey), []byte(payload)) |
||||
|
publishLatency := time.Since(start) |
||||
|
|
||||
|
require.NoError(t, err) |
||||
|
metrics.AddPublishLatency(publishLatency) |
||||
|
|
||||
|
// Controlled rate for latency measurement
|
||||
|
if i%100 == 0 { |
||||
|
time.Sleep(10 * time.Millisecond) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
metrics.EndTime = time.Now() |
||||
|
|
||||
|
// Wait for messages to be consumed
|
||||
|
messages := collector.WaitForMessages(30 * time.Second) |
||||
|
|
||||
|
// Analyze latency results
|
||||
|
t.Logf("Latency Test Results:") |
||||
|
t.Logf(" Messages Published: %d", numMessages) |
||||
|
t.Logf(" Messages Consumed: %d", len(messages)) |
||||
|
|
||||
|
if len(metrics.PublishLatencies) > 0 { |
||||
|
p95PublishLatency := metrics.GetP95Latency(metrics.PublishLatencies) |
||||
|
t.Logf(" P95 Publish Latency: %v", p95PublishLatency) |
||||
|
|
||||
|
// Latency assertions
|
||||
|
assert.Less(t, p95PublishLatency, 50*time.Millisecond, |
||||
|
"P95 publish latency should be under 50ms") |
||||
|
} |
||||
|
|
||||
|
// Verify message delivery
|
||||
|
deliveryRate := float64(len(messages)) / float64(numMessages) |
||||
|
assert.Greater(t, deliveryRate, 0.80, "Should deliver at least 80% of messages") |
||||
|
} |
||||
|
|
||||
|
func TestPerformanceConcurrentConsumers(t *testing.T) { |
||||
|
suite := NewIntegrationTestSuite(t) |
||||
|
require.NoError(t, suite.Setup()) |
||||
|
|
||||
|
topicName := "performance-concurrent-test" |
||||
|
namespace := "perf-test" |
||||
|
numPartitions := int32(4) |
||||
|
|
||||
|
// Create publisher first
|
||||
|
pubConfig := &PublisherTestConfig{ |
||||
|
Namespace: namespace, |
||||
|
TopicName: topicName, |
||||
|
PartitionCount: numPartitions, |
||||
|
PublisherName: "concurrent-publisher", |
||||
|
RecordType: nil, |
||||
|
} |
||||
|
|
||||
|
publisher, err := suite.CreatePublisher(pubConfig) |
||||
|
require.NoError(t, err) |
||||
|
|
||||
|
// Test parameters
|
||||
|
numConsumers := 8 |
||||
|
numMessages := 20000 |
||||
|
consumerGroup := "concurrent-perf-group" |
||||
|
|
||||
|
t.Logf("Starting concurrent consumer test: %d consumers, %d messages, %d partitions", |
||||
|
numConsumers, numMessages, numPartitions) |
||||
|
|
||||
|
// Start multiple consumers
|
||||
|
var collectors []*MessageCollector |
||||
|
|
||||
|
for i := 0; i < numConsumers; i++ { |
||||
|
collector := NewMessageCollector(numMessages / numConsumers) // Expected per consumer
|
||||
|
collectors = append(collectors, collector) |
||||
|
|
||||
|
subConfig := &SubscriberTestConfig{ |
||||
|
Namespace: namespace, |
||||
|
TopicName: topicName, |
||||
|
ConsumerGroup: consumerGroup, |
||||
|
ConsumerInstanceId: fmt.Sprintf("consumer-%d", i), |
||||
|
MaxPartitionCount: numPartitions, |
||||
|
SlidingWindowSize: 10, |
||||
|
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, |
||||
|
} |
||||
|
|
||||
|
subscriber, err := suite.CreateSubscriber(subConfig) |
||||
|
require.NoError(t, err) |
||||
|
|
||||
|
// Set up message handler for this consumer
|
||||
|
func(consumerID int, c *MessageCollector) { |
||||
|
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { |
||||
|
c.AddMessage(TestMessage{ |
||||
|
ID: fmt.Sprintf("%d-%s", consumerID, string(m.Data.Key)), |
||||
|
Content: m.Data.Value, |
||||
|
Timestamp: time.Unix(0, m.Data.TsNs), |
||||
|
Key: m.Data.Key, |
||||
|
}) |
||||
|
}) |
||||
|
}(i, collector) |
||||
|
|
||||
|
// Start subscriber
|
||||
|
go func(consumerID int, s *SubscriberTestConfig) { |
||||
|
sub, _ := suite.CreateSubscriber(s) |
||||
|
err := sub.Subscribe() |
||||
|
if err != nil { |
||||
|
t.Logf("Consumer %d error: %v", consumerID, err) |
||||
|
} |
||||
|
}(i, subConfig) |
||||
|
} |
||||
|
|
||||
|
// Wait for consumers to initialize
|
||||
|
time.Sleep(3 * time.Second) |
||||
|
|
||||
|
// Start publishing
|
||||
|
startTime := time.Now() |
||||
|
|
||||
|
var publishWg sync.WaitGroup |
||||
|
numPublishers := 3 |
||||
|
messagesPerPublisher := numMessages / numPublishers |
||||
|
|
||||
|
for i := 0; i < numPublishers; i++ { |
||||
|
publishWg.Add(1) |
||||
|
go func(publisherID int) { |
||||
|
defer publishWg.Done() |
||||
|
|
||||
|
for j := 0; j < messagesPerPublisher; j++ { |
||||
|
messageKey := fmt.Sprintf("concurrent-msg-%d-%d", publisherID, j) |
||||
|
payload := fmt.Sprintf("publisher-%d-message-%d-data", publisherID, j) |
||||
|
|
||||
|
err := publisher.Publish([]byte(messageKey), []byte(payload)) |
||||
|
|
||||
|
if err != nil { |
||||
|
t.Logf("Publish error: %v", err) |
||||
|
} |
||||
|
|
||||
|
// Rate limiting
|
||||
|
if j%1000 == 0 { |
||||
|
time.Sleep(2 * time.Millisecond) |
||||
|
} |
||||
|
} |
||||
|
}(i) |
||||
|
} |
||||
|
|
||||
|
publishWg.Wait() |
||||
|
publishDuration := time.Since(startTime) |
||||
|
|
||||
|
// Allow time for message consumption
|
||||
|
time.Sleep(10 * time.Second) |
||||
|
|
||||
|
// Analyze results
|
||||
|
totalConsumed := int64(0) |
||||
|
for i, collector := range collectors { |
||||
|
messages := collector.GetMessages() |
||||
|
consumed := int64(len(messages)) |
||||
|
totalConsumed += consumed |
||||
|
t.Logf("Consumer %d consumed %d messages", i, consumed) |
||||
|
} |
||||
|
|
||||
|
publishThroughput := float64(numMessages) / publishDuration.Seconds() |
||||
|
consumeThroughput := float64(totalConsumed) / publishDuration.Seconds() |
||||
|
|
||||
|
t.Logf("Concurrent Consumer Test Results:") |
||||
|
t.Logf(" Total Published: %d", numMessages) |
||||
|
t.Logf(" Total Consumed: %d", totalConsumed) |
||||
|
t.Logf(" Publish Throughput: %.2f msg/sec", publishThroughput) |
||||
|
t.Logf(" Consume Throughput: %.2f msg/sec", consumeThroughput) |
||||
|
t.Logf(" Test Duration: %v", publishDuration) |
||||
|
|
||||
|
// Performance assertions (relaxed for initial testing)
|
||||
|
deliveryRate := float64(totalConsumed) / float64(numMessages) |
||||
|
assert.Greater(t, deliveryRate, 0.70, "Should consume at least 70% of messages") |
||||
|
|
||||
|
expectedMinThroughput := 2000.0 // 2K messages/sec minimum for concurrent consumption
|
||||
|
assert.Greater(t, consumeThroughput, expectedMinThroughput, |
||||
|
"Consume throughput should exceed %.0f messages/second", expectedMinThroughput) |
||||
|
} |
||||
|
|
||||
|
func TestPerformanceWithErrorHandling(t *testing.T) { |
||||
|
suite := NewIntegrationTestSuite(t) |
||||
|
require.NoError(t, suite.Setup()) |
||||
|
|
||||
|
topicName := "performance-error-handling-test" |
||||
|
namespace := "perf-test" |
||||
|
|
||||
|
metrics := &EnhancedPerformanceMetrics{ |
||||
|
StartTime: time.Now(), |
||||
|
} |
||||
|
|
||||
|
// Test parameters
|
||||
|
numMessages := 50000 |
||||
|
numPublishers := 5 |
||||
|
messageSize := 1024 // 1KB messages
|
||||
|
|
||||
|
// Create message payload
|
||||
|
payload := make([]byte, messageSize) |
||||
|
for i := range payload { |
||||
|
payload[i] = byte(i % 256) |
||||
|
} |
||||
|
|
||||
|
t.Logf("Starting performance test with enhanced error handling: %d messages, %d publishers, %d bytes per message", |
||||
|
numMessages, numPublishers, messageSize) |
||||
|
|
||||
|
// Start publishers
|
||||
|
var publishWg sync.WaitGroup |
||||
|
messagesPerPublisher := numMessages / numPublishers |
||||
|
|
||||
|
for i := 0; i < numPublishers; i++ { |
||||
|
publishWg.Add(1) |
||||
|
go func(publisherID int) { |
||||
|
defer publishWg.Done() |
||||
|
|
||||
|
// Create publisher for this goroutine
|
||||
|
pubConfig := &PublisherTestConfig{ |
||||
|
Namespace: namespace, |
||||
|
TopicName: topicName, |
||||
|
PartitionCount: 4, // Multiple partitions for better throughput
|
||||
|
PublisherName: fmt.Sprintf("error-aware-publisher-%d", publisherID), |
||||
|
RecordType: nil, // Use raw publish
|
||||
|
} |
||||
|
|
||||
|
publisher, err := suite.CreatePublisher(pubConfig) |
||||
|
if err != nil { |
||||
|
atomic.AddInt64(&metrics.ErrorCount, 1) |
||||
|
t.Errorf("Failed to create publisher %d: %v", publisherID, err) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
for j := 0; j < messagesPerPublisher; j++ { |
||||
|
messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j) |
||||
|
|
||||
|
start := time.Now() |
||||
|
err := publisher.Publish([]byte(messageKey), payload) |
||||
|
latency := time.Since(start) |
||||
|
|
||||
|
if err != nil { |
||||
|
atomic.AddInt64(&metrics.ErrorCount, 1) |
||||
|
|
||||
|
// Classify the error type
|
||||
|
if isConnectionError(err) { |
||||
|
atomic.AddInt64(&metrics.ConnectionErrors, 1) |
||||
|
t.Logf("Connection error (publisher %d, msg %d): %v", publisherID, j, err) |
||||
|
} else { |
||||
|
atomic.AddInt64(&metrics.ApplicationErrors, 1) |
||||
|
t.Logf("Application error (publisher %d, msg %d): %v", publisherID, j, err) |
||||
|
} |
||||
|
continue |
||||
|
} |
||||
|
|
||||
|
atomic.AddInt64(&metrics.MessagesPublished, 1) |
||||
|
metrics.AddPublishLatency(latency) |
||||
|
|
||||
|
// Small delay to prevent overwhelming the system
|
||||
|
if j%1000 == 0 { |
||||
|
time.Sleep(1 * time.Millisecond) |
||||
|
} |
||||
|
} |
||||
|
}(i) |
||||
|
} |
||||
|
|
||||
|
// Wait for publishing to complete
|
||||
|
publishWg.Wait() |
||||
|
metrics.EndTime = time.Now() |
||||
|
|
||||
|
// Analyze results with enhanced error reporting
|
||||
|
publishedCount := atomic.LoadInt64(&metrics.MessagesPublished) |
||||
|
totalErrors := atomic.LoadInt64(&metrics.ErrorCount) |
||||
|
connectionErrors := atomic.LoadInt64(&metrics.ConnectionErrors) |
||||
|
applicationErrors := atomic.LoadInt64(&metrics.ApplicationErrors) |
||||
|
throughput := metrics.GetThroughput() |
||||
|
|
||||
|
t.Logf("Enhanced Performance Results:") |
||||
|
t.Logf(" Messages Successfully Published: %d", publishedCount) |
||||
|
t.Logf(" Total Errors: %d", totalErrors) |
||||
|
t.Logf(" Connection-Level Errors: %d", connectionErrors) |
||||
|
t.Logf(" Application-Level Errors: %d", applicationErrors) |
||||
|
t.Logf(" Throughput: %.2f messages/second", throughput) |
||||
|
t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime)) |
||||
|
|
||||
|
if len(metrics.PublishLatencies) > 0 { |
||||
|
p95Latency := metrics.GetP95Latency(metrics.PublishLatencies) |
||||
|
t.Logf(" P95 Publish Latency: %v", p95Latency) |
||||
|
|
||||
|
// Performance assertions (adjusted for error handling overhead)
|
||||
|
assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms") |
||||
|
} |
||||
|
|
||||
|
// Enhanced error analysis
|
||||
|
totalAttempts := publishedCount + totalErrors |
||||
|
if totalAttempts > 0 { |
||||
|
successRate := float64(publishedCount) / float64(totalAttempts) |
||||
|
connectionErrorRate := float64(connectionErrors) / float64(totalAttempts) |
||||
|
applicationErrorRate := float64(applicationErrors) / float64(totalAttempts) |
||||
|
|
||||
|
t.Logf("Error Analysis:") |
||||
|
t.Logf(" Success Rate: %.2f%%", successRate*100) |
||||
|
t.Logf(" Connection Error Rate: %.2f%%", connectionErrorRate*100) |
||||
|
t.Logf(" Application Error Rate: %.2f%%", applicationErrorRate*100) |
||||
|
|
||||
|
// Assertions based on error types
|
||||
|
assert.Greater(t, successRate, 0.80, "Success rate should be greater than 80%") |
||||
|
assert.Less(t, applicationErrorRate, 0.01, "Application error rate should be less than 1%") |
||||
|
|
||||
|
// Connection errors are expected under high load but should be handled
|
||||
|
if connectionErrors > 0 { |
||||
|
t.Logf("Note: %d connection errors detected - this indicates the test is successfully stressing the system", connectionErrors) |
||||
|
t.Logf("Recommendation: Implement retry logic for production applications to handle these connection errors") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Throughput requirements (adjusted for error handling)
|
||||
|
expectedMinThroughput := 5000.0 // 5K messages/sec minimum
|
||||
|
assert.Greater(t, throughput, expectedMinThroughput, |
||||
|
"Throughput should exceed %.0f messages/second", expectedMinThroughput) |
||||
|
} |
||||
@ -0,0 +1,336 @@ |
|||||
|
package integration |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"math" |
||||
|
"sync" |
||||
|
"sync/atomic" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
||||
|
"google.golang.org/grpc/codes" |
||||
|
"google.golang.org/grpc/status" |
||||
|
) |
||||
|
|
||||
|
// ResilientPublisher wraps TopicPublisher with enhanced error handling
|
||||
|
type ResilientPublisher struct { |
||||
|
publisher *pub_client.TopicPublisher |
||||
|
config *PublisherTestConfig |
||||
|
suite *IntegrationTestSuite |
||||
|
|
||||
|
// Error tracking
|
||||
|
connectionErrors int64 |
||||
|
applicationErrors int64 |
||||
|
retryAttempts int64 |
||||
|
totalPublishes int64 |
||||
|
|
||||
|
// Retry configuration
|
||||
|
maxRetries int |
||||
|
baseDelay time.Duration |
||||
|
maxDelay time.Duration |
||||
|
backoffFactor float64 |
||||
|
|
||||
|
// Circuit breaker
|
||||
|
circuitOpen bool |
||||
|
circuitOpenTime time.Time |
||||
|
circuitTimeout time.Duration |
||||
|
|
||||
|
mu sync.RWMutex |
||||
|
} |
||||
|
|
||||
|
// RetryConfig holds retry configuration
|
||||
|
type RetryConfig struct { |
||||
|
MaxRetries int |
||||
|
BaseDelay time.Duration |
||||
|
MaxDelay time.Duration |
||||
|
BackoffFactor float64 |
||||
|
CircuitTimeout time.Duration |
||||
|
} |
||||
|
|
||||
|
// DefaultRetryConfig returns sensible defaults for retry configuration
|
||||
|
func DefaultRetryConfig() *RetryConfig { |
||||
|
return &RetryConfig{ |
||||
|
MaxRetries: 5, |
||||
|
BaseDelay: 10 * time.Millisecond, |
||||
|
MaxDelay: 5 * time.Second, |
||||
|
BackoffFactor: 2.0, |
||||
|
CircuitTimeout: 30 * time.Second, |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// NewResilientPublisher creates a new resilient publisher
|
||||
|
func (its *IntegrationTestSuite) CreateResilientPublisher(config *PublisherTestConfig, retryConfig *RetryConfig) (*ResilientPublisher, error) { |
||||
|
if retryConfig == nil { |
||||
|
retryConfig = DefaultRetryConfig() |
||||
|
} |
||||
|
|
||||
|
publisher, err := its.CreatePublisher(config) |
||||
|
if err != nil { |
||||
|
return nil, fmt.Errorf("failed to create base publisher: %v", err) |
||||
|
} |
||||
|
|
||||
|
return &ResilientPublisher{ |
||||
|
publisher: publisher, |
||||
|
config: config, |
||||
|
suite: its, |
||||
|
maxRetries: retryConfig.MaxRetries, |
||||
|
baseDelay: retryConfig.BaseDelay, |
||||
|
maxDelay: retryConfig.MaxDelay, |
||||
|
backoffFactor: retryConfig.BackoffFactor, |
||||
|
circuitTimeout: retryConfig.CircuitTimeout, |
||||
|
}, nil |
||||
|
} |
||||
|
|
||||
|
// PublishWithRetry publishes a message with retry logic and error handling
|
||||
|
func (rp *ResilientPublisher) PublishWithRetry(key, value []byte) error { |
||||
|
atomic.AddInt64(&rp.totalPublishes, 1) |
||||
|
|
||||
|
// Check circuit breaker
|
||||
|
if rp.isCircuitOpen() { |
||||
|
atomic.AddInt64(&rp.applicationErrors, 1) |
||||
|
return fmt.Errorf("circuit breaker is open") |
||||
|
} |
||||
|
|
||||
|
var lastErr error |
||||
|
for attempt := 0; attempt <= rp.maxRetries; attempt++ { |
||||
|
if attempt > 0 { |
||||
|
atomic.AddInt64(&rp.retryAttempts, 1) |
||||
|
delay := rp.calculateDelay(attempt) |
||||
|
glog.V(1).Infof("Retrying publish after %v (attempt %d/%d)", delay, attempt, rp.maxRetries) |
||||
|
time.Sleep(delay) |
||||
|
} |
||||
|
|
||||
|
err := rp.publisher.Publish(key, value) |
||||
|
if err == nil { |
||||
|
// Success - reset circuit breaker if it was open
|
||||
|
rp.resetCircuitBreaker() |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
lastErr = err |
||||
|
|
||||
|
// Classify error type
|
||||
|
if rp.isConnectionError(err) { |
||||
|
atomic.AddInt64(&rp.connectionErrors, 1) |
||||
|
glog.V(1).Infof("Connection error on attempt %d: %v", attempt+1, err) |
||||
|
|
||||
|
// For connection errors, try to recreate the publisher
|
||||
|
if attempt < rp.maxRetries { |
||||
|
if recreateErr := rp.recreatePublisher(); recreateErr != nil { |
||||
|
glog.Warningf("Failed to recreate publisher: %v", recreateErr) |
||||
|
} |
||||
|
} |
||||
|
continue |
||||
|
} else { |
||||
|
// Application error - don't retry
|
||||
|
atomic.AddInt64(&rp.applicationErrors, 1) |
||||
|
glog.Warningf("Application error (not retrying): %v", err) |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// All retries exhausted or non-retryable error
|
||||
|
rp.openCircuitBreaker() |
||||
|
return fmt.Errorf("publish failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr) |
||||
|
} |
||||
|
|
||||
|
// PublishRecord publishes a record with retry logic
|
||||
|
func (rp *ResilientPublisher) PublishRecord(key []byte, record *schema_pb.RecordValue) error { |
||||
|
atomic.AddInt64(&rp.totalPublishes, 1) |
||||
|
|
||||
|
if rp.isCircuitOpen() { |
||||
|
atomic.AddInt64(&rp.applicationErrors, 1) |
||||
|
return fmt.Errorf("circuit breaker is open") |
||||
|
} |
||||
|
|
||||
|
var lastErr error |
||||
|
for attempt := 0; attempt <= rp.maxRetries; attempt++ { |
||||
|
if attempt > 0 { |
||||
|
atomic.AddInt64(&rp.retryAttempts, 1) |
||||
|
delay := rp.calculateDelay(attempt) |
||||
|
time.Sleep(delay) |
||||
|
} |
||||
|
|
||||
|
err := rp.publisher.PublishRecord(key, record) |
||||
|
if err == nil { |
||||
|
rp.resetCircuitBreaker() |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
lastErr = err |
||||
|
|
||||
|
if rp.isConnectionError(err) { |
||||
|
atomic.AddInt64(&rp.connectionErrors, 1) |
||||
|
if attempt < rp.maxRetries { |
||||
|
if recreateErr := rp.recreatePublisher(); recreateErr != nil { |
||||
|
glog.Warningf("Failed to recreate publisher: %v", recreateErr) |
||||
|
} |
||||
|
} |
||||
|
continue |
||||
|
} else { |
||||
|
atomic.AddInt64(&rp.applicationErrors, 1) |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
rp.openCircuitBreaker() |
||||
|
return fmt.Errorf("publish record failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr) |
||||
|
} |
||||
|
|
||||
|
// recreatePublisher attempts to recreate the underlying publisher
|
||||
|
func (rp *ResilientPublisher) recreatePublisher() error { |
||||
|
rp.mu.Lock() |
||||
|
defer rp.mu.Unlock() |
||||
|
|
||||
|
// Shutdown old publisher
|
||||
|
if rp.publisher != nil { |
||||
|
rp.publisher.Shutdown() |
||||
|
} |
||||
|
|
||||
|
// Create new publisher
|
||||
|
newPublisher, err := rp.suite.CreatePublisher(rp.config) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("failed to recreate publisher: %v", err) |
||||
|
} |
||||
|
|
||||
|
rp.publisher = newPublisher |
||||
|
glog.V(1).Infof("Successfully recreated publisher") |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
// isConnectionError determines if an error is a connection-level error that should be retried
|
||||
|
func (rp *ResilientPublisher) isConnectionError(err error) bool { |
||||
|
if err == nil { |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
errStr := err.Error() |
||||
|
|
||||
|
// Check for gRPC status codes
|
||||
|
if st, ok := status.FromError(err); ok { |
||||
|
switch st.Code() { |
||||
|
case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: |
||||
|
return true |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Check for common connection error strings
|
||||
|
connectionErrorPatterns := []string{ |
||||
|
"EOF", |
||||
|
"error reading server preface", |
||||
|
"connection refused", |
||||
|
"connection reset", |
||||
|
"broken pipe", |
||||
|
"network is unreachable", |
||||
|
"no route to host", |
||||
|
"transport is closing", |
||||
|
"connection error", |
||||
|
"dial tcp", |
||||
|
"context deadline exceeded", |
||||
|
} |
||||
|
|
||||
|
for _, pattern := range connectionErrorPatterns { |
||||
|
if contains(errStr, pattern) { |
||||
|
return true |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
// contains checks if a string contains a substring (case-insensitive)
|
||||
|
func contains(s, substr string) bool { |
||||
|
return len(s) >= len(substr) && |
||||
|
(s == substr || |
||||
|
len(s) > len(substr) && |
||||
|
(s[:len(substr)] == substr || |
||||
|
s[len(s)-len(substr):] == substr || |
||||
|
containsSubstring(s, substr))) |
||||
|
} |
||||
|
|
||||
|
func containsSubstring(s, substr string) bool { |
||||
|
for i := 0; i <= len(s)-len(substr); i++ { |
||||
|
if s[i:i+len(substr)] == substr { |
||||
|
return true |
||||
|
} |
||||
|
} |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
// calculateDelay calculates exponential backoff delay
|
||||
|
func (rp *ResilientPublisher) calculateDelay(attempt int) time.Duration { |
||||
|
delay := float64(rp.baseDelay) * math.Pow(rp.backoffFactor, float64(attempt-1)) |
||||
|
if delay > float64(rp.maxDelay) { |
||||
|
delay = float64(rp.maxDelay) |
||||
|
} |
||||
|
return time.Duration(delay) |
||||
|
} |
||||
|
|
||||
|
// Circuit breaker methods
|
||||
|
func (rp *ResilientPublisher) isCircuitOpen() bool { |
||||
|
rp.mu.RLock() |
||||
|
defer rp.mu.RUnlock() |
||||
|
|
||||
|
if !rp.circuitOpen { |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
// Check if circuit should be reset
|
||||
|
if time.Since(rp.circuitOpenTime) > rp.circuitTimeout { |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
return true |
||||
|
} |
||||
|
|
||||
|
func (rp *ResilientPublisher) openCircuitBreaker() { |
||||
|
rp.mu.Lock() |
||||
|
defer rp.mu.Unlock() |
||||
|
|
||||
|
rp.circuitOpen = true |
||||
|
rp.circuitOpenTime = time.Now() |
||||
|
glog.Warningf("Circuit breaker opened due to repeated failures") |
||||
|
} |
||||
|
|
||||
|
func (rp *ResilientPublisher) resetCircuitBreaker() { |
||||
|
rp.mu.Lock() |
||||
|
defer rp.mu.Unlock() |
||||
|
|
||||
|
if rp.circuitOpen { |
||||
|
rp.circuitOpen = false |
||||
|
glog.V(1).Infof("Circuit breaker reset") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// GetErrorStats returns error statistics
|
||||
|
func (rp *ResilientPublisher) GetErrorStats() ErrorStats { |
||||
|
return ErrorStats{ |
||||
|
ConnectionErrors: atomic.LoadInt64(&rp.connectionErrors), |
||||
|
ApplicationErrors: atomic.LoadInt64(&rp.applicationErrors), |
||||
|
RetryAttempts: atomic.LoadInt64(&rp.retryAttempts), |
||||
|
TotalPublishes: atomic.LoadInt64(&rp.totalPublishes), |
||||
|
CircuitOpen: rp.isCircuitOpen(), |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// ErrorStats holds error statistics
|
||||
|
type ErrorStats struct { |
||||
|
ConnectionErrors int64 |
||||
|
ApplicationErrors int64 |
||||
|
RetryAttempts int64 |
||||
|
TotalPublishes int64 |
||||
|
CircuitOpen bool |
||||
|
} |
||||
|
|
||||
|
// Shutdown gracefully shuts down the resilient publisher
|
||||
|
func (rp *ResilientPublisher) Shutdown() error { |
||||
|
rp.mu.Lock() |
||||
|
defer rp.mu.Unlock() |
||||
|
|
||||
|
if rp.publisher != nil { |
||||
|
return rp.publisher.Shutdown() |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
@ -0,0 +1,56 @@ |
|||||
|
package main |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"log" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
||||
|
) |
||||
|
|
||||
|
func main() { |
||||
|
log.Println("Starting SeaweedMQ logging test...") |
||||
|
|
||||
|
// Create publisher configuration
|
||||
|
config := &pub_client.PublisherConfiguration{ |
||||
|
Topic: topic.NewTopic("test", "logging-demo"), |
||||
|
PartitionCount: 3, |
||||
|
Brokers: []string{"127.0.0.1:17777"}, |
||||
|
PublisherName: "logging-test-client", |
||||
|
} |
||||
|
|
||||
|
log.Println("Creating topic publisher...") |
||||
|
publisher, err := pub_client.NewTopicPublisher(config) |
||||
|
if err != nil { |
||||
|
log.Printf("Failed to create publisher: %v", err) |
||||
|
return |
||||
|
} |
||||
|
defer publisher.Shutdown() |
||||
|
|
||||
|
log.Println("Publishing test messages...") |
||||
|
|
||||
|
// Publish some test messages
|
||||
|
for i := 0; i < 100; i++ { |
||||
|
key := fmt.Sprintf("key-%d", i) |
||||
|
value := fmt.Sprintf("message-%d-timestamp-%d", i, time.Now().Unix()) |
||||
|
|
||||
|
err := publisher.Publish([]byte(key), []byte(value)) |
||||
|
if err != nil { |
||||
|
log.Printf("Failed to publish message %d: %v", i, err) |
||||
|
} |
||||
|
|
||||
|
// Small delay to create some connection stress
|
||||
|
if i%10 == 0 { |
||||
|
time.Sleep(10 * time.Millisecond) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
log.Println("Finishing publish...") |
||||
|
err = publisher.FinishPublish() |
||||
|
if err != nil { |
||||
|
log.Printf("Failed to finish publish: %v", err) |
||||
|
} |
||||
|
|
||||
|
log.Println("Test completed successfully!") |
||||
|
} |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue