You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

489 lines
18 KiB

11 months ago
  1. """Test module for KeycloakOpenID."""
  2. from typing import Tuple
  3. from unittest import mock
  4. import pytest
  5. from keycloak import KeycloakAdmin, KeycloakOpenID
  6. from keycloak.authorization import Authorization
  7. from keycloak.authorization.permission import Permission
  8. from keycloak.authorization.policy import Policy
  9. from keycloak.authorization.role import Role
  10. from keycloak.connection import ConnectionManager
  11. from keycloak.exceptions import (
  12. KeycloakAuthenticationError,
  13. KeycloakAuthorizationConfigError,
  14. KeycloakDeprecationError,
  15. KeycloakInvalidTokenError,
  16. KeycloakPostError,
  17. KeycloakRPTNotFound,
  18. )
  19. def test_keycloak_openid_init(env):
  20. """Test KeycloakOpenId's init method.
  21. :param env: Environment fixture
  22. :type env: KeycloakTestEnv
  23. """
  24. oid = KeycloakOpenID(
  25. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  26. realm_name="master",
  27. client_id="admin-cli",
  28. )
  29. assert oid.client_id == "admin-cli"
  30. assert oid.client_secret_key is None
  31. assert oid.realm_name == "master"
  32. assert isinstance(oid.connection, ConnectionManager)
  33. assert isinstance(oid.authorization, Authorization)
  34. def test_well_known(oid: KeycloakOpenID):
  35. """Test the well_known method.
  36. :param oid: Keycloak OpenID client
  37. :type oid: KeycloakOpenID
  38. """
  39. res = oid.well_known()
  40. assert res is not None
  41. assert res != dict()
  42. for key in [
  43. "acr_values_supported",
  44. "authorization_encryption_alg_values_supported",
  45. "authorization_encryption_enc_values_supported",
  46. "authorization_endpoint",
  47. "authorization_signing_alg_values_supported",
  48. "backchannel_authentication_endpoint",
  49. "backchannel_authentication_request_signing_alg_values_supported",
  50. "backchannel_logout_session_supported",
  51. "backchannel_logout_supported",
  52. "backchannel_token_delivery_modes_supported",
  53. "check_session_iframe",
  54. "claim_types_supported",
  55. "claims_parameter_supported",
  56. "claims_supported",
  57. "code_challenge_methods_supported",
  58. "device_authorization_endpoint",
  59. "end_session_endpoint",
  60. "frontchannel_logout_session_supported",
  61. "frontchannel_logout_supported",
  62. "grant_types_supported",
  63. "id_token_encryption_alg_values_supported",
  64. "id_token_encryption_enc_values_supported",
  65. "id_token_signing_alg_values_supported",
  66. "introspection_endpoint",
  67. "introspection_endpoint_auth_methods_supported",
  68. "introspection_endpoint_auth_signing_alg_values_supported",
  69. "issuer",
  70. "jwks_uri",
  71. "mtls_endpoint_aliases",
  72. "pushed_authorization_request_endpoint",
  73. "registration_endpoint",
  74. "request_object_encryption_alg_values_supported",
  75. "request_object_encryption_enc_values_supported",
  76. "request_object_signing_alg_values_supported",
  77. "request_parameter_supported",
  78. "request_uri_parameter_supported",
  79. "require_pushed_authorization_requests",
  80. "require_request_uri_registration",
  81. "response_modes_supported",
  82. "response_types_supported",
  83. "revocation_endpoint",
  84. "revocation_endpoint_auth_methods_supported",
  85. "revocation_endpoint_auth_signing_alg_values_supported",
  86. "scopes_supported",
  87. "subject_types_supported",
  88. "tls_client_certificate_bound_access_tokens",
  89. "token_endpoint",
  90. "token_endpoint_auth_methods_supported",
  91. "token_endpoint_auth_signing_alg_values_supported",
  92. "userinfo_encryption_alg_values_supported",
  93. "userinfo_encryption_enc_values_supported",
  94. "userinfo_endpoint",
  95. "userinfo_signing_alg_values_supported",
  96. ]:
  97. assert key in res
  98. def test_auth_url(env, oid: KeycloakOpenID):
  99. """Test the auth_url method.
  100. :param env: Environment fixture
  101. :type env: KeycloakTestEnv
  102. :param oid: Keycloak OpenID client
  103. :type oid: KeycloakOpenID
  104. """
  105. res = oid.auth_url(redirect_uri="http://test.test/*")
  106. assert (
  107. res
  108. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  109. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  110. + "&redirect_uri=http://test.test/*&scope=email&state="
  111. )
  112. def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  113. """Test the token method.
  114. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  115. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  116. """
  117. oid, username, password = oid_with_credentials
  118. token = oid.token(username=username, password=password)
  119. assert token == {
  120. "access_token": mock.ANY,
  121. "expires_in": mock.ANY,
  122. "id_token": mock.ANY,
  123. "not-before-policy": 0,
  124. "refresh_expires_in": mock.ANY,
  125. "refresh_token": mock.ANY,
  126. "scope": mock.ANY,
  127. "session_state": mock.ANY,
  128. "token_type": "Bearer",
  129. }
  130. # Test with dummy totp
  131. token = oid.token(username=username, password=password, totp="123456")
  132. assert token == {
  133. "access_token": mock.ANY,
  134. "expires_in": mock.ANY,
  135. "id_token": mock.ANY,
  136. "not-before-policy": 0,
  137. "refresh_expires_in": mock.ANY,
  138. "refresh_token": mock.ANY,
  139. "scope": mock.ANY,
  140. "session_state": mock.ANY,
  141. "token_type": "Bearer",
  142. }
  143. # Test with extra param
  144. token = oid.token(username=username, password=password, extra_param="foo")
  145. assert token == {
  146. "access_token": mock.ANY,
  147. "expires_in": mock.ANY,
  148. "id_token": mock.ANY,
  149. "not-before-policy": 0,
  150. "refresh_expires_in": mock.ANY,
  151. "refresh_token": mock.ANY,
  152. "scope": mock.ANY,
  153. "session_state": mock.ANY,
  154. "token_type": "Bearer",
  155. }
  156. def test_exchange_token(
  157. oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  158. ):
  159. """Test the exchange token method.
  160. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  161. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  162. :param admin: Keycloak Admin client
  163. :type admin: KeycloakAdmin
  164. """
  165. # Verify existing user
  166. oid, username, password = oid_with_credentials
  167. # Allow impersonation
  168. admin.change_current_realm(oid.realm_name)
  169. admin.assign_client_role(
  170. user_id=admin.get_user_id(username=username),
  171. client_id=admin.get_client_id(client_id="realm-management"),
  172. roles=[
  173. admin.get_client_role(
  174. client_id=admin.get_client_id(client_id="realm-management"),
  175. role_name="impersonation",
  176. )
  177. ],
  178. )
  179. token = oid.token(username=username, password=password)
  180. assert oid.userinfo(token=token["access_token"]) == {
  181. "email": f"{username}@test.test",
  182. "email_verified": True,
  183. "family_name": "last",
  184. "given_name": "first",
  185. "name": "first last",
  186. "preferred_username": username,
  187. "sub": mock.ANY,
  188. }
  189. # Exchange token with the new user
  190. new_token = oid.exchange_token(
  191. token=token["access_token"], audience=oid.client_id, subject=username
  192. )
  193. assert oid.userinfo(token=new_token["access_token"]) == {
  194. "email": f"{username}@test.test",
  195. "email_verified": True,
  196. "family_name": "last",
  197. "given_name": "first",
  198. "name": "first last",
  199. "preferred_username": username,
  200. "sub": mock.ANY,
  201. }
  202. assert token != new_token
  203. def test_logout(oid_with_credentials):
  204. """Test logout.
  205. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  206. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  207. """
  208. oid, username, password = oid_with_credentials
  209. token = oid.token(username=username, password=password)
  210. assert oid.userinfo(token=token["access_token"]) != dict()
  211. assert oid.logout(refresh_token=token["refresh_token"]) == dict()
  212. with pytest.raises(KeycloakAuthenticationError):
  213. oid.userinfo(token=token["access_token"])
  214. def test_certs(oid: KeycloakOpenID):
  215. """Test certificates.
  216. :param oid: Keycloak OpenID client
  217. :type oid: KeycloakOpenID
  218. """
  219. assert len(oid.certs()["keys"]) == 2
  220. def test_public_key(oid: KeycloakOpenID):
  221. """Test public key.
  222. :param oid: Keycloak OpenID client
  223. :type oid: KeycloakOpenID
  224. """
  225. assert oid.public_key() is not None
  226. def test_entitlement(
  227. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  228. ):
  229. """Test entitlement.
  230. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  231. server with client credentials
  232. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  233. :param admin: Keycloak Admin client
  234. :type admin: KeycloakAdmin
  235. """
  236. oid, username, password = oid_with_credentials_authz
  237. token = oid.token(username=username, password=password)
  238. resource_server_id = admin.get_client_authz_resources(
  239. client_id=admin.get_client_id(oid.client_id)
  240. )[0]["_id"]
  241. with pytest.raises(KeycloakDeprecationError):
  242. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  243. def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  244. """Test introspect.
  245. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  246. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  247. """
  248. oid, username, password = oid_with_credentials
  249. token = oid.token(username=username, password=password)
  250. assert oid.introspect(token=token["access_token"])["active"]
  251. assert oid.introspect(
  252. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  253. ) == {"active": False}
  254. with pytest.raises(KeycloakRPTNotFound):
  255. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
  256. def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  257. """Test decode token.
  258. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  259. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  260. """
  261. oid, username, password = oid_with_credentials
  262. token = oid.token(username=username, password=password)
  263. decoded_access_token = oid.decode_token(token=token["access_token"])
  264. decoded_access_token_2 = oid.decode_token(token=token["access_token"], validate=False)
  265. decoded_refresh_token = oid.decode_token(token=token["refresh_token"], validate=False)
  266. assert decoded_access_token == decoded_access_token_2
  267. assert decoded_access_token["preferred_username"] == username, decoded_access_token
  268. assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
  269. def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  270. """Test load authorization config.
  271. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  272. server with client credentials
  273. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  274. """
  275. oid, username, password = oid_with_credentials_authz
  276. oid.load_authorization_config(path="tests/data/authz_settings.json")
  277. assert "test-authz-rb-policy" in oid.authorization.policies
  278. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  279. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  280. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  281. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  282. assert isinstance(
  283. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  284. )
  285. def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  286. """Test get policies.
  287. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  288. server with client credentials
  289. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  290. """
  291. oid, username, password = oid_with_credentials_authz
  292. token = oid.token(username=username, password=password)
  293. with pytest.raises(KeycloakAuthorizationConfigError):
  294. oid.get_policies(token=token["access_token"])
  295. oid.load_authorization_config(path="tests/data/authz_settings.json")
  296. assert oid.get_policies(token=token["access_token"]) is None
  297. orig_client_id = oid.client_id
  298. oid.client_id = "account"
  299. assert oid.get_policies(token=token["access_token"], method_token_info="decode") == []
  300. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  301. policy.add_role(role="account/view-profile")
  302. oid.authorization.policies["test"] = policy
  303. assert [
  304. str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
  305. ] == ["Policy: test (role)"]
  306. assert [
  307. repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
  308. ] == ["<Policy: test (role)>"]
  309. oid.client_id = orig_client_id
  310. oid.logout(refresh_token=token["refresh_token"])
  311. with pytest.raises(KeycloakInvalidTokenError):
  312. oid.get_policies(token=token["access_token"])
  313. def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  314. """Test get policies.
  315. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  316. server with client credentials
  317. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  318. """
  319. oid, username, password = oid_with_credentials_authz
  320. token = oid.token(username=username, password=password)
  321. with pytest.raises(KeycloakAuthorizationConfigError):
  322. oid.get_permissions(token=token["access_token"])
  323. oid.load_authorization_config(path="tests/data/authz_settings.json")
  324. assert oid.get_permissions(token=token["access_token"]) is None
  325. orig_client_id = oid.client_id
  326. oid.client_id = "account"
  327. assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == []
  328. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  329. policy.add_role(role="account/view-profile")
  330. policy.add_permission(
  331. permission=Permission(
  332. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  333. )
  334. )
  335. oid.authorization.policies["test"] = policy
  336. assert [
  337. str(x)
  338. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
  339. ] == ["Permission: test-perm (resource)"]
  340. assert [
  341. repr(x)
  342. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
  343. ] == ["<Permission: test-perm (resource)>"]
  344. oid.client_id = orig_client_id
  345. oid.logout(refresh_token=token["refresh_token"])
  346. with pytest.raises(KeycloakInvalidTokenError):
  347. oid.get_permissions(token=token["access_token"])
  348. def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  349. """Test UMA permissions.
  350. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  351. server with client credentials
  352. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  353. """
  354. oid, username, password = oid_with_credentials_authz
  355. token = oid.token(username=username, password=password)
  356. assert len(oid.uma_permissions(token=token["access_token"])) == 1
  357. assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
  358. def test_has_uma_access(
  359. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  360. ):
  361. """Test has UMA access.
  362. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  363. server with client credentials
  364. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  365. :param admin: Keycloak Admin client
  366. :type admin: KeycloakAdmin
  367. """
  368. oid, username, password = oid_with_credentials_authz
  369. token = oid.token(username=username, password=password)
  370. assert (
  371. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  372. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  373. )
  374. assert (
  375. str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
  376. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  377. )
  378. with pytest.raises(KeycloakPostError):
  379. oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
  380. oid.logout(refresh_token=token["refresh_token"])
  381. assert (
  382. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  383. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  384. )
  385. assert (
  386. str(
  387. oid.has_uma_access(
  388. token=admin.connection.token["access_token"], permissions="Default Resource"
  389. )
  390. )
  391. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  392. + "{'Default Resource'})"
  393. )
  394. def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
  395. """Test device authorization flow.
  396. :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
  397. credentials and device authorization flow enabled
  398. :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
  399. """
  400. oid, _, _ = oid_with_credentials_device
  401. res = oid.device()
  402. assert res == {
  403. "device_code": mock.ANY,
  404. "user_code": mock.ANY,
  405. "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
  406. "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
  407. + f"device?user_code={res['user_code']}",
  408. "expires_in": 600,
  409. "interval": 5,
  410. }