You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							216 lines
						
					
					
						
							6.9 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							216 lines
						
					
					
						
							6.9 KiB
						
					
					
				
								package integration
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"encoding/json"
							 | 
						|
									"io"
							 | 
						|
									"net/http"
							 | 
						|
									"testing"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// TestDockerIntegration tests the complete Kafka integration using Docker Compose
							 | 
						|
								func TestDockerIntegration(t *testing.T) {
							 | 
						|
									env := testutil.NewDockerEnvironment(t)
							 | 
						|
									env.SkipIfNotAvailable(t)
							 | 
						|
								
							 | 
						|
									t.Run("KafkaConnectivity", func(t *testing.T) {
							 | 
						|
										env.RequireKafka(t)
							 | 
						|
										testDockerKafkaConnectivity(t, env.KafkaBootstrap)
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									t.Run("SchemaRegistryConnectivity", func(t *testing.T) {
							 | 
						|
										env.RequireSchemaRegistry(t)
							 | 
						|
										testDockerSchemaRegistryConnectivity(t, env.SchemaRegistry)
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									t.Run("KafkaGatewayConnectivity", func(t *testing.T) {
							 | 
						|
										env.RequireGateway(t)
							 | 
						|
										testDockerKafkaGatewayConnectivity(t, env.KafkaGateway)
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									t.Run("SaramaProduceConsume", func(t *testing.T) {
							 | 
						|
										env.RequireKafka(t)
							 | 
						|
										testDockerSaramaProduceConsume(t, env.KafkaBootstrap)
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									t.Run("KafkaGoProduceConsume", func(t *testing.T) {
							 | 
						|
										env.RequireKafka(t)
							 | 
						|
										testDockerKafkaGoProduceConsume(t, env.KafkaBootstrap)
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									t.Run("GatewayProduceConsume", func(t *testing.T) {
							 | 
						|
										env.RequireGateway(t)
							 | 
						|
										testDockerGatewayProduceConsume(t, env.KafkaGateway)
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									t.Run("CrossClientCompatibility", func(t *testing.T) {
							 | 
						|
										env.RequireKafka(t)
							 | 
						|
										env.RequireGateway(t)
							 | 
						|
										testDockerCrossClientCompatibility(t, env.KafkaBootstrap, env.KafkaGateway)
							 | 
						|
									})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testDockerKafkaConnectivity(t *testing.T, bootstrap string) {
							 | 
						|
									client := testutil.NewSaramaClient(t, bootstrap)
							 | 
						|
								
							 | 
						|
									// Test basic connectivity by creating a topic
							 | 
						|
									topicName := testutil.GenerateUniqueTopicName("connectivity-test")
							 | 
						|
									err := client.CreateTopic(topicName, 1, 1)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create topic for connectivity test")
							 | 
						|
								
							 | 
						|
									t.Logf("Kafka connectivity test passed")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testDockerSchemaRegistryConnectivity(t *testing.T, registryURL string) {
							 | 
						|
									// Test basic HTTP connectivity to Schema Registry
							 | 
						|
									client := &http.Client{Timeout: 10 * time.Second}
							 | 
						|
								
							 | 
						|
									// Test 1: Check if Schema Registry is responding
							 | 
						|
									resp, err := client.Get(registryURL + "/subjects")
							 | 
						|
									if err != nil {
							 | 
						|
										t.Fatalf("Failed to connect to Schema Registry at %s: %v", registryURL, err)
							 | 
						|
									}
							 | 
						|
									defer resp.Body.Close()
							 | 
						|
								
							 | 
						|
									if resp.StatusCode != http.StatusOK {
							 | 
						|
										t.Fatalf("Schema Registry returned status %d, expected 200", resp.StatusCode)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Test 2: Verify response is valid JSON array
							 | 
						|
									body, err := io.ReadAll(resp.Body)
							 | 
						|
									if err != nil {
							 | 
						|
										t.Fatalf("Failed to read response body: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var subjects []string
							 | 
						|
									if err := json.Unmarshal(body, &subjects); err != nil {
							 | 
						|
										t.Fatalf("Schema Registry response is not valid JSON array: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.Logf("Schema Registry is accessible with %d subjects", len(subjects))
							 | 
						|
								
							 | 
						|
									// Test 3: Check config endpoint
							 | 
						|
									configResp, err := client.Get(registryURL + "/config")
							 | 
						|
									if err != nil {
							 | 
						|
										t.Fatalf("Failed to get Schema Registry config: %v", err)
							 | 
						|
									}
							 | 
						|
									defer configResp.Body.Close()
							 | 
						|
								
							 | 
						|
									if configResp.StatusCode != http.StatusOK {
							 | 
						|
										t.Fatalf("Schema Registry config endpoint returned status %d", configResp.StatusCode)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									configBody, err := io.ReadAll(configResp.Body)
							 | 
						|
									if err != nil {
							 | 
						|
										t.Fatalf("Failed to read config response: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var config map[string]interface{}
							 | 
						|
									if err := json.Unmarshal(configBody, &config); err != nil {
							 | 
						|
										t.Fatalf("Schema Registry config response is not valid JSON: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t.Logf("Schema Registry config: %v", config)
							 | 
						|
									t.Logf("Schema Registry connectivity test passed")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testDockerKafkaGatewayConnectivity(t *testing.T, gatewayURL string) {
							 | 
						|
									client := testutil.NewSaramaClient(t, gatewayURL)
							 | 
						|
								
							 | 
						|
									// Test basic connectivity to gateway
							 | 
						|
									topicName := testutil.GenerateUniqueTopicName("gateway-connectivity-test")
							 | 
						|
									err := client.CreateTopic(topicName, 1, 1)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create topic via gateway")
							 | 
						|
								
							 | 
						|
									t.Logf("Kafka Gateway connectivity test passed")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testDockerSaramaProduceConsume(t *testing.T, bootstrap string) {
							 | 
						|
									client := testutil.NewSaramaClient(t, bootstrap)
							 | 
						|
									msgGen := testutil.NewMessageGenerator()
							 | 
						|
								
							 | 
						|
									topicName := testutil.GenerateUniqueTopicName("sarama-docker-test")
							 | 
						|
								
							 | 
						|
									// Create topic
							 | 
						|
									err := client.CreateTopic(topicName, 1, 1)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create topic")
							 | 
						|
								
							 | 
						|
									// Produce and consume messages
							 | 
						|
									messages := msgGen.GenerateStringMessages(3)
							 | 
						|
									err = client.ProduceMessages(topicName, messages)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to produce messages")
							 | 
						|
								
							 | 
						|
									consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to consume messages")
							 | 
						|
								
							 | 
						|
									err = testutil.ValidateMessageContent(messages, consumed)
							 | 
						|
									testutil.AssertNoError(t, err, "Message validation failed")
							 | 
						|
								
							 | 
						|
									t.Logf("Sarama produce/consume test passed")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testDockerKafkaGoProduceConsume(t *testing.T, bootstrap string) {
							 | 
						|
									client := testutil.NewKafkaGoClient(t, bootstrap)
							 | 
						|
									msgGen := testutil.NewMessageGenerator()
							 | 
						|
								
							 | 
						|
									topicName := testutil.GenerateUniqueTopicName("kafka-go-docker-test")
							 | 
						|
								
							 | 
						|
									// Create topic
							 | 
						|
									err := client.CreateTopic(topicName, 1, 1)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create topic")
							 | 
						|
								
							 | 
						|
									// Produce and consume messages
							 | 
						|
									messages := msgGen.GenerateKafkaGoMessages(3)
							 | 
						|
									err = client.ProduceMessages(topicName, messages)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to produce messages")
							 | 
						|
								
							 | 
						|
									consumed, err := client.ConsumeMessages(topicName, len(messages))
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to consume messages")
							 | 
						|
								
							 | 
						|
									err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
							 | 
						|
									testutil.AssertNoError(t, err, "Message validation failed")
							 | 
						|
								
							 | 
						|
									t.Logf("kafka-go produce/consume test passed")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testDockerGatewayProduceConsume(t *testing.T, gatewayURL string) {
							 | 
						|
									client := testutil.NewSaramaClient(t, gatewayURL)
							 | 
						|
									msgGen := testutil.NewMessageGenerator()
							 | 
						|
								
							 | 
						|
									topicName := testutil.GenerateUniqueTopicName("gateway-docker-test")
							 | 
						|
								
							 | 
						|
									// Produce and consume via gateway
							 | 
						|
									messages := msgGen.GenerateStringMessages(3)
							 | 
						|
									err := client.ProduceMessages(topicName, messages)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to produce messages via gateway")
							 | 
						|
								
							 | 
						|
									consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to consume messages via gateway")
							 | 
						|
								
							 | 
						|
									err = testutil.ValidateMessageContent(messages, consumed)
							 | 
						|
									testutil.AssertNoError(t, err, "Message validation failed")
							 | 
						|
								
							 | 
						|
									t.Logf("Gateway produce/consume test passed")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testDockerCrossClientCompatibility(t *testing.T, kafkaBootstrap, gatewayURL string) {
							 | 
						|
									kafkaClient := testutil.NewSaramaClient(t, kafkaBootstrap)
							 | 
						|
									msgGen := testutil.NewMessageGenerator()
							 | 
						|
								
							 | 
						|
									topicName := testutil.GenerateUniqueTopicName("cross-client-docker-test")
							 | 
						|
								
							 | 
						|
									// Create topic on Kafka
							 | 
						|
									err := kafkaClient.CreateTopic(topicName, 1, 1)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to create topic on Kafka")
							 | 
						|
								
							 | 
						|
									// Produce to Kafka
							 | 
						|
									messages := msgGen.GenerateStringMessages(2)
							 | 
						|
									err = kafkaClient.ProduceMessages(topicName, messages)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to produce to Kafka")
							 | 
						|
								
							 | 
						|
									// This tests the integration between Kafka and the Gateway
							 | 
						|
									// In a real scenario, messages would be replicated or bridged
							 | 
						|
									t.Logf("Cross-client compatibility test passed")
							 | 
						|
								}
							 |