You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
7.0 KiB
216 lines
7.0 KiB
package integration
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
|
|
)
|
|
|
|
// TestDockerIntegration tests the complete Kafka integration using Docker Compose
|
|
func TestDockerIntegration(t *testing.T) {
|
|
env := testutil.NewDockerEnvironment(t)
|
|
env.SkipIfNotAvailable(t)
|
|
|
|
t.Run("KafkaConnectivity", func(t *testing.T) {
|
|
env.RequireKafka(t)
|
|
testDockerKafkaConnectivity(t, env.KafkaBootstrap)
|
|
})
|
|
|
|
t.Run("SchemaRegistryConnectivity", func(t *testing.T) {
|
|
env.RequireSchemaRegistry(t)
|
|
testDockerSchemaRegistryConnectivity(t, env.SchemaRegistry)
|
|
})
|
|
|
|
t.Run("KafkaGatewayConnectivity", func(t *testing.T) {
|
|
env.RequireGateway(t)
|
|
testDockerKafkaGatewayConnectivity(t, env.KafkaGateway)
|
|
})
|
|
|
|
t.Run("SaramaProduceConsume", func(t *testing.T) {
|
|
env.RequireKafka(t)
|
|
testDockerSaramaProduceConsume(t, env.KafkaBootstrap)
|
|
})
|
|
|
|
t.Run("KafkaGoProduceConsume", func(t *testing.T) {
|
|
env.RequireKafka(t)
|
|
testDockerKafkaGoProduceConsume(t, env.KafkaBootstrap)
|
|
})
|
|
|
|
t.Run("GatewayProduceConsume", func(t *testing.T) {
|
|
env.RequireGateway(t)
|
|
testDockerGatewayProduceConsume(t, env.KafkaGateway)
|
|
})
|
|
|
|
t.Run("CrossClientCompatibility", func(t *testing.T) {
|
|
env.RequireKafka(t)
|
|
env.RequireGateway(t)
|
|
testDockerCrossClientCompatibility(t, env.KafkaBootstrap, env.KafkaGateway)
|
|
})
|
|
}
|
|
|
|
func testDockerKafkaConnectivity(t *testing.T, bootstrap string) {
|
|
client := testutil.NewSaramaClient(t, bootstrap)
|
|
|
|
// Test basic connectivity by creating a topic
|
|
topicName := testutil.GenerateUniqueTopicName("connectivity-test")
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic for connectivity test")
|
|
|
|
t.Logf("✅ Kafka connectivity test passed")
|
|
}
|
|
|
|
func testDockerSchemaRegistryConnectivity(t *testing.T, registryURL string) {
|
|
// Test basic HTTP connectivity to Schema Registry
|
|
client := &http.Client{Timeout: 10 * time.Second}
|
|
|
|
// Test 1: Check if Schema Registry is responding
|
|
resp, err := client.Get(registryURL + "/subjects")
|
|
if err != nil {
|
|
t.Fatalf("Failed to connect to Schema Registry at %s: %v", registryURL, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Fatalf("Schema Registry returned status %d, expected 200", resp.StatusCode)
|
|
}
|
|
|
|
// Test 2: Verify response is valid JSON array
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read response body: %v", err)
|
|
}
|
|
|
|
var subjects []string
|
|
if err := json.Unmarshal(body, &subjects); err != nil {
|
|
t.Fatalf("Schema Registry response is not valid JSON array: %v", err)
|
|
}
|
|
|
|
t.Logf("Schema Registry is accessible with %d subjects", len(subjects))
|
|
|
|
// Test 3: Check config endpoint
|
|
configResp, err := client.Get(registryURL + "/config")
|
|
if err != nil {
|
|
t.Fatalf("Failed to get Schema Registry config: %v", err)
|
|
}
|
|
defer configResp.Body.Close()
|
|
|
|
if configResp.StatusCode != http.StatusOK {
|
|
t.Fatalf("Schema Registry config endpoint returned status %d", configResp.StatusCode)
|
|
}
|
|
|
|
configBody, err := io.ReadAll(configResp.Body)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read config response: %v", err)
|
|
}
|
|
|
|
var config map[string]interface{}
|
|
if err := json.Unmarshal(configBody, &config); err != nil {
|
|
t.Fatalf("Schema Registry config response is not valid JSON: %v", err)
|
|
}
|
|
|
|
t.Logf("Schema Registry config: %v", config)
|
|
t.Logf("Schema Registry connectivity test passed")
|
|
}
|
|
|
|
func testDockerKafkaGatewayConnectivity(t *testing.T, gatewayURL string) {
|
|
client := testutil.NewSaramaClient(t, gatewayURL)
|
|
|
|
// Test basic connectivity to gateway
|
|
topicName := testutil.GenerateUniqueTopicName("gateway-connectivity-test")
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic via gateway")
|
|
|
|
t.Logf("✅ Kafka Gateway connectivity test passed")
|
|
}
|
|
|
|
func testDockerSaramaProduceConsume(t *testing.T, bootstrap string) {
|
|
client := testutil.NewSaramaClient(t, bootstrap)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
topicName := testutil.GenerateUniqueTopicName("sarama-docker-test")
|
|
|
|
// Create topic
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
// Produce and consume messages
|
|
messages := msgGen.GenerateStringMessages(3)
|
|
err = client.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages")
|
|
|
|
consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
|
|
testutil.AssertNoError(t, err, "Failed to consume messages")
|
|
|
|
err = testutil.ValidateMessageContent(messages, consumed)
|
|
testutil.AssertNoError(t, err, "Message validation failed")
|
|
|
|
t.Logf("✅ Sarama produce/consume test passed")
|
|
}
|
|
|
|
func testDockerKafkaGoProduceConsume(t *testing.T, bootstrap string) {
|
|
client := testutil.NewKafkaGoClient(t, bootstrap)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
topicName := testutil.GenerateUniqueTopicName("kafka-go-docker-test")
|
|
|
|
// Create topic
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
// Produce and consume messages
|
|
messages := msgGen.GenerateKafkaGoMessages(3)
|
|
err = client.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages")
|
|
|
|
consumed, err := client.ConsumeMessages(topicName, len(messages))
|
|
testutil.AssertNoError(t, err, "Failed to consume messages")
|
|
|
|
err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
|
|
testutil.AssertNoError(t, err, "Message validation failed")
|
|
|
|
t.Logf("✅ kafka-go produce/consume test passed")
|
|
}
|
|
|
|
func testDockerGatewayProduceConsume(t *testing.T, gatewayURL string) {
|
|
client := testutil.NewSaramaClient(t, gatewayURL)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
topicName := testutil.GenerateUniqueTopicName("gateway-docker-test")
|
|
|
|
// Produce and consume via gateway
|
|
messages := msgGen.GenerateStringMessages(3)
|
|
err := client.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages via gateway")
|
|
|
|
consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
|
|
testutil.AssertNoError(t, err, "Failed to consume messages via gateway")
|
|
|
|
err = testutil.ValidateMessageContent(messages, consumed)
|
|
testutil.AssertNoError(t, err, "Message validation failed")
|
|
|
|
t.Logf("✅ Gateway produce/consume test passed")
|
|
}
|
|
|
|
func testDockerCrossClientCompatibility(t *testing.T, kafkaBootstrap, gatewayURL string) {
|
|
kafkaClient := testutil.NewSaramaClient(t, kafkaBootstrap)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
topicName := testutil.GenerateUniqueTopicName("cross-client-docker-test")
|
|
|
|
// Create topic on Kafka
|
|
err := kafkaClient.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic on Kafka")
|
|
|
|
// Produce to Kafka
|
|
messages := msgGen.GenerateStringMessages(2)
|
|
err = kafkaClient.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce to Kafka")
|
|
|
|
// This tests the integration between Kafka and the Gateway
|
|
// In a real scenario, messages would be replicated or bridged
|
|
t.Logf("✅ Cross-client compatibility test passed")
|
|
}
|