You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
10 KiB

2 years ago
2 years ago
  1. """Fixtures for tests."""
  2. import ipaddress
  3. import os
  4. import uuid
  5. from datetime import datetime, timedelta
  6. import pytest
  7. from cryptography import x509
  8. from cryptography.hazmat.backends import default_backend
  9. from cryptography.hazmat.primitives import hashes, serialization
  10. from cryptography.hazmat.primitives.asymmetric import rsa
  11. from cryptography.x509.oid import NameOID
  12. from keycloak import KeycloakAdmin, KeycloakOpenID
  13. class KeycloakTestEnv(object):
  14. """Wrapper for test Keycloak connection configuration.
  15. :param host: Hostname
  16. :type host: str
  17. :param port: Port
  18. :type port: str
  19. :param username: Admin username
  20. :type username: str
  21. :param password: Admin password
  22. :type password: str
  23. """
  24. def __init__(
  25. self,
  26. host: str = os.environ["KEYCLOAK_HOST"],
  27. port: str = os.environ["KEYCLOAK_PORT"],
  28. username: str = os.environ["KEYCLOAK_ADMIN"],
  29. password: str = os.environ["KEYCLOAK_ADMIN_PASSWORD"],
  30. ):
  31. """Init method."""
  32. self.KEYCLOAK_HOST = host
  33. self.KEYCLOAK_PORT = port
  34. self.KEYCLOAK_ADMIN = username
  35. self.KEYCLOAK_ADMIN_PASSWORD = password
  36. @property
  37. def KEYCLOAK_HOST(self):
  38. """Hostname getter."""
  39. return self._KEYCLOAK_HOST
  40. @KEYCLOAK_HOST.setter
  41. def KEYCLOAK_HOST(self, value: str):
  42. """Hostname setter."""
  43. self._KEYCLOAK_HOST = value
  44. @property
  45. def KEYCLOAK_PORT(self):
  46. """Port getter."""
  47. return self._KEYCLOAK_PORT
  48. @KEYCLOAK_PORT.setter
  49. def KEYCLOAK_PORT(self, value: str):
  50. """Port setter."""
  51. self._KEYCLOAK_PORT = value
  52. @property
  53. def KEYCLOAK_ADMIN(self):
  54. """Admin username getter."""
  55. return self._KEYCLOAK_ADMIN
  56. @KEYCLOAK_ADMIN.setter
  57. def KEYCLOAK_ADMIN(self, value: str):
  58. """Admin username setter."""
  59. self._KEYCLOAK_ADMIN = value
  60. @property
  61. def KEYCLOAK_ADMIN_PASSWORD(self):
  62. """Admin password getter."""
  63. return self._KEYCLOAK_ADMIN_PASSWORD
  64. @KEYCLOAK_ADMIN_PASSWORD.setter
  65. def KEYCLOAK_ADMIN_PASSWORD(self, value: str):
  66. """Admin password setter."""
  67. self._KEYCLOAK_ADMIN_PASSWORD = value
  68. @pytest.fixture
  69. def env():
  70. """Fixture for getting the test environment configuration object."""
  71. return KeycloakTestEnv()
  72. @pytest.fixture
  73. def admin(env: KeycloakTestEnv):
  74. """Fixture for initialized KeycloakAdmin class."""
  75. return KeycloakAdmin(
  76. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  77. username=env.KEYCLOAK_ADMIN,
  78. password=env.KEYCLOAK_ADMIN_PASSWORD,
  79. )
  80. @pytest.fixture
  81. def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
  82. """Fixture for initialized KeycloakOpenID class."""
  83. # Set the realm
  84. admin.realm_name = realm
  85. # Create client
  86. client = str(uuid.uuid4())
  87. client_id = admin.create_client(
  88. payload={
  89. "name": client,
  90. "clientId": client,
  91. "enabled": True,
  92. "publicClient": True,
  93. "protocol": "openid-connect",
  94. }
  95. )
  96. # Return OID
  97. yield KeycloakOpenID(
  98. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  99. realm_name=realm,
  100. client_id=client,
  101. )
  102. # Cleanup
  103. admin.delete_client(client_id=client_id)
  104. @pytest.fixture
  105. def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
  106. """Fixture for an initialized KeycloakOpenID class and a random user credentials."""
  107. # Set the realm
  108. admin.realm_name = realm
  109. # Create client
  110. client = str(uuid.uuid4())
  111. secret = str(uuid.uuid4())
  112. client_id = admin.create_client(
  113. payload={
  114. "name": client,
  115. "clientId": client,
  116. "enabled": True,
  117. "publicClient": False,
  118. "protocol": "openid-connect",
  119. "secret": secret,
  120. "clientAuthenticatorType": "client-secret",
  121. }
  122. )
  123. # Create user
  124. username = str(uuid.uuid4())
  125. password = str(uuid.uuid4())
  126. user_id = admin.create_user(
  127. payload={
  128. "username": username,
  129. "email": f"{username}@test.test",
  130. "enabled": True,
  131. "credentials": [{"type": "password", "value": password}],
  132. }
  133. )
  134. yield (
  135. KeycloakOpenID(
  136. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  137. realm_name=realm,
  138. client_id=client,
  139. client_secret_key=secret,
  140. ),
  141. username,
  142. password,
  143. )
  144. # Cleanup
  145. admin.delete_client(client_id=client_id)
  146. admin.delete_user(user_id=user_id)
  147. @pytest.fixture
  148. def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
  149. """Fixture for an initialized KeycloakOpenID class and a random user credentials."""
  150. # Set the realm
  151. admin.realm_name = realm
  152. # Create client
  153. client = str(uuid.uuid4())
  154. secret = str(uuid.uuid4())
  155. client_id = admin.create_client(
  156. payload={
  157. "name": client,
  158. "clientId": client,
  159. "enabled": True,
  160. "publicClient": False,
  161. "protocol": "openid-connect",
  162. "secret": secret,
  163. "clientAuthenticatorType": "client-secret",
  164. "authorizationServicesEnabled": True,
  165. "serviceAccountsEnabled": True,
  166. }
  167. )
  168. admin.create_client_authz_role_based_policy(
  169. client_id=client_id,
  170. payload={
  171. "name": "test-authz-rb-policy",
  172. "roles": [{"id": admin.get_realm_role(role_name="offline_access")["id"]}],
  173. },
  174. )
  175. # Create user
  176. username = str(uuid.uuid4())
  177. password = str(uuid.uuid4())
  178. user_id = admin.create_user(
  179. payload={
  180. "username": username,
  181. "email": f"{username}@test.test",
  182. "enabled": True,
  183. "credentials": [{"type": "password", "value": password}],
  184. }
  185. )
  186. yield (
  187. KeycloakOpenID(
  188. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  189. realm_name=realm,
  190. client_id=client,
  191. client_secret_key=secret,
  192. ),
  193. username,
  194. password,
  195. )
  196. # Cleanup
  197. admin.delete_client(client_id=client_id)
  198. admin.delete_user(user_id=user_id)
  199. @pytest.fixture
  200. def realm(admin: KeycloakAdmin) -> str:
  201. """Fixture for a new random realm."""
  202. realm_name = str(uuid.uuid4())
  203. admin.create_realm(payload={"realm": realm_name, "enabled": True})
  204. yield realm_name
  205. admin.delete_realm(realm_name=realm_name)
  206. @pytest.fixture
  207. def user(admin: KeycloakAdmin, realm: str) -> str:
  208. """Fixture for a new random user."""
  209. admin.realm_name = realm
  210. username = str(uuid.uuid4())
  211. user_id = admin.create_user(payload={"username": username, "email": f"{username}@test.test"})
  212. yield user_id
  213. admin.delete_user(user_id=user_id)
  214. @pytest.fixture
  215. def group(admin: KeycloakAdmin, realm: str) -> str:
  216. """Fixture for a new random group."""
  217. admin.realm_name = realm
  218. group_name = str(uuid.uuid4())
  219. group_id = admin.create_group(payload={"name": group_name})
  220. yield group_id
  221. admin.delete_group(group_id=group_id)
  222. @pytest.fixture
  223. def client(admin: KeycloakAdmin, realm: str) -> str:
  224. """Fixture for a new random client."""
  225. admin.realm_name = realm
  226. client = str(uuid.uuid4())
  227. client_id = admin.create_client(payload={"name": client, "clientId": client})
  228. yield client_id
  229. admin.delete_client(client_id=client_id)
  230. @pytest.fixture
  231. def client_role(admin: KeycloakAdmin, realm: str, client: str) -> str:
  232. """Fixture for a new random client role."""
  233. admin.realm_name = realm
  234. role = str(uuid.uuid4())
  235. admin.create_client_role(client, {"name": role, "composite": False})
  236. yield role
  237. admin.delete_client_role(client, role)
  238. @pytest.fixture
  239. def composite_client_role(admin: KeycloakAdmin, realm: str, client: str, client_role: str) -> str:
  240. """Fixture for a new random composite client role."""
  241. admin.realm_name = realm
  242. role = str(uuid.uuid4())
  243. admin.create_client_role(client, {"name": role, "composite": True})
  244. role_repr = admin.get_client_role(client, client_role)
  245. admin.add_composite_client_roles_to_role(client, role, roles=[role_repr])
  246. yield role
  247. admin.delete_client_role(client, role)
  248. @pytest.fixture
  249. def selfsigned_cert():
  250. """Generate self signed certificate for a hostname, and optional IP addresses."""
  251. hostname = "testcert"
  252. ip_addresses = None
  253. key = None
  254. # Generate our key
  255. if key is None:
  256. key = rsa.generate_private_key(
  257. public_exponent=65537, key_size=2048, backend=default_backend()
  258. )
  259. name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
  260. alt_names = [x509.DNSName(hostname)]
  261. # allow addressing by IP, for when you don't have real DNS (common in most testing scenarios
  262. if ip_addresses:
  263. for addr in ip_addresses:
  264. # openssl wants DNSnames for ips...
  265. alt_names.append(x509.DNSName(addr))
  266. # ... whereas golang's crypto/tls is stricter, and needs IPAddresses
  267. # note: older versions of cryptography do not understand ip_address objects
  268. alt_names.append(x509.IPAddress(ipaddress.ip_address(addr)))
  269. san = x509.SubjectAlternativeName(alt_names)
  270. # path_len=0 means this cert can only sign itself, not other certs.
  271. basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
  272. now = datetime.utcnow()
  273. cert = (
  274. x509.CertificateBuilder()
  275. .subject_name(name)
  276. .issuer_name(name)
  277. .public_key(key.public_key())
  278. .serial_number(1000)
  279. .not_valid_before(now)
  280. .not_valid_after(now + timedelta(days=10 * 365))
  281. .add_extension(basic_contraints, False)
  282. .add_extension(san, False)
  283. .sign(key, hashes.SHA256(), default_backend())
  284. )
  285. cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
  286. key_pem = key.private_bytes(
  287. encoding=serialization.Encoding.PEM,
  288. format=serialization.PrivateFormat.TraditionalOpenSSL,
  289. encryption_algorithm=serialization.NoEncryption(),
  290. )
  291. return cert_pem, key_pem