diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index e1c4caa41..1171bd5c0 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -39,7 +39,8 @@ type Consumer struct { // New creates a new consumer instance func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) { - consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id) + // All consumers share the same group for load balancing across partitions + consumerGroup := cfg.Consumers.GroupPrefix c := &Consumer{ id: id, @@ -226,9 +227,9 @@ func (c *Consumer) runSaramaConsumer(ctx context.Context) { log.Printf("Consumer %d: Error consuming: %v", c.id, err) c.metricsCollector.RecordConsumerError() - // Wait before retrying + // Wait briefly before retrying (reduced from 5s to 1s for faster recovery) select { - case <-time.After(5 * time.Second): + case <-time.After(1 * time.Second): case <-ctx.Done(): return }