diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go index 65f581de3..0d4923e35 100644 --- a/test/kafka/internal/testutil/clients.go +++ b/test/kafka/internal/testutil/clients.go @@ -84,7 +84,9 @@ func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Messa } defer writer.Close() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // Increased timeout to handle slow CI environments, especially when consumer groups + // are active and holding locks or requiring offset commits + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := writer.WriteMessages(ctx, messages...) @@ -144,7 +146,9 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun offset := reader.Offset() k.t.Logf("Consumer group reader created for group %s, initial offset: %d", groupID, offset) - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // Increased timeout for consumer groups - they require coordinator discovery, + // offset fetching, and offset commits which can be slow in CI environments + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() var messages []kafka.Message