You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

392 lines
14 KiB

  1. """Test module for KeycloakOpenID."""
  2. from unittest import mock
  3. import pytest
  4. from keycloak.authorization import Authorization
  5. from keycloak.authorization.permission import Permission
  6. from keycloak.authorization.policy import Policy
  7. from keycloak.authorization.role import Role
  8. from keycloak.connection import ConnectionManager
  9. from keycloak.exceptions import (
  10. KeycloakAuthenticationError,
  11. KeycloakAuthorizationConfigError,
  12. KeycloakDeprecationError,
  13. KeycloakInvalidTokenError,
  14. KeycloakPostError,
  15. KeycloakRPTNotFound,
  16. )
  17. from keycloak.keycloak_admin import KeycloakAdmin
  18. from keycloak.keycloak_openid import KeycloakOpenID
  19. def test_keycloak_openid_init(env):
  20. """Test KeycloakOpenId's init method."""
  21. oid = KeycloakOpenID(
  22. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  23. realm_name="master",
  24. client_id="admin-cli",
  25. )
  26. assert oid.client_id == "admin-cli"
  27. assert oid.client_secret_key is None
  28. assert oid.realm_name == "master"
  29. assert isinstance(oid.connection, ConnectionManager)
  30. assert isinstance(oid.authorization, Authorization)
  31. def test_well_known(oid: KeycloakOpenID):
  32. """Test the well_known method."""
  33. res = oid.well_known()
  34. assert res is not None
  35. assert res != dict()
  36. for key in [
  37. "acr_values_supported",
  38. "authorization_encryption_alg_values_supported",
  39. "authorization_encryption_enc_values_supported",
  40. "authorization_endpoint",
  41. "authorization_signing_alg_values_supported",
  42. "backchannel_authentication_endpoint",
  43. "backchannel_authentication_request_signing_alg_values_supported",
  44. "backchannel_logout_session_supported",
  45. "backchannel_logout_supported",
  46. "backchannel_token_delivery_modes_supported",
  47. "check_session_iframe",
  48. "claim_types_supported",
  49. "claims_parameter_supported",
  50. "claims_supported",
  51. "code_challenge_methods_supported",
  52. "device_authorization_endpoint",
  53. "end_session_endpoint",
  54. "frontchannel_logout_session_supported",
  55. "frontchannel_logout_supported",
  56. "grant_types_supported",
  57. "id_token_encryption_alg_values_supported",
  58. "id_token_encryption_enc_values_supported",
  59. "id_token_signing_alg_values_supported",
  60. "introspection_endpoint",
  61. "introspection_endpoint_auth_methods_supported",
  62. "introspection_endpoint_auth_signing_alg_values_supported",
  63. "issuer",
  64. "jwks_uri",
  65. "mtls_endpoint_aliases",
  66. "pushed_authorization_request_endpoint",
  67. "registration_endpoint",
  68. "request_object_encryption_alg_values_supported",
  69. "request_object_encryption_enc_values_supported",
  70. "request_object_signing_alg_values_supported",
  71. "request_parameter_supported",
  72. "request_uri_parameter_supported",
  73. "require_pushed_authorization_requests",
  74. "require_request_uri_registration",
  75. "response_modes_supported",
  76. "response_types_supported",
  77. "revocation_endpoint",
  78. "revocation_endpoint_auth_methods_supported",
  79. "revocation_endpoint_auth_signing_alg_values_supported",
  80. "scopes_supported",
  81. "subject_types_supported",
  82. "tls_client_certificate_bound_access_tokens",
  83. "token_endpoint",
  84. "token_endpoint_auth_methods_supported",
  85. "token_endpoint_auth_signing_alg_values_supported",
  86. "userinfo_encryption_alg_values_supported",
  87. "userinfo_encryption_enc_values_supported",
  88. "userinfo_endpoint",
  89. "userinfo_signing_alg_values_supported",
  90. ]:
  91. assert key in res
  92. def test_auth_url(env, oid: KeycloakOpenID):
  93. """Test the auth_url method."""
  94. res = oid.auth_url(redirect_uri="http://test.test/*")
  95. assert (
  96. res
  97. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  98. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  99. + "&redirect_uri=http://test.test/*"
  100. )
  101. def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]):
  102. """Test the token method."""
  103. oid, username, password = oid_with_credentials
  104. token = oid.token(username=username, password=password)
  105. assert token == {
  106. "access_token": mock.ANY,
  107. "expires_in": 300,
  108. "not-before-policy": 0,
  109. "refresh_expires_in": 1800,
  110. "refresh_token": mock.ANY,
  111. "scope": mock.ANY,
  112. "session_state": mock.ANY,
  113. "token_type": "Bearer",
  114. }
  115. # Test with dummy totp
  116. token = oid.token(username=username, password=password, totp="123456")
  117. assert token == {
  118. "access_token": mock.ANY,
  119. "expires_in": 300,
  120. "not-before-policy": 0,
  121. "refresh_expires_in": 1800,
  122. "refresh_token": mock.ANY,
  123. "scope": mock.ANY,
  124. "session_state": mock.ANY,
  125. "token_type": "Bearer",
  126. }
  127. # Test with extra param
  128. token = oid.token(username=username, password=password, extra_param="foo")
  129. assert token == {
  130. "access_token": mock.ANY,
  131. "expires_in": 300,
  132. "not-before-policy": 0,
  133. "refresh_expires_in": 1800,
  134. "refresh_token": mock.ANY,
  135. "scope": mock.ANY,
  136. "session_state": mock.ANY,
  137. "token_type": "Bearer",
  138. }
  139. def test_exchange_token(
  140. oid_with_credentials: tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  141. ):
  142. """Test the exchange token method."""
  143. # Verify existing user
  144. oid, username, password = oid_with_credentials
  145. # Allow impersonation
  146. admin.realm_name = oid.realm_name
  147. admin.assign_client_role(
  148. user_id=admin.get_user_id(username=username),
  149. client_id=admin.get_client_id(client_name="realm-management"),
  150. roles=[
  151. admin.get_client_role(
  152. client_id=admin.get_client_id(client_name="realm-management"),
  153. role_name="impersonation",
  154. )
  155. ],
  156. )
  157. token = oid.token(username=username, password=password)
  158. assert oid.userinfo(token=token["access_token"]) == {
  159. "email": f"{username}@test.test",
  160. "email_verified": False,
  161. "preferred_username": username,
  162. "sub": mock.ANY,
  163. }
  164. # Exchange token with the new user
  165. new_token = oid.exchange_token(
  166. token=token["access_token"],
  167. client_id=oid.client_id,
  168. audience=oid.client_id,
  169. subject=username,
  170. )
  171. assert oid.userinfo(token=new_token["access_token"]) == {
  172. "email": f"{username}@test.test",
  173. "email_verified": False,
  174. "preferred_username": username,
  175. "sub": mock.ANY,
  176. }
  177. assert token != new_token
  178. def test_logout(oid_with_credentials):
  179. """Test logout."""
  180. oid, username, password = oid_with_credentials
  181. token = oid.token(username=username, password=password)
  182. assert oid.userinfo(token=token["access_token"]) != dict()
  183. assert oid.logout(refresh_token=token["refresh_token"]) == dict()
  184. with pytest.raises(KeycloakAuthenticationError):
  185. oid.userinfo(token=token["access_token"])
  186. def test_certs(oid: KeycloakOpenID):
  187. """Test certificates."""
  188. assert len(oid.certs()["keys"]) == 2
  189. def test_public_key(oid: KeycloakOpenID):
  190. """Test public key."""
  191. assert oid.public_key() is not None
  192. def test_entitlement(
  193. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  194. ):
  195. """Test entitlement."""
  196. oid, username, password = oid_with_credentials_authz
  197. token = oid.token(username=username, password=password)
  198. resource_server_id = admin.get_client_authz_resources(
  199. client_id=admin.get_client_id(oid.client_id)
  200. )[0]["_id"]
  201. with pytest.raises(KeycloakDeprecationError):
  202. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  203. def test_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]):
  204. """Test introspect."""
  205. oid, username, password = oid_with_credentials
  206. token = oid.token(username=username, password=password)
  207. assert oid.introspect(token=token["access_token"])["active"]
  208. assert oid.introspect(
  209. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  210. ) == {"active": False}
  211. with pytest.raises(KeycloakRPTNotFound):
  212. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
  213. def test_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]):
  214. """Test decode token."""
  215. oid, username, password = oid_with_credentials
  216. token = oid.token(username=username, password=password)
  217. assert (
  218. oid.decode_token(
  219. token=token["access_token"],
  220. key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----",
  221. options={"verify_aud": False},
  222. )["preferred_username"]
  223. == username
  224. )
  225. def test_load_authorization_config(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]):
  226. """Test load authorization config."""
  227. oid, username, password = oid_with_credentials_authz
  228. oid.load_authorization_config(path="tests/data/authz_settings.json")
  229. assert "test-authz-rb-policy" in oid.authorization.policies
  230. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  231. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  232. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  233. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  234. assert isinstance(
  235. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  236. )
  237. def test_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]):
  238. """Test get policies."""
  239. oid, username, password = oid_with_credentials_authz
  240. token = oid.token(username=username, password=password)
  241. with pytest.raises(KeycloakAuthorizationConfigError):
  242. oid.get_policies(token=token["access_token"])
  243. oid.load_authorization_config(path="tests/data/authz_settings.json")
  244. assert oid.get_policies(token=token["access_token"]) is None
  245. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  246. orig_client_id = oid.client_id
  247. oid.client_id = "account"
  248. assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
  249. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  250. policy.add_role(role="account/view-profile")
  251. oid.authorization.policies["test"] = policy
  252. assert [
  253. str(x)
  254. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  255. ] == ["Policy: test (role)"]
  256. assert [
  257. repr(x)
  258. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  259. ] == ["<Policy: test (role)>"]
  260. oid.client_id = orig_client_id
  261. oid.logout(refresh_token=token["refresh_token"])
  262. with pytest.raises(KeycloakInvalidTokenError):
  263. oid.get_policies(token=token["access_token"])
  264. def test_get_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]):
  265. """Test get policies."""
  266. oid, username, password = oid_with_credentials_authz
  267. token = oid.token(username=username, password=password)
  268. with pytest.raises(KeycloakAuthorizationConfigError):
  269. oid.get_permissions(token=token["access_token"])
  270. oid.load_authorization_config(path="tests/data/authz_settings.json")
  271. assert oid.get_permissions(token=token["access_token"]) is None
  272. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  273. orig_client_id = oid.client_id
  274. oid.client_id = "account"
  275. assert (
  276. oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
  277. )
  278. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  279. policy.add_role(role="account/view-profile")
  280. policy.add_permission(
  281. permission=Permission(
  282. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  283. )
  284. )
  285. oid.authorization.policies["test"] = policy
  286. assert [
  287. str(x)
  288. for x in oid.get_permissions(
  289. token=token["access_token"], method_token_info="decode", key=key
  290. )
  291. ] == ["Permission: test-perm (resource)"]
  292. assert [
  293. repr(x)
  294. for x in oid.get_permissions(
  295. token=token["access_token"], method_token_info="decode", key=key
  296. )
  297. ] == ["<Permission: test-perm (resource)>"]
  298. oid.client_id = orig_client_id
  299. oid.logout(refresh_token=token["refresh_token"])
  300. with pytest.raises(KeycloakInvalidTokenError):
  301. oid.get_permissions(token=token["access_token"])
  302. def test_uma_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]):
  303. """Test UMA permissions."""
  304. oid, username, password = oid_with_credentials_authz
  305. token = oid.token(username=username, password=password)
  306. assert len(oid.uma_permissions(token=token["access_token"])) == 1
  307. assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
  308. def test_has_uma_access(
  309. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  310. ):
  311. """Test has UMA access."""
  312. oid, username, password = oid_with_credentials_authz
  313. token = oid.token(username=username, password=password)
  314. assert (
  315. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  316. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  317. )
  318. assert (
  319. str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
  320. == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
  321. )
  322. with pytest.raises(KeycloakPostError):
  323. oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
  324. oid.logout(refresh_token=token["refresh_token"])
  325. assert (
  326. str(oid.has_uma_access(token=token["access_token"], permissions=""))
  327. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
  328. )
  329. assert (
  330. str(oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource"))
  331. == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
  332. + "{'Default Resource'})"
  333. )