You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
247 lines
7.5 KiB
247 lines
7.5 KiB
package integration
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
|
|
)
|
|
|
|
// TestSMQIntegration tests that the Kafka gateway properly integrates with SeaweedMQ
|
|
// This test REQUIRES SeaweedFS masters to be running and will skip if not available
|
|
func TestSMQIntegration(t *testing.T) {
|
|
// This test requires SMQ to be available
|
|
gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
|
|
defer gateway.CleanupAndClose()
|
|
|
|
addr := gateway.StartAndWait()
|
|
|
|
t.Logf("Running SMQ integration test with SeaweedFS backend")
|
|
|
|
t.Run("ProduceConsumeWithPersistence", func(t *testing.T) {
|
|
testProduceConsumeWithPersistence(t, addr)
|
|
})
|
|
|
|
t.Run("ConsumerGroupOffsetPersistence", func(t *testing.T) {
|
|
testConsumerGroupOffsetPersistence(t, addr)
|
|
})
|
|
|
|
t.Run("TopicPersistence", func(t *testing.T) {
|
|
testTopicPersistence(t, addr)
|
|
})
|
|
}
|
|
|
|
func testProduceConsumeWithPersistence(t *testing.T, addr string) {
|
|
topicName := testutil.GenerateUniqueTopicName("smq-integration-produce-consume")
|
|
|
|
client := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Create topic
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
// Produce messages
|
|
messages := msgGen.GenerateStringMessages(5)
|
|
err = client.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages")
|
|
|
|
t.Logf("Produced %d messages to topic %s", len(messages), topicName)
|
|
|
|
// Consume messages
|
|
consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
|
|
testutil.AssertNoError(t, err, "Failed to consume messages")
|
|
|
|
// Verify all messages were consumed
|
|
testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
|
|
|
|
t.Logf("Successfully consumed %d messages from SMQ backend", len(consumed))
|
|
}
|
|
|
|
func testConsumerGroupOffsetPersistence(t *testing.T, addr string) {
|
|
topicName := testutil.GenerateUniqueTopicName("smq-integration-offset-persistence")
|
|
groupID := testutil.GenerateUniqueGroupID("smq-offset-group")
|
|
|
|
client := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Create topic and produce messages
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
messages := msgGen.GenerateStringMessages(10)
|
|
err = client.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages")
|
|
|
|
// Phase 1: Consume first 5 messages with consumer group and commit offsets
|
|
t.Logf("Phase 1: Consuming first 5 messages and committing offsets")
|
|
|
|
config := client.GetConfig()
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
|
|
consumerGroup1, err := sarama.NewConsumerGroup([]string{addr}, groupID, config)
|
|
testutil.AssertNoError(t, err, "Failed to create first consumer group")
|
|
|
|
handler := &SMQOffsetTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, len(messages)),
|
|
ready: make(chan bool),
|
|
stopAfter: 5,
|
|
t: t,
|
|
}
|
|
|
|
ctx1, cancel1 := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel1()
|
|
|
|
go func() {
|
|
err := consumerGroup1.Consume(ctx1, []string{topicName}, handler)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("First consumer error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for consumer to be ready and consume messages
|
|
<-handler.ready
|
|
consumedCount := 0
|
|
for consumedCount < 5 {
|
|
select {
|
|
case <-handler.messages:
|
|
consumedCount++
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatalf("Timeout waiting for first batch of messages")
|
|
}
|
|
}
|
|
|
|
consumerGroup1.Close()
|
|
cancel1()
|
|
time.Sleep(2 * time.Second) // Allow offset commits to be processed by SMQ
|
|
|
|
t.Logf("Consumed %d messages in first phase", consumedCount)
|
|
|
|
// Phase 2: Start new consumer group with same ID - should resume from committed offset
|
|
t.Logf("Phase 2: Starting new consumer group to test offset persistence")
|
|
|
|
consumerGroup2, err := sarama.NewConsumerGroup([]string{addr}, groupID, config)
|
|
testutil.AssertNoError(t, err, "Failed to create second consumer group")
|
|
defer consumerGroup2.Close()
|
|
|
|
handler2 := &SMQOffsetTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, len(messages)),
|
|
ready: make(chan bool),
|
|
stopAfter: 5, // Should consume remaining 5 messages
|
|
t: t,
|
|
}
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel2()
|
|
|
|
go func() {
|
|
err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Second consumer error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for second consumer and collect remaining messages
|
|
<-handler2.ready
|
|
secondConsumerMessages := make([]*sarama.ConsumerMessage, 0)
|
|
consumedCount = 0
|
|
for consumedCount < 5 {
|
|
select {
|
|
case msg := <-handler2.messages:
|
|
consumedCount++
|
|
secondConsumerMessages = append(secondConsumerMessages, msg)
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatalf("Timeout waiting for second batch of messages. Got %d/5", consumedCount)
|
|
}
|
|
}
|
|
|
|
// Verify second consumer started from correct offset (should be >= 5)
|
|
if len(secondConsumerMessages) > 0 {
|
|
firstMessageOffset := secondConsumerMessages[0].Offset
|
|
if firstMessageOffset < 5 {
|
|
t.Fatalf("Second consumer should start from offset >= 5: got %d", firstMessageOffset)
|
|
}
|
|
t.Logf("Second consumer correctly resumed from offset %d", firstMessageOffset)
|
|
}
|
|
|
|
t.Logf("Successfully verified SMQ offset persistence")
|
|
}
|
|
|
|
func testTopicPersistence(t *testing.T, addr string) {
|
|
topicName := testutil.GenerateUniqueTopicName("smq-integration-topic-persistence")
|
|
|
|
client := testutil.NewSaramaClient(t, addr)
|
|
|
|
// Create topic
|
|
err := client.CreateTopic(topicName, 2, 1) // 2 partitions
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
// Verify topic exists by listing topics using admin client
|
|
admin, err := sarama.NewClusterAdmin([]string{addr}, client.GetConfig())
|
|
testutil.AssertNoError(t, err, "Failed to create admin client")
|
|
defer admin.Close()
|
|
|
|
topics, err := admin.ListTopics()
|
|
testutil.AssertNoError(t, err, "Failed to list topics")
|
|
|
|
topicDetails, exists := topics[topicName]
|
|
if !exists {
|
|
t.Fatalf("Topic %s not found in topic list", topicName)
|
|
}
|
|
|
|
if topicDetails.NumPartitions != 2 {
|
|
t.Errorf("Expected 2 partitions, got %d", topicDetails.NumPartitions)
|
|
}
|
|
|
|
t.Logf("Successfully verified topic persistence with %d partitions", topicDetails.NumPartitions)
|
|
}
|
|
|
|
// SMQOffsetTestHandler implements sarama.ConsumerGroupHandler for SMQ offset testing
|
|
type SMQOffsetTestHandler struct {
|
|
messages chan *sarama.ConsumerMessage
|
|
ready chan bool
|
|
readyOnce bool
|
|
stopAfter int
|
|
consumed int
|
|
t *testing.T
|
|
}
|
|
|
|
func (h *SMQOffsetTestHandler) Setup(sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("SMQ offset test consumer setup")
|
|
if !h.readyOnce {
|
|
close(h.ready)
|
|
h.readyOnce = true
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *SMQOffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("SMQ offset test consumer cleanup")
|
|
return nil
|
|
}
|
|
|
|
func (h *SMQOffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
for {
|
|
select {
|
|
case message := <-claim.Messages():
|
|
if message == nil {
|
|
return nil
|
|
}
|
|
h.consumed++
|
|
h.messages <- message
|
|
session.MarkMessage(message, "")
|
|
|
|
// Stop after consuming the specified number of messages
|
|
if h.consumed >= h.stopAfter {
|
|
h.t.Logf("Stopping SMQ consumer after %d messages", h.consumed)
|
|
// Ensure commits are flushed before exiting the claim
|
|
session.Commit()
|
|
return nil
|
|
}
|
|
case <-session.Context().Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|