diff --git a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go index 33db85c28..a164ea13d 100644 --- a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go +++ b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go @@ -144,8 +144,9 @@ func main() { func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { log.Printf("Starting producer-only test with %d producers", cfg.Producers.Count) - // Create record tracker (nil for now as producer-only test doesn't need comparison) - recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl") + // Create record tracker with current timestamp to filter old messages + testStartTime := time.Now().UnixNano() + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime) errChan := make(chan error, cfg.Producers.Count) @@ -183,8 +184,9 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { log.Printf("Starting consumer-only test with %d consumers", cfg.Consumers.Count) - // Create record tracker - recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl") + // Create record tracker with current timestamp to filter old messages + testStartTime := time.Now().UnixNano() + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime) errChan := make(chan error, cfg.Consumers.Count) @@ -213,8 +215,10 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c log.Printf("Starting comprehensive test with %d producers and %d consumers", cfg.Producers.Count, cfg.Consumers.Count) - // Create record tracker - recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl") + // Create record tracker with current timestamp to filter old messages + testStartTime := time.Now().UnixNano() + log.Printf("Test run starting at %d - only tracking messages from this run", testStartTime) + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime) errChan := make(chan error, cfg.Producers.Count) diff --git a/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go b/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go index 2329ec691..1f67c7a65 100644 --- a/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go +++ b/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go @@ -7,6 +7,7 @@ import ( "sort" "strings" "sync" + "time" ) // Record represents a tracked message @@ -22,20 +23,34 @@ type Record struct { // Tracker tracks produced and consumed records type Tracker struct { - mu sync.Mutex - producedRecords []Record - consumedRecords []Record - producedFile string - consumedFile string + mu sync.Mutex + producedRecords []Record + consumedRecords []Record + producedFile string + consumedFile string + testStartTime int64 // Unix timestamp in nanoseconds - used to filter old messages + testRunPrefix string // Key prefix for this test run (e.g., "run-20251015-170150") + filteredOldCount int // Count of old messages consumed but not tracked } // NewTracker creates a new record tracker -func NewTracker(producedFile, consumedFile string) *Tracker { +func NewTracker(producedFile, consumedFile string, testStartTime int64) *Tracker { + // Generate test run prefix from start time using same format as producer + // Producer format: p.startTime.Format("20060102-150405") -> "20251015-170859" + startTime := time.Unix(0, testStartTime) + runID := startTime.Format("20060102-150405") + testRunPrefix := fmt.Sprintf("run-%s", runID) + + fmt.Printf("Tracker initialized with prefix: %s (filtering messages not matching this prefix)\n", testRunPrefix) + return &Tracker{ - producedRecords: make([]Record, 0, 100000), - consumedRecords: make([]Record, 0, 100000), - producedFile: producedFile, - consumedFile: consumedFile, + producedRecords: make([]Record, 0, 100000), + consumedRecords: make([]Record, 0, 100000), + producedFile: producedFile, + consumedFile: consumedFile, + testStartTime: testStartTime, + testRunPrefix: testRunPrefix, + filteredOldCount: 0, } } @@ -47,9 +62,20 @@ func (t *Tracker) TrackProduced(record Record) { } // TrackConsumed records a consumed message +// Only tracks messages from the current test run (filters out old messages from previous tests) func (t *Tracker) TrackConsumed(record Record) { t.mu.Lock() defer t.mu.Unlock() + + // Filter: Only track messages from current test run based on key prefix + // Producer keys look like: "run-20251015-170150-key-123" + // We only want messages that match our test run prefix + if !strings.HasPrefix(record.Key, t.testRunPrefix) { + // Count old messages consumed but not tracked + t.filteredOldCount++ + return + } + t.consumedRecords = append(t.consumedRecords, record) } @@ -103,8 +129,9 @@ func (t *Tracker) Compare() ComparisonResult { defer t.mu.Unlock() result := ComparisonResult{ - TotalProduced: len(t.producedRecords), - TotalConsumed: len(t.consumedRecords), + TotalProduced: len(t.producedRecords), + TotalConsumed: len(t.consumedRecords), + FilteredOldCount: t.filteredOldCount, } // Build maps for efficient lookup @@ -153,13 +180,14 @@ func (t *Tracker) Compare() ComparisonResult { // ComparisonResult holds the comparison results type ComparisonResult struct { - TotalProduced int - TotalConsumed int - UniqueConsumed int - MissingCount int - DuplicateCount int - Missing []Record - Duplicates []DuplicateRecord + TotalProduced int + TotalConsumed int + UniqueConsumed int + MissingCount int + DuplicateCount int + FilteredOldCount int // Old messages consumed but filtered out + Missing []Record + Duplicates []DuplicateRecord } // DuplicateRecord represents a record consumed multiple times @@ -178,9 +206,12 @@ func (r *ComparisonResult) PrintSummary() { fmt.Printf(" Total Produced: %d messages\n", r.TotalProduced) fmt.Printf("\nConsumption Summary:\n") - fmt.Printf(" Total Consumed: %d messages\n", r.TotalConsumed) + fmt.Printf(" Total Consumed: %d messages (from current test)\n", r.TotalConsumed) fmt.Printf(" Unique Consumed: %d messages\n", r.UniqueConsumed) fmt.Printf(" Duplicate Reads: %d messages\n", r.TotalConsumed-r.UniqueConsumed) + if r.FilteredOldCount > 0 { + fmt.Printf(" Filtered Old: %d messages (from previous tests, not tracked)\n", r.FilteredOldCount) + } fmt.Printf("\nVerification Results:\n") if r.MissingCount == 0 {