diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index 273dc9c77..ed7b5d9b0 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -571,8 +571,16 @@ func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited -func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { - log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id) +// CRITICAL: Commit all marked offsets before partition reassignment to minimize duplicates +func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session cleanup - committing final offsets before rebalance", h.consumer.id) + + // Commit all marked offsets before releasing partitions + // This ensures that when partitions are reassigned to other consumers, + // they start from the last processed offset, minimizing duplicate reads + session.Commit() + + log.Printf("Consumer %d: Cleanup complete - offsets committed", h.consumer.id) return nil }