Browse Source

fix: commit offsets in Cleanup() before rebalancing

This commit adds explicit offset commit in the ConsumerGroupHandler.Cleanup()
method, which is called during consumer group rebalancing. This ensures all
marked offsets are committed BEFORE partitions are reassigned to other consumers,
significantly reducing duplicate message consumption during rebalancing.

Problem:
- Cleanup() was not committing offsets before rebalancing
- When partition reassigned to another consumer, it started from last committed offset
- Uncommitted messages (processed but not yet committed) were read again by new consumer
- This caused ~100-200% duplicate messages during rebalancing in tests

Solution:
- Add session.Commit() in Cleanup() method
- This runs after all ConsumeClaim goroutines have exited
- Ensures all MarkMessage() calls are committed before partition release
- New consumer starts from the last processed offset, not an older committed offset

Benefits:
- Dramatically reduces duplicate messages during rebalancing
- Improves at-least-once semantics (closer to exactly-once for normal cases)
- Better performance (less redundant processing)
- Cleaner test results (expected duplicates only from actual failures)

Kafka Rebalancing Lifecycle:
1. Rebalance triggered (consumer join/leave, timeout, etc.)
2. All ConsumeClaim goroutines cancelled
3. Cleanup() called ← WE COMMIT HERE NOW
4. Partitions reassigned to other consumers
5. New consumer starts from last committed offset ← NOW MORE UP-TO-DATE

Expected Results:
- Before: ~100-200% duplicates during rebalancing (2-3x reads)
- After: <10% duplicates (only from uncommitted in-flight messages)

This is a critical fix for production deployments where consumer churn
(scaling, restarts, failures) causes frequent rebalancing.
pull/7329/head
chrislu 5 days ago
parent
commit
2ffdda2661
  1. 12
      test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

12
test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

@ -571,8 +571,16 @@ func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id)
// CRITICAL: Commit all marked offsets before partition reassignment to minimize duplicates
func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
log.Printf("Consumer %d: Consumer group session cleanup - committing final offsets before rebalance", h.consumer.id)
// Commit all marked offsets before releasing partitions
// This ensures that when partitions are reassigned to other consumers,
// they start from the last processed offset, minimizing duplicate reads
session.Commit()
log.Printf("Consumer %d: Cleanup complete - offsets committed", h.consumer.id)
return nil
}

Loading…
Cancel
Save