You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
131 lines
4.1 KiB
131 lines
4.1 KiB
package e2e
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
|
|
)
|
|
|
|
// TestComprehensiveE2E tests complete end-to-end workflows
|
|
// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
|
|
func TestComprehensiveE2E(t *testing.T) {
|
|
gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
|
|
defer gateway.CleanupAndClose()
|
|
|
|
addr := gateway.StartAndWait()
|
|
|
|
// Log which backend we're using
|
|
if gateway.IsSMQMode() {
|
|
t.Logf("Running comprehensive E2E tests with SMQ backend")
|
|
} else {
|
|
t.Logf("Running comprehensive E2E tests with mock backend")
|
|
}
|
|
|
|
// Create topics for different test scenarios
|
|
topics := []string{
|
|
testutil.GenerateUniqueTopicName("e2e-kafka-go"),
|
|
testutil.GenerateUniqueTopicName("e2e-sarama"),
|
|
testutil.GenerateUniqueTopicName("e2e-mixed"),
|
|
}
|
|
gateway.AddTestTopics(topics...)
|
|
|
|
t.Run("KafkaGo_to_KafkaGo", func(t *testing.T) {
|
|
testKafkaGoToKafkaGo(t, addr, topics[0])
|
|
})
|
|
|
|
t.Run("Sarama_to_Sarama", func(t *testing.T) {
|
|
testSaramaToSarama(t, addr, topics[1])
|
|
})
|
|
|
|
t.Run("KafkaGo_to_Sarama", func(t *testing.T) {
|
|
testKafkaGoToSarama(t, addr, topics[2])
|
|
})
|
|
|
|
t.Run("Sarama_to_KafkaGo", func(t *testing.T) {
|
|
testSaramaToKafkaGo(t, addr, topics[2])
|
|
})
|
|
}
|
|
|
|
func testKafkaGoToKafkaGo(t *testing.T, addr, topic string) {
|
|
client := testutil.NewKafkaGoClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Generate test messages
|
|
messages := msgGen.GenerateKafkaGoMessages(2)
|
|
|
|
// Produce with kafka-go
|
|
err := client.ProduceMessages(topic, messages)
|
|
testutil.AssertNoError(t, err, "kafka-go produce failed")
|
|
|
|
// Consume with kafka-go
|
|
consumed, err := client.ConsumeMessages(topic, len(messages))
|
|
testutil.AssertNoError(t, err, "kafka-go consume failed")
|
|
|
|
// Validate message content
|
|
err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
|
|
testutil.AssertNoError(t, err, "Message content validation failed")
|
|
|
|
t.Logf("kafka-go to kafka-go test PASSED")
|
|
}
|
|
|
|
func testSaramaToSarama(t *testing.T, addr, topic string) {
|
|
client := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Generate test messages
|
|
messages := msgGen.GenerateStringMessages(2)
|
|
|
|
// Produce with Sarama
|
|
err := client.ProduceMessages(topic, messages)
|
|
testutil.AssertNoError(t, err, "Sarama produce failed")
|
|
|
|
// Consume with Sarama
|
|
consumed, err := client.ConsumeMessages(topic, 0, len(messages))
|
|
testutil.AssertNoError(t, err, "Sarama consume failed")
|
|
|
|
// Validate message content
|
|
err = testutil.ValidateMessageContent(messages, consumed)
|
|
testutil.AssertNoError(t, err, "Message content validation failed")
|
|
|
|
t.Logf("Sarama to Sarama test PASSED")
|
|
}
|
|
|
|
func testKafkaGoToSarama(t *testing.T, addr, topic string) {
|
|
kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
|
|
saramaClient := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Produce with kafka-go
|
|
messages := msgGen.GenerateKafkaGoMessages(2)
|
|
err := kafkaGoClient.ProduceMessages(topic, messages)
|
|
testutil.AssertNoError(t, err, "kafka-go produce failed")
|
|
|
|
// Consume with Sarama
|
|
consumed, err := saramaClient.ConsumeMessages(topic, 0, len(messages))
|
|
testutil.AssertNoError(t, err, "Sarama consume failed")
|
|
|
|
// Validate that we got the expected number of messages
|
|
testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
|
|
|
|
t.Logf("kafka-go to Sarama test PASSED")
|
|
}
|
|
|
|
func testSaramaToKafkaGo(t *testing.T, addr, topic string) {
|
|
kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
|
|
saramaClient := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Produce with Sarama
|
|
messages := msgGen.GenerateStringMessages(2)
|
|
err := saramaClient.ProduceMessages(topic, messages)
|
|
testutil.AssertNoError(t, err, "Sarama produce failed")
|
|
|
|
// Consume with kafka-go
|
|
consumed, err := kafkaGoClient.ConsumeMessages(topic, len(messages))
|
|
testutil.AssertNoError(t, err, "kafka-go consume failed")
|
|
|
|
// Validate that we got the expected number of messages
|
|
testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
|
|
|
|
t.Logf("Sarama to kafka-go test PASSED")
|
|
}
|