Browse Source

verify produced messages are consumed

pull/7329/head
chrislu 7 days ago
parent
commit
f4a018e731
  1. 40
      test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
  2. 19
      test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
  3. 21
      test/kafka/kafka-client-loadtest/internal/producer/producer.go
  4. 250
      test/kafka/kafka-client-loadtest/internal/tracker/tracker.go

40
test/kafka/kafka-client-loadtest/cmd/loadtest/main.go

@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/producer"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker"
)
var (
@ -143,6 +144,9 @@ func main() {
func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error {
log.Printf("Starting producer-only test with %d producers", cfg.Producers.Count)
// Create record tracker (nil for now as producer-only test doesn't need comparison)
recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl")
errChan := make(chan error, cfg.Producers.Count)
for i := 0; i < cfg.Producers.Count; i++ {
@ -150,7 +154,7 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics
go func(id int) {
defer wg.Done()
prod, err := producer.New(cfg, collector, id)
prod, err := producer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create producer %d: %v", id, err)
errChan <- err
@ -179,6 +183,9 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics
func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error {
log.Printf("Starting consumer-only test with %d consumers", cfg.Consumers.Count)
// Create record tracker
recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl")
errChan := make(chan error, cfg.Consumers.Count)
for i := 0; i < cfg.Consumers.Count; i++ {
@ -186,7 +193,7 @@ func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics
go func(id int) {
defer wg.Done()
cons, err := consumer.New(cfg, collector, id)
cons, err := consumer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create consumer %d: %v", id, err)
errChan <- err
@ -206,6 +213,9 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
log.Printf("Starting comprehensive test with %d producers and %d consumers",
cfg.Producers.Count, cfg.Consumers.Count)
// Create record tracker
recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl")
errChan := make(chan error, cfg.Producers.Count)
// Create separate contexts for producers and consumers
@ -218,7 +228,7 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
go func(id int) {
defer wg.Done()
prod, err := producer.New(cfg, collector, id)
prod, err := producer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create producer %d: %v", id, err)
errChan <- err
@ -244,7 +254,7 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
go func(id int) {
defer wg.Done()
cons, err := consumer.New(cfg, collector, id)
cons, err := consumer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create consumer %d: %v", id, err)
return
@ -304,6 +314,28 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
}()
}
// Wait for all producer and consumer goroutines to complete
log.Printf("Waiting for all producers and consumers to complete...")
wg.Wait()
log.Printf("All producers and consumers completed, starting verification...")
// Save produced and consumed records
log.Printf("Saving produced records...")
if err := recordTracker.SaveProduced(); err != nil {
log.Printf("Failed to save produced records: %v", err)
}
log.Printf("Saving consumed records...")
if err := recordTracker.SaveConsumed(); err != nil {
log.Printf("Failed to save consumed records: %v", err)
}
// Compare records
log.Printf("Comparing produced vs consumed records...")
result := recordTracker.Compare()
result.PrintSummary()
log.Printf("Verification complete!")
return nil
}

19
test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker"
"google.golang.org/protobuf/proto"
)
@ -35,10 +36,13 @@ type Consumer struct {
messagesProcessed int64
lastOffset map[string]map[int32]int64
offsetMutex sync.RWMutex
// Record tracking
tracker *tracker.Tracker
}
// New creates a new consumer instance
func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) {
func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker *tracker.Tracker) (*Consumer, error) {
// All consumers share the same group for load balancing across partitions
consumerGroup := cfg.Consumers.GroupPrefix
@ -51,6 +55,7 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, e
useConfluent: false, // Use Sarama by default
lastOffset: make(map[string]map[int32]int64),
schemaFormats: make(map[string]string),
tracker: recordTracker,
}
// Initialize schema formats for each topic (must match producer logic)
@ -600,6 +605,18 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
// return nil
// }
} else {
// Track consumed message
if h.consumer.tracker != nil {
h.consumer.tracker.TrackConsumed(tracker.Record{
Key: string(key),
Topic: message.Topic,
Partition: message.Partition,
Offset: message.Offset,
Timestamp: message.Timestamp.UnixNano(),
ConsumerID: h.consumer.id,
})
}
// Mark message as processed
session.MarkMessage(message, "")
}

21
test/kafka/kafka-client-loadtest/internal/producer/producer.go

@ -20,6 +20,7 @@ import (
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema"
pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker"
"google.golang.org/protobuf/proto"
)
@ -50,6 +51,9 @@ type Producer struct {
// Circuit breaker detection
consecutiveFailures int
// Record tracking
tracker *tracker.Tracker
}
// Message represents a test message
@ -64,7 +68,7 @@ type Message struct {
}
// New creates a new producer instance
func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, error) {
func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker *tracker.Tracker) (*Producer, error) {
p := &Producer{
id: id,
config: cfg,
@ -75,6 +79,7 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, e
schemaIDs: make(map[string]int),
schemaFormats: make(map[string]string),
startTime: time.Now(), // Record test start time for unique key generation
tracker: recordTracker,
}
// Initialize schema formats for each topic
@ -375,11 +380,23 @@ func (p *Producer) produceSaramaMessage(topic string, startTime time.Time) error
}
// Produce message
_, _, err := p.saramaProducer.SendMessage(msg)
partition, offset, err := p.saramaProducer.SendMessage(msg)
if err != nil {
return err
}
// Track produced message
if p.tracker != nil {
p.tracker.TrackProduced(tracker.Record{
Key: key,
Topic: topic,
Partition: partition,
Offset: offset,
Timestamp: startTime.UnixNano(),
ProducerID: p.id,
})
}
// Record metrics
latency := time.Since(startTime)
p.metricsCollector.RecordProducedMessage(len(messageValue), latency)

250
test/kafka/kafka-client-loadtest/internal/tracker/tracker.go

@ -0,0 +1,250 @@
package tracker
import (
"encoding/json"
"fmt"
"os"
"sort"
"strings"
"sync"
)
// Record represents a tracked message
type Record struct {
Key string `json:"key"`
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"`
ProducerID int `json:"producer_id,omitempty"`
ConsumerID int `json:"consumer_id,omitempty"`
}
// Tracker tracks produced and consumed records
type Tracker struct {
mu sync.Mutex
producedRecords []Record
consumedRecords []Record
producedFile string
consumedFile string
}
// NewTracker creates a new record tracker
func NewTracker(producedFile, consumedFile string) *Tracker {
return &Tracker{
producedRecords: make([]Record, 0, 100000),
consumedRecords: make([]Record, 0, 100000),
producedFile: producedFile,
consumedFile: consumedFile,
}
}
// TrackProduced records a produced message
func (t *Tracker) TrackProduced(record Record) {
t.mu.Lock()
defer t.mu.Unlock()
t.producedRecords = append(t.producedRecords, record)
}
// TrackConsumed records a consumed message
func (t *Tracker) TrackConsumed(record Record) {
t.mu.Lock()
defer t.mu.Unlock()
t.consumedRecords = append(t.consumedRecords, record)
}
// SaveProduced writes produced records to file
func (t *Tracker) SaveProduced() error {
t.mu.Lock()
defer t.mu.Unlock()
f, err := os.Create(t.producedFile)
if err != nil {
return fmt.Errorf("failed to create produced file: %v", err)
}
defer f.Close()
encoder := json.NewEncoder(f)
for _, record := range t.producedRecords {
if err := encoder.Encode(record); err != nil {
return fmt.Errorf("failed to encode produced record: %v", err)
}
}
fmt.Printf("Saved %d produced records to %s\n", len(t.producedRecords), t.producedFile)
return nil
}
// SaveConsumed writes consumed records to file
func (t *Tracker) SaveConsumed() error {
t.mu.Lock()
defer t.mu.Unlock()
f, err := os.Create(t.consumedFile)
if err != nil {
return fmt.Errorf("failed to create consumed file: %v", err)
}
defer f.Close()
encoder := json.NewEncoder(f)
for _, record := range t.consumedRecords {
if err := encoder.Encode(record); err != nil {
return fmt.Errorf("failed to encode consumed record: %v", err)
}
}
fmt.Printf("Saved %d consumed records to %s\n", len(t.consumedRecords), t.consumedFile)
return nil
}
// Compare compares produced and consumed records
func (t *Tracker) Compare() ComparisonResult {
t.mu.Lock()
defer t.mu.Unlock()
result := ComparisonResult{
TotalProduced: len(t.producedRecords),
TotalConsumed: len(t.consumedRecords),
}
// Build maps for efficient lookup
producedMap := make(map[string]Record)
for _, record := range t.producedRecords {
key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset)
producedMap[key] = record
}
consumedMap := make(map[string]int)
duplicateKeys := make(map[string][]Record)
for _, record := range t.consumedRecords {
key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset)
consumedMap[key]++
if consumedMap[key] > 1 {
duplicateKeys[key] = append(duplicateKeys[key], record)
}
}
// Find missing records (produced but not consumed)
for key, record := range producedMap {
if _, found := consumedMap[key]; !found {
result.Missing = append(result.Missing, record)
}
}
// Find duplicate records (consumed multiple times)
for key, records := range duplicateKeys {
if len(records) > 0 {
// Add first occurrence for context
result.Duplicates = append(result.Duplicates, DuplicateRecord{
Record: records[0],
Count: consumedMap[key],
})
}
}
result.MissingCount = len(result.Missing)
result.DuplicateCount = len(result.Duplicates)
result.UniqueConsumed = result.TotalConsumed - sumDuplicates(result.Duplicates)
return result
}
// ComparisonResult holds the comparison results
type ComparisonResult struct {
TotalProduced int
TotalConsumed int
UniqueConsumed int
MissingCount int
DuplicateCount int
Missing []Record
Duplicates []DuplicateRecord
}
// DuplicateRecord represents a record consumed multiple times
type DuplicateRecord struct {
Record Record
Count int
}
// PrintSummary prints a summary of the comparison
func (r *ComparisonResult) PrintSummary() {
fmt.Println("\n" + strings.Repeat("=", 70))
fmt.Println(" MESSAGE VERIFICATION RESULTS")
fmt.Println(strings.Repeat("=", 70))
fmt.Printf("\nProduction Summary:\n")
fmt.Printf(" Total Produced: %d messages\n", r.TotalProduced)
fmt.Printf("\nConsumption Summary:\n")
fmt.Printf(" Total Consumed: %d messages\n", r.TotalConsumed)
fmt.Printf(" Unique Consumed: %d messages\n", r.UniqueConsumed)
fmt.Printf(" Duplicate Reads: %d messages\n", r.TotalConsumed-r.UniqueConsumed)
fmt.Printf("\nVerification Results:\n")
if r.MissingCount == 0 {
fmt.Printf(" ✅ Missing Records: 0 (all messages delivered)\n")
} else {
fmt.Printf(" ❌ Missing Records: %d (data loss detected!)\n", r.MissingCount)
}
if r.DuplicateCount == 0 {
fmt.Printf(" ✅ Duplicate Records: 0 (no duplicates)\n")
} else {
duplicatePercent := float64(r.TotalConsumed-r.UniqueConsumed) * 100.0 / float64(r.TotalProduced)
fmt.Printf(" ⚠️ Duplicate Records: %d unique messages read multiple times (%.1f%%)\n",
r.DuplicateCount, duplicatePercent)
}
fmt.Printf("\nDelivery Guarantee:\n")
if r.MissingCount == 0 && r.DuplicateCount == 0 {
fmt.Printf(" ✅ EXACTLY-ONCE: All messages delivered exactly once\n")
} else if r.MissingCount == 0 {
fmt.Printf(" ✅ AT-LEAST-ONCE: All messages delivered (some duplicates)\n")
} else {
fmt.Printf(" ❌ AT-MOST-ONCE: Some messages lost\n")
}
// Print sample of missing records (up to 10)
if len(r.Missing) > 0 {
fmt.Printf("\nSample Missing Records (first 10 of %d):\n", len(r.Missing))
for i, record := range r.Missing {
if i >= 10 {
break
}
fmt.Printf(" - %s[%d]@%d (key=%s)\n",
record.Topic, record.Partition, record.Offset, record.Key)
}
}
// Print sample of duplicate records (up to 10)
if len(r.Duplicates) > 0 {
fmt.Printf("\nSample Duplicate Records (first 10 of %d):\n", len(r.Duplicates))
// Sort by count descending
sorted := make([]DuplicateRecord, len(r.Duplicates))
copy(sorted, r.Duplicates)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Count > sorted[j].Count
})
for i, dup := range sorted {
if i >= 10 {
break
}
fmt.Printf(" - %s[%d]@%d (key=%s, read %d times)\n",
dup.Record.Topic, dup.Record.Partition, dup.Record.Offset,
dup.Record.Key, dup.Count)
}
}
fmt.Println(strings.Repeat("=", 70))
}
func sumDuplicates(duplicates []DuplicateRecord) int {
sum := 0
for _, dup := range duplicates {
sum += dup.Count - 1 // Don't count the first occurrence
}
return sum
}
Loading…
Cancel
Save