You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

351 lines
13 KiB

  1. """Test module for KeycloakOpenID."""
  2. from unittest import mock
  3. import pytest
  4. from keycloak.authorization import Authorization
  5. from keycloak.authorization.permission import Permission
  6. from keycloak.authorization.policy import Policy
  7. from keycloak.authorization.role import Role
  8. from keycloak.connection import ConnectionManager
  9. from keycloak.exceptions import (
  10. KeycloakAuthenticationError,
  11. KeycloakAuthorizationConfigError,
  12. KeycloakDeprecationError,
  13. KeycloakInvalidTokenError,
  14. KeycloakRPTNotFound,
  15. )
  16. from keycloak.keycloak_admin import KeycloakAdmin
  17. from keycloak.keycloak_openid import KeycloakOpenID
  18. def test_keycloak_openid_init(env):
  19. """Test KeycloakOpenId's init method."""
  20. oid = KeycloakOpenID(
  21. server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
  22. realm_name="master",
  23. client_id="admin-cli",
  24. )
  25. assert oid.client_id == "admin-cli"
  26. assert oid.client_secret_key is None
  27. assert oid.realm_name == "master"
  28. assert isinstance(oid.connection, ConnectionManager)
  29. assert isinstance(oid.authorization, Authorization)
  30. def test_well_known(oid: KeycloakOpenID):
  31. """Test the well_known method."""
  32. res = oid.well_known()
  33. assert res is not None
  34. assert res != dict()
  35. for key in [
  36. "acr_values_supported",
  37. "authorization_encryption_alg_values_supported",
  38. "authorization_encryption_enc_values_supported",
  39. "authorization_endpoint",
  40. "authorization_signing_alg_values_supported",
  41. "backchannel_authentication_endpoint",
  42. "backchannel_authentication_request_signing_alg_values_supported",
  43. "backchannel_logout_session_supported",
  44. "backchannel_logout_supported",
  45. "backchannel_token_delivery_modes_supported",
  46. "check_session_iframe",
  47. "claim_types_supported",
  48. "claims_parameter_supported",
  49. "claims_supported",
  50. "code_challenge_methods_supported",
  51. "device_authorization_endpoint",
  52. "end_session_endpoint",
  53. "frontchannel_logout_session_supported",
  54. "frontchannel_logout_supported",
  55. "grant_types_supported",
  56. "id_token_encryption_alg_values_supported",
  57. "id_token_encryption_enc_values_supported",
  58. "id_token_signing_alg_values_supported",
  59. "introspection_endpoint",
  60. "introspection_endpoint_auth_methods_supported",
  61. "introspection_endpoint_auth_signing_alg_values_supported",
  62. "issuer",
  63. "jwks_uri",
  64. "mtls_endpoint_aliases",
  65. "pushed_authorization_request_endpoint",
  66. "registration_endpoint",
  67. "request_object_encryption_alg_values_supported",
  68. "request_object_encryption_enc_values_supported",
  69. "request_object_signing_alg_values_supported",
  70. "request_parameter_supported",
  71. "request_uri_parameter_supported",
  72. "require_pushed_authorization_requests",
  73. "require_request_uri_registration",
  74. "response_modes_supported",
  75. "response_types_supported",
  76. "revocation_endpoint",
  77. "revocation_endpoint_auth_methods_supported",
  78. "revocation_endpoint_auth_signing_alg_values_supported",
  79. "scopes_supported",
  80. "subject_types_supported",
  81. "tls_client_certificate_bound_access_tokens",
  82. "token_endpoint",
  83. "token_endpoint_auth_methods_supported",
  84. "token_endpoint_auth_signing_alg_values_supported",
  85. "userinfo_encryption_alg_values_supported",
  86. "userinfo_encryption_enc_values_supported",
  87. "userinfo_endpoint",
  88. "userinfo_signing_alg_values_supported",
  89. ]:
  90. assert key in res
  91. def test_auth_url(env, oid: KeycloakOpenID):
  92. """Test the auth_url method."""
  93. res = oid.auth_url(redirect_uri="http://test.test/*")
  94. assert (
  95. res
  96. == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
  97. + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
  98. + "&redirect_uri=http://test.test/*"
  99. )
  100. def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]):
  101. """Test the token method."""
  102. oid, username, password = oid_with_credentials
  103. token = oid.token(username=username, password=password)
  104. assert token == {
  105. "access_token": mock.ANY,
  106. "expires_in": 300,
  107. "not-before-policy": 0,
  108. "refresh_expires_in": 1800,
  109. "refresh_token": mock.ANY,
  110. "scope": mock.ANY,
  111. "session_state": mock.ANY,
  112. "token_type": "Bearer",
  113. }
  114. # Test with dummy totp
  115. token = oid.token(username=username, password=password, totp="123456")
  116. assert token == {
  117. "access_token": mock.ANY,
  118. "expires_in": 300,
  119. "not-before-policy": 0,
  120. "refresh_expires_in": 1800,
  121. "refresh_token": mock.ANY,
  122. "scope": mock.ANY,
  123. "session_state": mock.ANY,
  124. "token_type": "Bearer",
  125. }
  126. # Test with extra param
  127. token = oid.token(username=username, password=password, extra_param="foo")
  128. assert token == {
  129. "access_token": mock.ANY,
  130. "expires_in": 300,
  131. "not-before-policy": 0,
  132. "refresh_expires_in": 1800,
  133. "refresh_token": mock.ANY,
  134. "scope": mock.ANY,
  135. "session_state": mock.ANY,
  136. "token_type": "Bearer",
  137. }
  138. def test_exchange_token(
  139. oid_with_credentials: tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  140. ):
  141. """Test the exchange token method."""
  142. # Verify existing user
  143. oid, username, password = oid_with_credentials
  144. # Allow impersonation
  145. admin.realm_name = oid.realm_name
  146. admin.assign_client_role(
  147. user_id=admin.get_user_id(username=username),
  148. client_id=admin.get_client_id(client_name="realm-management"),
  149. roles=[
  150. admin.get_client_role(
  151. client_id=admin.get_client_id(client_name="realm-management"),
  152. role_name="impersonation",
  153. )
  154. ],
  155. )
  156. token = oid.token(username=username, password=password)
  157. assert oid.userinfo(token=token["access_token"]) == {
  158. "email": f"{username}@test.test",
  159. "email_verified": False,
  160. "preferred_username": username,
  161. "sub": mock.ANY,
  162. }
  163. # Exchange token with the new user
  164. new_token = oid.exchange_token(
  165. token=token["access_token"],
  166. client_id=oid.client_id,
  167. audience=oid.client_id,
  168. subject=username,
  169. )
  170. assert oid.userinfo(token=new_token["access_token"]) == {
  171. "email": f"{username}@test.test",
  172. "email_verified": False,
  173. "preferred_username": username,
  174. "sub": mock.ANY,
  175. }
  176. assert token != new_token
  177. def test_logout(oid_with_credentials):
  178. """Test logout."""
  179. oid, username, password = oid_with_credentials
  180. token = oid.token(username=username, password=password)
  181. assert oid.userinfo(token=token["access_token"]) != dict()
  182. assert oid.logout(refresh_token=token["refresh_token"]) == dict()
  183. with pytest.raises(KeycloakAuthenticationError):
  184. oid.userinfo(token=token["access_token"])
  185. def test_certs(oid: KeycloakOpenID):
  186. """Test certificates."""
  187. assert len(oid.certs()["keys"]) == 2
  188. def test_public_key(oid: KeycloakOpenID):
  189. """Test public key."""
  190. assert oid.public_key() is not None
  191. def test_entitlement(
  192. oid_with_credentials_authz: tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
  193. ):
  194. """Test entitlement."""
  195. oid, username, password = oid_with_credentials_authz
  196. token = oid.token(username=username, password=password)
  197. resource_server_id = admin.get_client_authz_resources(
  198. client_id=admin.get_client_id(oid.client_id)
  199. )[0]["_id"]
  200. with pytest.raises(KeycloakDeprecationError):
  201. oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
  202. def test_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]):
  203. """Test introspect."""
  204. oid, username, password = oid_with_credentials
  205. token = oid.token(username=username, password=password)
  206. assert oid.introspect(token=token["access_token"])["active"]
  207. assert oid.introspect(
  208. token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
  209. ) == {"active": False}
  210. with pytest.raises(KeycloakRPTNotFound):
  211. oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
  212. def test_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]):
  213. """Test decode token."""
  214. oid, username, password = oid_with_credentials
  215. token = oid.token(username=username, password=password)
  216. assert (
  217. oid.decode_token(
  218. token=token["access_token"],
  219. key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----",
  220. options={"verify_aud": False},
  221. )["preferred_username"]
  222. == username
  223. )
  224. def test_load_authorization_config(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]):
  225. """Test load authorization config."""
  226. oid, username, password = oid_with_credentials_authz
  227. oid.load_authorization_config(path="tests/data/authz_settings.json")
  228. assert "test-authz-rb-policy" in oid.authorization.policies
  229. assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
  230. assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
  231. assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
  232. assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
  233. assert isinstance(
  234. oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
  235. )
  236. def test_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]):
  237. """Test get policies."""
  238. oid, username, password = oid_with_credentials_authz
  239. token = oid.token(username=username, password=password)
  240. with pytest.raises(KeycloakAuthorizationConfigError):
  241. oid.get_policies(token=token["access_token"])
  242. oid.load_authorization_config(path="tests/data/authz_settings.json")
  243. assert oid.get_policies(token=token["access_token"]) is None
  244. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  245. orig_client_id = oid.client_id
  246. oid.client_id = "account"
  247. assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
  248. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  249. policy.add_role(role="account/view-profile")
  250. oid.authorization.policies["test"] = policy
  251. assert [
  252. str(x)
  253. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  254. ] == ["Policy: test (role)"]
  255. assert [
  256. repr(x)
  257. for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
  258. ] == ["<Policy: test (role)>"]
  259. oid.client_id = orig_client_id
  260. oid.logout(refresh_token=token["refresh_token"])
  261. with pytest.raises(KeycloakInvalidTokenError):
  262. oid.get_policies(token=token["access_token"])
  263. def test_get_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]):
  264. """Test get policies."""
  265. oid, username, password = oid_with_credentials_authz
  266. token = oid.token(username=username, password=password)
  267. with pytest.raises(KeycloakAuthorizationConfigError):
  268. oid.get_permissions(token=token["access_token"])
  269. oid.load_authorization_config(path="tests/data/authz_settings.json")
  270. assert oid.get_permissions(token=token["access_token"]) is None
  271. key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
  272. orig_client_id = oid.client_id
  273. oid.client_id = "account"
  274. assert (
  275. oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
  276. )
  277. policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
  278. policy.add_role(role="account/view-profile")
  279. policy.add_permission(
  280. permission=Permission(
  281. name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
  282. )
  283. )
  284. oid.authorization.policies["test"] = policy
  285. assert [
  286. str(x)
  287. for x in oid.get_permissions(
  288. token=token["access_token"], method_token_info="decode", key=key
  289. )
  290. ] == ["Permission: test-perm (resource)"]
  291. assert [
  292. repr(x)
  293. for x in oid.get_permissions(
  294. token=token["access_token"], method_token_info="decode", key=key
  295. )
  296. ] == ["<Permission: test-perm (resource)>"]
  297. oid.client_id = orig_client_id
  298. oid.logout(refresh_token=token["refresh_token"])
  299. with pytest.raises(KeycloakInvalidTokenError):
  300. oid.get_permissions(token=token["access_token"])