From fba4fc3a7dc54576b735110739093a37f184b0f9 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 13 Oct 2025 19:43:09 -0700 Subject: [PATCH] All consumers share the same group for load balancing across partitions --- .../kafka-client-loadtest/internal/consumer/consumer.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index e1c4caa41..1171bd5c0 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -39,7 +39,8 @@ type Consumer struct { // New creates a new consumer instance func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) { - consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id) + // All consumers share the same group for load balancing across partitions + consumerGroup := cfg.Consumers.GroupPrefix c := &Consumer{ id: id, @@ -226,9 +227,9 @@ func (c *Consumer) runSaramaConsumer(ctx context.Context) { log.Printf("Consumer %d: Error consuming: %v", c.id, err) c.metricsCollector.RecordConsumerError() - // Wait before retrying + // Wait briefly before retrying (reduced from 5s to 1s for faster recovery) select { - case <-time.After(5 * time.Second): + case <-time.After(1 * time.Second): case <-ctx.Done(): return }