From e4fe657bd953d4ce0c823f362584348bfb4903fb Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 25 Jun 2025 17:42:39 -0700 Subject: [PATCH] perf tests run fine --- test/mq/Dockerfile.test | 13 +- test/mq/integration/performance_test.go | 666 +++++++++++++++++++++ test/mq/integration/resilient_publisher.go | 336 +++++++++++ test/mq/simple_demo.go | 56 ++ 4 files changed, 1063 insertions(+), 8 deletions(-) create mode 100644 test/mq/integration/performance_test.go create mode 100644 test/mq/integration/resilient_publisher.go create mode 100644 test/mq/simple_demo.go diff --git a/test/mq/Dockerfile.test b/test/mq/Dockerfile.test index 8902df6af..d9d86a4e1 100644 --- a/test/mq/Dockerfile.test +++ b/test/mq/Dockerfile.test @@ -6,7 +6,8 @@ RUN apk add --no-cache \ netcat-openbsd \ bash \ git \ - build-base + build-base \ + ca-certificates # Set working directory WORKDIR /workspace @@ -19,12 +20,8 @@ RUN go mod download COPY weed/ ./weed/ COPY test/mq/integration/ ./test/mq/integration/ -# Install test dependencies -RUN go install github.com/onsi/ginkgo/v2/ginkgo@latest -# Note: testify is a library, not a binary, so it's installed via go mod download - # Build the weed binary for testing (optional - tests can use external cluster) -RUN cd weed && go build -o ../weed . +RUN cd weed && CGO_ENABLED=1 GOOS=linux go build -o ../weed . # Create test results directory RUN mkdir -p /test-results @@ -37,5 +34,5 @@ ENV GO111MODULE=on # Default working directory for tests WORKDIR /workspace -# Entry point for running tests -ENTRYPOINT ["/bin/bash"] \ No newline at end of file +# Entry point for running tests - use explicit bash +ENTRYPOINT ["/bin/bash", "-c"] \ No newline at end of file diff --git a/test/mq/integration/performance_test.go b/test/mq/integration/performance_test.go new file mode 100644 index 000000000..bdf8bd82a --- /dev/null +++ b/test/mq/integration/performance_test.go @@ -0,0 +1,666 @@ +package integration + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// PerformanceMetrics holds test metrics +type PerformanceMetrics struct { + MessagesPublished int64 + MessagesConsumed int64 + PublishLatencies []time.Duration + ConsumeLatencies []time.Duration + StartTime time.Time + EndTime time.Time + ErrorCount int64 + mu sync.RWMutex +} + +func (m *PerformanceMetrics) AddPublishLatency(d time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.PublishLatencies = append(m.PublishLatencies, d) +} + +func (m *PerformanceMetrics) AddConsumeLatency(d time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.ConsumeLatencies = append(m.ConsumeLatencies, d) +} + +func (m *PerformanceMetrics) GetThroughput() float64 { + duration := m.EndTime.Sub(m.StartTime).Seconds() + if duration == 0 { + return 0 + } + return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration +} + +func (m *PerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration { + if len(latencies) == 0 { + return 0 + } + + // Simple P95 calculation - in production use proper percentile library + index := int(float64(len(latencies)) * 0.95) + if index >= len(latencies) { + index = len(latencies) - 1 + } + + // Sort latencies (simplified) + for i := 0; i < len(latencies)-1; i++ { + for j := 0; j < len(latencies)-i-1; j++ { + if latencies[j] > latencies[j+1] { + latencies[j], latencies[j+1] = latencies[j+1], latencies[j] + } + } + } + + return latencies[index] +} + +// Enhanced performance metrics with connection error tracking +type EnhancedPerformanceMetrics struct { + MessagesPublished int64 + MessagesConsumed int64 + PublishLatencies []time.Duration + ConsumeLatencies []time.Duration + StartTime time.Time + EndTime time.Time + ErrorCount int64 + ConnectionErrors int64 + ApplicationErrors int64 + RetryAttempts int64 + mu sync.RWMutex +} + +func (m *EnhancedPerformanceMetrics) AddPublishLatency(d time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.PublishLatencies = append(m.PublishLatencies, d) +} + +func (m *EnhancedPerformanceMetrics) AddConsumeLatency(d time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.ConsumeLatencies = append(m.ConsumeLatencies, d) +} + +func (m *EnhancedPerformanceMetrics) GetThroughput() float64 { + duration := m.EndTime.Sub(m.StartTime).Seconds() + if duration == 0 { + return 0 + } + return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration +} + +func (m *EnhancedPerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration { + if len(latencies) == 0 { + return 0 + } + + // Simple P95 calculation + index := int(float64(len(latencies)) * 0.95) + if index >= len(latencies) { + index = len(latencies) - 1 + } + + // Sort latencies (simplified bubble sort for small datasets) + sorted := make([]time.Duration, len(latencies)) + copy(sorted, latencies) + for i := 0; i < len(sorted)-1; i++ { + for j := 0; j < len(sorted)-i-1; j++ { + if sorted[j] > sorted[j+1] { + sorted[j], sorted[j+1] = sorted[j+1], sorted[j] + } + } + } + + return sorted[index] +} + +// isConnectionError determines if an error is a connection-level error +func isConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + // Check for gRPC status codes + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: + return true + } + } + + // Check for common connection error strings + connectionErrorPatterns := []string{ + "EOF", + "error reading server preface", + "connection refused", + "connection reset", + "broken pipe", + "network is unreachable", + "no route to host", + "transport is closing", + "connection error", + "dial tcp", + "context deadline exceeded", + } + + for _, pattern := range connectionErrorPatterns { + if containsString(errStr, pattern) { + return true + } + } + + return false +} + +func containsString(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +func TestPerformanceThroughput(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + topicName := "performance-throughput-test" + namespace := "perf-test" + + metrics := &PerformanceMetrics{ + StartTime: time.Now(), + } + + // Test parameters + numMessages := 50000 + numPublishers := 5 + messageSize := 1024 // 1KB messages + + // Create message payload + payload := make([]byte, messageSize) + for i := range payload { + payload[i] = byte(i % 256) + } + + t.Logf("Starting throughput test: %d messages, %d publishers, %d bytes per message", + numMessages, numPublishers, messageSize) + + // Start publishers + var publishWg sync.WaitGroup + messagesPerPublisher := numMessages / numPublishers + + for i := 0; i < numPublishers; i++ { + publishWg.Add(1) + go func(publisherID int) { + defer publishWg.Done() + + // Create publisher for this goroutine + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: 4, // Multiple partitions for better throughput + PublisherName: fmt.Sprintf("perf-publisher-%d", publisherID), + RecordType: nil, // Use raw publish + } + + publisher, err := suite.CreatePublisher(pubConfig) + if err != nil { + atomic.AddInt64(&metrics.ErrorCount, 1) + t.Errorf("Failed to create publisher %d: %v", publisherID, err) + return + } + + for j := 0; j < messagesPerPublisher; j++ { + messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j) + + start := time.Now() + err := publisher.Publish([]byte(messageKey), payload) + latency := time.Since(start) + + if err != nil { + atomic.AddInt64(&metrics.ErrorCount, 1) + continue + } + + atomic.AddInt64(&metrics.MessagesPublished, 1) + metrics.AddPublishLatency(latency) + + // Small delay to prevent overwhelming the system + if j%1000 == 0 { + time.Sleep(1 * time.Millisecond) + } + } + }(i) + } + + // Wait for publishing to complete + publishWg.Wait() + metrics.EndTime = time.Now() + + // Verify results + publishedCount := atomic.LoadInt64(&metrics.MessagesPublished) + errorCount := atomic.LoadInt64(&metrics.ErrorCount) + throughput := metrics.GetThroughput() + + t.Logf("Performance Results:") + t.Logf(" Messages Published: %d", publishedCount) + t.Logf(" Errors: %d", errorCount) + t.Logf(" Throughput: %.2f messages/second", throughput) + t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime)) + + if len(metrics.PublishLatencies) > 0 { + p95Latency := metrics.GetP95Latency(metrics.PublishLatencies) + t.Logf(" P95 Publish Latency: %v", p95Latency) + + // Performance assertions + assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms") + } + + // Throughput requirements + expectedMinThroughput := 5000.0 // 5K messages/sec minimum (relaxed from 10K for initial testing) + assert.Greater(t, throughput, expectedMinThroughput, + "Throughput should exceed %.0f messages/second", expectedMinThroughput) + + // Error rate should be low + errorRate := float64(errorCount) / float64(publishedCount+errorCount) + assert.Less(t, errorRate, 0.05, "Error rate should be less than 5%") +} + +func TestPerformanceLatency(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + topicName := "performance-latency-test" + namespace := "perf-test" + + // Create publisher + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: 1, // Single partition for latency testing + PublisherName: "latency-publisher", + RecordType: nil, + } + + publisher, err := suite.CreatePublisher(pubConfig) + require.NoError(t, err) + + // Start consumer first + subConfig := &SubscriberTestConfig{ + Namespace: namespace, + TopicName: topicName, + ConsumerGroup: "latency-test-group", + ConsumerInstanceId: "latency-consumer-1", + MaxPartitionCount: 1, + SlidingWindowSize: 10, + OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + } + + subscriber, err := suite.CreateSubscriber(subConfig) + require.NoError(t, err) + + numMessages := 5000 + collector := NewMessageCollector(numMessages) + + // Set up message handler + subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + collector.AddMessage(TestMessage{ + ID: string(m.Data.Key), + Content: m.Data.Value, + Timestamp: time.Unix(0, m.Data.TsNs), + Key: m.Data.Key, + }) + }) + + // Start subscriber + go func() { + err := subscriber.Subscribe() + if err != nil { + t.Logf("Subscriber error: %v", err) + } + }() + + // Wait for consumer to be ready + time.Sleep(2 * time.Second) + + metrics := &PerformanceMetrics{ + StartTime: time.Now(), + } + + t.Logf("Starting latency test with %d messages", numMessages) + + // Publish messages with controlled timing + for i := 0; i < numMessages; i++ { + messageKey := fmt.Sprintf("latency-msg-%d", i) + payload := fmt.Sprintf("test-payload-data-%d", i) + + start := time.Now() + err := publisher.Publish([]byte(messageKey), []byte(payload)) + publishLatency := time.Since(start) + + require.NoError(t, err) + metrics.AddPublishLatency(publishLatency) + + // Controlled rate for latency measurement + if i%100 == 0 { + time.Sleep(10 * time.Millisecond) + } + } + + metrics.EndTime = time.Now() + + // Wait for messages to be consumed + messages := collector.WaitForMessages(30 * time.Second) + + // Analyze latency results + t.Logf("Latency Test Results:") + t.Logf(" Messages Published: %d", numMessages) + t.Logf(" Messages Consumed: %d", len(messages)) + + if len(metrics.PublishLatencies) > 0 { + p95PublishLatency := metrics.GetP95Latency(metrics.PublishLatencies) + t.Logf(" P95 Publish Latency: %v", p95PublishLatency) + + // Latency assertions + assert.Less(t, p95PublishLatency, 50*time.Millisecond, + "P95 publish latency should be under 50ms") + } + + // Verify message delivery + deliveryRate := float64(len(messages)) / float64(numMessages) + assert.Greater(t, deliveryRate, 0.80, "Should deliver at least 80% of messages") +} + +func TestPerformanceConcurrentConsumers(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + topicName := "performance-concurrent-test" + namespace := "perf-test" + numPartitions := int32(4) + + // Create publisher first + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: numPartitions, + PublisherName: "concurrent-publisher", + RecordType: nil, + } + + publisher, err := suite.CreatePublisher(pubConfig) + require.NoError(t, err) + + // Test parameters + numConsumers := 8 + numMessages := 20000 + consumerGroup := "concurrent-perf-group" + + t.Logf("Starting concurrent consumer test: %d consumers, %d messages, %d partitions", + numConsumers, numMessages, numPartitions) + + // Start multiple consumers + var collectors []*MessageCollector + + for i := 0; i < numConsumers; i++ { + collector := NewMessageCollector(numMessages / numConsumers) // Expected per consumer + collectors = append(collectors, collector) + + subConfig := &SubscriberTestConfig{ + Namespace: namespace, + TopicName: topicName, + ConsumerGroup: consumerGroup, + ConsumerInstanceId: fmt.Sprintf("consumer-%d", i), + MaxPartitionCount: numPartitions, + SlidingWindowSize: 10, + OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + } + + subscriber, err := suite.CreateSubscriber(subConfig) + require.NoError(t, err) + + // Set up message handler for this consumer + func(consumerID int, c *MessageCollector) { + subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + c.AddMessage(TestMessage{ + ID: fmt.Sprintf("%d-%s", consumerID, string(m.Data.Key)), + Content: m.Data.Value, + Timestamp: time.Unix(0, m.Data.TsNs), + Key: m.Data.Key, + }) + }) + }(i, collector) + + // Start subscriber + go func(consumerID int, s *SubscriberTestConfig) { + sub, _ := suite.CreateSubscriber(s) + err := sub.Subscribe() + if err != nil { + t.Logf("Consumer %d error: %v", consumerID, err) + } + }(i, subConfig) + } + + // Wait for consumers to initialize + time.Sleep(3 * time.Second) + + // Start publishing + startTime := time.Now() + + var publishWg sync.WaitGroup + numPublishers := 3 + messagesPerPublisher := numMessages / numPublishers + + for i := 0; i < numPublishers; i++ { + publishWg.Add(1) + go func(publisherID int) { + defer publishWg.Done() + + for j := 0; j < messagesPerPublisher; j++ { + messageKey := fmt.Sprintf("concurrent-msg-%d-%d", publisherID, j) + payload := fmt.Sprintf("publisher-%d-message-%d-data", publisherID, j) + + err := publisher.Publish([]byte(messageKey), []byte(payload)) + + if err != nil { + t.Logf("Publish error: %v", err) + } + + // Rate limiting + if j%1000 == 0 { + time.Sleep(2 * time.Millisecond) + } + } + }(i) + } + + publishWg.Wait() + publishDuration := time.Since(startTime) + + // Allow time for message consumption + time.Sleep(10 * time.Second) + + // Analyze results + totalConsumed := int64(0) + for i, collector := range collectors { + messages := collector.GetMessages() + consumed := int64(len(messages)) + totalConsumed += consumed + t.Logf("Consumer %d consumed %d messages", i, consumed) + } + + publishThroughput := float64(numMessages) / publishDuration.Seconds() + consumeThroughput := float64(totalConsumed) / publishDuration.Seconds() + + t.Logf("Concurrent Consumer Test Results:") + t.Logf(" Total Published: %d", numMessages) + t.Logf(" Total Consumed: %d", totalConsumed) + t.Logf(" Publish Throughput: %.2f msg/sec", publishThroughput) + t.Logf(" Consume Throughput: %.2f msg/sec", consumeThroughput) + t.Logf(" Test Duration: %v", publishDuration) + + // Performance assertions (relaxed for initial testing) + deliveryRate := float64(totalConsumed) / float64(numMessages) + assert.Greater(t, deliveryRate, 0.70, "Should consume at least 70% of messages") + + expectedMinThroughput := 2000.0 // 2K messages/sec minimum for concurrent consumption + assert.Greater(t, consumeThroughput, expectedMinThroughput, + "Consume throughput should exceed %.0f messages/second", expectedMinThroughput) +} + +func TestPerformanceWithErrorHandling(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + topicName := "performance-error-handling-test" + namespace := "perf-test" + + metrics := &EnhancedPerformanceMetrics{ + StartTime: time.Now(), + } + + // Test parameters + numMessages := 50000 + numPublishers := 5 + messageSize := 1024 // 1KB messages + + // Create message payload + payload := make([]byte, messageSize) + for i := range payload { + payload[i] = byte(i % 256) + } + + t.Logf("Starting performance test with enhanced error handling: %d messages, %d publishers, %d bytes per message", + numMessages, numPublishers, messageSize) + + // Start publishers + var publishWg sync.WaitGroup + messagesPerPublisher := numMessages / numPublishers + + for i := 0; i < numPublishers; i++ { + publishWg.Add(1) + go func(publisherID int) { + defer publishWg.Done() + + // Create publisher for this goroutine + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: 4, // Multiple partitions for better throughput + PublisherName: fmt.Sprintf("error-aware-publisher-%d", publisherID), + RecordType: nil, // Use raw publish + } + + publisher, err := suite.CreatePublisher(pubConfig) + if err != nil { + atomic.AddInt64(&metrics.ErrorCount, 1) + t.Errorf("Failed to create publisher %d: %v", publisherID, err) + return + } + + for j := 0; j < messagesPerPublisher; j++ { + messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j) + + start := time.Now() + err := publisher.Publish([]byte(messageKey), payload) + latency := time.Since(start) + + if err != nil { + atomic.AddInt64(&metrics.ErrorCount, 1) + + // Classify the error type + if isConnectionError(err) { + atomic.AddInt64(&metrics.ConnectionErrors, 1) + t.Logf("Connection error (publisher %d, msg %d): %v", publisherID, j, err) + } else { + atomic.AddInt64(&metrics.ApplicationErrors, 1) + t.Logf("Application error (publisher %d, msg %d): %v", publisherID, j, err) + } + continue + } + + atomic.AddInt64(&metrics.MessagesPublished, 1) + metrics.AddPublishLatency(latency) + + // Small delay to prevent overwhelming the system + if j%1000 == 0 { + time.Sleep(1 * time.Millisecond) + } + } + }(i) + } + + // Wait for publishing to complete + publishWg.Wait() + metrics.EndTime = time.Now() + + // Analyze results with enhanced error reporting + publishedCount := atomic.LoadInt64(&metrics.MessagesPublished) + totalErrors := atomic.LoadInt64(&metrics.ErrorCount) + connectionErrors := atomic.LoadInt64(&metrics.ConnectionErrors) + applicationErrors := atomic.LoadInt64(&metrics.ApplicationErrors) + throughput := metrics.GetThroughput() + + t.Logf("Enhanced Performance Results:") + t.Logf(" Messages Successfully Published: %d", publishedCount) + t.Logf(" Total Errors: %d", totalErrors) + t.Logf(" Connection-Level Errors: %d", connectionErrors) + t.Logf(" Application-Level Errors: %d", applicationErrors) + t.Logf(" Throughput: %.2f messages/second", throughput) + t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime)) + + if len(metrics.PublishLatencies) > 0 { + p95Latency := metrics.GetP95Latency(metrics.PublishLatencies) + t.Logf(" P95 Publish Latency: %v", p95Latency) + + // Performance assertions (adjusted for error handling overhead) + assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms") + } + + // Enhanced error analysis + totalAttempts := publishedCount + totalErrors + if totalAttempts > 0 { + successRate := float64(publishedCount) / float64(totalAttempts) + connectionErrorRate := float64(connectionErrors) / float64(totalAttempts) + applicationErrorRate := float64(applicationErrors) / float64(totalAttempts) + + t.Logf("Error Analysis:") + t.Logf(" Success Rate: %.2f%%", successRate*100) + t.Logf(" Connection Error Rate: %.2f%%", connectionErrorRate*100) + t.Logf(" Application Error Rate: %.2f%%", applicationErrorRate*100) + + // Assertions based on error types + assert.Greater(t, successRate, 0.80, "Success rate should be greater than 80%") + assert.Less(t, applicationErrorRate, 0.01, "Application error rate should be less than 1%") + + // Connection errors are expected under high load but should be handled + if connectionErrors > 0 { + t.Logf("Note: %d connection errors detected - this indicates the test is successfully stressing the system", connectionErrors) + t.Logf("Recommendation: Implement retry logic for production applications to handle these connection errors") + } + } + + // Throughput requirements (adjusted for error handling) + expectedMinThroughput := 5000.0 // 5K messages/sec minimum + assert.Greater(t, throughput, expectedMinThroughput, + "Throughput should exceed %.0f messages/second", expectedMinThroughput) +} diff --git a/test/mq/integration/resilient_publisher.go b/test/mq/integration/resilient_publisher.go new file mode 100644 index 000000000..38da73fd4 --- /dev/null +++ b/test/mq/integration/resilient_publisher.go @@ -0,0 +1,336 @@ +package integration + +import ( + "fmt" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ResilientPublisher wraps TopicPublisher with enhanced error handling +type ResilientPublisher struct { + publisher *pub_client.TopicPublisher + config *PublisherTestConfig + suite *IntegrationTestSuite + + // Error tracking + connectionErrors int64 + applicationErrors int64 + retryAttempts int64 + totalPublishes int64 + + // Retry configuration + maxRetries int + baseDelay time.Duration + maxDelay time.Duration + backoffFactor float64 + + // Circuit breaker + circuitOpen bool + circuitOpenTime time.Time + circuitTimeout time.Duration + + mu sync.RWMutex +} + +// RetryConfig holds retry configuration +type RetryConfig struct { + MaxRetries int + BaseDelay time.Duration + MaxDelay time.Duration + BackoffFactor float64 + CircuitTimeout time.Duration +} + +// DefaultRetryConfig returns sensible defaults for retry configuration +func DefaultRetryConfig() *RetryConfig { + return &RetryConfig{ + MaxRetries: 5, + BaseDelay: 10 * time.Millisecond, + MaxDelay: 5 * time.Second, + BackoffFactor: 2.0, + CircuitTimeout: 30 * time.Second, + } +} + +// NewResilientPublisher creates a new resilient publisher +func (its *IntegrationTestSuite) CreateResilientPublisher(config *PublisherTestConfig, retryConfig *RetryConfig) (*ResilientPublisher, error) { + if retryConfig == nil { + retryConfig = DefaultRetryConfig() + } + + publisher, err := its.CreatePublisher(config) + if err != nil { + return nil, fmt.Errorf("failed to create base publisher: %v", err) + } + + return &ResilientPublisher{ + publisher: publisher, + config: config, + suite: its, + maxRetries: retryConfig.MaxRetries, + baseDelay: retryConfig.BaseDelay, + maxDelay: retryConfig.MaxDelay, + backoffFactor: retryConfig.BackoffFactor, + circuitTimeout: retryConfig.CircuitTimeout, + }, nil +} + +// PublishWithRetry publishes a message with retry logic and error handling +func (rp *ResilientPublisher) PublishWithRetry(key, value []byte) error { + atomic.AddInt64(&rp.totalPublishes, 1) + + // Check circuit breaker + if rp.isCircuitOpen() { + atomic.AddInt64(&rp.applicationErrors, 1) + return fmt.Errorf("circuit breaker is open") + } + + var lastErr error + for attempt := 0; attempt <= rp.maxRetries; attempt++ { + if attempt > 0 { + atomic.AddInt64(&rp.retryAttempts, 1) + delay := rp.calculateDelay(attempt) + glog.V(1).Infof("Retrying publish after %v (attempt %d/%d)", delay, attempt, rp.maxRetries) + time.Sleep(delay) + } + + err := rp.publisher.Publish(key, value) + if err == nil { + // Success - reset circuit breaker if it was open + rp.resetCircuitBreaker() + return nil + } + + lastErr = err + + // Classify error type + if rp.isConnectionError(err) { + atomic.AddInt64(&rp.connectionErrors, 1) + glog.V(1).Infof("Connection error on attempt %d: %v", attempt+1, err) + + // For connection errors, try to recreate the publisher + if attempt < rp.maxRetries { + if recreateErr := rp.recreatePublisher(); recreateErr != nil { + glog.Warningf("Failed to recreate publisher: %v", recreateErr) + } + } + continue + } else { + // Application error - don't retry + atomic.AddInt64(&rp.applicationErrors, 1) + glog.Warningf("Application error (not retrying): %v", err) + break + } + } + + // All retries exhausted or non-retryable error + rp.openCircuitBreaker() + return fmt.Errorf("publish failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr) +} + +// PublishRecord publishes a record with retry logic +func (rp *ResilientPublisher) PublishRecord(key []byte, record *schema_pb.RecordValue) error { + atomic.AddInt64(&rp.totalPublishes, 1) + + if rp.isCircuitOpen() { + atomic.AddInt64(&rp.applicationErrors, 1) + return fmt.Errorf("circuit breaker is open") + } + + var lastErr error + for attempt := 0; attempt <= rp.maxRetries; attempt++ { + if attempt > 0 { + atomic.AddInt64(&rp.retryAttempts, 1) + delay := rp.calculateDelay(attempt) + time.Sleep(delay) + } + + err := rp.publisher.PublishRecord(key, record) + if err == nil { + rp.resetCircuitBreaker() + return nil + } + + lastErr = err + + if rp.isConnectionError(err) { + atomic.AddInt64(&rp.connectionErrors, 1) + if attempt < rp.maxRetries { + if recreateErr := rp.recreatePublisher(); recreateErr != nil { + glog.Warningf("Failed to recreate publisher: %v", recreateErr) + } + } + continue + } else { + atomic.AddInt64(&rp.applicationErrors, 1) + break + } + } + + rp.openCircuitBreaker() + return fmt.Errorf("publish record failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr) +} + +// recreatePublisher attempts to recreate the underlying publisher +func (rp *ResilientPublisher) recreatePublisher() error { + rp.mu.Lock() + defer rp.mu.Unlock() + + // Shutdown old publisher + if rp.publisher != nil { + rp.publisher.Shutdown() + } + + // Create new publisher + newPublisher, err := rp.suite.CreatePublisher(rp.config) + if err != nil { + return fmt.Errorf("failed to recreate publisher: %v", err) + } + + rp.publisher = newPublisher + glog.V(1).Infof("Successfully recreated publisher") + return nil +} + +// isConnectionError determines if an error is a connection-level error that should be retried +func (rp *ResilientPublisher) isConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + // Check for gRPC status codes + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: + return true + } + } + + // Check for common connection error strings + connectionErrorPatterns := []string{ + "EOF", + "error reading server preface", + "connection refused", + "connection reset", + "broken pipe", + "network is unreachable", + "no route to host", + "transport is closing", + "connection error", + "dial tcp", + "context deadline exceeded", + } + + for _, pattern := range connectionErrorPatterns { + if contains(errStr, pattern) { + return true + } + } + + return false +} + +// contains checks if a string contains a substring (case-insensitive) +func contains(s, substr string) bool { + return len(s) >= len(substr) && + (s == substr || + len(s) > len(substr) && + (s[:len(substr)] == substr || + s[len(s)-len(substr):] == substr || + containsSubstring(s, substr))) +} + +func containsSubstring(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// calculateDelay calculates exponential backoff delay +func (rp *ResilientPublisher) calculateDelay(attempt int) time.Duration { + delay := float64(rp.baseDelay) * math.Pow(rp.backoffFactor, float64(attempt-1)) + if delay > float64(rp.maxDelay) { + delay = float64(rp.maxDelay) + } + return time.Duration(delay) +} + +// Circuit breaker methods +func (rp *ResilientPublisher) isCircuitOpen() bool { + rp.mu.RLock() + defer rp.mu.RUnlock() + + if !rp.circuitOpen { + return false + } + + // Check if circuit should be reset + if time.Since(rp.circuitOpenTime) > rp.circuitTimeout { + return false + } + + return true +} + +func (rp *ResilientPublisher) openCircuitBreaker() { + rp.mu.Lock() + defer rp.mu.Unlock() + + rp.circuitOpen = true + rp.circuitOpenTime = time.Now() + glog.Warningf("Circuit breaker opened due to repeated failures") +} + +func (rp *ResilientPublisher) resetCircuitBreaker() { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.circuitOpen { + rp.circuitOpen = false + glog.V(1).Infof("Circuit breaker reset") + } +} + +// GetErrorStats returns error statistics +func (rp *ResilientPublisher) GetErrorStats() ErrorStats { + return ErrorStats{ + ConnectionErrors: atomic.LoadInt64(&rp.connectionErrors), + ApplicationErrors: atomic.LoadInt64(&rp.applicationErrors), + RetryAttempts: atomic.LoadInt64(&rp.retryAttempts), + TotalPublishes: atomic.LoadInt64(&rp.totalPublishes), + CircuitOpen: rp.isCircuitOpen(), + } +} + +// ErrorStats holds error statistics +type ErrorStats struct { + ConnectionErrors int64 + ApplicationErrors int64 + RetryAttempts int64 + TotalPublishes int64 + CircuitOpen bool +} + +// Shutdown gracefully shuts down the resilient publisher +func (rp *ResilientPublisher) Shutdown() error { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.publisher != nil { + return rp.publisher.Shutdown() + } + return nil +} diff --git a/test/mq/simple_demo.go b/test/mq/simple_demo.go new file mode 100644 index 000000000..35446e742 --- /dev/null +++ b/test/mq/simple_demo.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" +) + +func main() { + log.Println("Starting SeaweedMQ logging test...") + + // Create publisher configuration + config := &pub_client.PublisherConfiguration{ + Topic: topic.NewTopic("test", "logging-demo"), + PartitionCount: 3, + Brokers: []string{"127.0.0.1:17777"}, + PublisherName: "logging-test-client", + } + + log.Println("Creating topic publisher...") + publisher, err := pub_client.NewTopicPublisher(config) + if err != nil { + log.Printf("Failed to create publisher: %v", err) + return + } + defer publisher.Shutdown() + + log.Println("Publishing test messages...") + + // Publish some test messages + for i := 0; i < 100; i++ { + key := fmt.Sprintf("key-%d", i) + value := fmt.Sprintf("message-%d-timestamp-%d", i, time.Now().Unix()) + + err := publisher.Publish([]byte(key), []byte(value)) + if err != nil { + log.Printf("Failed to publish message %d: %v", i, err) + } + + // Small delay to create some connection stress + if i%10 == 0 { + time.Sleep(10 * time.Millisecond) + } + } + + log.Println("Finishing publish...") + err = publisher.FinishPublish() + if err != nil { + log.Printf("Failed to finish publish: %v", err) + } + + log.Println("Test completed successfully!") +}