From f4a018e7312d993c0656dcee77a01a490bf84a74 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 09:58:31 -0700 Subject: [PATCH] verify produced messages are consumed --- .../cmd/loadtest/main.go | 40 ++- .../internal/consumer/consumer.go | 19 +- .../internal/producer/producer.go | 21 +- .../internal/tracker/tracker.go | 250 ++++++++++++++++++ 4 files changed, 323 insertions(+), 7 deletions(-) create mode 100644 test/kafka/kafka-client-loadtest/internal/tracker/tracker.go diff --git a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go index 2f435e600..33db85c28 100644 --- a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go +++ b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go @@ -22,6 +22,7 @@ import ( "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/producer" "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker" ) var ( @@ -143,6 +144,9 @@ func main() { func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { log.Printf("Starting producer-only test with %d producers", cfg.Producers.Count) + // Create record tracker (nil for now as producer-only test doesn't need comparison) + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl") + errChan := make(chan error, cfg.Producers.Count) for i := 0; i < cfg.Producers.Count; i++ { @@ -150,7 +154,7 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics go func(id int) { defer wg.Done() - prod, err := producer.New(cfg, collector, id) + prod, err := producer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create producer %d: %v", id, err) errChan <- err @@ -179,6 +183,9 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { log.Printf("Starting consumer-only test with %d consumers", cfg.Consumers.Count) + // Create record tracker + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl") + errChan := make(chan error, cfg.Consumers.Count) for i := 0; i < cfg.Consumers.Count; i++ { @@ -186,7 +193,7 @@ func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics go func(id int) { defer wg.Done() - cons, err := consumer.New(cfg, collector, id) + cons, err := consumer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create consumer %d: %v", id, err) errChan <- err @@ -206,6 +213,9 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c log.Printf("Starting comprehensive test with %d producers and %d consumers", cfg.Producers.Count, cfg.Consumers.Count) + // Create record tracker + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl") + errChan := make(chan error, cfg.Producers.Count) // Create separate contexts for producers and consumers @@ -218,7 +228,7 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c go func(id int) { defer wg.Done() - prod, err := producer.New(cfg, collector, id) + prod, err := producer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create producer %d: %v", id, err) errChan <- err @@ -244,7 +254,7 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c go func(id int) { defer wg.Done() - cons, err := consumer.New(cfg, collector, id) + cons, err := consumer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create consumer %d: %v", id, err) return @@ -304,6 +314,28 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c }() } + // Wait for all producer and consumer goroutines to complete + log.Printf("Waiting for all producers and consumers to complete...") + wg.Wait() + log.Printf("All producers and consumers completed, starting verification...") + + // Save produced and consumed records + log.Printf("Saving produced records...") + if err := recordTracker.SaveProduced(); err != nil { + log.Printf("Failed to save produced records: %v", err) + } + + log.Printf("Saving consumed records...") + if err := recordTracker.SaveConsumed(); err != nil { + log.Printf("Failed to save consumed records: %v", err) + } + + // Compare records + log.Printf("Comparing produced vs consumed records...") + result := recordTracker.Compare() + result.PrintSummary() + + log.Printf("Verification complete!") return nil } diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index 1171bd5c0..6c03288b9 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker" "google.golang.org/protobuf/proto" ) @@ -35,10 +36,13 @@ type Consumer struct { messagesProcessed int64 lastOffset map[string]map[int32]int64 offsetMutex sync.RWMutex + + // Record tracking + tracker *tracker.Tracker } // New creates a new consumer instance -func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) { +func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker *tracker.Tracker) (*Consumer, error) { // All consumers share the same group for load balancing across partitions consumerGroup := cfg.Consumers.GroupPrefix @@ -51,6 +55,7 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, e useConfluent: false, // Use Sarama by default lastOffset: make(map[string]map[int32]int64), schemaFormats: make(map[string]string), + tracker: recordTracker, } // Initialize schema formats for each topic (must match producer logic) @@ -600,6 +605,18 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, // return nil // } } else { + // Track consumed message + if h.consumer.tracker != nil { + h.consumer.tracker.TrackConsumed(tracker.Record{ + Key: string(key), + Topic: message.Topic, + Partition: message.Partition, + Offset: message.Offset, + Timestamp: message.Timestamp.UnixNano(), + ConsumerID: h.consumer.id, + }) + } + // Mark message as processed session.MarkMessage(message, "") } diff --git a/test/kafka/kafka-client-loadtest/internal/producer/producer.go b/test/kafka/kafka-client-loadtest/internal/producer/producer.go index 167bfeac6..f8b8db7f7 100644 --- a/test/kafka/kafka-client-loadtest/internal/producer/producer.go +++ b/test/kafka/kafka-client-loadtest/internal/producer/producer.go @@ -20,6 +20,7 @@ import ( "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema" pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker" "google.golang.org/protobuf/proto" ) @@ -50,6 +51,9 @@ type Producer struct { // Circuit breaker detection consecutiveFailures int + + // Record tracking + tracker *tracker.Tracker } // Message represents a test message @@ -64,7 +68,7 @@ type Message struct { } // New creates a new producer instance -func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, error) { +func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker *tracker.Tracker) (*Producer, error) { p := &Producer{ id: id, config: cfg, @@ -75,6 +79,7 @@ func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, e schemaIDs: make(map[string]int), schemaFormats: make(map[string]string), startTime: time.Now(), // Record test start time for unique key generation + tracker: recordTracker, } // Initialize schema formats for each topic @@ -375,11 +380,23 @@ func (p *Producer) produceSaramaMessage(topic string, startTime time.Time) error } // Produce message - _, _, err := p.saramaProducer.SendMessage(msg) + partition, offset, err := p.saramaProducer.SendMessage(msg) if err != nil { return err } + // Track produced message + if p.tracker != nil { + p.tracker.TrackProduced(tracker.Record{ + Key: key, + Topic: topic, + Partition: partition, + Offset: offset, + Timestamp: startTime.UnixNano(), + ProducerID: p.id, + }) + } + // Record metrics latency := time.Since(startTime) p.metricsCollector.RecordProducedMessage(len(messageValue), latency) diff --git a/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go b/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go new file mode 100644 index 000000000..2329ec691 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go @@ -0,0 +1,250 @@ +package tracker + +import ( + "encoding/json" + "fmt" + "os" + "sort" + "strings" + "sync" +) + +// Record represents a tracked message +type Record struct { + Key string `json:"key"` + Topic string `json:"topic"` + Partition int32 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp int64 `json:"timestamp"` + ProducerID int `json:"producer_id,omitempty"` + ConsumerID int `json:"consumer_id,omitempty"` +} + +// Tracker tracks produced and consumed records +type Tracker struct { + mu sync.Mutex + producedRecords []Record + consumedRecords []Record + producedFile string + consumedFile string +} + +// NewTracker creates a new record tracker +func NewTracker(producedFile, consumedFile string) *Tracker { + return &Tracker{ + producedRecords: make([]Record, 0, 100000), + consumedRecords: make([]Record, 0, 100000), + producedFile: producedFile, + consumedFile: consumedFile, + } +} + +// TrackProduced records a produced message +func (t *Tracker) TrackProduced(record Record) { + t.mu.Lock() + defer t.mu.Unlock() + t.producedRecords = append(t.producedRecords, record) +} + +// TrackConsumed records a consumed message +func (t *Tracker) TrackConsumed(record Record) { + t.mu.Lock() + defer t.mu.Unlock() + t.consumedRecords = append(t.consumedRecords, record) +} + +// SaveProduced writes produced records to file +func (t *Tracker) SaveProduced() error { + t.mu.Lock() + defer t.mu.Unlock() + + f, err := os.Create(t.producedFile) + if err != nil { + return fmt.Errorf("failed to create produced file: %v", err) + } + defer f.Close() + + encoder := json.NewEncoder(f) + for _, record := range t.producedRecords { + if err := encoder.Encode(record); err != nil { + return fmt.Errorf("failed to encode produced record: %v", err) + } + } + + fmt.Printf("Saved %d produced records to %s\n", len(t.producedRecords), t.producedFile) + return nil +} + +// SaveConsumed writes consumed records to file +func (t *Tracker) SaveConsumed() error { + t.mu.Lock() + defer t.mu.Unlock() + + f, err := os.Create(t.consumedFile) + if err != nil { + return fmt.Errorf("failed to create consumed file: %v", err) + } + defer f.Close() + + encoder := json.NewEncoder(f) + for _, record := range t.consumedRecords { + if err := encoder.Encode(record); err != nil { + return fmt.Errorf("failed to encode consumed record: %v", err) + } + } + + fmt.Printf("Saved %d consumed records to %s\n", len(t.consumedRecords), t.consumedFile) + return nil +} + +// Compare compares produced and consumed records +func (t *Tracker) Compare() ComparisonResult { + t.mu.Lock() + defer t.mu.Unlock() + + result := ComparisonResult{ + TotalProduced: len(t.producedRecords), + TotalConsumed: len(t.consumedRecords), + } + + // Build maps for efficient lookup + producedMap := make(map[string]Record) + for _, record := range t.producedRecords { + key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset) + producedMap[key] = record + } + + consumedMap := make(map[string]int) + duplicateKeys := make(map[string][]Record) + + for _, record := range t.consumedRecords { + key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset) + consumedMap[key]++ + + if consumedMap[key] > 1 { + duplicateKeys[key] = append(duplicateKeys[key], record) + } + } + + // Find missing records (produced but not consumed) + for key, record := range producedMap { + if _, found := consumedMap[key]; !found { + result.Missing = append(result.Missing, record) + } + } + + // Find duplicate records (consumed multiple times) + for key, records := range duplicateKeys { + if len(records) > 0 { + // Add first occurrence for context + result.Duplicates = append(result.Duplicates, DuplicateRecord{ + Record: records[0], + Count: consumedMap[key], + }) + } + } + + result.MissingCount = len(result.Missing) + result.DuplicateCount = len(result.Duplicates) + result.UniqueConsumed = result.TotalConsumed - sumDuplicates(result.Duplicates) + + return result +} + +// ComparisonResult holds the comparison results +type ComparisonResult struct { + TotalProduced int + TotalConsumed int + UniqueConsumed int + MissingCount int + DuplicateCount int + Missing []Record + Duplicates []DuplicateRecord +} + +// DuplicateRecord represents a record consumed multiple times +type DuplicateRecord struct { + Record Record + Count int +} + +// PrintSummary prints a summary of the comparison +func (r *ComparisonResult) PrintSummary() { + fmt.Println("\n" + strings.Repeat("=", 70)) + fmt.Println(" MESSAGE VERIFICATION RESULTS") + fmt.Println(strings.Repeat("=", 70)) + + fmt.Printf("\nProduction Summary:\n") + fmt.Printf(" Total Produced: %d messages\n", r.TotalProduced) + + fmt.Printf("\nConsumption Summary:\n") + fmt.Printf(" Total Consumed: %d messages\n", r.TotalConsumed) + fmt.Printf(" Unique Consumed: %d messages\n", r.UniqueConsumed) + fmt.Printf(" Duplicate Reads: %d messages\n", r.TotalConsumed-r.UniqueConsumed) + + fmt.Printf("\nVerification Results:\n") + if r.MissingCount == 0 { + fmt.Printf(" ✅ Missing Records: 0 (all messages delivered)\n") + } else { + fmt.Printf(" ❌ Missing Records: %d (data loss detected!)\n", r.MissingCount) + } + + if r.DuplicateCount == 0 { + fmt.Printf(" ✅ Duplicate Records: 0 (no duplicates)\n") + } else { + duplicatePercent := float64(r.TotalConsumed-r.UniqueConsumed) * 100.0 / float64(r.TotalProduced) + fmt.Printf(" ⚠️ Duplicate Records: %d unique messages read multiple times (%.1f%%)\n", + r.DuplicateCount, duplicatePercent) + } + + fmt.Printf("\nDelivery Guarantee:\n") + if r.MissingCount == 0 && r.DuplicateCount == 0 { + fmt.Printf(" ✅ EXACTLY-ONCE: All messages delivered exactly once\n") + } else if r.MissingCount == 0 { + fmt.Printf(" ✅ AT-LEAST-ONCE: All messages delivered (some duplicates)\n") + } else { + fmt.Printf(" ❌ AT-MOST-ONCE: Some messages lost\n") + } + + // Print sample of missing records (up to 10) + if len(r.Missing) > 0 { + fmt.Printf("\nSample Missing Records (first 10 of %d):\n", len(r.Missing)) + for i, record := range r.Missing { + if i >= 10 { + break + } + fmt.Printf(" - %s[%d]@%d (key=%s)\n", + record.Topic, record.Partition, record.Offset, record.Key) + } + } + + // Print sample of duplicate records (up to 10) + if len(r.Duplicates) > 0 { + fmt.Printf("\nSample Duplicate Records (first 10 of %d):\n", len(r.Duplicates)) + // Sort by count descending + sorted := make([]DuplicateRecord, len(r.Duplicates)) + copy(sorted, r.Duplicates) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Count > sorted[j].Count + }) + + for i, dup := range sorted { + if i >= 10 { + break + } + fmt.Printf(" - %s[%d]@%d (key=%s, read %d times)\n", + dup.Record.Topic, dup.Record.Partition, dup.Record.Offset, + dup.Record.Key, dup.Count) + } + } + + fmt.Println(strings.Repeat("=", 70)) +} + +func sumDuplicates(duplicates []DuplicateRecord) int { + sum := 0 + for _, dup := range duplicates { + sum += dup.Count - 1 // Don't count the first occurrence + } + return sum +}