You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1002 lines
37 KiB

5 months ago
  1. """Test module for KeycloakOpenID."""
  2. from inspect import iscoroutinefunction, signature
  3. from typing import Tuple
  4. from unittest import mock
  5. import pytest
  6. from keycloak import KeycloakAdmin, KeycloakOpenID
  7. from keycloak.authorization import Authorization
  8. from keycloak.authorization.permission import Permission
  9. from keycloak.authorization.policy import Policy
  10. from keycloak.authorization.role import Role
  11. from keycloak.connection import ConnectionManager
  12. from keycloak.exceptions import (
  13. KeycloakAuthenticationError,
  14. KeycloakAuthorizationConfigError,
  15. KeycloakDeprecationError,
  16. KeycloakInvalidTokenError,
  17. KeycloakPostError,
  18. KeycloakRPTNotFound,
  19. )
  20. def test_keycloak_openid_init(env):
  21. """Test KeycloakOpenId's init method.
  22. :param env: Environment fixture
  23. :type env: KeycloakTestEnv
  24. """
  25. oid = KeycloakOpenID(
  26. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  27. realm_name="master",
  28. client_id="admin-cli",
  29. )
  30. assert oid.client_id == "admin-cli"
  31. assert oid.client_secret_key is None
  32. assert oid.realm_name == "master"
  33. assert isinstance(oid.connection, ConnectionManager)
  34. assert isinstance(oid.authorization, Authorization)
  35. def test_well_known(oid: KeycloakOpenID):
  36. """Test the well_known method.
  37. :param oid: Keycloak OpenID client
  38. :type oid: KeycloakOpenID
  39. """
  40. res = oid.well_known()
  41. assert res is not None
  42. assert res != dict()
  43. for key in [
  44. "acr_values_supported",
  45. "authorization_encryption_alg_values_supported",
  46. "authorization_encryption_enc_values_supported",
  47. "authorization_endpoint",
  48. "authorization_signing_alg_values_supported",
  49. "backchannel_authentication_endpoint",
  50. "backchannel_authentication_request_signing_alg_values_supported",
  51. "backchannel_logout_session_supported",
  52. "backchannel_logout_supported",
  53. "backchannel_token_delivery_modes_supported",
  54. "check_session_iframe",
  55. "claim_types_supported",
  56. "claims_parameter_supported",
  57. "claims_supported",
  58. "code_challenge_methods_supported",
  59. "device_authorization_endpoint",
  60. "end_session_endpoint",
  61. "frontchannel_logout_session_supported",
  62. "frontchannel_logout_supported",
  63. "grant_types_supported",
  64. "id_token_encryption_alg_values_supported",
  65. "id_token_encryption_enc_values_supported",
  66. "id_token_signing_alg_values_supported",
  67. "introspection_endpoint",
  68. "introspection_endpoint_auth_methods_supported",
  69. "introspection_endpoint_auth_signing_alg_values_supported",
  70. "issuer",
  71. "jwks_uri",
  72. "mtls_endpoint_aliases",
  73. "pushed_authorization_request_endpoint",
  74. "registration_endpoint",
  75. "request_object_encryption_alg_values_supported",
  76. "request_object_encryption_enc_values_supported",
  77. "request_object_signing_alg_values_supported",
  78. "request_parameter_supported",
  79. "request_uri_parameter_supported",
  80. "require_pushed_authorization_requests",
  81. "require_request_uri_registration",
  82. "response_modes_supported",
  83. "response_types_supported",
  84. "revocation_endpoint",
  85. "revocation_endpoint_auth_methods_supported",
  86. "revocation_endpoint_auth_signing_alg_values_supported",
  87. "scopes_supported",
  88. "subject_types_supported",
  89. "tls_client_certificate_bound_access_tokens",
  90. "token_endpoint",
  91. "token_endpoint_auth_methods_supported",
  92. "token_endpoint_auth_signing_alg_values_supported",
  93. "userinfo_encryption_alg_values_supported",
  94. "userinfo_encryption_enc_values_supported",
  95. "userinfo_endpoint",
  96. "userinfo_signing_alg_values_supported",
  97. ]:
  98. assert key in res
  99. def test_auth_url(env, oid: KeycloakOpenID):
  100. """Test the auth_url method.
  101. :param env: Environment fixture
  102. :type env: KeycloakTestEnv
  103. :param oid: Keycloak OpenID client
  104. :type oid: KeycloakOpenID
  105. """
  106. res = oid.auth_url(redirect_uri="http://test.test/*")
  107. assert (
  108. res
  109. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  110. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  111. + "&redirect_uri=http://test.test/*&scope=email&state="
  112. )
  113. def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  114. """Test the token method.
  115. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  116. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  117. """
  118. oid, username, password = oid_with_credentials
  119. token = oid.token(username=username, password=password)
  120. assert token == {
  121. "access_token": mock.ANY,
  122. "expires_in": mock.ANY,
  123. "id_token": mock.ANY,
  124. "not-before-policy": 0,
  125. "refresh_expires_in": mock.ANY,
  126. "refresh_token": mock.ANY,
  127. "scope": mock.ANY,
  128. "session_state": mock.ANY,
  129. "token_type": "Bearer",
  130. }
  131. # Test with dummy totp
  132. token = oid.token(username=username, password=password, totp="123456")
  133. assert token == {
  134. "access_token": mock.ANY,
  135. "expires_in": mock.ANY,
  136. "id_token": mock.ANY,
  137. "not-before-policy": 0,
  138. "refresh_expires_in": mock.ANY,
  139. "refresh_token": mock.ANY,
  140. "scope": mock.ANY,
  141. "session_state": mock.ANY,
  142. "token_type": "Bearer",
  143. }
  144. # Test with extra param
  145. token = oid.token(username=username, password=password, extra_param="foo")
  146. assert token == {
  147. "access_token": mock.ANY,
  148. "expires_in": mock.ANY,
  149. "id_token": mock.ANY,
  150. "not-before-policy": 0,
  151. "refresh_expires_in": mock.ANY,
  152. "refresh_token": mock.ANY,
  153. "scope": mock.ANY,
  154. "session_state": mock.ANY,
  155. "token_type": "Bearer",
  156. }
  157. def test_exchange_token(
  158. oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  159. ):
  160. """Test the exchange token method.
  161. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  162. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  163. :param admin: Keycloak Admin client
  164. :type admin: KeycloakAdmin
  165. """
  166. # Verify existing user
  167. oid, username, password = oid_with_credentials
  168. # Allow impersonation
  169. admin.change_current_realm(oid.realm_name)
  170. admin.assign_client_role(
  171. user_id=admin.get_user_id(username=username),
  172. client_id=admin.get_client_id(client_id="realm-management"),
  173. roles=[
  174. admin.get_client_role(
  175. client_id=admin.get_client_id(client_id="realm-management"),
  176. role_name="impersonation",
  177. )
  178. ],
  179. )
  180. token = oid.token(username=username, password=password)
  181. assert oid.userinfo(token=token["access_token"]) == {
  182. "email": f"{username}@test.test",
  183. "email_verified": True,
  184. "family_name": "last",
  185. "given_name": "first",
  186. "name": "first last",
  187. "preferred_username": username,
  188. "sub": mock.ANY,
  189. }
  190. # Exchange token with the new user
  191. new_token = oid.exchange_token(
  192. token=token["access_token"], audience=oid.client_id, subject=username
  193. )
  194. assert oid.userinfo(token=new_token["access_token"]) == {
  195. "email": f"{username}@test.test",
  196. "email_verified": True,
  197. "family_name": "last",
  198. "given_name": "first",
  199. "name": "first last",
  200. "preferred_username": username,
  201. "sub": mock.ANY,
  202. }
  203. assert token != new_token
  204. def test_logout(oid_with_credentials):
  205. """Test logout.
  206. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  207. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  208. """
  209. oid, username, password = oid_with_credentials
  210. token = oid.token(username=username, password=password)
  211. assert oid.userinfo(token=token["access_token"]) != dict()
  212. assert oid.logout(refresh_token=token["refresh_token"]) == dict()
  213. with pytest.raises(KeycloakAuthenticationError):
  214. oid.userinfo(token=token["access_token"])
  215. def test_certs(oid: KeycloakOpenID):
  216. """Test certificates.
  217. :param oid: Keycloak OpenID client
  218. :type oid: KeycloakOpenID
  219. """
  220. assert len(oid.certs()["keys"]) == 2
  221. def test_public_key(oid: KeycloakOpenID):
  222. """Test public key.
  223. :param oid: Keycloak OpenID client
  224. :type oid: KeycloakOpenID
  225. """
  226. assert oid.public_key() is not None
  227. def test_entitlement(
  228. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  229. ):
  230. """Test entitlement.
  231. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  232. server with client credentials
  233. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  234. :param admin: Keycloak Admin client
  235. :type admin: KeycloakAdmin
  236. """
  237. oid, username, password = oid_with_credentials_authz
  238. token = oid.token(username=username, password=password)
  239. resource_server_id = admin.get_client_authz_resources(
  240. client_id=admin.get_client_id(oid.client_id)
  241. )[0]["_id"]
  242. with pytest.raises(KeycloakDeprecationError):
  243. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  244. def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  245. """Test introspect.
  246. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  247. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  248. """
  249. oid, username, password = oid_with_credentials
  250. token = oid.token(username=username, password=password)
  251. assert oid.introspect(token=token["access_token"])["active"]
  252. assert oid.introspect(
  253. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  254. ) == {"active": False}
  255. with pytest.raises(KeycloakRPTNotFound):
  256. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
  257. def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  258. """Test decode token.
  259. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  260. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  261. """
  262. oid, username, password = oid_with_credentials
  263. token = oid.token(username=username, password=password)
  264. decoded_access_token = oid.decode_token(token=token["access_token"])
  265. decoded_access_token_2 = oid.decode_token(token=token["access_token"], validate=False)
  266. decoded_refresh_token = oid.decode_token(token=token["refresh_token"], validate=False)
  267. assert decoded_access_token == decoded_access_token_2
  268. assert decoded_access_token["preferred_username"] == username, decoded_access_token
  269. assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
  270. def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  271. """Test load authorization config.
  272. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  273. server with client credentials
  274. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  275. """
  276. oid, username, password = oid_with_credentials_authz
  277. oid.load_authorization_config(path="tests/data/authz_settings.json")
  278. assert "test-authz-rb-policy" in oid.authorization.policies
  279. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  280. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  281. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  282. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  283. assert isinstance(
  284. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  285. )
  286. def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  287. """Test get policies.
  288. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  289. server with client credentials
  290. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  291. """
  292. oid, username, password = oid_with_credentials_authz
  293. token = oid.token(username=username, password=password)
  294. with pytest.raises(KeycloakAuthorizationConfigError):
  295. oid.get_policies(token=token["access_token"])
  296. oid.load_authorization_config(path="tests/data/authz_settings.json")
  297. assert oid.get_policies(token=token["access_token"]) is None
  298. orig_client_id = oid.client_id
  299. oid.client_id = "account"
  300. assert oid.get_policies(token=token["access_token"], method_token_info="decode") == []
  301. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  302. policy.add_role(role="account/view-profile")
  303. oid.authorization.policies["test"] = policy
  304. assert [
  305. str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
  306. ] == ["Policy: test (role)"]
  307. assert [
  308. repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
  309. ] == ["<Policy: test (role)>"]
  310. oid.client_id = orig_client_id
  311. oid.logout(refresh_token=token["refresh_token"])
  312. with pytest.raises(KeycloakInvalidTokenError):
  313. oid.get_policies(token=token["access_token"])
  314. def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  315. """Test get policies.
  316. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  317. server with client credentials
  318. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  319. """
  320. oid, username, password = oid_with_credentials_authz
  321. token = oid.token(username=username, password=password)
  322. with pytest.raises(KeycloakAuthorizationConfigError):
  323. oid.get_permissions(token=token["access_token"])
  324. oid.load_authorization_config(path="tests/data/authz_settings.json")
  325. assert oid.get_permissions(token=token["access_token"]) is None
  326. orig_client_id = oid.client_id
  327. oid.client_id = "account"
  328. assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == []
  329. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  330. policy.add_role(role="account/view-profile")
  331. policy.add_permission(
  332. permission=Permission(
  333. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  334. )
  335. )
  336. oid.authorization.policies["test"] = policy
  337. assert [
  338. str(x)
  339. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
  340. ] == ["Permission: test-perm (resource)"]
  341. assert [
  342. repr(x)
  343. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
  344. ] == ["<Permission: test-perm (resource)>"]
  345. oid.client_id = orig_client_id
  346. oid.logout(refresh_token=token["refresh_token"])
  347. with pytest.raises(KeycloakInvalidTokenError):
  348. oid.get_permissions(token=token["access_token"])
  349. def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  350. """Test UMA permissions.
  351. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  352. server with client credentials
  353. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  354. """
  355. oid, username, password = oid_with_credentials_authz
  356. token = oid.token(username=username, password=password)
  357. assert len(oid.uma_permissions(token=token["access_token"])) == 1
  358. assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
  359. def test_has_uma_access(
  360. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  361. ):
  362. """Test has UMA access.
  363. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  364. server with client credentials
  365. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  366. :param admin: Keycloak Admin client
  367. :type admin: KeycloakAdmin
  368. """
  369. oid, username, password = oid_with_credentials_authz
  370. token = oid.token(username=username, password=password)
  371. assert (
  372. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  373. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  374. )
  375. assert (
  376. str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
  377. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  378. )
  379. with pytest.raises(KeycloakPostError):
  380. oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
  381. oid.logout(refresh_token=token["refresh_token"])
  382. assert (
  383. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  384. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  385. )
  386. assert (
  387. str(
  388. oid.has_uma_access(
  389. token=admin.connection.token["access_token"], permissions="Default Resource"
  390. )
  391. )
  392. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  393. + "{'Default Resource'})"
  394. )
  395. def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
  396. """Test device authorization flow.
  397. :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
  398. credentials and device authorization flow enabled
  399. :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
  400. """
  401. oid, _, _ = oid_with_credentials_device
  402. res = oid.device()
  403. assert res == {
  404. "device_code": mock.ANY,
  405. "user_code": mock.ANY,
  406. "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
  407. "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
  408. + f"device?user_code={res['user_code']}",
  409. "expires_in": 600,
  410. "interval": 5,
  411. }
  412. # async function start
  413. @pytest.mark.asyncio
  414. async def test_a_well_known(oid: KeycloakOpenID):
  415. """Test the well_known method.
  416. :param oid: Keycloak OpenID client
  417. :type oid: KeycloakOpenID
  418. """
  419. res = await oid.a_well_known()
  420. assert res is not None
  421. assert res != dict()
  422. for key in [
  423. "acr_values_supported",
  424. "authorization_encryption_alg_values_supported",
  425. "authorization_encryption_enc_values_supported",
  426. "authorization_endpoint",
  427. "authorization_signing_alg_values_supported",
  428. "backchannel_authentication_endpoint",
  429. "backchannel_authentication_request_signing_alg_values_supported",
  430. "backchannel_logout_session_supported",
  431. "backchannel_logout_supported",
  432. "backchannel_token_delivery_modes_supported",
  433. "check_session_iframe",
  434. "claim_types_supported",
  435. "claims_parameter_supported",
  436. "claims_supported",
  437. "code_challenge_methods_supported",
  438. "device_authorization_endpoint",
  439. "end_session_endpoint",
  440. "frontchannel_logout_session_supported",
  441. "frontchannel_logout_supported",
  442. "grant_types_supported",
  443. "id_token_encryption_alg_values_supported",
  444. "id_token_encryption_enc_values_supported",
  445. "id_token_signing_alg_values_supported",
  446. "introspection_endpoint",
  447. "introspection_endpoint_auth_methods_supported",
  448. "introspection_endpoint_auth_signing_alg_values_supported",
  449. "issuer",
  450. "jwks_uri",
  451. "mtls_endpoint_aliases",
  452. "pushed_authorization_request_endpoint",
  453. "registration_endpoint",
  454. "request_object_encryption_alg_values_supported",
  455. "request_object_encryption_enc_values_supported",
  456. "request_object_signing_alg_values_supported",
  457. "request_parameter_supported",
  458. "request_uri_parameter_supported",
  459. "require_pushed_authorization_requests",
  460. "require_request_uri_registration",
  461. "response_modes_supported",
  462. "response_types_supported",
  463. "revocation_endpoint",
  464. "revocation_endpoint_auth_methods_supported",
  465. "revocation_endpoint_auth_signing_alg_values_supported",
  466. "scopes_supported",
  467. "subject_types_supported",
  468. "tls_client_certificate_bound_access_tokens",
  469. "token_endpoint",
  470. "token_endpoint_auth_methods_supported",
  471. "token_endpoint_auth_signing_alg_values_supported",
  472. "userinfo_encryption_alg_values_supported",
  473. "userinfo_encryption_enc_values_supported",
  474. "userinfo_endpoint",
  475. "userinfo_signing_alg_values_supported",
  476. ]:
  477. assert key in res
  478. @pytest.mark.asyncio
  479. async def test_a_auth_url(env, oid: KeycloakOpenID):
  480. """Test the auth_url method.
  481. :param env: Environment fixture
  482. :type env: KeycloakTestEnv
  483. :param oid: Keycloak OpenID client
  484. :type oid: KeycloakOpenID
  485. """
  486. res = await oid.a_auth_url(redirect_uri="http://test.test/*")
  487. assert (
  488. res
  489. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  490. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  491. + "&redirect_uri=http://test.test/*&scope=email&state="
  492. )
  493. @pytest.mark.asyncio
  494. async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  495. """Test the token method.
  496. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  497. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  498. """
  499. oid, username, password = oid_with_credentials
  500. token = await oid.a_token(username=username, password=password)
  501. assert token == {
  502. "access_token": mock.ANY,
  503. "expires_in": mock.ANY,
  504. "id_token": mock.ANY,
  505. "not-before-policy": 0,
  506. "refresh_expires_in": mock.ANY,
  507. "refresh_token": mock.ANY,
  508. "scope": mock.ANY,
  509. "session_state": mock.ANY,
  510. "token_type": "Bearer",
  511. }
  512. # Test with dummy totp
  513. token = await oid.a_token(username=username, password=password, totp="123456")
  514. assert token == {
  515. "access_token": mock.ANY,
  516. "expires_in": mock.ANY,
  517. "id_token": mock.ANY,
  518. "not-before-policy": 0,
  519. "refresh_expires_in": mock.ANY,
  520. "refresh_token": mock.ANY,
  521. "scope": mock.ANY,
  522. "session_state": mock.ANY,
  523. "token_type": "Bearer",
  524. }
  525. # Test with extra param
  526. token = await oid.a_token(username=username, password=password, extra_param="foo")
  527. assert token == {
  528. "access_token": mock.ANY,
  529. "expires_in": mock.ANY,
  530. "id_token": mock.ANY,
  531. "not-before-policy": 0,
  532. "refresh_expires_in": mock.ANY,
  533. "refresh_token": mock.ANY,
  534. "scope": mock.ANY,
  535. "session_state": mock.ANY,
  536. "token_type": "Bearer",
  537. }
  538. @pytest.mark.asyncio
  539. async def test_a_exchange_token(
  540. oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  541. ):
  542. """Test the exchange token method.
  543. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  544. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  545. :param admin: Keycloak Admin client
  546. :type admin: KeycloakAdmin
  547. """
  548. # Verify existing user
  549. oid, username, password = oid_with_credentials
  550. # Allow impersonation
  551. await admin.a_change_current_realm(oid.realm_name)
  552. await admin.a_assign_client_role(
  553. user_id=await admin.a_get_user_id(username=username),
  554. client_id=await admin.a_get_client_id(client_id="realm-management"),
  555. roles=[
  556. await admin.a_get_client_role(
  557. client_id=admin.get_client_id(client_id="realm-management"),
  558. role_name="impersonation",
  559. )
  560. ],
  561. )
  562. token = await oid.a_token(username=username, password=password)
  563. assert await oid.a_userinfo(token=token["access_token"]) == {
  564. "email": f"{username}@test.test",
  565. "email_verified": True,
  566. "family_name": "last",
  567. "given_name": "first",
  568. "name": "first last",
  569. "preferred_username": username,
  570. "sub": mock.ANY,
  571. }
  572. # Exchange token with the new user
  573. new_token = oid.exchange_token(
  574. token=token["access_token"], audience=oid.client_id, subject=username
  575. )
  576. assert await oid.a_userinfo(token=new_token["access_token"]) == {
  577. "email": f"{username}@test.test",
  578. "email_verified": True,
  579. "family_name": "last",
  580. "given_name": "first",
  581. "name": "first last",
  582. "preferred_username": username,
  583. "sub": mock.ANY,
  584. }
  585. assert token != new_token
  586. @pytest.mark.asyncio
  587. async def test_a_logout(oid_with_credentials):
  588. """Test logout.
  589. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  590. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  591. """
  592. oid, username, password = oid_with_credentials
  593. token = await oid.a_token(username=username, password=password)
  594. assert await oid.a_userinfo(token=token["access_token"]) != dict()
  595. assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict()
  596. with pytest.raises(KeycloakAuthenticationError):
  597. await oid.a_userinfo(token=token["access_token"])
  598. @pytest.mark.asyncio
  599. async def test_a_certs(oid: KeycloakOpenID):
  600. """Test certificates.
  601. :param oid: Keycloak OpenID client
  602. :type oid: KeycloakOpenID
  603. """
  604. assert len((await oid.a_certs())["keys"]) == 2
  605. @pytest.mark.asyncio
  606. async def test_a_public_key(oid: KeycloakOpenID):
  607. """Test public key.
  608. :param oid: Keycloak OpenID client
  609. :type oid: KeycloakOpenID
  610. """
  611. assert await oid.a_public_key() is not None
  612. @pytest.mark.asyncio
  613. async def test_a_entitlement(
  614. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  615. ):
  616. """Test entitlement.
  617. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  618. server with client credentials
  619. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  620. :param admin: Keycloak Admin client
  621. :type admin: KeycloakAdmin
  622. """
  623. oid, username, password = oid_with_credentials_authz
  624. token = await oid.a_token(username=username, password=password)
  625. resource_server_id = admin.get_client_authz_resources(
  626. client_id=admin.get_client_id(oid.client_id)
  627. )[0]["_id"]
  628. with pytest.raises(KeycloakDeprecationError):
  629. await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  630. @pytest.mark.asyncio
  631. async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  632. """Test introspect.
  633. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  634. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  635. """
  636. oid, username, password = oid_with_credentials
  637. token = await oid.a_token(username=username, password=password)
  638. assert (await oid.a_introspect(token=token["access_token"]))["active"]
  639. assert await oid.a_introspect(
  640. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  641. ) == {"active": False}
  642. with pytest.raises(KeycloakRPTNotFound):
  643. await oid.a_introspect(
  644. token=token["access_token"], token_type_hint="requesting_party_token"
  645. )
  646. @pytest.mark.asyncio
  647. async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  648. """Test decode token.
  649. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  650. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  651. """
  652. oid, username, password = oid_with_credentials
  653. token = await oid.a_token(username=username, password=password)
  654. decoded_access_token = await oid.a_decode_token(token=token["access_token"])
  655. decoded_access_token_2 = await oid.a_decode_token(token=token["access_token"], validate=False)
  656. decoded_refresh_token = await oid.a_decode_token(token=token["refresh_token"], validate=False)
  657. assert decoded_access_token == decoded_access_token_2
  658. assert decoded_access_token["preferred_username"] == username, decoded_access_token
  659. assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
  660. @pytest.mark.asyncio
  661. async def test_a_load_authorization_config(
  662. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  663. ):
  664. """Test load authorization config.
  665. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  666. server with client credentials
  667. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  668. """
  669. oid, username, password = oid_with_credentials_authz
  670. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  671. assert "test-authz-rb-policy" in oid.authorization.policies
  672. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  673. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  674. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  675. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  676. assert isinstance(
  677. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  678. )
  679. @pytest.mark.asyncio
  680. async def test_a_has_uma_access(
  681. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  682. ):
  683. """Test has UMA access.
  684. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  685. server with client credentials
  686. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  687. :param admin: Keycloak Admin client
  688. :type admin: KeycloakAdmin
  689. """
  690. oid, username, password = oid_with_credentials_authz
  691. token = await oid.a_token(username=username, password=password)
  692. assert (
  693. str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
  694. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  695. )
  696. assert (
  697. str(
  698. await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")
  699. )
  700. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  701. )
  702. with pytest.raises(KeycloakPostError):
  703. await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist")
  704. await oid.a_logout(refresh_token=token["refresh_token"])
  705. assert (
  706. str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
  707. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  708. )
  709. assert (
  710. str(
  711. await oid.a_has_uma_access(
  712. token=admin.connection.token["access_token"], permissions="Default Resource"
  713. )
  714. )
  715. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  716. + "{'Default Resource'})"
  717. )
  718. @pytest.mark.asyncio
  719. async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  720. """Test get policies.
  721. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  722. server with client credentials
  723. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  724. """
  725. oid, username, password = oid_with_credentials_authz
  726. token = await oid.a_token(username=username, password=password)
  727. with pytest.raises(KeycloakAuthorizationConfigError):
  728. await oid.a_get_policies(token=token["access_token"])
  729. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  730. assert await oid.a_get_policies(token=token["access_token"]) is None
  731. orig_client_id = oid.client_id
  732. oid.client_id = "account"
  733. assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == []
  734. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  735. policy.add_role(role="account/view-profile")
  736. oid.authorization.policies["test"] = policy
  737. assert [
  738. str(x)
  739. for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
  740. ] == ["Policy: test (role)"]
  741. assert [
  742. repr(x)
  743. for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
  744. ] == ["<Policy: test (role)>"]
  745. oid.client_id = orig_client_id
  746. await oid.a_logout(refresh_token=token["refresh_token"])
  747. with pytest.raises(KeycloakInvalidTokenError):
  748. await oid.a_get_policies(token=token["access_token"])
  749. @pytest.mark.asyncio
  750. async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  751. """Test get policies.
  752. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  753. server with client credentials
  754. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  755. """
  756. oid, username, password = oid_with_credentials_authz
  757. token = await oid.a_token(username=username, password=password)
  758. with pytest.raises(KeycloakAuthorizationConfigError):
  759. await oid.a_get_permissions(token=token["access_token"])
  760. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  761. assert await oid.a_get_permissions(token=token["access_token"]) is None
  762. orig_client_id = oid.client_id
  763. oid.client_id = "account"
  764. assert (
  765. await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
  766. )
  767. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  768. policy.add_role(role="account/view-profile")
  769. policy.add_permission(
  770. permission=Permission(
  771. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  772. )
  773. )
  774. oid.authorization.policies["test"] = policy
  775. assert [
  776. str(x)
  777. for x in await oid.a_get_permissions(
  778. token=token["access_token"], method_token_info="decode"
  779. )
  780. ] == ["Permission: test-perm (resource)"]
  781. assert [
  782. repr(x)
  783. for x in await oid.a_get_permissions(
  784. token=token["access_token"], method_token_info="decode"
  785. )
  786. ] == ["<Permission: test-perm (resource)>"]
  787. oid.client_id = orig_client_id
  788. await oid.a_logout(refresh_token=token["refresh_token"])
  789. with pytest.raises(KeycloakInvalidTokenError):
  790. await oid.a_get_permissions(token=token["access_token"])
  791. @pytest.mark.asyncio
  792. async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  793. """Test UMA permissions.
  794. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  795. server with client credentials
  796. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  797. """
  798. oid, username, password = oid_with_credentials_authz
  799. token = await oid.a_token(username=username, password=password)
  800. assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1
  801. assert (await oid.a_uma_permissions(token=token["access_token"]))[0][
  802. "rsname"
  803. ] == "Default Resource"
  804. @pytest.mark.asyncio
  805. async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
  806. """Test device authorization flow.
  807. :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
  808. credentials and device authorization flow enabled
  809. :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
  810. """
  811. oid, _, _ = oid_with_credentials_device
  812. res = await oid.a_device()
  813. assert res == {
  814. "device_code": mock.ANY,
  815. "user_code": mock.ANY,
  816. "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
  817. "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
  818. + f"device?user_code={res['user_code']}",
  819. "expires_in": 600,
  820. "interval": 5,
  821. }
  822. def test_counter_part():
  823. """Test that each function has its async counter part."""
  824. openid_methods = [
  825. func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func))
  826. ]
  827. sync_methods = [
  828. method
  829. for method in openid_methods
  830. if not method.startswith("a_") and not method.startswith("_")
  831. ]
  832. async_methods = [
  833. method for method in openid_methods if iscoroutinefunction(getattr(KeycloakOpenID, method))
  834. ]
  835. for method in sync_methods:
  836. async_method = f"a_{method}"
  837. assert (async_method in openid_methods) is True
  838. sync_sign = signature(getattr(KeycloakOpenID, method))
  839. async_sign = signature(getattr(KeycloakOpenID, async_method))
  840. assert sync_sign.parameters == async_sign.parameters
  841. for async_method in async_methods:
  842. if async_method[2:].startswith("_"):
  843. continue
  844. assert async_method[2:] in sync_methods