You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

474 lines
17 KiB

  1. """Test module for KeycloakOpenID."""
  2. from typing import Tuple
  3. from unittest import mock
  4. import pytest
  5. from keycloak.authorization import Authorization
  6. from keycloak.authorization.permission import Permission
  7. from keycloak.authorization.policy import Policy
  8. from keycloak.authorization.role import Role
  9. from keycloak.connection import ConnectionManager
  10. from keycloak.exceptions import (
  11. KeycloakAuthenticationError,
  12. KeycloakAuthorizationConfigError,
  13. KeycloakDeprecationError,
  14. KeycloakInvalidTokenError,
  15. KeycloakPostError,
  16. KeycloakRPTNotFound,
  17. )
  18. from keycloak.keycloak_admin import KeycloakAdmin
  19. from keycloak.keycloak_openid import KeycloakOpenID
  20. def test_keycloak_openid_init(env):
  21. """Test KeycloakOpenId's init method.
  22. :param env: Environment fixture
  23. :type env: KeycloakTestEnv
  24. """
  25. oid = KeycloakOpenID(
  26. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  27. realm_name="master",
  28. client_id="admin-cli",
  29. )
  30. assert oid.client_id == "admin-cli"
  31. assert oid.client_secret_key is None
  32. assert oid.realm_name == "master"
  33. assert isinstance(oid.connection, ConnectionManager)
  34. assert isinstance(oid.authorization, Authorization)
  35. def test_well_known(oid: KeycloakOpenID):
  36. """Test the well_known method.
  37. :param oid: Keycloak OpenID client
  38. :type oid: KeycloakOpenID
  39. """
  40. res = oid.well_known()
  41. assert res is not None
  42. assert res != dict()
  43. for key in [
  44. "acr_values_supported",
  45. "authorization_encryption_alg_values_supported",
  46. "authorization_encryption_enc_values_supported",
  47. "authorization_endpoint",
  48. "authorization_signing_alg_values_supported",
  49. "backchannel_authentication_endpoint",
  50. "backchannel_authentication_request_signing_alg_values_supported",
  51. "backchannel_logout_session_supported",
  52. "backchannel_logout_supported",
  53. "backchannel_token_delivery_modes_supported",
  54. "check_session_iframe",
  55. "claim_types_supported",
  56. "claims_parameter_supported",
  57. "claims_supported",
  58. "code_challenge_methods_supported",
  59. "device_authorization_endpoint",
  60. "end_session_endpoint",
  61. "frontchannel_logout_session_supported",
  62. "frontchannel_logout_supported",
  63. "grant_types_supported",
  64. "id_token_encryption_alg_values_supported",
  65. "id_token_encryption_enc_values_supported",
  66. "id_token_signing_alg_values_supported",
  67. "introspection_endpoint",
  68. "introspection_endpoint_auth_methods_supported",
  69. "introspection_endpoint_auth_signing_alg_values_supported",
  70. "issuer",
  71. "jwks_uri",
  72. "mtls_endpoint_aliases",
  73. "pushed_authorization_request_endpoint",
  74. "registration_endpoint",
  75. "request_object_encryption_alg_values_supported",
  76. "request_object_encryption_enc_values_supported",
  77. "request_object_signing_alg_values_supported",
  78. "request_parameter_supported",
  79. "request_uri_parameter_supported",
  80. "require_pushed_authorization_requests",
  81. "require_request_uri_registration",
  82. "response_modes_supported",
  83. "response_types_supported",
  84. "revocation_endpoint",
  85. "revocation_endpoint_auth_methods_supported",
  86. "revocation_endpoint_auth_signing_alg_values_supported",
  87. "scopes_supported",
  88. "subject_types_supported",
  89. "tls_client_certificate_bound_access_tokens",
  90. "token_endpoint",
  91. "token_endpoint_auth_methods_supported",
  92. "token_endpoint_auth_signing_alg_values_supported",
  93. "userinfo_encryption_alg_values_supported",
  94. "userinfo_encryption_enc_values_supported",
  95. "userinfo_endpoint",
  96. "userinfo_signing_alg_values_supported",
  97. ]:
  98. assert key in res
  99. def test_auth_url(env, oid: KeycloakOpenID):
  100. """Test the auth_url method.
  101. :param env: Environment fixture
  102. :type env: KeycloakTestEnv
  103. :param oid: Keycloak OpenID client
  104. :type oid: KeycloakOpenID
  105. """
  106. res = oid.auth_url(redirect_uri="http://test.test/*")
  107. assert (
  108. res
  109. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  110. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  111. + "&redirect_uri=http://test.test/*&scope=email&state="
  112. )
  113. def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  114. """Test the token method.
  115. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  116. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  117. """
  118. oid, username, password = oid_with_credentials
  119. token = oid.token(username=username, password=password)
  120. assert token == {
  121. "access_token": mock.ANY,
  122. "expires_in": 300,
  123. "id_token": mock.ANY,
  124. "not-before-policy": 0,
  125. "refresh_expires_in": 1800,
  126. "refresh_token": mock.ANY,
  127. "scope": mock.ANY,
  128. "session_state": mock.ANY,
  129. "token_type": "Bearer",
  130. }
  131. # Test with dummy totp
  132. token = oid.token(username=username, password=password, totp="123456")
  133. assert token == {
  134. "access_token": mock.ANY,
  135. "expires_in": 300,
  136. "id_token": mock.ANY,
  137. "not-before-policy": 0,
  138. "refresh_expires_in": 1800,
  139. "refresh_token": mock.ANY,
  140. "scope": mock.ANY,
  141. "session_state": mock.ANY,
  142. "token_type": "Bearer",
  143. }
  144. # Test with extra param
  145. token = oid.token(username=username, password=password, extra_param="foo")
  146. assert token == {
  147. "access_token": mock.ANY,
  148. "expires_in": 300,
  149. "id_token": mock.ANY,
  150. "not-before-policy": 0,
  151. "refresh_expires_in": 1800,
  152. "refresh_token": mock.ANY,
  153. "scope": mock.ANY,
  154. "session_state": mock.ANY,
  155. "token_type": "Bearer",
  156. }
  157. def test_exchange_token(
  158. oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  159. ):
  160. """Test the exchange token method.
  161. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  162. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  163. :param admin: Keycloak Admin client
  164. :type admin: KeycloakAdmin
  165. """
  166. # Verify existing user
  167. oid, username, password = oid_with_credentials
  168. # Allow impersonation
  169. admin.realm_name = oid.realm_name
  170. admin.assign_client_role(
  171. user_id=admin.get_user_id(username=username),
  172. client_id=admin.get_client_id(client_id="realm-management"),
  173. roles=[
  174. admin.get_client_role(
  175. client_id=admin.get_client_id(client_id="realm-management"),
  176. role_name="impersonation",
  177. )
  178. ],
  179. )
  180. token = oid.token(username=username, password=password)
  181. assert oid.userinfo(token=token["access_token"]) == {
  182. "email": f"{username}@test.test",
  183. "email_verified": False,
  184. "preferred_username": username,
  185. "sub": mock.ANY,
  186. }
  187. # Exchange token with the new user
  188. new_token = oid.exchange_token(
  189. token=token["access_token"],
  190. client_id=oid.client_id,
  191. audience=oid.client_id,
  192. subject=username,
  193. )
  194. assert oid.userinfo(token=new_token["access_token"]) == {
  195. "email": f"{username}@test.test",
  196. "email_verified": False,
  197. "preferred_username": username,
  198. "sub": mock.ANY,
  199. }
  200. assert token != new_token
  201. def test_logout(oid_with_credentials):
  202. """Test logout.
  203. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  204. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  205. """
  206. oid, username, password = oid_with_credentials
  207. token = oid.token(username=username, password=password)
  208. assert oid.userinfo(token=token["access_token"]) != dict()
  209. assert oid.logout(refresh_token=token["refresh_token"]) == dict()
  210. with pytest.raises(KeycloakAuthenticationError):
  211. oid.userinfo(token=token["access_token"])
  212. def test_certs(oid: KeycloakOpenID):
  213. """Test certificates.
  214. :param oid: Keycloak OpenID client
  215. :type oid: KeycloakOpenID
  216. """
  217. assert len(oid.certs()["keys"]) == 2
  218. def test_public_key(oid: KeycloakOpenID):
  219. """Test public key.
  220. :param oid: Keycloak OpenID client
  221. :type oid: KeycloakOpenID
  222. """
  223. assert oid.public_key() is not None
  224. def test_entitlement(
  225. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  226. ):
  227. """Test entitlement.
  228. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  229. server with client credentials
  230. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  231. :param admin: Keycloak Admin client
  232. :type admin: KeycloakAdmin
  233. """
  234. oid, username, password = oid_with_credentials_authz
  235. token = oid.token(username=username, password=password)
  236. resource_server_id = admin.get_client_authz_resources(
  237. client_id=admin.get_client_id(oid.client_id)
  238. )[0]["_id"]
  239. with pytest.raises(KeycloakDeprecationError):
  240. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  241. def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  242. """Test introspect.
  243. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  244. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  245. """
  246. oid, username, password = oid_with_credentials
  247. token = oid.token(username=username, password=password)
  248. assert oid.introspect(token=token["access_token"])["active"]
  249. assert oid.introspect(
  250. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  251. ) == {"active": False}
  252. with pytest.raises(KeycloakRPTNotFound):
  253. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
  254. def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  255. """Test decode token.
  256. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
  257. :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
  258. """
  259. oid, username, password = oid_with_credentials
  260. token = oid.token(username=username, password=password)
  261. assert (
  262. oid.decode_token(
  263. token=token["access_token"],
  264. key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----",
  265. options={"verify_aud": False},
  266. )["preferred_username"]
  267. == username
  268. )
  269. def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  270. """Test load authorization config.
  271. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  272. server with client credentials
  273. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  274. """
  275. oid, username, password = oid_with_credentials_authz
  276. oid.load_authorization_config(path="tests/data/authz_settings.json")
  277. assert "test-authz-rb-policy" in oid.authorization.policies
  278. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  279. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  280. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  281. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  282. assert isinstance(
  283. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  284. )
  285. def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  286. """Test get policies.
  287. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  288. server with client credentials
  289. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  290. """
  291. oid, username, password = oid_with_credentials_authz
  292. token = oid.token(username=username, password=password)
  293. with pytest.raises(KeycloakAuthorizationConfigError):
  294. oid.get_policies(token=token["access_token"])
  295. oid.load_authorization_config(path="tests/data/authz_settings.json")
  296. assert oid.get_policies(token=token["access_token"]) is None
  297. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  298. orig_client_id = oid.client_id
  299. oid.client_id = "account"
  300. assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
  301. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  302. policy.add_role(role="account/view-profile")
  303. oid.authorization.policies["test"] = policy
  304. assert [
  305. str(x)
  306. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  307. ] == ["Policy: test (role)"]
  308. assert [
  309. repr(x)
  310. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  311. ] == ["<Policy: test (role)>"]
  312. oid.client_id = orig_client_id
  313. oid.logout(refresh_token=token["refresh_token"])
  314. with pytest.raises(KeycloakInvalidTokenError):
  315. oid.get_policies(token=token["access_token"])
  316. def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  317. """Test get policies.
  318. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  319. server with client credentials
  320. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  321. """
  322. oid, username, password = oid_with_credentials_authz
  323. token = oid.token(username=username, password=password)
  324. with pytest.raises(KeycloakAuthorizationConfigError):
  325. oid.get_permissions(token=token["access_token"])
  326. oid.load_authorization_config(path="tests/data/authz_settings.json")
  327. assert oid.get_permissions(token=token["access_token"]) is None
  328. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  329. orig_client_id = oid.client_id
  330. oid.client_id = "account"
  331. assert (
  332. oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
  333. )
  334. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  335. policy.add_role(role="account/view-profile")
  336. policy.add_permission(
  337. permission=Permission(
  338. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  339. )
  340. )
  341. oid.authorization.policies["test"] = policy
  342. assert [
  343. str(x)
  344. for x in oid.get_permissions(
  345. token=token["access_token"], method_token_info="decode", key=key
  346. )
  347. ] == ["Permission: test-perm (resource)"]
  348. assert [
  349. repr(x)
  350. for x in oid.get_permissions(
  351. token=token["access_token"], method_token_info="decode", key=key
  352. )
  353. ] == ["<Permission: test-perm (resource)>"]
  354. oid.client_id = orig_client_id
  355. oid.logout(refresh_token=token["refresh_token"])
  356. with pytest.raises(KeycloakInvalidTokenError):
  357. oid.get_permissions(token=token["access_token"])
  358. def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  359. """Test UMA permissions.
  360. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  361. server with client credentials
  362. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  363. """
  364. oid, username, password = oid_with_credentials_authz
  365. token = oid.token(username=username, password=password)
  366. assert len(oid.uma_permissions(token=token["access_token"])) == 1
  367. assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
  368. def test_has_uma_access(
  369. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  370. ):
  371. """Test has UMA access.
  372. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
  373. server with client credentials
  374. :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
  375. :param admin: Keycloak Admin client
  376. :type admin: KeycloakAdmin
  377. """
  378. oid, username, password = oid_with_credentials_authz
  379. token = oid.token(username=username, password=password)
  380. assert (
  381. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  382. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  383. )
  384. assert (
  385. str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
  386. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  387. )
  388. with pytest.raises(KeycloakPostError):
  389. oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
  390. oid.logout(refresh_token=token["refresh_token"])
  391. assert (
  392. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  393. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  394. )
  395. assert (
  396. str(oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource"))
  397. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  398. + "{'Default Resource'})"
  399. )