From 4833a6ffc7d7f43de130cfbbb18639b80b838135 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 14 May 2024 15:42:26 -0500 Subject: [PATCH] update docstring and write async tests for keycloak openid class --- src/keycloak/keycloak_openid.py | 44 ++-- src/keycloak/keycloak_uma.py | 24 +- tests/test_keycloak_openid.py | 451 ++++++++++++++++++++++++++++++++ 3 files changed, 485 insertions(+), 34 deletions(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 49abbc9..c09bb60 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -605,7 +605,7 @@ class KeycloakOpenID: return list(set(policies)) def get_permissions(self, token, method_token_info="introspect", **kwargs): - """Get permission by user token. + """Get permission by user token . :param token: user token :type token: str @@ -620,7 +620,7 @@ class KeycloakOpenID: """ if not self.authorization.policies: raise KeycloakAuthorizationConfigError( - "Keycloak settings not found. Load Authorization Keycloak settings." + "Keycloak settings not found. Load Authorization Keycloak settings ." ) token_info = self._token_info(token, method_token_info, **kwargs) @@ -785,7 +785,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPutError) async def a_well_known(self): - """Get the well_known object. + """Get the well_known object asynchronously. The most important endpoint to understand is the well-known configuration endpoint. It lists endpoints and other configuration options relevant to @@ -799,7 +799,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakGetError) async def a_auth_url(self, redirect_uri, scope="email", state=""): - """Get authorization URL endpoint. + """Get authorization URL endpoint asynchronously. :param redirect_uri: Redirect url to receive oauth code :type redirect_uri: str @@ -830,7 +830,7 @@ class KeycloakOpenID: scope="openid", **extra ): - """Retrieve user token. + """Retrieve user token asynchronously. The token endpoint is used to obtain tokens. Tokens can either be obtained by exchanging an authorization code or by supplying credentials directly depending on @@ -879,7 +879,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPostError) async def a_refresh_token(self, refresh_token, grant_type=["refresh_token"]): - """Refresh the user token. + """Refresh the user token asynchronously. The token endpoint is used to obtain tokens. Tokens can either be obtained by exchanging an authorization code or by supplying credentials directly depending on @@ -916,7 +916,7 @@ class KeycloakOpenID: requested_token_type: str = "urn:ietf:params:oauth:token-type:refresh_token", scope: str = "openid", ) -> dict: - """Exchange user token. + """Exchange user token asynchronously. Use a token to obtain an entirely different token. See https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange @@ -958,7 +958,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPostError) async def a_userinfo(self, token): - """Get the user info object. + """Get the user info object asynchronously. The userinfo endpoint returns standard claims about the authenticated user, and is protected by a bearer token. @@ -976,7 +976,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakGetError) async def a_logout(self, refresh_token): - """Log out the authenticated user. + """Log out the authenticated user asynchronously. :param refresh_token: Refresh token from Keycloak :type refresh_token: str @@ -990,7 +990,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) async def a_certs(self): - """Get certificates. + """Get certificates asynchronously. The certificate endpoint returns the public keys enabled by the realm, encoded as a JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled @@ -1006,7 +1006,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakGetError) async def a_public_key(self): - """Retrieve the public key. + """Retrieve the public key asynchronously. The public key is exposed by the realm page directly. @@ -1018,7 +1018,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] async def a_entitlement(self, token, resource_server_id): - """Get entitlements from the token. + """Get entitlements from the token asynchronously. Client applications can use a specific endpoint to obtain a special security token called a requesting party token (RPT). This token consists of all the entitlements @@ -1043,7 +1043,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover async def a_introspect(self, token, rpt=None, token_type_hint=None): - """Introspect the user token. + """Introspect the user token asynchronously. The introspection endpoint is used to retrieve the active state of a token. It is can only be invoked by confidential clients. @@ -1077,7 +1077,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPostError) async def a_decode_token(self, token, validate: bool = True, **kwargs): - """Decode user token. + """Decode user token asynchronously. A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data structure that represents a cryptographic key. This specification @@ -1116,7 +1116,7 @@ class KeycloakOpenID: return json.loads(full_jwt.token.payload.decode("utf-8")) async def a_load_authorization_config(self, path): - """Load Keycloak settings (authorization). + """Load Keycloak settings (authorization) asynchronously. :param path: settings file (json) :type path: str @@ -1127,7 +1127,7 @@ class KeycloakOpenID: self.authorization.load_config(authorization_json) async def a_get_policies(self, token, method_token_info="introspect", **kwargs): - """Get policies by user token. + """Get policies by user token asynchronously. :param token: User token :type token: str @@ -1165,7 +1165,7 @@ class KeycloakOpenID: return list(set(policies)) async def a_get_permissions(self, token, method_token_info="introspect", **kwargs): - """Get permission by user token. + """Get permission by user token asynchronously. :param token: user token :type token: str @@ -1203,7 +1203,7 @@ class KeycloakOpenID: return list(set(permissions)) async def a_uma_permissions(self, token, permissions=""): - """Get UMA permissions by user token with requested permissions. + """Get UMA permissions by user token with requested permissions asynchronously. The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be invoked by confidential clients. @@ -1232,7 +1232,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPostError) async def a_has_uma_access(self, token, permissions): - """Determine whether user has uma permissions with specified user token. + """Determine whether user has uma permissions with specified user token asynchronously. :param token: user token :type token: str @@ -1271,7 +1271,7 @@ class KeycloakOpenID: ) async def a_register_client(self, token: str, payload: dict): - """Create a client. + """Create a client asynchronously. ClientRepresentation: https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation @@ -1292,7 +1292,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPostError) async def a_device(self): - """Get device authorization grant. + """Get device authorization grant asynchronously. The device endpoint is used to obtain a user code verification and user authentication. The response contains a device_code, user_code, verification_uri, @@ -1317,7 +1317,7 @@ class KeycloakOpenID: return raise_error_from_response(data_raw, KeycloakPostError) async def a_update_client(self, token: str, client_id: str, payload: dict): - """Update a client. + """Update a client asynchronously. ClientRepresentation: https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation diff --git a/src/keycloak/keycloak_uma.py b/src/keycloak/keycloak_uma.py index 97ac1d8..5dbe2fd 100644 --- a/src/keycloak/keycloak_uma.py +++ b/src/keycloak/keycloak_uma.py @@ -422,7 +422,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakGetError) async def a_resource_set_create(self, payload): - """Create a resource set. + """Create a resource set asynchronously. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1 @@ -441,7 +441,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) async def a_resource_set_update(self, resource_id, payload): - """Update a resource set. + """Update a resource set asynchronously. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set @@ -463,7 +463,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) async def a_resource_set_read(self, resource_id): - """Read a resource set. + """Read a resource set asynchronously. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set @@ -483,7 +483,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) async def a_resource_set_delete(self, resource_id): - """Delete a resource set. + """Delete a resource set asynchronously. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set @@ -510,7 +510,7 @@ class KeycloakUMA: first: int = 0, maximum: int = -1, ): - """Query for list of resource set ids. + """Query for list of resource set ids asynchronously. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets @@ -558,7 +558,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) async def a_resource_set_list(self): - """List all resource sets. + """List all resource sets asynchronously. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets @@ -574,7 +574,7 @@ class KeycloakUMA: yield resource async def a_permission_ticket_create(self, permissions: Iterable[UMAPermission]): - """Create a permission ticket. + """Create a permission ticket asynchronously. :param permissions: Iterable of uma permissions to validate the token against :type permissions: Iterable[UMAPermission] @@ -611,7 +611,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakPostError) async def a_permissions_check(self, token, permissions: Iterable[UMAPermission]): - """Check UMA permissions by user token with requested permissions. + """Check UMA permissions by user token with requested permissions asynchronously. The token endpoint is used to check UMA permissions from Keycloak. It can only be invoked by confidential clients. @@ -648,7 +648,7 @@ class KeycloakUMA: return data.get("result", False) async def a_policy_resource_create(self, resource_id, payload): - """Create permission policy for resource. + """Create permission policy for resource asynchronously. Supports name, description, scopes, roles, groups, clients @@ -667,7 +667,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakPostError) async def a_policy_update(self, policy_id, payload): - """Update permission policy. + """Update permission policy asynchronously. https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation @@ -685,7 +685,7 @@ class KeycloakUMA: return raise_error_from_response(data_raw, KeycloakPutError) async def a_policy_delete(self, policy_id): - """Delete permission policy. + """Delete permission policy asynchronously. https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation @@ -708,7 +708,7 @@ class KeycloakUMA: first: int = 0, maximum: int = -1, ): - """Query permission policies. + """Query permission policies asynchronously. https://www.keycloak.org/docs/latest/authorization_services/#querying-permission diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index dd3067a..bb0708d 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -487,3 +487,454 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): "expires_in": 600, "interval": 5, } + +#async function start + +@pytest.mark.asyncio +async def test_a_well_known(oid: KeycloakOpenID): + """Test the well_known method. + + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + res = await oid.a_well_known() + assert res is not None + assert res != dict() + for key in [ + "acr_values_supported", + "authorization_encryption_alg_values_supported", + "authorization_encryption_enc_values_supported", + "authorization_endpoint", + "authorization_signing_alg_values_supported", + "backchannel_authentication_endpoint", + "backchannel_authentication_request_signing_alg_values_supported", + "backchannel_logout_session_supported", + "backchannel_logout_supported", + "backchannel_token_delivery_modes_supported", + "check_session_iframe", + "claim_types_supported", + "claims_parameter_supported", + "claims_supported", + "code_challenge_methods_supported", + "device_authorization_endpoint", + "end_session_endpoint", + "frontchannel_logout_session_supported", + "frontchannel_logout_supported", + "grant_types_supported", + "id_token_encryption_alg_values_supported", + "id_token_encryption_enc_values_supported", + "id_token_signing_alg_values_supported", + "introspection_endpoint", + "introspection_endpoint_auth_methods_supported", + "introspection_endpoint_auth_signing_alg_values_supported", + "issuer", + "jwks_uri", + "mtls_endpoint_aliases", + "pushed_authorization_request_endpoint", + "registration_endpoint", + "request_object_encryption_alg_values_supported", + "request_object_encryption_enc_values_supported", + "request_object_signing_alg_values_supported", + "request_parameter_supported", + "request_uri_parameter_supported", + "require_pushed_authorization_requests", + "require_request_uri_registration", + "response_modes_supported", + "response_types_supported", + "revocation_endpoint", + "revocation_endpoint_auth_methods_supported", + "revocation_endpoint_auth_signing_alg_values_supported", + "scopes_supported", + "subject_types_supported", + "tls_client_certificate_bound_access_tokens", + "token_endpoint", + "token_endpoint_auth_methods_supported", + "token_endpoint_auth_signing_alg_values_supported", + "userinfo_encryption_alg_values_supported", + "userinfo_encryption_enc_values_supported", + "userinfo_endpoint", + "userinfo_signing_alg_values_supported", + ]: + assert key in res + +@pytest.mark.asyncio +async def test_a_auth_url(env, oid: KeycloakOpenID): + """Test the auth_url method. + + :param env: Environment fixture + :type env: KeycloakTestEnv + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + res = await oid.a_auth_url(redirect_uri="http://test.test/*") + assert ( + res + == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}" + + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code" + + "&redirect_uri=http://test.test/*&scope=email&state=" + ) + +@pytest.mark.asyncio +async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test the token method. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = await oid.a_token(username=username, password=password) + assert token == { + "access_token": mock.ANY, + "expires_in": mock.ANY, + "id_token": mock.ANY, + "not-before-policy": 0, + "refresh_expires_in": mock.ANY, + "refresh_token": mock.ANY, + "scope": mock.ANY, + "session_state": mock.ANY, + "token_type": "Bearer", + } + + # Test with dummy totp + token = await oid.a_token(username=username, password=password, totp="123456") + assert token == { + "access_token": mock.ANY, + "expires_in": mock.ANY, + "id_token": mock.ANY, + "not-before-policy": 0, + "refresh_expires_in": mock.ANY, + "refresh_token": mock.ANY, + "scope": mock.ANY, + "session_state": mock.ANY, + "token_type": "Bearer", + } + + # Test with extra param + token = await oid.a_token(username=username, password=password, extra_param="foo") + assert token == { + "access_token": mock.ANY, + "expires_in": mock.ANY, + "id_token": mock.ANY, + "not-before-policy": 0, + "refresh_expires_in": mock.ANY, + "refresh_token": mock.ANY, + "scope": mock.ANY, + "session_state": mock.ANY, + "token_type": "Bearer", + } + +@pytest.mark.asyncio +async def test_a_exchange_token( + oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin +): + """Test the exchange token method. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + # Verify existing user + oid, username, password = oid_with_credentials + + # Allow impersonation + admin.change_current_realm(oid.realm_name) + admin.assign_client_role( + user_id=admin.get_user_id(username=username), + client_id=admin.get_client_id(client_id="realm-management"), + roles=[ + admin.get_client_role( + client_id=admin.get_client_id(client_id="realm-management"), + role_name="impersonation", + ) + ], + ) + + token = await oid.a_token(username=username, password=password) + assert await oid.a_userinfo(token=token["access_token"]) == { + "email": f"{username}@test.test", + "email_verified": True, + "family_name": "last", + "given_name": "first", + "name": "first last", + "preferred_username": username, + "sub": mock.ANY, + } + + # Exchange token with the new user + new_token = await oid.a_exchange_token( + token=token["access_token"], audience=oid.client_id, subject=username + ) + assert await oid.a_userinfo(token=new_token["access_token"]) == { + "email": f"{username}@test.test", + "email_verified": True, + "family_name": "last", + "given_name": "first", + "name": "first last", + "preferred_username": username, + "sub": mock.ANY, + } + assert token != new_token + +@pytest.mark.asyncio +async def test_a_logout(oid_with_credentials): + """Test logout. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + + token = await oid.a_token(username=username, password=password) + assert await oid.a_userinfo(token=token["access_token"]) != dict() + assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict() + + with pytest.raises(KeycloakAuthenticationError): + await oid.a_userinfo(token=token["access_token"]) + +@pytest.mark.asyncio +async def test_a_certs(oid: KeycloakOpenID): + """Test certificates. + + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + assert len((await oid.a_certs())["keys"]) == 2 + +@pytest.mark.asyncio +async def test_a_public_key(oid: KeycloakOpenID): + """Test public key. + + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + assert await oid.a_public_key() is not None + +@pytest.mark.asyncio +async def test_a_entitlement( + oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin +): + """Test entitlement. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + resource_server_id = admin.get_client_authz_resources( + client_id=admin.get_client_id(oid.client_id) + )[0]["_id"] + + with pytest.raises(KeycloakDeprecationError): + await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id) + +@pytest.mark.asyncio +async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test introspect. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = await oid.a_token(username=username, password=password) + + assert await oid.a_introspect(token=token["access_token"])["active"] + assert await oid.a_introspect( + token=token["access_token"], rpt="some", token_type_hint="requesting_party_token" + ) == {"active": False} + + with pytest.raises(KeycloakRPTNotFound): + await oid.a_introspect(token=token["access_token"], token_type_hint="requesting_party_token") + +@pytest.mark.asyncio +async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test decode token. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = await oid.a_token(username=username, password=password) + decoded_access_token = await oid.a_decode_token(token=token["access_token"]) + decoded_access_token_2 = await oid.a_decode_token(token=token["access_token"], validate=False) + decoded_refresh_token = await oid.a_decode_token(token=token["refresh_token"], validate=False) + + assert decoded_access_token == decoded_access_token_2 + assert decoded_access_token["preferred_username"] == username, decoded_access_token + assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token + +@pytest.mark.asyncio +async def test_a_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test load authorization config. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + + await oid.a_load_authorization_config(path="tests/data/authz_settings.json") + assert "test-authz-rb-policy" in oid.authorization.policies + assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy) + assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1 + assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role) + assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2 + assert isinstance( + oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission + ) + +@pytest.mark.asyncio +async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test get policies. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + with pytest.raises(KeycloakAuthorizationConfigError): + await oid.a_get_policies(token=token["access_token"]) + + await oid.a_load_authorization_config(path="tests/data/authz_settings.json") + assert await oid.a_get_policies(token=token["access_token"]) is None + + orig_client_id = oid.client_id + oid.client_id = "account" + assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == [] + policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") + policy.add_role(role="account/view-profile") + oid.authorization.policies["test"] = policy + assert [ + str(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") + ] == ["Policy: test (role)"] + assert [ + repr(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") + ] == [""] + oid.client_id = orig_client_id + + await oid.a_logout(refresh_token=token["refresh_token"]) + with pytest.raises(KeycloakInvalidTokenError): + await oid.a_get_policies(token=token["access_token"]) + +@pytest.mark.asyncio +def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test get policies. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + with pytest.raises(KeycloakAuthorizationConfigError): + await oid.a_get_permissions(token=token["access_token"]) + + await oid.a_load_authorization_config(path="tests/data/authz_settings.json") + assert await oid.a_get_permissions(token=token["access_token"]) is None + + orig_client_id = oid.client_id + oid.client_id = "account" + assert await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == [] + policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") + policy.add_role(role="account/view-profile") + policy.add_permission( + permission=Permission( + name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS" + ) + ) + oid.authorization.policies["test"] = policy + assert [ + str(x) + for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") + ] == ["Permission: test-perm (resource)"] + assert [ + repr(x) + for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") + ] == [""] + oid.client_id = orig_client_id + + await oid.a_logout(refresh_token=token["refresh_token"]) + with pytest.raises(KeycloakInvalidTokenError): + await oid.a_get_permissions(token=token["access_token"]) + +@pytest.mark.asyncio +async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test UMA permissions. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1 + assert (await oid.a_uma_permissions(token=token["access_token"]))[0]["rsname"] == "Default Resource" + +@pytest.mark.asyncio +async def test_a_has_uma_access( + oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin +): + """Test has UMA access. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) + == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" + ) + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")) + == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" + ) + + with pytest.raises(KeycloakPostError): + await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist") + + await oid.a_logout(refresh_token=token["refresh_token"]) + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) + == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" + ) + assert ( + str( + await oid.a_has_uma_access( + token=admin.connection.token["access_token"], permissions="Default Resource" + ) + ) + == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" + + "{'Default Resource'})" + ) + +@pytest.mark.asyncio +async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): + """Test device authorization flow. + + :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user + credentials and device authorization flow enabled + :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str] + """ + oid, _, _ = oid_with_credentials_device + res = await oid.a_device() + assert res == { + "device_code": mock.ANY, + "user_code": mock.ANY, + "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device", + "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/" + + f"device?user_code={res['user_code']}", + "expires_in": 600, + "interval": 5, + } \ No newline at end of file