You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1075 lines
40 KiB

10 months ago
  1. """Test module for KeycloakOpenID."""
  2. from inspect import iscoroutinefunction, signature
  3. from typing import Tuple
  4. from unittest import mock
  5. import jwcrypto.jwk
  6. import jwcrypto.jws
  7. import pytest
  8. from keycloak import KeycloakAdmin, KeycloakOpenID
  9. from keycloak.authorization import Authorization
  10. from keycloak.authorization.permission import Permission
  11. from keycloak.authorization.policy import Policy
  12. from keycloak.authorization.role import Role
  13. from keycloak.connection import ConnectionManager
  14. from keycloak.exceptions import (
  15. KeycloakAuthenticationError,
  16. KeycloakAuthorizationConfigError,
  17. KeycloakDeprecationError,
  18. KeycloakInvalidTokenError,
  19. KeycloakPostError,
  20. KeycloakRPTNotFound,
  21. )
  22. def test_keycloak_openid_init(env):
  23. """Test KeycloakOpenId's init method.
  24. :param env: Environment fixture
  25. :type env: KeycloakTestEnv
  26. """
  27. oid = KeycloakOpenID(
  28. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  29. realm_name="master",
  30. client_id="admin-cli",
  31. )
  32. assert oid.client_id == "admin-cli"
  33. assert oid.client_secret_key is None
  34. assert oid.realm_name == "master"
  35. assert isinstance(oid.connection, ConnectionManager)
  36. assert isinstance(oid.authorization, Authorization)
  37. def test_well_known(oid: KeycloakOpenID):
  38. """Test the well_known method.
  39. :param oid: Keycloak OpenID client
  40. :type oid: KeycloakOpenID
  41. """
  42. res = oid.well_known()
  43. assert res is not None
  44. assert res != dict()
  45. for key in [
  46. "acr_values_supported",
  47. "authorization_encryption_alg_values_supported",
  48. "authorization_encryption_enc_values_supported",
  49. "authorization_endpoint",
  50. "authorization_signing_alg_values_supported",
  51. "backchannel_authentication_endpoint",
  52. "backchannel_authentication_request_signing_alg_values_supported",
  53. "backchannel_logout_session_supported",
  54. "backchannel_logout_supported",
  55. "backchannel_token_delivery_modes_supported",
  56. "check_session_iframe",
  57. "claim_types_supported",
  58. "claims_parameter_supported",
  59. "claims_supported",
  60. "code_challenge_methods_supported",
  61. "device_authorization_endpoint",
  62. "end_session_endpoint",
  63. "frontchannel_logout_session_supported",
  64. "frontchannel_logout_supported",
  65. "grant_types_supported",
  66. "id_token_encryption_alg_values_supported",
  67. "id_token_encryption_enc_values_supported",
  68. "id_token_signing_alg_values_supported",
  69. "introspection_endpoint",
  70. "introspection_endpoint_auth_methods_supported",
  71. "introspection_endpoint_auth_signing_alg_values_supported",
  72. "issuer",
  73. "jwks_uri",
  74. "mtls_endpoint_aliases",
  75. "pushed_authorization_request_endpoint",
  76. "registration_endpoint",
  77. "request_object_encryption_alg_values_supported",
  78. "request_object_encryption_enc_values_supported",
  79. "request_object_signing_alg_values_supported",
  80. "request_parameter_supported",
  81. "request_uri_parameter_supported",
  82. "require_pushed_authorization_requests",
  83. "require_request_uri_registration",
  84. "response_modes_supported",
  85. "response_types_supported",
  86. "revocation_endpoint",
  87. "revocation_endpoint_auth_methods_supported",
  88. "revocation_endpoint_auth_signing_alg_values_supported",
  89. "scopes_supported",
  90. "subject_types_supported",
  91. "tls_client_certificate_bound_access_tokens",
  92. "token_endpoint",
  93. "token_endpoint_auth_methods_supported",
  94. "token_endpoint_auth_signing_alg_values_supported",
  95. "userinfo_encryption_alg_values_supported",
  96. "userinfo_encryption_enc_values_supported",
  97. "userinfo_endpoint",
  98. "userinfo_signing_alg_values_supported",
  99. ]:
  100. assert key in res
  101. def test_auth_url(env, oid: KeycloakOpenID):
  102. """Test the auth_url method.
  103. :param env: Environment fixture
  104. :type env: KeycloakTestEnv
  105. :param oid: Keycloak OpenID client
  106. :type oid: KeycloakOpenID
  107. """
  108. res = oid.auth_url(redirect_uri="http://test.test/*")
  109. assert (
  110. res
  111. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  112. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  113. + "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
  114. )
  115. def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  116. """Test the token method.
  117. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  118. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  119. """
  120. oid, username, password = oid_with_credentials
  121. token = oid.token(username=username, password=password)
  122. assert token == {
  123. "access_token": mock.ANY,
  124. "expires_in": mock.ANY,
  125. "id_token": mock.ANY,
  126. "not-before-policy": 0,
  127. "refresh_expires_in": mock.ANY,
  128. "refresh_token": mock.ANY,
  129. "scope": mock.ANY,
  130. "session_state": mock.ANY,
  131. "token_type": "Bearer",
  132. }
  133. # Test with dummy totp
  134. token = oid.token(username=username, password=password, totp="123456")
  135. assert token == {
  136. "access_token": mock.ANY,
  137. "expires_in": mock.ANY,
  138. "id_token": mock.ANY,
  139. "not-before-policy": 0,
  140. "refresh_expires_in": mock.ANY,
  141. "refresh_token": mock.ANY,
  142. "scope": mock.ANY,
  143. "session_state": mock.ANY,
  144. "token_type": "Bearer",
  145. }
  146. # Test with extra param
  147. token = oid.token(username=username, password=password, extra_param="foo")
  148. assert token == {
  149. "access_token": mock.ANY,
  150. "expires_in": mock.ANY,
  151. "id_token": mock.ANY,
  152. "not-before-policy": 0,
  153. "refresh_expires_in": mock.ANY,
  154. "refresh_token": mock.ANY,
  155. "scope": mock.ANY,
  156. "session_state": mock.ANY,
  157. "token_type": "Bearer",
  158. }
  159. def test_exchange_token(
  160. oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  161. ):
  162. """Test the exchange token method.
  163. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  164. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  165. :param admin: Keycloak Admin client
  166. :type admin: KeycloakAdmin
  167. """
  168. # Verify existing user
  169. oid, username, password = oid_with_credentials
  170. # Allow impersonation
  171. admin.change_current_realm(oid.realm_name)
  172. admin.assign_client_role(
  173. user_id=admin.get_user_id(username=username),
  174. client_id=admin.get_client_id(client_id="realm-management"),
  175. roles=[
  176. admin.get_client_role(
  177. client_id=admin.get_client_id(client_id="realm-management"),
  178. role_name="impersonation",
  179. )
  180. ],
  181. )
  182. token = oid.token(username=username, password=password)
  183. assert oid.userinfo(token=token["access_token"]) == {
  184. "email": f"{username}@test.test",
  185. "email_verified": True,
  186. "family_name": "last",
  187. "given_name": "first",
  188. "name": "first last",
  189. "preferred_username": username,
  190. "sub": mock.ANY,
  191. }
  192. # Exchange token with the new user
  193. new_token = oid.exchange_token(
  194. token=token["access_token"], audience=oid.client_id, subject=username
  195. )
  196. assert oid.userinfo(token=new_token["access_token"]) == {
  197. "email": f"{username}@test.test",
  198. "email_verified": True,
  199. "family_name": "last",
  200. "given_name": "first",
  201. "name": "first last",
  202. "preferred_username": username,
  203. "sub": mock.ANY,
  204. }
  205. assert token != new_token
  206. def test_logout(oid_with_credentials):
  207. """Test logout.
  208. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  209. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  210. """
  211. oid, username, password = oid_with_credentials
  212. token = oid.token(username=username, password=password)
  213. assert oid.userinfo(token=token["access_token"]) != dict()
  214. assert oid.logout(refresh_token=token["refresh_token"]) == dict()
  215. with pytest.raises(KeycloakAuthenticationError):
  216. oid.userinfo(token=token["access_token"])
  217. def test_certs(oid: KeycloakOpenID):
  218. """Test certificates.
  219. :param oid: Keycloak OpenID client
  220. :type oid: KeycloakOpenID
  221. """
  222. assert len(oid.certs()["keys"]) == 2
  223. def test_public_key(oid: KeycloakOpenID):
  224. """Test public key.
  225. :param oid: Keycloak OpenID client
  226. :type oid: KeycloakOpenID
  227. """
  228. assert oid.public_key() is not None
  229. def test_entitlement(
  230. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  231. ):
  232. """Test entitlement.
  233. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  234. server with client credentials
  235. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  236. :param admin: Keycloak Admin client
  237. :type admin: KeycloakAdmin
  238. """
  239. oid, username, password = oid_with_credentials_authz
  240. token = oid.token(username=username, password=password)
  241. resource_server_id = admin.get_client_authz_resources(
  242. client_id=admin.get_client_id(oid.client_id)
  243. )[0]["_id"]
  244. with pytest.raises(KeycloakDeprecationError):
  245. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  246. def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  247. """Test introspect.
  248. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  249. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  250. """
  251. oid, username, password = oid_with_credentials
  252. token = oid.token(username=username, password=password)
  253. assert oid.introspect(token=token["access_token"])["active"]
  254. assert oid.introspect(
  255. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  256. ) == {"active": False}
  257. with pytest.raises(KeycloakRPTNotFound):
  258. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
  259. def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  260. """Test decode token.
  261. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  262. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  263. """
  264. oid, username, password = oid_with_credentials
  265. token = oid.token(username=username, password=password)
  266. decoded_access_token = oid.decode_token(token=token["access_token"])
  267. decoded_access_token_2 = oid.decode_token(token=token["access_token"], validate=False)
  268. decoded_refresh_token = oid.decode_token(token=token["refresh_token"], validate=False)
  269. assert decoded_access_token == decoded_access_token_2
  270. assert decoded_access_token["preferred_username"] == username, decoded_access_token
  271. assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
  272. def test_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  273. """Test decode token with an invalid token.
  274. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  275. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  276. """
  277. oid, username, password = oid_with_credentials
  278. token = oid.token(username=username, password=password)
  279. access_token = token["access_token"]
  280. decoded_access_token = oid.decode_token(token=access_token)
  281. key = oid.public_key()
  282. key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----"
  283. key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8"))
  284. invalid_access_token = access_token + "a"
  285. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  286. decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=True)
  287. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  288. decoded_invalid_access_token = oid.decode_token(
  289. token=invalid_access_token, validate=True, key=key
  290. )
  291. decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False)
  292. assert decoded_access_token == decoded_invalid_access_token
  293. decoded_invalid_access_token = oid.decode_token(
  294. token=invalid_access_token, validate=False, key=key
  295. )
  296. assert decoded_access_token == decoded_invalid_access_token
  297. def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  298. """Test load authorization config.
  299. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  300. server with client credentials
  301. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  302. """
  303. oid, username, password = oid_with_credentials_authz
  304. oid.load_authorization_config(path="tests/data/authz_settings.json")
  305. assert "test-authz-rb-policy" in oid.authorization.policies
  306. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  307. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  308. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  309. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  310. assert isinstance(
  311. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  312. )
  313. def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  314. """Test get policies.
  315. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  316. server with client credentials
  317. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  318. """
  319. oid, username, password = oid_with_credentials_authz
  320. token = oid.token(username=username, password=password)
  321. with pytest.raises(KeycloakAuthorizationConfigError):
  322. oid.get_policies(token=token["access_token"])
  323. oid.load_authorization_config(path="tests/data/authz_settings.json")
  324. assert oid.get_policies(token=token["access_token"]) is None
  325. orig_client_id = oid.client_id
  326. oid.client_id = "account"
  327. assert oid.get_policies(token=token["access_token"], method_token_info="decode") == []
  328. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  329. policy.add_role(role="account/view-profile")
  330. oid.authorization.policies["test"] = policy
  331. assert [
  332. str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
  333. ] == ["Policy: test (role)"]
  334. assert [
  335. repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
  336. ] == ["<Policy: test (role)>"]
  337. oid.client_id = orig_client_id
  338. oid.logout(refresh_token=token["refresh_token"])
  339. with pytest.raises(KeycloakInvalidTokenError):
  340. oid.get_policies(token=token["access_token"])
  341. def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  342. """Test get policies.
  343. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  344. server with client credentials
  345. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  346. """
  347. oid, username, password = oid_with_credentials_authz
  348. token = oid.token(username=username, password=password)
  349. with pytest.raises(KeycloakAuthorizationConfigError):
  350. oid.get_permissions(token=token["access_token"])
  351. oid.load_authorization_config(path="tests/data/authz_settings.json")
  352. assert oid.get_permissions(token=token["access_token"]) is None
  353. orig_client_id = oid.client_id
  354. oid.client_id = "account"
  355. assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == []
  356. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  357. policy.add_role(role="account/view-profile")
  358. policy.add_permission(
  359. permission=Permission(
  360. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  361. )
  362. )
  363. oid.authorization.policies["test"] = policy
  364. assert [
  365. str(x)
  366. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
  367. ] == ["Permission: test-perm (resource)"]
  368. assert [
  369. repr(x)
  370. for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
  371. ] == ["<Permission: test-perm (resource)>"]
  372. oid.client_id = orig_client_id
  373. oid.logout(refresh_token=token["refresh_token"])
  374. with pytest.raises(KeycloakInvalidTokenError):
  375. oid.get_permissions(token=token["access_token"])
  376. def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  377. """Test UMA permissions.
  378. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  379. server with client credentials
  380. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  381. """
  382. oid, username, password = oid_with_credentials_authz
  383. token = oid.token(username=username, password=password)
  384. assert len(oid.uma_permissions(token=token["access_token"])) == 1
  385. assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
  386. def test_has_uma_access(
  387. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  388. ):
  389. """Test has UMA access.
  390. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  391. server with client credentials
  392. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  393. :param admin: Keycloak Admin client
  394. :type admin: KeycloakAdmin
  395. """
  396. oid, username, password = oid_with_credentials_authz
  397. token = oid.token(username=username, password=password)
  398. assert (
  399. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  400. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  401. )
  402. assert (
  403. str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
  404. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  405. )
  406. with pytest.raises(KeycloakPostError):
  407. oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
  408. oid.logout(refresh_token=token["refresh_token"])
  409. assert (
  410. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  411. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  412. )
  413. assert (
  414. str(
  415. oid.has_uma_access(
  416. token=admin.connection.token["access_token"], permissions="Default Resource"
  417. )
  418. )
  419. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  420. + "{'Default Resource'})"
  421. )
  422. def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
  423. """Test device authorization flow.
  424. :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
  425. credentials and device authorization flow enabled
  426. :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
  427. """
  428. oid, _, _ = oid_with_credentials_device
  429. res = oid.device()
  430. assert res == {
  431. "device_code": mock.ANY,
  432. "user_code": mock.ANY,
  433. "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
  434. "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
  435. + f"device?user_code={res['user_code']}",
  436. "expires_in": 600,
  437. "interval": 5,
  438. }
  439. # async function start
  440. @pytest.mark.asyncio
  441. async def test_a_well_known(oid: KeycloakOpenID):
  442. """Test the well_known method.
  443. :param oid: Keycloak OpenID client
  444. :type oid: KeycloakOpenID
  445. """
  446. res = await oid.a_well_known()
  447. assert res is not None
  448. assert res != dict()
  449. for key in [
  450. "acr_values_supported",
  451. "authorization_encryption_alg_values_supported",
  452. "authorization_encryption_enc_values_supported",
  453. "authorization_endpoint",
  454. "authorization_signing_alg_values_supported",
  455. "backchannel_authentication_endpoint",
  456. "backchannel_authentication_request_signing_alg_values_supported",
  457. "backchannel_logout_session_supported",
  458. "backchannel_logout_supported",
  459. "backchannel_token_delivery_modes_supported",
  460. "check_session_iframe",
  461. "claim_types_supported",
  462. "claims_parameter_supported",
  463. "claims_supported",
  464. "code_challenge_methods_supported",
  465. "device_authorization_endpoint",
  466. "end_session_endpoint",
  467. "frontchannel_logout_session_supported",
  468. "frontchannel_logout_supported",
  469. "grant_types_supported",
  470. "id_token_encryption_alg_values_supported",
  471. "id_token_encryption_enc_values_supported",
  472. "id_token_signing_alg_values_supported",
  473. "introspection_endpoint",
  474. "introspection_endpoint_auth_methods_supported",
  475. "introspection_endpoint_auth_signing_alg_values_supported",
  476. "issuer",
  477. "jwks_uri",
  478. "mtls_endpoint_aliases",
  479. "pushed_authorization_request_endpoint",
  480. "registration_endpoint",
  481. "request_object_encryption_alg_values_supported",
  482. "request_object_encryption_enc_values_supported",
  483. "request_object_signing_alg_values_supported",
  484. "request_parameter_supported",
  485. "request_uri_parameter_supported",
  486. "require_pushed_authorization_requests",
  487. "require_request_uri_registration",
  488. "response_modes_supported",
  489. "response_types_supported",
  490. "revocation_endpoint",
  491. "revocation_endpoint_auth_methods_supported",
  492. "revocation_endpoint_auth_signing_alg_values_supported",
  493. "scopes_supported",
  494. "subject_types_supported",
  495. "tls_client_certificate_bound_access_tokens",
  496. "token_endpoint",
  497. "token_endpoint_auth_methods_supported",
  498. "token_endpoint_auth_signing_alg_values_supported",
  499. "userinfo_encryption_alg_values_supported",
  500. "userinfo_encryption_enc_values_supported",
  501. "userinfo_endpoint",
  502. "userinfo_signing_alg_values_supported",
  503. ]:
  504. assert key in res
  505. @pytest.mark.asyncio
  506. async def test_a_auth_url(env, oid: KeycloakOpenID):
  507. """Test the auth_url method.
  508. :param env: Environment fixture
  509. :type env: KeycloakTestEnv
  510. :param oid: Keycloak OpenID client
  511. :type oid: KeycloakOpenID
  512. """
  513. res = await oid.a_auth_url(redirect_uri="http://test.test/*")
  514. assert (
  515. res
  516. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  517. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  518. + "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
  519. )
  520. @pytest.mark.asyncio
  521. async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  522. """Test the token method.
  523. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  524. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  525. """
  526. oid, username, password = oid_with_credentials
  527. token = await oid.a_token(username=username, password=password)
  528. assert token == {
  529. "access_token": mock.ANY,
  530. "expires_in": mock.ANY,
  531. "id_token": mock.ANY,
  532. "not-before-policy": 0,
  533. "refresh_expires_in": mock.ANY,
  534. "refresh_token": mock.ANY,
  535. "scope": mock.ANY,
  536. "session_state": mock.ANY,
  537. "token_type": "Bearer",
  538. }
  539. # Test with dummy totp
  540. token = await oid.a_token(username=username, password=password, totp="123456")
  541. assert token == {
  542. "access_token": mock.ANY,
  543. "expires_in": mock.ANY,
  544. "id_token": mock.ANY,
  545. "not-before-policy": 0,
  546. "refresh_expires_in": mock.ANY,
  547. "refresh_token": mock.ANY,
  548. "scope": mock.ANY,
  549. "session_state": mock.ANY,
  550. "token_type": "Bearer",
  551. }
  552. # Test with extra param
  553. token = await oid.a_token(username=username, password=password, extra_param="foo")
  554. assert token == {
  555. "access_token": mock.ANY,
  556. "expires_in": mock.ANY,
  557. "id_token": mock.ANY,
  558. "not-before-policy": 0,
  559. "refresh_expires_in": mock.ANY,
  560. "refresh_token": mock.ANY,
  561. "scope": mock.ANY,
  562. "session_state": mock.ANY,
  563. "token_type": "Bearer",
  564. }
  565. @pytest.mark.asyncio
  566. async def test_a_exchange_token(
  567. oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  568. ):
  569. """Test the exchange token method.
  570. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  571. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  572. :param admin: Keycloak Admin client
  573. :type admin: KeycloakAdmin
  574. """
  575. # Verify existing user
  576. oid, username, password = oid_with_credentials
  577. # Allow impersonation
  578. await admin.a_change_current_realm(oid.realm_name)
  579. await admin.a_assign_client_role(
  580. user_id=await admin.a_get_user_id(username=username),
  581. client_id=await admin.a_get_client_id(client_id="realm-management"),
  582. roles=[
  583. await admin.a_get_client_role(
  584. client_id=admin.get_client_id(client_id="realm-management"),
  585. role_name="impersonation",
  586. )
  587. ],
  588. )
  589. token = await oid.a_token(username=username, password=password)
  590. assert await oid.a_userinfo(token=token["access_token"]) == {
  591. "email": f"{username}@test.test",
  592. "email_verified": True,
  593. "family_name": "last",
  594. "given_name": "first",
  595. "name": "first last",
  596. "preferred_username": username,
  597. "sub": mock.ANY,
  598. }
  599. # Exchange token with the new user
  600. new_token = oid.exchange_token(
  601. token=token["access_token"], audience=oid.client_id, subject=username
  602. )
  603. assert await oid.a_userinfo(token=new_token["access_token"]) == {
  604. "email": f"{username}@test.test",
  605. "email_verified": True,
  606. "family_name": "last",
  607. "given_name": "first",
  608. "name": "first last",
  609. "preferred_username": username,
  610. "sub": mock.ANY,
  611. }
  612. assert token != new_token
  613. @pytest.mark.asyncio
  614. async def test_a_logout(oid_with_credentials):
  615. """Test logout.
  616. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  617. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  618. """
  619. oid, username, password = oid_with_credentials
  620. token = await oid.a_token(username=username, password=password)
  621. assert await oid.a_userinfo(token=token["access_token"]) != dict()
  622. assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict()
  623. with pytest.raises(KeycloakAuthenticationError):
  624. await oid.a_userinfo(token=token["access_token"])
  625. @pytest.mark.asyncio
  626. async def test_a_certs(oid: KeycloakOpenID):
  627. """Test certificates.
  628. :param oid: Keycloak OpenID client
  629. :type oid: KeycloakOpenID
  630. """
  631. assert len((await oid.a_certs())["keys"]) == 2
  632. @pytest.mark.asyncio
  633. async def test_a_public_key(oid: KeycloakOpenID):
  634. """Test public key.
  635. :param oid: Keycloak OpenID client
  636. :type oid: KeycloakOpenID
  637. """
  638. assert await oid.a_public_key() is not None
  639. @pytest.mark.asyncio
  640. async def test_a_entitlement(
  641. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  642. ):
  643. """Test entitlement.
  644. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  645. server with client credentials
  646. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  647. :param admin: Keycloak Admin client
  648. :type admin: KeycloakAdmin
  649. """
  650. oid, username, password = oid_with_credentials_authz
  651. token = await oid.a_token(username=username, password=password)
  652. resource_server_id = admin.get_client_authz_resources(
  653. client_id=admin.get_client_id(oid.client_id)
  654. )[0]["_id"]
  655. with pytest.raises(KeycloakDeprecationError):
  656. await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  657. @pytest.mark.asyncio
  658. async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  659. """Test introspect.
  660. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  661. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  662. """
  663. oid, username, password = oid_with_credentials
  664. token = await oid.a_token(username=username, password=password)
  665. assert (await oid.a_introspect(token=token["access_token"]))["active"]
  666. assert await oid.a_introspect(
  667. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  668. ) == {"active": False}
  669. with pytest.raises(KeycloakRPTNotFound):
  670. await oid.a_introspect(
  671. token=token["access_token"], token_type_hint="requesting_party_token"
  672. )
  673. @pytest.mark.asyncio
  674. async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  675. """Test decode token asynchronously.
  676. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  677. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  678. """
  679. oid, username, password = oid_with_credentials
  680. token = await oid.a_token(username=username, password=password)
  681. decoded_access_token = await oid.a_decode_token(token=token["access_token"])
  682. decoded_access_token_2 = await oid.a_decode_token(token=token["access_token"], validate=False)
  683. decoded_refresh_token = await oid.a_decode_token(token=token["refresh_token"], validate=False)
  684. assert decoded_access_token == decoded_access_token_2
  685. assert decoded_access_token["preferred_username"] == username, decoded_access_token
  686. assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
  687. @pytest.mark.asyncio
  688. async def test_a_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  689. """Test decode token asynchronously an invalid token.
  690. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  691. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  692. """
  693. oid, username, password = oid_with_credentials
  694. token = await oid.a_token(username=username, password=password)
  695. access_token = token["access_token"]
  696. decoded_access_token = await oid.a_decode_token(token=access_token)
  697. key = await oid.a_public_key()
  698. key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----"
  699. key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8"))
  700. invalid_access_token = access_token + "a"
  701. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  702. decoded_invalid_access_token = await oid.a_decode_token(
  703. token=invalid_access_token, validate=True
  704. )
  705. with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
  706. decoded_invalid_access_token = await oid.a_decode_token(
  707. token=invalid_access_token, validate=True, key=key
  708. )
  709. decoded_invalid_access_token = await oid.a_decode_token(
  710. token=invalid_access_token, validate=False
  711. )
  712. assert decoded_access_token == decoded_invalid_access_token
  713. decoded_invalid_access_token = await oid.a_decode_token(
  714. token=invalid_access_token, validate=False, key=key
  715. )
  716. assert decoded_access_token == decoded_invalid_access_token
  717. @pytest.mark.asyncio
  718. async def test_a_load_authorization_config(
  719. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  720. ):
  721. """Test load authorization config.
  722. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  723. server with client credentials
  724. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  725. """
  726. oid, username, password = oid_with_credentials_authz
  727. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  728. assert "test-authz-rb-policy" in oid.authorization.policies
  729. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  730. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  731. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  732. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  733. assert isinstance(
  734. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  735. )
  736. @pytest.mark.asyncio
  737. async def test_a_has_uma_access(
  738. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  739. ):
  740. """Test has UMA access.
  741. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  742. server with client credentials
  743. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  744. :param admin: Keycloak Admin client
  745. :type admin: KeycloakAdmin
  746. """
  747. oid, username, password = oid_with_credentials_authz
  748. token = await oid.a_token(username=username, password=password)
  749. assert (
  750. str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
  751. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  752. )
  753. assert (
  754. str(
  755. await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")
  756. )
  757. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  758. )
  759. with pytest.raises(KeycloakPostError):
  760. await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist")
  761. await oid.a_logout(refresh_token=token["refresh_token"])
  762. assert (
  763. str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
  764. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  765. )
  766. assert (
  767. str(
  768. await oid.a_has_uma_access(
  769. token=admin.connection.token["access_token"], permissions="Default Resource"
  770. )
  771. )
  772. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  773. + "{'Default Resource'})"
  774. )
  775. @pytest.mark.asyncio
  776. async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  777. """Test get policies.
  778. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  779. server with client credentials
  780. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  781. """
  782. oid, username, password = oid_with_credentials_authz
  783. token = await oid.a_token(username=username, password=password)
  784. with pytest.raises(KeycloakAuthorizationConfigError):
  785. await oid.a_get_policies(token=token["access_token"])
  786. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  787. assert await oid.a_get_policies(token=token["access_token"]) is None
  788. orig_client_id = oid.client_id
  789. oid.client_id = "account"
  790. assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == []
  791. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  792. policy.add_role(role="account/view-profile")
  793. oid.authorization.policies["test"] = policy
  794. assert [
  795. str(x)
  796. for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
  797. ] == ["Policy: test (role)"]
  798. assert [
  799. repr(x)
  800. for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
  801. ] == ["<Policy: test (role)>"]
  802. oid.client_id = orig_client_id
  803. await oid.a_logout(refresh_token=token["refresh_token"])
  804. with pytest.raises(KeycloakInvalidTokenError):
  805. await oid.a_get_policies(token=token["access_token"])
  806. @pytest.mark.asyncio
  807. async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  808. """Test get policies.
  809. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  810. server with client credentials
  811. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  812. """
  813. oid, username, password = oid_with_credentials_authz
  814. token = await oid.a_token(username=username, password=password)
  815. with pytest.raises(KeycloakAuthorizationConfigError):
  816. await oid.a_get_permissions(token=token["access_token"])
  817. await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
  818. assert await oid.a_get_permissions(token=token["access_token"]) is None
  819. orig_client_id = oid.client_id
  820. oid.client_id = "account"
  821. assert (
  822. await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
  823. )
  824. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  825. policy.add_role(role="account/view-profile")
  826. policy.add_permission(
  827. permission=Permission(
  828. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  829. )
  830. )
  831. oid.authorization.policies["test"] = policy
  832. assert [
  833. str(x)
  834. for x in await oid.a_get_permissions(
  835. token=token["access_token"], method_token_info="decode"
  836. )
  837. ] == ["Permission: test-perm (resource)"]
  838. assert [
  839. repr(x)
  840. for x in await oid.a_get_permissions(
  841. token=token["access_token"], method_token_info="decode"
  842. )
  843. ] == ["<Permission: test-perm (resource)>"]
  844. oid.client_id = orig_client_id
  845. await oid.a_logout(refresh_token=token["refresh_token"])
  846. with pytest.raises(KeycloakInvalidTokenError):
  847. await oid.a_get_permissions(token=token["access_token"])
  848. @pytest.mark.asyncio
  849. async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  850. """Test UMA permissions.
  851. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  852. server with client credentials
  853. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  854. """
  855. oid, username, password = oid_with_credentials_authz
  856. token = await oid.a_token(username=username, password=password)
  857. assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1
  858. assert (await oid.a_uma_permissions(token=token["access_token"]))[0][
  859. "rsname"
  860. ] == "Default Resource"
  861. @pytest.mark.asyncio
  862. async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
  863. """Test device authorization flow.
  864. :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
  865. credentials and device authorization flow enabled
  866. :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
  867. """
  868. oid, _, _ = oid_with_credentials_device
  869. res = await oid.a_device()
  870. assert res == {
  871. "device_code": mock.ANY,
  872. "user_code": mock.ANY,
  873. "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
  874. "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
  875. + f"device?user_code={res['user_code']}",
  876. "expires_in": 600,
  877. "interval": 5,
  878. }
  879. def test_counter_part():
  880. """Test that each function has its async counter part."""
  881. openid_methods = [
  882. func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func))
  883. ]
  884. sync_methods = [
  885. method
  886. for method in openid_methods
  887. if not method.startswith("a_") and not method.startswith("_")
  888. ]
  889. async_methods = [
  890. method for method in openid_methods if iscoroutinefunction(getattr(KeycloakOpenID, method))
  891. ]
  892. for method in sync_methods:
  893. async_method = f"a_{method}"
  894. assert (async_method in openid_methods) is True
  895. sync_sign = signature(getattr(KeycloakOpenID, method))
  896. async_sign = signature(getattr(KeycloakOpenID, async_method))
  897. assert sync_sign.parameters == async_sign.parameters
  898. for async_method in async_methods:
  899. if async_method[2:].startswith("_"):
  900. continue
  901. assert async_method[2:] in sync_methods