You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

393 lines
14 KiB

  1. """Test module for KeycloakOpenID."""
  2. from typing import Tuple
  3. from unittest import mock
  4. import pytest
  5. from keycloak.authorization import Authorization
  6. from keycloak.authorization.permission import Permission
  7. from keycloak.authorization.policy import Policy
  8. from keycloak.authorization.role import Role
  9. from keycloak.connection import ConnectionManager
  10. from keycloak.exceptions import (
  11. KeycloakAuthenticationError,
  12. KeycloakAuthorizationConfigError,
  13. KeycloakDeprecationError,
  14. KeycloakInvalidTokenError,
  15. KeycloakPostError,
  16. KeycloakRPTNotFound,
  17. )
  18. from keycloak.keycloak_admin import KeycloakAdmin
  19. from keycloak.keycloak_openid import KeycloakOpenID
  20. def test_keycloak_openid_init(env):
  21. """Test KeycloakOpenId's init method."""
  22. oid = KeycloakOpenID(
  23. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  24. realm_name="master",
  25. client_id="admin-cli",
  26. )
  27. assert oid.client_id == "admin-cli"
  28. assert oid.client_secret_key is None
  29. assert oid.realm_name == "master"
  30. assert isinstance(oid.connection, ConnectionManager)
  31. assert isinstance(oid.authorization, Authorization)
  32. def test_well_known(oid: KeycloakOpenID):
  33. """Test the well_known method."""
  34. res = oid.well_known()
  35. assert res is not None
  36. assert res != dict()
  37. for key in [
  38. "acr_values_supported",
  39. "authorization_encryption_alg_values_supported",
  40. "authorization_encryption_enc_values_supported",
  41. "authorization_endpoint",
  42. "authorization_signing_alg_values_supported",
  43. "backchannel_authentication_endpoint",
  44. "backchannel_authentication_request_signing_alg_values_supported",
  45. "backchannel_logout_session_supported",
  46. "backchannel_logout_supported",
  47. "backchannel_token_delivery_modes_supported",
  48. "check_session_iframe",
  49. "claim_types_supported",
  50. "claims_parameter_supported",
  51. "claims_supported",
  52. "code_challenge_methods_supported",
  53. "device_authorization_endpoint",
  54. "end_session_endpoint",
  55. "frontchannel_logout_session_supported",
  56. "frontchannel_logout_supported",
  57. "grant_types_supported",
  58. "id_token_encryption_alg_values_supported",
  59. "id_token_encryption_enc_values_supported",
  60. "id_token_signing_alg_values_supported",
  61. "introspection_endpoint",
  62. "introspection_endpoint_auth_methods_supported",
  63. "introspection_endpoint_auth_signing_alg_values_supported",
  64. "issuer",
  65. "jwks_uri",
  66. "mtls_endpoint_aliases",
  67. "pushed_authorization_request_endpoint",
  68. "registration_endpoint",
  69. "request_object_encryption_alg_values_supported",
  70. "request_object_encryption_enc_values_supported",
  71. "request_object_signing_alg_values_supported",
  72. "request_parameter_supported",
  73. "request_uri_parameter_supported",
  74. "require_pushed_authorization_requests",
  75. "require_request_uri_registration",
  76. "response_modes_supported",
  77. "response_types_supported",
  78. "revocation_endpoint",
  79. "revocation_endpoint_auth_methods_supported",
  80. "revocation_endpoint_auth_signing_alg_values_supported",
  81. "scopes_supported",
  82. "subject_types_supported",
  83. "tls_client_certificate_bound_access_tokens",
  84. "token_endpoint",
  85. "token_endpoint_auth_methods_supported",
  86. "token_endpoint_auth_signing_alg_values_supported",
  87. "userinfo_encryption_alg_values_supported",
  88. "userinfo_encryption_enc_values_supported",
  89. "userinfo_endpoint",
  90. "userinfo_signing_alg_values_supported",
  91. ]:
  92. assert key in res
  93. def test_auth_url(env, oid: KeycloakOpenID):
  94. """Test the auth_url method."""
  95. res = oid.auth_url(redirect_uri="http://test.test/*")
  96. assert (
  97. res
  98. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  99. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  100. + "&redirect_uri=http://test.test/*&scope=email&state="
  101. )
  102. def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  103. """Test the token method."""
  104. oid, username, password = oid_with_credentials
  105. token = oid.token(username=username, password=password)
  106. assert token == {
  107. "access_token": mock.ANY,
  108. "expires_in": 300,
  109. "not-before-policy": 0,
  110. "refresh_expires_in": 1800,
  111. "refresh_token": mock.ANY,
  112. "scope": mock.ANY,
  113. "session_state": mock.ANY,
  114. "token_type": "Bearer",
  115. }
  116. # Test with dummy totp
  117. token = oid.token(username=username, password=password, totp="123456")
  118. assert token == {
  119. "access_token": mock.ANY,
  120. "expires_in": 300,
  121. "not-before-policy": 0,
  122. "refresh_expires_in": 1800,
  123. "refresh_token": mock.ANY,
  124. "scope": mock.ANY,
  125. "session_state": mock.ANY,
  126. "token_type": "Bearer",
  127. }
  128. # Test with extra param
  129. token = oid.token(username=username, password=password, extra_param="foo")
  130. assert token == {
  131. "access_token": mock.ANY,
  132. "expires_in": 300,
  133. "not-before-policy": 0,
  134. "refresh_expires_in": 1800,
  135. "refresh_token": mock.ANY,
  136. "scope": mock.ANY,
  137. "session_state": mock.ANY,
  138. "token_type": "Bearer",
  139. }
  140. def test_exchange_token(
  141. oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  142. ):
  143. """Test the exchange token method."""
  144. # Verify existing user
  145. oid, username, password = oid_with_credentials
  146. # Allow impersonation
  147. admin.realm_name = oid.realm_name
  148. admin.assign_client_role(
  149. user_id=admin.get_user_id(username=username),
  150. client_id=admin.get_client_id(client_id="realm-management"),
  151. roles=[
  152. admin.get_client_role(
  153. client_id=admin.get_client_id(client_id="realm-management"),
  154. role_name="impersonation",
  155. )
  156. ],
  157. )
  158. token = oid.token(username=username, password=password)
  159. assert oid.userinfo(token=token["access_token"]) == {
  160. "email": f"{username}@test.test",
  161. "email_verified": False,
  162. "preferred_username": username,
  163. "sub": mock.ANY,
  164. }
  165. # Exchange token with the new user
  166. new_token = oid.exchange_token(
  167. token=token["access_token"],
  168. client_id=oid.client_id,
  169. audience=oid.client_id,
  170. subject=username,
  171. )
  172. assert oid.userinfo(token=new_token["access_token"]) == {
  173. "email": f"{username}@test.test",
  174. "email_verified": False,
  175. "preferred_username": username,
  176. "sub": mock.ANY,
  177. }
  178. assert token != new_token
  179. def test_logout(oid_with_credentials):
  180. """Test logout."""
  181. oid, username, password = oid_with_credentials
  182. token = oid.token(username=username, password=password)
  183. assert oid.userinfo(token=token["access_token"]) != dict()
  184. assert oid.logout(refresh_token=token["refresh_token"]) == dict()
  185. with pytest.raises(KeycloakAuthenticationError):
  186. oid.userinfo(token=token["access_token"])
  187. def test_certs(oid: KeycloakOpenID):
  188. """Test certificates."""
  189. assert len(oid.certs()["keys"]) == 2
  190. def test_public_key(oid: KeycloakOpenID):
  191. """Test public key."""
  192. assert oid.public_key() is not None
  193. def test_entitlement(
  194. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  195. ):
  196. """Test entitlement."""
  197. oid, username, password = oid_with_credentials_authz
  198. token = oid.token(username=username, password=password)
  199. resource_server_id = admin.get_client_authz_resources(
  200. client_id=admin.get_client_id(oid.client_id)
  201. )[0]["_id"]
  202. with pytest.raises(KeycloakDeprecationError):
  203. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  204. def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  205. """Test introspect."""
  206. oid, username, password = oid_with_credentials
  207. token = oid.token(username=username, password=password)
  208. assert oid.introspect(token=token["access_token"])["active"]
  209. assert oid.introspect(
  210. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  211. ) == {"active": False}
  212. with pytest.raises(KeycloakRPTNotFound):
  213. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
  214. def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
  215. """Test decode token."""
  216. oid, username, password = oid_with_credentials
  217. token = oid.token(username=username, password=password)
  218. assert (
  219. oid.decode_token(
  220. token=token["access_token"],
  221. key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----",
  222. options={"verify_aud": False},
  223. )["preferred_username"]
  224. == username
  225. )
  226. def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  227. """Test load authorization config."""
  228. oid, username, password = oid_with_credentials_authz
  229. oid.load_authorization_config(path="tests/data/authz_settings.json")
  230. assert "test-authz-rb-policy" in oid.authorization.policies
  231. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  232. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  233. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  234. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  235. assert isinstance(
  236. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  237. )
  238. def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  239. """Test get policies."""
  240. oid, username, password = oid_with_credentials_authz
  241. token = oid.token(username=username, password=password)
  242. with pytest.raises(KeycloakAuthorizationConfigError):
  243. oid.get_policies(token=token["access_token"])
  244. oid.load_authorization_config(path="tests/data/authz_settings.json")
  245. assert oid.get_policies(token=token["access_token"]) is None
  246. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  247. orig_client_id = oid.client_id
  248. oid.client_id = "account"
  249. assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
  250. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  251. policy.add_role(role="account/view-profile")
  252. oid.authorization.policies["test"] = policy
  253. assert [
  254. str(x)
  255. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  256. ] == ["Policy: test (role)"]
  257. assert [
  258. repr(x)
  259. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  260. ] == ["<Policy: test (role)>"]
  261. oid.client_id = orig_client_id
  262. oid.logout(refresh_token=token["refresh_token"])
  263. with pytest.raises(KeycloakInvalidTokenError):
  264. oid.get_policies(token=token["access_token"])
  265. def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  266. """Test get policies."""
  267. oid, username, password = oid_with_credentials_authz
  268. token = oid.token(username=username, password=password)
  269. with pytest.raises(KeycloakAuthorizationConfigError):
  270. oid.get_permissions(token=token["access_token"])
  271. oid.load_authorization_config(path="tests/data/authz_settings.json")
  272. assert oid.get_permissions(token=token["access_token"]) is None
  273. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  274. orig_client_id = oid.client_id
  275. oid.client_id = "account"
  276. assert (
  277. oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
  278. )
  279. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  280. policy.add_role(role="account/view-profile")
  281. policy.add_permission(
  282. permission=Permission(
  283. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  284. )
  285. )
  286. oid.authorization.policies["test"] = policy
  287. assert [
  288. str(x)
  289. for x in oid.get_permissions(
  290. token=token["access_token"], method_token_info="decode", key=key
  291. )
  292. ] == ["Permission: test-perm (resource)"]
  293. assert [
  294. repr(x)
  295. for x in oid.get_permissions(
  296. token=token["access_token"], method_token_info="decode", key=key
  297. )
  298. ] == ["<Permission: test-perm (resource)>"]
  299. oid.client_id = orig_client_id
  300. oid.logout(refresh_token=token["refresh_token"])
  301. with pytest.raises(KeycloakInvalidTokenError):
  302. oid.get_permissions(token=token["access_token"])
  303. def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
  304. """Test UMA permissions."""
  305. oid, username, password = oid_with_credentials_authz
  306. token = oid.token(username=username, password=password)
  307. assert len(oid.uma_permissions(token=token["access_token"])) == 1
  308. assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
  309. def test_has_uma_access(
  310. oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  311. ):
  312. """Test has UMA access."""
  313. oid, username, password = oid_with_credentials_authz
  314. token = oid.token(username=username, password=password)
  315. assert (
  316. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  317. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  318. )
  319. assert (
  320. str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
  321. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  322. )
  323. with pytest.raises(KeycloakPostError):
  324. oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
  325. oid.logout(refresh_token=token["refresh_token"])
  326. assert (
  327. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  328. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  329. )
  330. assert (
  331. str(oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource"))
  332. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  333. + "{'Default Resource'})"
  334. )