You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
135 lines
4.1 KiB
135 lines
4.1 KiB
package testutil
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
// MessageGenerator provides utilities for generating test messages
|
|
type MessageGenerator struct {
|
|
counter int
|
|
}
|
|
|
|
// NewMessageGenerator creates a new message generator
|
|
func NewMessageGenerator() *MessageGenerator {
|
|
return &MessageGenerator{counter: 0}
|
|
}
|
|
|
|
// GenerateKafkaGoMessages generates kafka-go messages for testing
|
|
func (m *MessageGenerator) GenerateKafkaGoMessages(count int) []kafka.Message {
|
|
messages := make([]kafka.Message, count)
|
|
|
|
for i := 0; i < count; i++ {
|
|
m.counter++
|
|
key := []byte(fmt.Sprintf("test-key-%d", m.counter))
|
|
val := []byte(fmt.Sprintf("{\"value\":\"test-message-%d-generated-at-%d\"}", m.counter, time.Now().Unix()))
|
|
|
|
// If schema mode is requested, ensure a test schema exists and wrap with Confluent envelope
|
|
if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
|
|
subject := "offset-management-value"
|
|
schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}`
|
|
rc := schema.NewRegistryClient(schema.RegistryConfig{URL: url})
|
|
if _, err := rc.GetLatestSchema(subject); err != nil {
|
|
// Best-effort register schema
|
|
_, _ = rc.RegisterSchema(subject, schemaJSON)
|
|
}
|
|
if latest, err := rc.GetLatestSchema(subject); err == nil {
|
|
val = schema.CreateConfluentEnvelope(schema.FormatAvro, latest.LatestID, nil, val)
|
|
} else {
|
|
// fallback to schema id 1
|
|
val = schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, val)
|
|
}
|
|
}
|
|
|
|
messages[i] = kafka.Message{Key: key, Value: val}
|
|
}
|
|
|
|
return messages
|
|
}
|
|
|
|
// GenerateStringMessages generates string messages for Sarama
|
|
func (m *MessageGenerator) GenerateStringMessages(count int) []string {
|
|
messages := make([]string, count)
|
|
|
|
for i := 0; i < count; i++ {
|
|
m.counter++
|
|
messages[i] = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
|
|
}
|
|
|
|
return messages
|
|
}
|
|
|
|
// GenerateKafkaGoMessage generates a single kafka-go message
|
|
func (m *MessageGenerator) GenerateKafkaGoMessage(key, value string) kafka.Message {
|
|
if key == "" {
|
|
m.counter++
|
|
key = fmt.Sprintf("test-key-%d", m.counter)
|
|
}
|
|
if value == "" {
|
|
value = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
|
|
}
|
|
|
|
return kafka.Message{
|
|
Key: []byte(key),
|
|
Value: []byte(value),
|
|
}
|
|
}
|
|
|
|
// GenerateUniqueTopicName generates a unique topic name for testing
|
|
func GenerateUniqueTopicName(prefix string) string {
|
|
if prefix == "" {
|
|
prefix = "test-topic"
|
|
}
|
|
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
|
|
}
|
|
|
|
// GenerateUniqueGroupID generates a unique consumer group ID for testing
|
|
func GenerateUniqueGroupID(prefix string) string {
|
|
if prefix == "" {
|
|
prefix = "test-group"
|
|
}
|
|
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
|
|
}
|
|
|
|
// ValidateMessageContent validates that consumed messages match expected content
|
|
func ValidateMessageContent(expected, actual []string) error {
|
|
if len(expected) != len(actual) {
|
|
return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
|
|
}
|
|
|
|
for i, expectedMsg := range expected {
|
|
if i >= len(actual) {
|
|
return fmt.Errorf("missing message at index %d", i)
|
|
}
|
|
if actual[i] != expectedMsg {
|
|
return fmt.Errorf("message mismatch at index %d: expected %q, got %q", i, expectedMsg, actual[i])
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ValidateKafkaGoMessageContent validates kafka-go messages
|
|
func ValidateKafkaGoMessageContent(expected, actual []kafka.Message) error {
|
|
if len(expected) != len(actual) {
|
|
return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
|
|
}
|
|
|
|
for i, expectedMsg := range expected {
|
|
if i >= len(actual) {
|
|
return fmt.Errorf("missing message at index %d", i)
|
|
}
|
|
if string(actual[i].Key) != string(expectedMsg.Key) {
|
|
return fmt.Errorf("key mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Key), string(actual[i].Key))
|
|
}
|
|
if string(actual[i].Value) != string(expectedMsg.Value) {
|
|
return fmt.Errorf("value mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Value), string(actual[i].Value))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|