You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
293 lines
8.0 KiB
293 lines
8.0 KiB
package testutil
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
// KafkaGoClient wraps kafka-go client with test utilities
|
|
type KafkaGoClient struct {
|
|
brokerAddr string
|
|
t *testing.T
|
|
}
|
|
|
|
// SaramaClient wraps Sarama client with test utilities
|
|
type SaramaClient struct {
|
|
brokerAddr string
|
|
config *sarama.Config
|
|
t *testing.T
|
|
}
|
|
|
|
// NewKafkaGoClient creates a new kafka-go test client
|
|
func NewKafkaGoClient(t *testing.T, brokerAddr string) *KafkaGoClient {
|
|
return &KafkaGoClient{
|
|
brokerAddr: brokerAddr,
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
// NewSaramaClient creates a new Sarama test client with default config
|
|
func NewSaramaClient(t *testing.T, brokerAddr string) *SaramaClient {
|
|
config := sarama.NewConfig()
|
|
config.Version = sarama.V2_8_0_0
|
|
config.Producer.Return.Successes = true
|
|
config.Consumer.Return.Errors = true
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest // Start from earliest when no committed offset
|
|
|
|
return &SaramaClient{
|
|
brokerAddr: brokerAddr,
|
|
config: config,
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
// CreateTopic creates a topic using kafka-go
|
|
func (k *KafkaGoClient) CreateTopic(topicName string, partitions int, replicationFactor int) error {
|
|
k.t.Helper()
|
|
|
|
conn, err := kafka.Dial("tcp", k.brokerAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("dial broker: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
topicConfig := kafka.TopicConfig{
|
|
Topic: topicName,
|
|
NumPartitions: partitions,
|
|
ReplicationFactor: replicationFactor,
|
|
}
|
|
|
|
err = conn.CreateTopics(topicConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("create topic: %w", err)
|
|
}
|
|
|
|
k.t.Logf("Created topic %s with %d partitions", topicName, partitions)
|
|
return nil
|
|
}
|
|
|
|
// ProduceMessages produces messages using kafka-go
|
|
func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Message) error {
|
|
k.t.Helper()
|
|
|
|
writer := &kafka.Writer{
|
|
Addr: kafka.TCP(k.brokerAddr),
|
|
Topic: topicName,
|
|
Balancer: &kafka.LeastBytes{},
|
|
BatchTimeout: 50 * time.Millisecond,
|
|
RequiredAcks: kafka.RequireOne,
|
|
}
|
|
defer writer.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
err := writer.WriteMessages(ctx, messages...)
|
|
if err != nil {
|
|
return fmt.Errorf("write messages: %w", err)
|
|
}
|
|
|
|
k.t.Logf("Produced %d messages to topic %s", len(messages), topicName)
|
|
return nil
|
|
}
|
|
|
|
// ConsumeMessages consumes messages using kafka-go
|
|
func (k *KafkaGoClient) ConsumeMessages(topicName string, expectedCount int) ([]kafka.Message, error) {
|
|
k.t.Helper()
|
|
|
|
reader := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{k.brokerAddr},
|
|
Topic: topicName,
|
|
StartOffset: kafka.FirstOffset,
|
|
MinBytes: 1,
|
|
MaxBytes: 10e6,
|
|
})
|
|
defer reader.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
var messages []kafka.Message
|
|
for i := 0; i < expectedCount; i++ {
|
|
msg, err := reader.ReadMessage(ctx)
|
|
if err != nil {
|
|
return messages, fmt.Errorf("read message %d: %w", i, err)
|
|
}
|
|
messages = append(messages, msg)
|
|
}
|
|
|
|
k.t.Logf("Consumed %d messages from topic %s", len(messages), topicName)
|
|
return messages, nil
|
|
}
|
|
|
|
// ConsumeWithGroup consumes messages using consumer group
|
|
func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCount int) ([]kafka.Message, error) {
|
|
k.t.Helper()
|
|
|
|
reader := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{k.brokerAddr},
|
|
Topic: topicName,
|
|
GroupID: groupID,
|
|
MinBytes: 1,
|
|
MaxBytes: 10e6,
|
|
CommitInterval: 500 * time.Millisecond,
|
|
})
|
|
defer reader.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
var messages []kafka.Message
|
|
for i := 0; i < expectedCount; i++ {
|
|
// Fetch then explicitly commit to better control commit timing
|
|
msg, err := reader.FetchMessage(ctx)
|
|
if err != nil {
|
|
return messages, fmt.Errorf("read message %d: %w", i, err)
|
|
}
|
|
messages = append(messages, msg)
|
|
|
|
// Commit with simple retry to handle transient connection churn
|
|
var commitErr error
|
|
for attempt := 0; attempt < 3; attempt++ {
|
|
commitErr = reader.CommitMessages(ctx, msg)
|
|
if commitErr == nil {
|
|
break
|
|
}
|
|
// brief backoff
|
|
time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond)
|
|
}
|
|
if commitErr != nil {
|
|
return messages, fmt.Errorf("committing message %d: %w", i, commitErr)
|
|
}
|
|
}
|
|
|
|
k.t.Logf("Consumed %d messages from topic %s with group %s", len(messages), topicName, groupID)
|
|
return messages, nil
|
|
}
|
|
|
|
// CreateTopic creates a topic using Sarama
|
|
func (s *SaramaClient) CreateTopic(topicName string, partitions int32, replicationFactor int16) error {
|
|
s.t.Helper()
|
|
|
|
admin, err := sarama.NewClusterAdmin([]string{s.brokerAddr}, s.config)
|
|
if err != nil {
|
|
return fmt.Errorf("create admin client: %w", err)
|
|
}
|
|
defer admin.Close()
|
|
|
|
topicDetail := &sarama.TopicDetail{
|
|
NumPartitions: partitions,
|
|
ReplicationFactor: replicationFactor,
|
|
}
|
|
|
|
err = admin.CreateTopic(topicName, topicDetail, false)
|
|
if err != nil {
|
|
return fmt.Errorf("create topic: %w", err)
|
|
}
|
|
|
|
s.t.Logf("Created topic %s with %d partitions", topicName, partitions)
|
|
return nil
|
|
}
|
|
|
|
// ProduceMessages produces messages using Sarama
|
|
func (s *SaramaClient) ProduceMessages(topicName string, messages []string) error {
|
|
s.t.Helper()
|
|
|
|
producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config)
|
|
if err != nil {
|
|
return fmt.Errorf("create producer: %w", err)
|
|
}
|
|
defer producer.Close()
|
|
|
|
for i, msgText := range messages {
|
|
msg := &sarama.ProducerMessage{
|
|
Topic: topicName,
|
|
Key: sarama.StringEncoder(fmt.Sprintf("Test message %d", i)),
|
|
Value: sarama.StringEncoder(msgText),
|
|
}
|
|
|
|
partition, offset, err := producer.SendMessage(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("send message %d: %w", i, err)
|
|
}
|
|
|
|
s.t.Logf("Produced message %d: partition=%d, offset=%d", i, partition, offset)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProduceMessageToPartition produces a single message to a specific partition using Sarama
|
|
func (s *SaramaClient) ProduceMessageToPartition(topicName string, partition int32, message string) error {
|
|
s.t.Helper()
|
|
|
|
producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config)
|
|
if err != nil {
|
|
return fmt.Errorf("create producer: %w", err)
|
|
}
|
|
defer producer.Close()
|
|
|
|
msg := &sarama.ProducerMessage{
|
|
Topic: topicName,
|
|
Partition: partition,
|
|
Key: sarama.StringEncoder(fmt.Sprintf("key-p%d", partition)),
|
|
Value: sarama.StringEncoder(message),
|
|
}
|
|
|
|
actualPartition, offset, err := producer.SendMessage(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("send message to partition %d: %w", partition, err)
|
|
}
|
|
|
|
s.t.Logf("Produced message to partition %d: actualPartition=%d, offset=%d", partition, actualPartition, offset)
|
|
return nil
|
|
}
|
|
|
|
// ConsumeMessages consumes messages using Sarama
|
|
func (s *SaramaClient) ConsumeMessages(topicName string, partition int32, expectedCount int) ([]string, error) {
|
|
s.t.Helper()
|
|
|
|
consumer, err := sarama.NewConsumer([]string{s.brokerAddr}, s.config)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create consumer: %w", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
partitionConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetOldest)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create partition consumer: %w", err)
|
|
}
|
|
defer partitionConsumer.Close()
|
|
|
|
var messages []string
|
|
timeout := time.After(30 * time.Second)
|
|
|
|
for len(messages) < expectedCount {
|
|
select {
|
|
case msg := <-partitionConsumer.Messages():
|
|
messages = append(messages, string(msg.Value))
|
|
case err := <-partitionConsumer.Errors():
|
|
return messages, fmt.Errorf("consumer error: %w", err)
|
|
case <-timeout:
|
|
return messages, fmt.Errorf("timeout waiting for messages, got %d/%d", len(messages), expectedCount)
|
|
}
|
|
}
|
|
|
|
s.t.Logf("Consumed %d messages from topic %s", len(messages), topicName)
|
|
return messages, nil
|
|
}
|
|
|
|
// GetConfig returns the Sarama configuration
|
|
func (s *SaramaClient) GetConfig() *sarama.Config {
|
|
return s.config
|
|
}
|
|
|
|
// SetConfig sets a custom Sarama configuration
|
|
func (s *SaramaClient) SetConfig(config *sarama.Config) {
|
|
s.config = config
|
|
}
|