From 66d87659e5f281861a69766cd5d0e3bf772eeae1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 20:15:46 -0700 Subject: [PATCH] test: increase timeouts for consumer group operations in E2E tests Consumer group operations (coordinator discovery, offset fetch/commit) are slower in CI environments with limited resources. This increases timeouts to: - ProduceMessages: 10s -> 30s (for when consumer groups are active) - ConsumeWithGroup: 30s -> 60s (for offset fetch/commit operations) Fixes the TestOffsetManagement timeout failures in GitHub Actions CI. --- test/kafka/internal/testutil/clients.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go index 65f581de3..0d4923e35 100644 --- a/test/kafka/internal/testutil/clients.go +++ b/test/kafka/internal/testutil/clients.go @@ -84,7 +84,9 @@ func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Messa } defer writer.Close() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // Increased timeout to handle slow CI environments, especially when consumer groups + // are active and holding locks or requiring offset commits + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := writer.WriteMessages(ctx, messages...) @@ -144,7 +146,9 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun offset := reader.Offset() k.t.Logf("Consumer group reader created for group %s, initial offset: %d", groupID, offset) - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // Increased timeout for consumer groups - they require coordinator discovery, + // offset fetching, and offset commits which can be slow in CI environments + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() var messages []kafka.Message