Browse Source

update docstring and write async tests for keycloak openid class

pull/565/head
David 12 months ago
parent
commit
4833a6ffc7
  1. 44
      src/keycloak/keycloak_openid.py
  2. 24
      src/keycloak/keycloak_uma.py
  3. 451
      tests/test_keycloak_openid.py

44
src/keycloak/keycloak_openid.py

@ -605,7 +605,7 @@ class KeycloakOpenID:
return list(set(policies))
def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""Get permission by user token.
"""Get permission by user token .
:param token: user token
:type token: str
@ -620,7 +620,7 @@ class KeycloakOpenID:
"""
if not self.authorization.policies:
raise KeycloakAuthorizationConfigError(
"Keycloak settings not found. Load Authorization Keycloak settings."
"Keycloak settings not found. Load Authorization Keycloak settings ."
)
token_info = self._token_info(token, method_token_info, **kwargs)
@ -785,7 +785,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPutError)
async def a_well_known(self):
"""Get the well_known object.
"""Get the well_known object asynchronously.
The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
@ -799,7 +799,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_auth_url(self, redirect_uri, scope="email", state=""):
"""Get authorization URL endpoint.
"""Get authorization URL endpoint asynchronously.
:param redirect_uri: Redirect url to receive oauth code
:type redirect_uri: str
@ -830,7 +830,7 @@ class KeycloakOpenID:
scope="openid",
**extra
):
"""Retrieve user token.
"""Retrieve user token asynchronously.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
@ -879,7 +879,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_refresh_token(self, refresh_token, grant_type=["refresh_token"]):
"""Refresh the user token.
"""Refresh the user token asynchronously.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
@ -916,7 +916,7 @@ class KeycloakOpenID:
requested_token_type: str = "urn:ietf:params:oauth:token-type:refresh_token",
scope: str = "openid",
) -> dict:
"""Exchange user token.
"""Exchange user token asynchronously.
Use a token to obtain an entirely different token. See
https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange
@ -958,7 +958,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_userinfo(self, token):
"""Get the user info object.
"""Get the user info object asynchronously.
The userinfo endpoint returns standard claims about the authenticated user,
and is protected by a bearer token.
@ -976,7 +976,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_logout(self, refresh_token):
"""Log out the authenticated user.
"""Log out the authenticated user asynchronously.
:param refresh_token: Refresh token from Keycloak
:type refresh_token: str
@ -990,7 +990,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
async def a_certs(self):
"""Get certificates.
"""Get certificates asynchronously.
The certificate endpoint returns the public keys enabled by the realm, encoded as a
JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled
@ -1006,7 +1006,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_public_key(self):
"""Retrieve the public key.
"""Retrieve the public key asynchronously.
The public key is exposed by the realm page directly.
@ -1018,7 +1018,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
async def a_entitlement(self, token, resource_server_id):
"""Get entitlements from the token.
"""Get entitlements from the token asynchronously.
Client applications can use a specific endpoint to obtain a special security token
called a requesting party token (RPT). This token consists of all the entitlements
@ -1043,7 +1043,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover
async def a_introspect(self, token, rpt=None, token_type_hint=None):
"""Introspect the user token.
"""Introspect the user token asynchronously.
The introspection endpoint is used to retrieve the active state of a token.
It is can only be invoked by confidential clients.
@ -1077,7 +1077,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_decode_token(self, token, validate: bool = True, **kwargs):
"""Decode user token.
"""Decode user token asynchronously.
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
@ -1116,7 +1116,7 @@ class KeycloakOpenID:
return json.loads(full_jwt.token.payload.decode("utf-8"))
async def a_load_authorization_config(self, path):
"""Load Keycloak settings (authorization).
"""Load Keycloak settings (authorization) asynchronously.
:param path: settings file (json)
:type path: str
@ -1127,7 +1127,7 @@ class KeycloakOpenID:
self.authorization.load_config(authorization_json)
async def a_get_policies(self, token, method_token_info="introspect", **kwargs):
"""Get policies by user token.
"""Get policies by user token asynchronously.
:param token: User token
:type token: str
@ -1165,7 +1165,7 @@ class KeycloakOpenID:
return list(set(policies))
async def a_get_permissions(self, token, method_token_info="introspect", **kwargs):
"""Get permission by user token.
"""Get permission by user token asynchronously.
:param token: user token
:type token: str
@ -1203,7 +1203,7 @@ class KeycloakOpenID:
return list(set(permissions))
async def a_uma_permissions(self, token, permissions=""):
"""Get UMA permissions by user token with requested permissions.
"""Get UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
@ -1232,7 +1232,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_has_uma_access(self, token, permissions):
"""Determine whether user has uma permissions with specified user token.
"""Determine whether user has uma permissions with specified user token asynchronously.
:param token: user token
:type token: str
@ -1271,7 +1271,7 @@ class KeycloakOpenID:
)
async def a_register_client(self, token: str, payload: dict):
"""Create a client.
"""Create a client asynchronously.
ClientRepresentation:
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation
@ -1292,7 +1292,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_device(self):
"""Get device authorization grant.
"""Get device authorization grant asynchronously.
The device endpoint is used to obtain a user code verification and user authentication.
The response contains a device_code, user_code, verification_uri,
@ -1317,7 +1317,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_update_client(self, token: str, client_id: str, payload: dict):
"""Update a client.
"""Update a client asynchronously.
ClientRepresentation:
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation

24
src/keycloak/keycloak_uma.py

@ -422,7 +422,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_resource_set_create(self, payload):
"""Create a resource set.
"""Create a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
@ -441,7 +441,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
async def a_resource_set_update(self, resource_id, payload):
"""Update a resource set.
"""Update a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
@ -463,7 +463,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
async def a_resource_set_read(self, resource_id):
"""Read a resource set.
"""Read a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
@ -483,7 +483,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
async def a_resource_set_delete(self, resource_id):
"""Delete a resource set.
"""Delete a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
@ -510,7 +510,7 @@ class KeycloakUMA:
first: int = 0,
maximum: int = -1,
):
"""Query for list of resource set ids.
"""Query for list of resource set ids asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -558,7 +558,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
async def a_resource_set_list(self):
"""List all resource sets.
"""List all resource sets asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -574,7 +574,7 @@ class KeycloakUMA:
yield resource
async def a_permission_ticket_create(self, permissions: Iterable[UMAPermission]):
"""Create a permission ticket.
"""Create a permission ticket asynchronously.
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
@ -611,7 +611,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_permissions_check(self, token, permissions: Iterable[UMAPermission]):
"""Check UMA permissions by user token with requested permissions.
"""Check UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to check UMA permissions from Keycloak. It can only be
invoked by confidential clients.
@ -648,7 +648,7 @@ class KeycloakUMA:
return data.get("result", False)
async def a_policy_resource_create(self, resource_id, payload):
"""Create permission policy for resource.
"""Create permission policy for resource asynchronously.
Supports name, description, scopes, roles, groups, clients
@ -667,7 +667,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_policy_update(self, policy_id, payload):
"""Update permission policy.
"""Update permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -685,7 +685,7 @@ class KeycloakUMA:
return raise_error_from_response(data_raw, KeycloakPutError)
async def a_policy_delete(self, policy_id):
"""Delete permission policy.
"""Delete permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -708,7 +708,7 @@ class KeycloakUMA:
first: int = 0,
maximum: int = -1,
):
"""Query permission policies.
"""Query permission policies asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission

451
tests/test_keycloak_openid.py

@ -487,3 +487,454 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"expires_in": 600,
"interval": 5,
}
#async function start
@pytest.mark.asyncio
async def test_a_well_known(oid: KeycloakOpenID):
"""Test the well_known method.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
res = await oid.a_well_known()
assert res is not None
assert res != dict()
for key in [
"acr_values_supported",
"authorization_encryption_alg_values_supported",
"authorization_encryption_enc_values_supported",
"authorization_endpoint",
"authorization_signing_alg_values_supported",
"backchannel_authentication_endpoint",
"backchannel_authentication_request_signing_alg_values_supported",
"backchannel_logout_session_supported",
"backchannel_logout_supported",
"backchannel_token_delivery_modes_supported",
"check_session_iframe",
"claim_types_supported",
"claims_parameter_supported",
"claims_supported",
"code_challenge_methods_supported",
"device_authorization_endpoint",
"end_session_endpoint",
"frontchannel_logout_session_supported",
"frontchannel_logout_supported",
"grant_types_supported",
"id_token_encryption_alg_values_supported",
"id_token_encryption_enc_values_supported",
"id_token_signing_alg_values_supported",
"introspection_endpoint",
"introspection_endpoint_auth_methods_supported",
"introspection_endpoint_auth_signing_alg_values_supported",
"issuer",
"jwks_uri",
"mtls_endpoint_aliases",
"pushed_authorization_request_endpoint",
"registration_endpoint",
"request_object_encryption_alg_values_supported",
"request_object_encryption_enc_values_supported",
"request_object_signing_alg_values_supported",
"request_parameter_supported",
"request_uri_parameter_supported",
"require_pushed_authorization_requests",
"require_request_uri_registration",
"response_modes_supported",
"response_types_supported",
"revocation_endpoint",
"revocation_endpoint_auth_methods_supported",
"revocation_endpoint_auth_signing_alg_values_supported",
"scopes_supported",
"subject_types_supported",
"tls_client_certificate_bound_access_tokens",
"token_endpoint",
"token_endpoint_auth_methods_supported",
"token_endpoint_auth_signing_alg_values_supported",
"userinfo_encryption_alg_values_supported",
"userinfo_encryption_enc_values_supported",
"userinfo_endpoint",
"userinfo_signing_alg_values_supported",
]:
assert key in res
@pytest.mark.asyncio
async def test_a_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method.
:param env: Environment fixture
:type env: KeycloakTestEnv
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
res = await oid.a_auth_url(redirect_uri="http://test.test/*")
assert (
res
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
+ f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
+ "&redirect_uri=http://test.test/*&scope=email&state="
)
@pytest.mark.asyncio
async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
assert token == {
"access_token": mock.ANY,
"expires_in": mock.ANY,
"id_token": mock.ANY,
"not-before-policy": 0,
"refresh_expires_in": mock.ANY,
"refresh_token": mock.ANY,
"scope": mock.ANY,
"session_state": mock.ANY,
"token_type": "Bearer",
}
# Test with dummy totp
token = await oid.a_token(username=username, password=password, totp="123456")
assert token == {
"access_token": mock.ANY,
"expires_in": mock.ANY,
"id_token": mock.ANY,
"not-before-policy": 0,
"refresh_expires_in": mock.ANY,
"refresh_token": mock.ANY,
"scope": mock.ANY,
"session_state": mock.ANY,
"token_type": "Bearer",
}
# Test with extra param
token = await oid.a_token(username=username, password=password, extra_param="foo")
assert token == {
"access_token": mock.ANY,
"expires_in": mock.ANY,
"id_token": mock.ANY,
"not-before-policy": 0,
"refresh_expires_in": mock.ANY,
"refresh_token": mock.ANY,
"scope": mock.ANY,
"session_state": mock.ANY,
"token_type": "Bearer",
}
@pytest.mark.asyncio
async def test_a_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test the exchange token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
# Verify existing user
oid, username, password = oid_with_credentials
# Allow impersonation
admin.change_current_realm(oid.realm_name)
admin.assign_client_role(
user_id=admin.get_user_id(username=username),
client_id=admin.get_client_id(client_id="realm-management"),
roles=[
admin.get_client_role(
client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation",
)
],
)
token = await oid.a_token(username=username, password=password)
assert await oid.a_userinfo(token=token["access_token"]) == {
"email": f"{username}@test.test",
"email_verified": True,
"family_name": "last",
"given_name": "first",
"name": "first last",
"preferred_username": username,
"sub": mock.ANY,
}
# Exchange token with the new user
new_token = await oid.a_exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username
)
assert await oid.a_userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test",
"email_verified": True,
"family_name": "last",
"given_name": "first",
"name": "first last",
"preferred_username": username,
"sub": mock.ANY,
}
assert token != new_token
@pytest.mark.asyncio
async def test_a_logout(oid_with_credentials):
"""Test logout.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
assert await oid.a_userinfo(token=token["access_token"]) != dict()
assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict()
with pytest.raises(KeycloakAuthenticationError):
await oid.a_userinfo(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_certs(oid: KeycloakOpenID):
"""Test certificates.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
assert len((await oid.a_certs())["keys"]) == 2
@pytest.mark.asyncio
async def test_a_public_key(oid: KeycloakOpenID):
"""Test public key.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
assert await oid.a_public_key() is not None
@pytest.mark.asyncio
async def test_a_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test entitlement.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
resource_server_id = admin.get_client_authz_resources(
client_id=admin.get_client_id(oid.client_id)
)[0]["_id"]
with pytest.raises(KeycloakDeprecationError):
await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id)
@pytest.mark.asyncio
async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
assert await oid.a_introspect(token=token["access_token"])["active"]
assert await oid.a_introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
) == {"active": False}
with pytest.raises(KeycloakRPTNotFound):
await oid.a_introspect(token=token["access_token"], token_type_hint="requesting_party_token")
@pytest.mark.asyncio
async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
decoded_access_token = await oid.a_decode_token(token=token["access_token"])
decoded_access_token_2 = await oid.a_decode_token(token=token["access_token"], validate=False)
decoded_refresh_token = await oid.a_decode_token(token=token["refresh_token"], validate=False)
assert decoded_access_token == decoded_access_token_2
assert decoded_access_token["preferred_username"] == username, decoded_access_token
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
@pytest.mark.asyncio
async def test_a_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
assert "test-authz-rb-policy" in oid.authorization.policies
assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
)
@pytest.mark.asyncio
async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
with pytest.raises(KeycloakAuthorizationConfigError):
await oid.a_get_policies(token=token["access_token"])
await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
assert await oid.a_get_policies(token=token["access_token"]) is None
orig_client_id = oid.client_id
oid.client_id = "account"
assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy
assert [
str(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
] == ["Policy: test (role)"]
assert [
repr(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id
await oid.a_logout(refresh_token=token["refresh_token"])
with pytest.raises(KeycloakInvalidTokenError):
await oid.a_get_policies(token=token["access_token"])
@pytest.mark.asyncio
def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
with pytest.raises(KeycloakAuthorizationConfigError):
await oid.a_get_permissions(token=token["access_token"])
await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
assert await oid.a_get_permissions(token=token["access_token"]) is None
orig_client_id = oid.client_id
oid.client_id = "account"
assert await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
policy.add_permission(
permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
)
)
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode")
] == ["Permission: test-perm (resource)"]
assert [
repr(x)
for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode")
] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id
await oid.a_logout(refresh_token=token["refresh_token"])
with pytest.raises(KeycloakInvalidTokenError):
await oid.a_get_permissions(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1
assert (await oid.a_uma_permissions(token=token["access_token"]))[0]["rsname"] == "Default Resource"
@pytest.mark.asyncio
async def test_a_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test has UMA access.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
assert (
str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
assert (
str(await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource"))
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
with pytest.raises(KeycloakPostError):
await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist")
await oid.a_logout(refresh_token=token["refresh_token"])
assert (
str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
)
assert (
str(
await oid.a_has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource"
)
)
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})"
)
@pytest.mark.asyncio
async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"""Test device authorization flow.
:param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
credentials and device authorization flow enabled
:type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
"""
oid, _, _ = oid_with_credentials_device
res = await oid.a_device()
assert res == {
"device_code": mock.ANY,
"user_code": mock.ANY,
"verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
"verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
+ f"device?user_code={res['user_code']}",
"expires_in": 600,
"interval": 5,
}
Loading…
Cancel
Save