You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1158 lines
41 KiB

3 weeks ago
3 weeks ago
3 weeks ago
3 weeks ago
3 weeks ago
3 weeks ago
3 weeks ago
3 weeks ago
  1. """Test module for KeycloakOpenID."""
  2. from inspect import iscoroutinefunction, signature
  3. from unittest import mock
  4. import jwcrypto.jwk
  5. import jwcrypto.jws
  6. import pytest
  7. from keycloak import KeycloakAdmin, KeycloakOpenID
  8. from keycloak.authorization import Authorization
  9. from keycloak.authorization.permission import Permission
  10. from keycloak.authorization.policy import Policy
  11. from keycloak.authorization.role import Role
  12. from keycloak.connection import ConnectionManager
  13. from keycloak.exceptions import (
  14. KeycloakAuthenticationError,
  15. KeycloakAuthorizationConfigError,
  16. KeycloakDeprecationError,
  17. KeycloakInvalidTokenError,
  18. KeycloakPostError,
  19. KeycloakRPTNotFound,
  20. )
  21. from tests.conftest import KeycloakTestEnv
  22. def test_keycloak_openid_init(env: KeycloakTestEnv) -> None:
  23. """
  24. Test KeycloakOpenId's init method.
  25. :param env: Environment fixture
  26. :type env: KeycloakTestEnv
  27. """
  28. oid = KeycloakOpenID(
  29. server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
  30. realm_name="master",
  31. client_id="admin-cli",
  32. )
  33. assert oid.client_id == "admin-cli"
  34. assert oid.client_secret_key is None
  35. assert oid.realm_name == "master"
  36. assert isinstance(oid.connection, ConnectionManager)
  37. assert isinstance(oid.authorization, Authorization)
  38. def test_well_known(oid: KeycloakOpenID) -> None:
  39. """
  40. Test the well_known method.
  41. :param oid: Keycloak OpenID client
  42. :type oid: KeycloakOpenID
  43. """
  44. res = oid.well_known()
  45. assert res is not None
  46. assert res != {}
  47. for key in [
  48. "acr_values_supported",
  49. "authorization_encryption_alg_values_supported",
  50. "authorization_encryption_enc_values_supported",
  51. "authorization_endpoint",
  52. "authorization_signing_alg_values_supported",
  53. "backchannel_authentication_endpoint",
  54. "backchannel_authentication_request_signing_alg_values_supported",
  55. "backchannel_logout_session_supported",
  56. "backchannel_logout_supported",
  57. "backchannel_token_delivery_modes_supported",
  58. "check_session_iframe",
  59. "claim_types_supported",
  60. "claims_parameter_supported",
  61. "claims_supported",
  62. "code_challenge_methods_supported",
  63. "device_authorization_endpoint",
  64. "end_session_endpoint",
  65. "frontchannel_logout_session_supported",
  66. "frontchannel_logout_supported",
  67. "grant_types_supported",
  68. "id_token_encryption_alg_values_supported",
  69. "id_token_encryption_enc_values_supported",
  70. "id_token_signing_alg_values_supported",
  71. "introspection_endpoint",
  72. "introspection_endpoint_auth_methods_supported",
  73. "introspection_endpoint_auth_signing_alg_values_supported",
  74. "issuer",
  75. "jwks_uri",
  76. "mtls_endpoint_aliases",
  77. "pushed_authorization_request_endpoint",
  78. "registration_endpoint",
  79. "request_object_encryption_alg_values_supported",
  80. "request_object_encryption_enc_values_supported",
  81. "request_object_signing_alg_values_supported",
  82. "request_parameter_supported",
  83. "request_uri_parameter_supported",
  84. "require_pushed_authorization_requests",
  85. "require_request_uri_registration",
  86. "response_modes_supported",
  87. "response_types_supported",
  88. "revocation_endpoint",
  89. "revocation_endpoint_auth_methods_supported",
  90. "revocation_endpoint_auth_signing_alg_values_supported",
  91. "scopes_supported",
  92. "subject_types_supported",
  93. "tls_client_certificate_bound_access_tokens",
  94. "token_endpoint",
  95. "token_endpoint_auth_methods_supported",
  96. "token_endpoint_auth_signing_alg_values_supported",
  97. "userinfo_encryption_alg_values_supported",
  98. "userinfo_encryption_enc_values_supported",
  99. "userinfo_endpoint",
  100. "userinfo_signing_alg_values_supported",
  101. ]:
  102. assert key in res
  103. def test_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None:
  104. """
  105. Test the auth_url method.
  106. :param env: Environment fixture
  107. :type env: KeycloakTestEnv
  108. :param oid: Keycloak OpenID client
  109. :type oid: KeycloakOpenID
  110. """
  111. res = oid.auth_url(redirect_uri="http://test.test/*")
  112. assert (
  113. res == f"http://{env.keycloak_host}:{env.keycloak_port}/realms/{oid.realm_name}"
  114. f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  115. "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
  116. )
  117. def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  118. """
  119. Test the token method.
  120. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  121. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  122. """
  123. oid, username, password = oid_with_credentials
  124. token = oid.token(username=username, password=password)
  125. assert token == {
  126. "access_token": mock.ANY,
  127. "expires_in": mock.ANY,
  128. "id_token": mock.ANY,
  129. "not-before-policy": 0,
  130. "refresh_expires_in": mock.ANY,
  131. "refresh_token": mock.ANY,
  132. "scope": mock.ANY,
  133. "session_state": mock.ANY,
  134. "token_type": "Bearer",
  135. }
  136. # Test with dummy totp
  137. token = oid.token(username=username, password=password, totp="123456")
  138. assert token == {
  139. "access_token": mock.ANY,
  140. "expires_in": mock.ANY,
  141. "id_token": mock.ANY,
  142. "not-before-policy": 0,
  143. "refresh_expires_in": mock.ANY,
  144. "refresh_token": mock.ANY,
  145. "scope": mock.ANY,
  146. "session_state": mock.ANY,
  147. "token_type": "Bearer",
  148. }
  149. # Test with extra param
  150. token = oid.token(username=username, password=password, extra_param="foo")
  151. assert token == {
  152. "access_token": mock.ANY,
  153. "expires_in": mock.ANY,
  154. "id_token": mock.ANY,
  155. "not-before-policy": 0,
  156. "refresh_expires_in": mock.ANY,
  157. "refresh_token": mock.ANY,
  158. "scope": mock.ANY,
  159. "session_state": mock.ANY,
  160. "token_type": "Bearer",
  161. }
  162. def test_exchange_token(
  163. oid_with_credentials: tuple[KeycloakOpenID, str, str],
  164. admin: KeycloakAdmin,
  165. ) -> None:
  166. """
  167. Test the exchange token method.
  168. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  169. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  170. :param admin: Keycloak Admin client
  171. :type admin: KeycloakAdmin
  172. """
  173. # Verify existing user
  174. oid, username, password = oid_with_credentials
  175. # Allow impersonation
  176. admin.change_current_realm(oid.realm_name)
  177. admin.assign_client_role(
  178. user_id=admin.get_user_id(username=username),
  179. client_id=admin.get_client_id(client_id="realm-management"),
  180. roles=[
  181. admin.get_client_role(
  182. client_id=admin.get_client_id(client_id="realm-management"),
  183. role_name="impersonation",
  184. ),
  185. ],
  186. )
  187. token = oid.token(username=username, password=password)
  188. assert oid.userinfo(token=token["access_token"]) == {
  189. "email": f"{username}@test.test",
  190. "email_verified": True,
  191. "family_name": "last",
  192. "given_name": "first",
  193. "name": "first last",
  194. "preferred_username": username,
  195. "sub": mock.ANY,
  196. }
  197. # Exchange token with the new user
  198. new_token = oid.exchange_token(
  199. token=token["access_token"],
  200. audience=oid.client_id,
  201. subject=username,
  202. )
  203. assert oid.userinfo(token=new_token["access_token"]) == {
  204. "email": f"{username}@test.test",
  205. "email_verified": True,
  206. "family_name": "last",
  207. "given_name": "first",
  208. "name": "first last",
  209. "preferred_username": username,
  210. "sub": mock.ANY,
  211. }
  212. assert token != new_token
  213. def test_logout(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  214. """
  215. Test logout.
  216. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  217. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  218. """
  219. oid, username, password = oid_with_credentials
  220. token = oid.token(username=username, password=password)
  221. assert oid.userinfo(token=token["access_token"]) != {}
  222. assert oid.logout(refresh_token=token["refresh_token"]) == {}
  223. with pytest.raises(KeycloakAuthenticationError):
  224. oid.userinfo(token=token["access_token"])
  225. def test_certs(oid: KeycloakOpenID) -> None:
  226. """
  227. Test certificates.
  228. :param oid: Keycloak OpenID client
  229. :type oid: KeycloakOpenID
  230. """
  231. assert len(oid.certs()["keys"]) == 2
  232. def test_public_key(oid: KeycloakOpenID) -> None:
  233. """
  234. Test public key.
  235. :param oid: Keycloak OpenID client
  236. :type oid: KeycloakOpenID
  237. """
  238. assert oid.public_key() is not None
  239. def test_entitlement(
  240. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  241. admin: KeycloakAdmin,
  242. ) -> None:
  243. """
  244. Test entitlement.
  245. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  246. server with client credentials
  247. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  248. :param admin: Keycloak Admin client
  249. :type admin: KeycloakAdmin
  250. """
  251. oid, username, password = oid_with_credentials_authz
  252. token = oid.token(username=username, password=password)
  253. resource_server_id = admin.get_client_authz_resources(
  254. client_id=admin.get_client_id(oid.client_id),
  255. )[0]["_id"]
  256. with pytest.raises(KeycloakDeprecationError):
  257. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  258. def test_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  259. """
  260. Test introspect.
  261. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  262. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  263. """
  264. oid, username, password = oid_with_credentials
  265. token = oid.token(username=username, password=password)
  266. assert oid.introspect(token=token["access_token"])["active"]
  267. assert oid.introspect(
  268. token=token["access_token"],
  269. rpt="some",
  270. token_type_hint="requesting_party_token", # noqa: S106
  271. ) == {"active": False}
  272. with pytest.raises(KeycloakRPTNotFound):
  273. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token") # noqa: S106
  274. def test_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  275. """
  276. Test decode token.
  277. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  278. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  279. """
  280. oid, username, password = oid_with_credentials
  281. token = oid.token(username=username, password=password)
  282. decoded_access_token = oid.decode_token(token=token["access_token"])
  283. decoded_access_token_2 = oid.decode_token(token=token["access_token"], validate=False)
  284. decoded_refresh_token = oid.decode_token(token=token["refresh_token"], validate=False)
  285. assert decoded_access_token == decoded_access_token_2
  286. assert decoded_access_token["preferred_username"] == username, decoded_access_token
  287. assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
  288. def test_decode_token_invalid_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  289. """
  290. Test decode token with an invalid token.
  291. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  292. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  293. """
  294. oid, username, password = oid_with_credentials
  295. token = oid.token(username=username, password=password)
  296. access_token = token["access_token"]
  297. decoded_access_token = oid.decode_token(token=access_token)
  298. key = oid.public_key()
  299. key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----"
  300. key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8"))
  301. invalid_access_token = access_token + "a"
  302. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  303. decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=True)
  304. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  305. decoded_invalid_access_token = oid.decode_token(
  306. token=invalid_access_token,
  307. validate=True,
  308. key=key,
  309. )
  310. decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False)
  311. assert decoded_access_token == decoded_invalid_access_token
  312. decoded_invalid_access_token = oid.decode_token(
  313. token=invalid_access_token,
  314. validate=False,
  315. key=key,
  316. )
  317. assert decoded_access_token == decoded_invalid_access_token
  318. def test_load_authorization_config(
  319. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  320. ) -> None:
  321. """
  322. Test load authorization config.
  323. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  324. server with client credentials
  325. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  326. """
  327. oid, username, password = oid_with_credentials_authz
  328. oid.load_authorization_config(path="tests/data/authz_settings.json")
  329. assert "test-authz-rb-policy" in oid.authorization.policies
  330. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  331. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  332. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  333. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  334. assert isinstance(
  335. oid.authorization.policies["test-authz-rb-policy"].permissions[0],
  336. Permission,
  337. )
  338. def test_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
  339. """
  340. Test get policies.
  341. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  342. server with client credentials
  343. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  344. """
  345. oid, username, password = oid_with_credentials_authz
  346. token = oid.token(username=username, password=password)
  347. with pytest.raises(KeycloakAuthorizationConfigError):
  348. oid.get_policies(token=token["access_token"])
  349. oid.load_authorization_config(path="tests/data/authz_settings.json")
  350. assert oid.get_policies(token=token["access_token"]) is None
  351. orig_client_id = oid.client_id
  352. oid.client_id = "account"
  353. assert oid.get_policies(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
  354. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  355. policy.add_role(role="account/view-profile")
  356. oid.authorization.policies["test"] = policy
  357. assert [
  358. str(x)
  359. for x in oid.get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
  360. ] == ["Policy: test (role)"]
  361. assert [
  362. repr(x)
  363. for x in oid.get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
  364. ] == ["<Policy: test (role)>"]
  365. oid.client_id = orig_client_id
  366. oid.logout(refresh_token=token["refresh_token"])
  367. with pytest.raises(KeycloakInvalidTokenError):
  368. oid.get_policies(token=token["access_token"])
  369. def test_get_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
  370. """
  371. Test get policies.
  372. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  373. server with client credentials
  374. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  375. """
  376. oid, username, password = oid_with_credentials_authz
  377. token = oid.token(username=username, password=password)
  378. with pytest.raises(KeycloakAuthorizationConfigError):
  379. oid.get_permissions(token=token["access_token"])
  380. oid.load_authorization_config(path="tests/data/authz_settings.json")
  381. assert oid.get_permissions(token=token["access_token"]) is None
  382. orig_client_id = oid.client_id
  383. oid.client_id = "account"
  384. assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
  385. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  386. policy.add_role(role="account/view-profile")
  387. policy.add_permission(
  388. permission=Permission(
  389. name="test-perm",
  390. type="resource",
  391. logic="POSITIVE",
  392. decision_strategy="UNANIMOUS",
  393. ),
  394. )
  395. oid.authorization.policies["test"] = policy
  396. assert [
  397. str(x)
  398. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") # noqa: S106
  399. ] == ["Permission: test-perm (resource)"]
  400. assert [
  401. repr(x)
  402. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") # noqa: S106
  403. ] == ["<Permission: test-perm (resource)>"]
  404. oid.client_id = orig_client_id
  405. oid.logout(refresh_token=token["refresh_token"])
  406. with pytest.raises(KeycloakInvalidTokenError):
  407. oid.get_permissions(token=token["access_token"])
  408. def test_uma_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
  409. """
  410. Test UMA permissions.
  411. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  412. server with client credentials
  413. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  414. """
  415. oid, username, password = oid_with_credentials_authz
  416. token = oid.token(username=username, password=password)
  417. assert len(oid.uma_permissions(token=token["access_token"])) == 1
  418. assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
  419. def test_has_uma_access(
  420. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  421. admin: KeycloakAdmin,
  422. ) -> None:
  423. """
  424. Test has UMA access.
  425. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  426. server with client credentials
  427. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  428. :param admin: Keycloak Admin client
  429. :type admin: KeycloakAdmin
  430. """
  431. oid, username, password = oid_with_credentials_authz
  432. token = oid.token(username=username, password=password)
  433. assert (
  434. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  435. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  436. )
  437. assert (
  438. str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
  439. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  440. )
  441. with pytest.raises(KeycloakPostError):
  442. oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
  443. oid.logout(refresh_token=token["refresh_token"])
  444. assert (
  445. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  446. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  447. )
  448. assert (
  449. str(
  450. oid.has_uma_access(
  451. token=admin.connection.token["access_token"],
  452. permissions="Default Resource",
  453. ),
  454. )
  455. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  456. "{'Default Resource'})"
  457. )
  458. def test_device(oid_with_credentials_device: tuple[KeycloakOpenID, str, str]) -> None:
  459. """
  460. Test device authorization flow.
  461. :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
  462. credentials and device authorization flow enabled
  463. :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
  464. """
  465. oid, _, _ = oid_with_credentials_device
  466. res = oid.device()
  467. assert res == {
  468. "device_code": mock.ANY,
  469. "user_code": mock.ANY,
  470. "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
  471. "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
  472. f"device?user_code={res['user_code']}",
  473. "expires_in": 600,
  474. "interval": 5,
  475. }
  476. # async function start
  477. @pytest.mark.asyncio
  478. async def test_a_well_known(oid: KeycloakOpenID) -> None:
  479. """
  480. Test the well_known method.
  481. :param oid: Keycloak OpenID client
  482. :type oid: KeycloakOpenID
  483. """
  484. res = await oid.a_well_known()
  485. assert res is not None
  486. assert res != {}
  487. for key in [
  488. "acr_values_supported",
  489. "authorization_encryption_alg_values_supported",
  490. "authorization_encryption_enc_values_supported",
  491. "authorization_endpoint",
  492. "authorization_signing_alg_values_supported",
  493. "backchannel_authentication_endpoint",
  494. "backchannel_authentication_request_signing_alg_values_supported",
  495. "backchannel_logout_session_supported",
  496. "backchannel_logout_supported",
  497. "backchannel_token_delivery_modes_supported",
  498. "check_session_iframe",
  499. "claim_types_supported",
  500. "claims_parameter_supported",
  501. "claims_supported",
  502. "code_challenge_methods_supported",
  503. "device_authorization_endpoint",
  504. "end_session_endpoint",
  505. "frontchannel_logout_session_supported",
  506. "frontchannel_logout_supported",
  507. "grant_types_supported",
  508. "id_token_encryption_alg_values_supported",
  509. "id_token_encryption_enc_values_supported",
  510. "id_token_signing_alg_values_supported",
  511. "introspection_endpoint",
  512. "introspection_endpoint_auth_methods_supported",
  513. "introspection_endpoint_auth_signing_alg_values_supported",
  514. "issuer",
  515. "jwks_uri",
  516. "mtls_endpoint_aliases",
  517. "pushed_authorization_request_endpoint",
  518. "registration_endpoint",
  519. "request_object_encryption_alg_values_supported",
  520. "request_object_encryption_enc_values_supported",
  521. "request_object_signing_alg_values_supported",
  522. "request_parameter_supported",
  523. "request_uri_parameter_supported",
  524. "require_pushed_authorization_requests",
  525. "require_request_uri_registration",
  526. "response_modes_supported",
  527. "response_types_supported",
  528. "revocation_endpoint",
  529. "revocation_endpoint_auth_methods_supported",
  530. "revocation_endpoint_auth_signing_alg_values_supported",
  531. "scopes_supported",
  532. "subject_types_supported",
  533. "tls_client_certificate_bound_access_tokens",
  534. "token_endpoint",
  535. "token_endpoint_auth_methods_supported",
  536. "token_endpoint_auth_signing_alg_values_supported",
  537. "userinfo_encryption_alg_values_supported",
  538. "userinfo_encryption_enc_values_supported",
  539. "userinfo_endpoint",
  540. "userinfo_signing_alg_values_supported",
  541. ]:
  542. assert key in res
  543. @pytest.mark.asyncio
  544. async def test_a_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None:
  545. """
  546. Test the auth_url method.
  547. :param env: Environment fixture
  548. :type env: KeycloakTestEnv
  549. :param oid: Keycloak OpenID client
  550. :type oid: KeycloakOpenID
  551. """
  552. res = await oid.a_auth_url(redirect_uri="http://test.test/*")
  553. assert (
  554. res == f"http://{env.keycloak_host}:{env.keycloak_port}/realms/{oid.realm_name}"
  555. f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  556. "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
  557. )
  558. @pytest.mark.asyncio
  559. async def test_a_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  560. """
  561. Test the token method.
  562. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  563. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  564. """
  565. oid, username, password = oid_with_credentials
  566. token = await oid.a_token(username=username, password=password)
  567. assert token == {
  568. "access_token": mock.ANY,
  569. "expires_in": mock.ANY,
  570. "id_token": mock.ANY,
  571. "not-before-policy": 0,
  572. "refresh_expires_in": mock.ANY,
  573. "refresh_token": mock.ANY,
  574. "scope": mock.ANY,
  575. "session_state": mock.ANY,
  576. "token_type": "Bearer",
  577. }
  578. # Test with dummy totp
  579. token = await oid.a_token(username=username, password=password, totp="123456")
  580. assert token == {
  581. "access_token": mock.ANY,
  582. "expires_in": mock.ANY,
  583. "id_token": mock.ANY,
  584. "not-before-policy": 0,
  585. "refresh_expires_in": mock.ANY,
  586. "refresh_token": mock.ANY,
  587. "scope": mock.ANY,
  588. "session_state": mock.ANY,
  589. "token_type": "Bearer",
  590. }
  591. # Test with extra param
  592. token = await oid.a_token(username=username, password=password, extra_param="foo")
  593. assert token == {
  594. "access_token": mock.ANY,
  595. "expires_in": mock.ANY,
  596. "id_token": mock.ANY,
  597. "not-before-policy": 0,
  598. "refresh_expires_in": mock.ANY,
  599. "refresh_token": mock.ANY,
  600. "scope": mock.ANY,
  601. "session_state": mock.ANY,
  602. "token_type": "Bearer",
  603. }
  604. @pytest.mark.asyncio
  605. async def test_a_exchange_token(
  606. oid_with_credentials: tuple[KeycloakOpenID, str, str],
  607. admin: KeycloakAdmin,
  608. ) -> None:
  609. """
  610. Test the exchange token method.
  611. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  612. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  613. :param admin: Keycloak Admin client
  614. :type admin: KeycloakAdmin
  615. """
  616. # Verify existing user
  617. oid, username, password = oid_with_credentials
  618. # Allow impersonation
  619. await admin.a_change_current_realm(oid.realm_name)
  620. await admin.a_assign_client_role(
  621. user_id=await admin.a_get_user_id(username=username),
  622. client_id=await admin.a_get_client_id(client_id="realm-management"),
  623. roles=[
  624. await admin.a_get_client_role(
  625. client_id=admin.get_client_id(client_id="realm-management"),
  626. role_name="impersonation",
  627. ),
  628. ],
  629. )
  630. token = await oid.a_token(username=username, password=password)
  631. assert await oid.a_userinfo(token=token["access_token"]) == {
  632. "email": f"{username}@test.test",
  633. "email_verified": True,
  634. "family_name": "last",
  635. "given_name": "first",
  636. "name": "first last",
  637. "preferred_username": username,
  638. "sub": mock.ANY,
  639. }
  640. # Exchange token with the new user
  641. new_token = oid.exchange_token(
  642. token=token["access_token"],
  643. audience=oid.client_id,
  644. subject=username,
  645. )
  646. assert await oid.a_userinfo(token=new_token["access_token"]) == {
  647. "email": f"{username}@test.test",
  648. "email_verified": True,
  649. "family_name": "last",
  650. "given_name": "first",
  651. "name": "first last",
  652. "preferred_username": username,
  653. "sub": mock.ANY,
  654. }
  655. assert token != new_token
  656. @pytest.mark.asyncio
  657. async def test_a_logout(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  658. """
  659. Test logout.
  660. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  661. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  662. """
  663. oid, username, password = oid_with_credentials
  664. token = await oid.a_token(username=username, password=password)
  665. assert await oid.a_userinfo(token=token["access_token"]) != {}
  666. assert await oid.a_logout(refresh_token=token["refresh_token"]) == {}
  667. with pytest.raises(KeycloakAuthenticationError):
  668. await oid.a_userinfo(token=token["access_token"])
  669. @pytest.mark.asyncio
  670. async def test_a_certs(oid: KeycloakOpenID) -> None:
  671. """
  672. Test certificates.
  673. :param oid: Keycloak OpenID client
  674. :type oid: KeycloakOpenID
  675. """
  676. assert len((await oid.a_certs())["keys"]) == 2
  677. @pytest.mark.asyncio
  678. async def test_a_public_key(oid: KeycloakOpenID) -> None:
  679. """
  680. Test public key.
  681. :param oid: Keycloak OpenID client
  682. :type oid: KeycloakOpenID
  683. """
  684. assert await oid.a_public_key() is not None
  685. @pytest.mark.asyncio
  686. async def test_a_entitlement(
  687. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  688. admin: KeycloakAdmin,
  689. ) -> None:
  690. """
  691. Test entitlement.
  692. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  693. server with client credentials
  694. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  695. :param admin: Keycloak Admin client
  696. :type admin: KeycloakAdmin
  697. """
  698. oid, username, password = oid_with_credentials_authz
  699. token = await oid.a_token(username=username, password=password)
  700. resource_server_id = admin.get_client_authz_resources(
  701. client_id=admin.get_client_id(oid.client_id),
  702. )[0]["_id"]
  703. with pytest.raises(KeycloakDeprecationError):
  704. await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  705. @pytest.mark.asyncio
  706. async def test_a_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  707. """
  708. Test introspect.
  709. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  710. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  711. """
  712. oid, username, password = oid_with_credentials
  713. token = await oid.a_token(username=username, password=password)
  714. assert (await oid.a_introspect(token=token["access_token"]))["active"]
  715. assert await oid.a_introspect(
  716. token=token["access_token"],
  717. rpt="some",
  718. token_type_hint="requesting_party_token", # noqa: S106
  719. ) == {"active": False}
  720. with pytest.raises(KeycloakRPTNotFound):
  721. await oid.a_introspect(
  722. token=token["access_token"],
  723. token_type_hint="requesting_party_token", # noqa: S106
  724. )
  725. @pytest.mark.asyncio
  726. async def test_a_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
  727. """
  728. Test decode token asynchronously.
  729. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  730. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  731. """
  732. oid, username, password = oid_with_credentials
  733. token = await oid.a_token(username=username, password=password)
  734. decoded_access_token = await oid.a_decode_token(token=token["access_token"])
  735. decoded_access_token_2 = await oid.a_decode_token(token=token["access_token"], validate=False)
  736. decoded_refresh_token = await oid.a_decode_token(token=token["refresh_token"], validate=False)
  737. assert decoded_access_token == decoded_access_token_2
  738. assert decoded_access_token["preferred_username"] == username, decoded_access_token
  739. assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
  740. @pytest.mark.asyncio
  741. async def test_a_decode_token_invalid_token(
  742. oid_with_credentials: tuple[KeycloakOpenID, str, str],
  743. ) -> None:
  744. """
  745. Test decode token asynchronously an invalid token.
  746. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  747. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  748. """
  749. oid, username, password = oid_with_credentials
  750. token = await oid.a_token(username=username, password=password)
  751. access_token = token["access_token"]
  752. decoded_access_token = await oid.a_decode_token(token=access_token)
  753. key = await oid.a_public_key()
  754. key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----"
  755. key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8"))
  756. invalid_access_token = access_token + "a"
  757. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  758. decoded_invalid_access_token = await oid.a_decode_token(
  759. token=invalid_access_token,
  760. validate=True,
  761. )
  762. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  763. decoded_invalid_access_token = await oid.a_decode_token(
  764. token=invalid_access_token,
  765. validate=True,
  766. key=key,
  767. )
  768. decoded_invalid_access_token = await oid.a_decode_token(
  769. token=invalid_access_token,
  770. validate=False,
  771. )
  772. assert decoded_access_token == decoded_invalid_access_token
  773. decoded_invalid_access_token = await oid.a_decode_token(
  774. token=invalid_access_token,
  775. validate=False,
  776. key=key,
  777. )
  778. assert decoded_access_token == decoded_invalid_access_token
  779. @pytest.mark.asyncio
  780. async def test_a_load_authorization_config(
  781. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  782. ) -> None:
  783. """
  784. Test load authorization config.
  785. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  786. server with client credentials
  787. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  788. """
  789. oid, username, password = oid_with_credentials_authz
  790. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  791. assert "test-authz-rb-policy" in oid.authorization.policies
  792. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  793. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  794. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  795. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  796. assert isinstance(
  797. oid.authorization.policies["test-authz-rb-policy"].permissions[0],
  798. Permission,
  799. )
  800. @pytest.mark.asyncio
  801. async def test_a_has_uma_access(
  802. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  803. admin: KeycloakAdmin,
  804. ) -> None:
  805. """
  806. Test has UMA access.
  807. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  808. server with client credentials
  809. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  810. :param admin: Keycloak Admin client
  811. :type admin: KeycloakAdmin
  812. """
  813. oid, username, password = oid_with_credentials_authz
  814. token = await oid.a_token(username=username, password=password)
  815. assert (
  816. str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
  817. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  818. )
  819. assert (
  820. str(
  821. await oid.a_has_uma_access(
  822. token=token["access_token"],
  823. permissions="Default Resource",
  824. ),
  825. )
  826. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  827. )
  828. with pytest.raises(KeycloakPostError):
  829. await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist")
  830. await oid.a_logout(refresh_token=token["refresh_token"])
  831. assert (
  832. str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
  833. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  834. )
  835. assert (
  836. str(
  837. await oid.a_has_uma_access(
  838. token=admin.connection.token["access_token"],
  839. permissions="Default Resource",
  840. ),
  841. )
  842. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  843. "{'Default Resource'})"
  844. )
  845. @pytest.mark.asyncio
  846. async def test_a_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
  847. """
  848. Test get policies.
  849. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  850. server with client credentials
  851. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  852. """
  853. oid, username, password = oid_with_credentials_authz
  854. token = await oid.a_token(username=username, password=password)
  855. with pytest.raises(KeycloakAuthorizationConfigError):
  856. await oid.a_get_policies(token=token["access_token"])
  857. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  858. assert await oid.a_get_policies(token=token["access_token"]) is None
  859. orig_client_id = oid.client_id
  860. oid.client_id = "account"
  861. assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
  862. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  863. policy.add_role(role="account/view-profile")
  864. oid.authorization.policies["test"] = policy
  865. assert [
  866. str(x)
  867. for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
  868. ] == ["Policy: test (role)"]
  869. assert [
  870. repr(x)
  871. for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
  872. ] == ["<Policy: test (role)>"]
  873. oid.client_id = orig_client_id
  874. await oid.a_logout(refresh_token=token["refresh_token"])
  875. with pytest.raises(KeycloakInvalidTokenError):
  876. await oid.a_get_policies(token=token["access_token"])
  877. @pytest.mark.asyncio
  878. async def test_a_get_permissions(
  879. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  880. ) -> None:
  881. """
  882. Test get policies.
  883. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  884. server with client credentials
  885. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  886. """
  887. oid, username, password = oid_with_credentials_authz
  888. token = await oid.a_token(username=username, password=password)
  889. with pytest.raises(KeycloakAuthorizationConfigError):
  890. await oid.a_get_permissions(token=token["access_token"])
  891. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  892. assert await oid.a_get_permissions(token=token["access_token"]) is None
  893. orig_client_id = oid.client_id
  894. oid.client_id = "account"
  895. assert (
  896. await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
  897. )
  898. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  899. policy.add_role(role="account/view-profile")
  900. policy.add_permission(
  901. permission=Permission(
  902. name="test-perm",
  903. type="resource",
  904. logic="POSITIVE",
  905. decision_strategy="UNANIMOUS",
  906. ),
  907. )
  908. oid.authorization.policies["test"] = policy
  909. assert [
  910. str(x)
  911. for x in await oid.a_get_permissions(
  912. token=token["access_token"],
  913. method_token_info="decode", # noqa: S106
  914. )
  915. ] == ["Permission: test-perm (resource)"]
  916. assert [
  917. repr(x)
  918. for x in await oid.a_get_permissions(
  919. token=token["access_token"],
  920. method_token_info="decode", # noqa: S106
  921. )
  922. ] == ["<Permission: test-perm (resource)>"]
  923. oid.client_id = orig_client_id
  924. await oid.a_logout(refresh_token=token["refresh_token"])
  925. with pytest.raises(KeycloakInvalidTokenError):
  926. await oid.a_get_permissions(token=token["access_token"])
  927. @pytest.mark.asyncio
  928. async def test_a_uma_permissions(
  929. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
  930. ) -> None:
  931. """
  932. Test UMA permissions.
  933. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  934. server with client credentials
  935. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  936. """
  937. oid, username, password = oid_with_credentials_authz
  938. token = await oid.a_token(username=username, password=password)
  939. assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1
  940. assert (await oid.a_uma_permissions(token=token["access_token"]))[0][
  941. "rsname"
  942. ] == "Default Resource"
  943. @pytest.mark.asyncio
  944. async def test_a_device(oid_with_credentials_device: tuple[KeycloakOpenID, str, str]) -> None:
  945. """
  946. Test device authorization flow.
  947. :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
  948. credentials and device authorization flow enabled
  949. :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
  950. """
  951. oid, _, _ = oid_with_credentials_device
  952. res = await oid.a_device()
  953. assert res == {
  954. "device_code": mock.ANY,
  955. "user_code": mock.ANY,
  956. "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
  957. "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
  958. f"device?user_code={res['user_code']}",
  959. "expires_in": 600,
  960. "interval": 5,
  961. }
  962. def test_counter_part() -> None:
  963. """Test that each function has its async counter part."""
  964. openid_methods = [
  965. func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func))
  966. ]
  967. sync_methods = [
  968. method
  969. for method in openid_methods
  970. if not method.startswith("a_") and not method.startswith("_")
  971. ]
  972. async_methods = [
  973. method for method in openid_methods if iscoroutinefunction(getattr(KeycloakOpenID, method))
  974. ]
  975. for method in sync_methods:
  976. async_method = f"a_{method}"
  977. assert (async_method in openid_methods) is True
  978. sync_sign = signature(getattr(KeycloakOpenID, method))
  979. async_sign = signature(getattr(KeycloakOpenID, async_method))
  980. assert sync_sign.parameters == async_sign.parameters
  981. for async_method in async_methods:
  982. if async_method[2:].startswith("_"):
  983. continue
  984. assert async_method[2:] in sync_methods