package testutil import ( "context" "fmt" "testing" "time" "github.com/IBM/sarama" "github.com/segmentio/kafka-go" ) // KafkaGoClient wraps kafka-go client with test utilities type KafkaGoClient struct { brokerAddr string t *testing.T } // SaramaClient wraps Sarama client with test utilities type SaramaClient struct { brokerAddr string config *sarama.Config t *testing.T } // NewKafkaGoClient creates a new kafka-go test client func NewKafkaGoClient(t *testing.T, brokerAddr string) *KafkaGoClient { return &KafkaGoClient{ brokerAddr: brokerAddr, t: t, } } // NewSaramaClient creates a new Sarama test client with default config func NewSaramaClient(t *testing.T, brokerAddr string) *SaramaClient { config := sarama.NewConfig() config.Version = sarama.V2_8_0_0 config.Producer.Return.Successes = true config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = sarama.OffsetOldest // Start from earliest when no committed offset return &SaramaClient{ brokerAddr: brokerAddr, config: config, t: t, } } // CreateTopic creates a topic using kafka-go func (k *KafkaGoClient) CreateTopic(topicName string, partitions int, replicationFactor int) error { k.t.Helper() conn, err := kafka.Dial("tcp", k.brokerAddr) if err != nil { return fmt.Errorf("dial broker: %w", err) } defer conn.Close() topicConfig := kafka.TopicConfig{ Topic: topicName, NumPartitions: partitions, ReplicationFactor: replicationFactor, } err = conn.CreateTopics(topicConfig) if err != nil { return fmt.Errorf("create topic: %w", err) } k.t.Logf("Created topic %s with %d partitions", topicName, partitions) return nil } // ProduceMessages produces messages using kafka-go func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Message) error { k.t.Helper() writer := &kafka.Writer{ Addr: kafka.TCP(k.brokerAddr), Topic: topicName, Balancer: &kafka.LeastBytes{}, BatchTimeout: 50 * time.Millisecond, RequiredAcks: kafka.RequireOne, } defer writer.Close() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := writer.WriteMessages(ctx, messages...) if err != nil { return fmt.Errorf("write messages: %w", err) } k.t.Logf("Produced %d messages to topic %s", len(messages), topicName) return nil } // ConsumeMessages consumes messages using kafka-go func (k *KafkaGoClient) ConsumeMessages(topicName string, expectedCount int) ([]kafka.Message, error) { k.t.Helper() reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{k.brokerAddr}, Topic: topicName, StartOffset: kafka.FirstOffset, MinBytes: 1, MaxBytes: 10e6, }) defer reader.Close() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() var messages []kafka.Message for i := 0; i < expectedCount; i++ { msg, err := reader.ReadMessage(ctx) if err != nil { return messages, fmt.Errorf("read message %d: %w", i, err) } messages = append(messages, msg) } k.t.Logf("Consumed %d messages from topic %s", len(messages), topicName) return messages, nil } // ConsumeWithGroup consumes messages using consumer group func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCount int) ([]kafka.Message, error) { k.t.Helper() reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{k.brokerAddr}, Topic: topicName, GroupID: groupID, MinBytes: 1, MaxBytes: 10e6, CommitInterval: 500 * time.Millisecond, }) defer reader.Close() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() var messages []kafka.Message for i := 0; i < expectedCount; i++ { // Fetch then explicitly commit to better control commit timing msg, err := reader.FetchMessage(ctx) if err != nil { return messages, fmt.Errorf("read message %d: %w", i, err) } messages = append(messages, msg) // Commit with simple retry to handle transient connection churn var commitErr error for attempt := 0; attempt < 3; attempt++ { commitErr = reader.CommitMessages(ctx, msg) if commitErr == nil { break } // brief backoff time.Sleep(time.Duration(50*(1<