Browse Source

fix: lint issues

pull/566/head
David 11 months ago
parent
commit
e4ea169c4b
  1. 4
      src/keycloak/connection.py
  2. 88
      src/keycloak/keycloak_admin.py
  3. 4
      src/keycloak/keycloak_openid.py
  4. 1
      src/keycloak/openid_connection.py
  5. 8
      tests/conftest.py
  6. 235
      tests/test_keycloak_admin.py
  7. 51
      tests/test_keycloak_openid.py
  8. 8
      tests/test_keycloak_uma.py

4
src/keycloak/connection.py

@ -308,7 +308,7 @@ class ConnectionManager(object):
urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
@ -332,7 +332,7 @@ class ConnectionManager(object):
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)

88
src/keycloak/keycloak_admin.py

@ -4250,7 +4250,7 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
#async functions start
# async functions start
async def a___fetch_all(self, url, query=None):
"""Paginate asynchronously over get requests .
@ -4297,7 +4297,9 @@ class KeycloakAdmin:
:rtype: dict
"""
query = query or {}
return raise_error_from_response(await self.connection.a_raw_get(url, **query), KeycloakGetError)
return raise_error_from_response(
await self.connection.a_raw_get(url, **query), KeycloakGetError
)
async def a_get_current_realm(self) -> str:
"""Return the currently configured realm asynchronously.
@ -4401,7 +4403,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": realm_name}
data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path))
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_REALM.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
async def a_create_realm(self, payload, skip_exists=False):
@ -4455,7 +4459,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": realm_name}
data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path))
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_REALM.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
async def a_get_users(self, query=None):
@ -4594,7 +4600,9 @@ class KeycloakAdmin:
:rtype: list
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path))
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_IDPS.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_get_idp(self, idp_alias):
@ -4611,7 +4619,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias}
data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_IDP.format(**params_path))
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_IDP.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_delete_idp(self, idp_alias):
@ -4623,7 +4633,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias}
data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_IDP.format(**params_path))
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_IDP.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
async def a_create_user(self, payload, exist_ok=False):
@ -4691,7 +4703,9 @@ class KeycloakAdmin:
:rtype: str
"""
lower_user_name = username.lower()
users = await self.a_get_users(query={"username": lower_user_name, "max": 1, "exact": True})
users = await self.a_get_users(
query={"username": lower_user_name, "max": 1, "exact": True}
)
return users[0]["id"] if len(users) == 1 else None
async def a_get_user(self, user_id):
@ -4799,7 +4813,9 @@ class KeycloakAdmin:
:rtype: bytes
"""
params_path = {"realm-name": self.connection.realm_name, "id": user_id}
data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_USER.format(**params_path))
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_USER.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
async def a_set_user_password(self, user_id, password, temporary=True):
@ -4918,7 +4934,9 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_add_user_social_login(self, user_id, provider_id, provider_userid, provider_username):
async def a_add_user_social_login(
self, user_id, provider_id, provider_userid, provider_username
):
"""Add a federated identity / social login provider asynchronously to the user.
:param user_id: User id
@ -5106,7 +5124,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "id": group_id}
response = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_GROUP.format(**params_path))
response = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_GROUP.format(**params_path)
)
if response.status_code >= 400:
return raise_error_from_response(response, KeycloakGetError)
@ -5368,7 +5388,9 @@ class KeycloakAdmin:
:rtype: bytes
"""
params_path = {"realm-name": self.connection.realm_name, "id": group_id}
data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_GROUP.format(**params_path))
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_GROUP.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
async def a_get_clients(self):
@ -5383,7 +5405,9 @@ class KeycloakAdmin:
:rtype: list
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw =await self.connection.a_raw_get(urls_patterns.URL_ADMIN_CLIENTS.format(**params_path))
data_raw =await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_CLIENTS.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_get_client(self, client_id):
@ -5398,7 +5422,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw =await self.connection.a_raw_get(urls_patterns.URL_ADMIN_CLIENT.format(**params_path))
data_raw =await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_CLIENT.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_get_client_id(self, client_id):
@ -5633,7 +5659,9 @@ class KeycloakAdmin:
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
)
async def a_create_client_authz_resource_based_permission(self, client_id, payload, skip_exists=False):
async def a_create_client_authz_resource_based_permission(
self, client_id, payload, skip_exists=False
):
"""Create resource-based permission of client asynchronously.
Payload example::
@ -6007,7 +6035,9 @@ class KeycloakAdmin:
:rtype: bytes
"""
params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_CLIENT.format(**params_path))
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_CLIENT.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
async def a_get_client_installation_provider(self, client_id, provider_id):
@ -6254,7 +6284,9 @@ class KeycloakAdmin:
"""
if skip_exists:
try:
res = await self.a_get_client_role(client_id=client_role_id, role_name=payload["name"])
res = await self.a_get_client_role(
client_id=client_role_id, role_name=payload["name"]
)
return res["name"]
except KeycloakGetError:
pass
@ -6336,7 +6368,7 @@ class KeycloakAdmin:
"id": client_role_id,
"role-name": role_name,
}
data_raw =await self.connection.a_raw_delete(
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
@ -6967,7 +6999,9 @@ class KeycloakAdmin:
urls_patterns.URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, user_id, client_id
)
async def a_get_composite_client_roles_of_user(self, user_id, client_id, brief_representation=False):
async def a_get_composite_client_roles_of_user(
self, user_id, client_id, brief_representation=False
):
"""Get composite client role-mappings for a user asynchronously.
:param user_id: id of user
@ -7046,7 +7080,9 @@ class KeycloakAdmin:
:rtype: list
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_FLOWS.format(**params_path))
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_FLOWS.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_get_authentication_flow_for_id(self, flow_id):
@ -7119,7 +7155,9 @@ class KeycloakAdmin:
:rtype: bytes
"""
params_path = {"realm-name": self.connection.realm_name, "id": flow_id}
data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_FLOW.format(**params_path))
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_FLOW.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
async def a_get_authentication_flow_executions(self, flow_alias):
@ -7800,7 +7838,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "component-id": component_id}
data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path))
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_update_component(self, component_id, payload):
@ -8155,7 +8195,9 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
async def a_get_composite_client_roles_of_group(self, client_id, group_id, brief_representation=True):
async def a_get_composite_client_roles_of_group(
self, client_id, group_id, brief_representation=True
):
"""Get the composite client roles of the given group for the given client asynchronously.
:param client_id: id of the client.

4
src/keycloak/keycloak_openid.py

@ -1073,7 +1073,9 @@ class KeycloakOpenID:
payload = self._add_secret_key(payload)
data_raw = await self.connection.a_raw_post(URL_INTROSPECT.format(**params_path), data=payload)
data_raw = await self.connection.a_raw_post(
URL_INTROSPECT.format(**params_path), data=payload
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_decode_token(self, token, validate: bool = True, **kwargs):

1
src/keycloak/openid_connection.py

@ -117,7 +117,6 @@ class KeycloakOpenIDConnection(ConnectionManager):
self.headers = {}
self.custom_headers = custom_headers
if self.token is None:
self.get_token()

8
tests/conftest.py

@ -32,10 +32,10 @@ class KeycloakTestEnv(object):
def __init__(
self,
host: str = os.environ["KEYCLOAK_HOST"],
port: str = os.environ["KEYCLOAK_PORT"],
username: str = os.environ["KEYCLOAK_ADMIN"],
password: str = os.environ["KEYCLOAK_ADMIN_PASSWORD"],
host: str = 'localhost',#os.environ["KEYCLOAK_HOST"],
port: str = '8080',#os.environ["KEYCLOAK_PORT"],
username: str = 'admin',#os.environ["KEYCLOAK_ADMIN"],
password: str = 'admin',#os.environ["KEYCLOAK_ADMIN_PASSWORD"],
):
"""Init method.

235
tests/test_keycloak_admin.py

@ -3064,7 +3064,7 @@ def test_refresh_token(admin: KeycloakAdmin):
admin.connection.refresh_token()
#async function start
# async function start
@pytest.mark.asyncio
async def test_a_realms(admin: KeycloakAdmin):
@ -3135,6 +3135,7 @@ async def test_a_realms(admin: KeycloakAdmin):
await admin.a_delete_realm(realm_name="non-existent")
assert err.match('404: b\'{"error":"Realm not found.".*}\'')
@pytest.mark.asyncio
async def test_a_changing_of_realms(admin: KeycloakAdmin, realm: str):
"""Test changing of realms.
@ -3148,6 +3149,7 @@ async def test_a_changing_of_realms(admin: KeycloakAdmin, realm: str):
await admin.a_change_current_realm(realm)
assert await admin.a_get_current_realm() == realm
@pytest.mark.asyncio
async def test_a_import_export_realms(admin: KeycloakAdmin, realm: str):
"""Test import and export of realms.
@ -3174,6 +3176,7 @@ async def test_a_import_export_realms(admin: KeycloakAdmin, realm: str):
'500: b\'{"error":"unknown_error"}\'|400: b\'{"errorMessage":"Realm name cannot be empty"}\'' # noqa: E501
)
@pytest.mark.asyncio
async def test_a_partial_import_realm(admin: KeycloakAdmin, realm: str):
"""Test partial import of realm configuration.
@ -3221,6 +3224,7 @@ async def test_a_partial_import_realm(admin: KeycloakAdmin, realm: str):
res = await admin.a_partial_import_realm(realm_name=realm, payload=payload)
assert res["overwritten"] == 3
@pytest.mark.asyncio
async def test_a_users(admin: KeycloakAdmin, realm: str):
"""Test users.
@ -3329,6 +3333,7 @@ async def test_a_users(admin: KeycloakAdmin, realm: str):
await admin.a_delete_user(user_id="non-existent-id")
assert err.match(USER_NOT_FOUND_REGEX)
@pytest.mark.asyncio
async def test_a_enable_disable_all_users(admin: KeycloakAdmin, realm: str):
"""Test enable and disable all users.
@ -3366,6 +3371,7 @@ async def test_a_enable_disable_all_users(admin: KeycloakAdmin, realm: str):
assert (await admin.a_get_user(user_id_2))["enabled"]
assert (await admin.a_get_user(user_id_3))["enabled"]
@pytest.mark.asyncio
async def test_a_users_roles(admin: KeycloakAdmin, realm: str):
"""Test users roles.
@ -3378,7 +3384,9 @@ async def test_a_users_roles(admin: KeycloakAdmin, realm: str):
user_id = await admin.a_create_user(payload={"username": "test", "email": "test@test.test"})
# Test all level user roles
client_id = await admin.a_create_client(payload={"name": "test-client", "clientId": "test-client"})
client_id = await admin.a_create_client(
payload={"name": "test-client", "clientId": "test-client"}
)
await admin.a_create_client_role(client_role_id=client_id, payload={"name": "test-role"})
await admin.a_assign_client_role(
client_id=client_id,
@ -3399,6 +3407,7 @@ async def test_a_users_roles(admin: KeycloakAdmin, realm: str):
await admin.a_delete_user(user_id)
await admin.a_delete_client(client_id)
@pytest.mark.asyncio
async def test_a_users_pagination(admin: KeycloakAdmin, realm: str):
"""Test user pagination.
@ -3423,6 +3432,7 @@ async def test_a_users_pagination(admin: KeycloakAdmin, realm: str):
users = await admin.a_get_users(query={"max": 20})
assert len(users) == 20, len(users)
@pytest.mark.asyncio
async def test_a_user_groups_pagination(admin: KeycloakAdmin, realm: str):
"""Test user groups pagination.
@ -3449,9 +3459,12 @@ async def test_a_user_groups_pagination(admin: KeycloakAdmin, realm: str):
groups = await admin.a_get_user_groups(user_id=user_id, query={"first": 100, "max": -1, "search": ""})
assert len(groups) == 50, len(groups)
groups = await admin.a_get_user_groups(user_id=user_id, query={"max": 20, "first": -1, "search": ""})
groups = await admin.a_get_user_groups(
user_id=user_id, query={"max": 20, "first": -1, "search": ""}
)
assert len(groups) == 20, len(groups)
@pytest.mark.asyncio
async def test_a_idps(admin: KeycloakAdmin, realm: str):
"""Test IDPs.
@ -3542,6 +3555,7 @@ async def test_a_idps(admin: KeycloakAdmin, realm: str):
await admin.a_delete_idp(idp_alias="does-not-exist")
assert err.match(HTTP_404_REGEX)
@pytest.mark.asyncio
async def test_a_user_credentials(admin: KeycloakAdmin, user: str):
"""Test user credentials.
@ -3576,6 +3590,7 @@ async def test_a_user_credentials(admin: KeycloakAdmin, user: str):
await admin.a_delete_credential(user_id=user, credential_id="does-not-exist")
assert err.match('404: b\'{"error":"Credential not found".*}\'')
@pytest.mark.asyncio
async def test_a_social_logins(admin: KeycloakAdmin, user: str):
"""Test social logins.
@ -3622,6 +3637,7 @@ async def test_a_social_logins(admin: KeycloakAdmin, user: str):
await admin.a_delete_user_social_login(user_id=user, provider_id="instagram")
assert err.match('404: b\'{"error":"Link not found".*}\''), err
@pytest.mark.asyncio
async def test_a_server_info(admin: KeycloakAdmin):
"""Test server info.
@ -3651,6 +3667,7 @@ async def test_a_server_info(admin: KeycloakAdmin):
}
), info.keys()
@pytest.mark.asyncio
async def test_a_groups(admin: KeycloakAdmin, user: str):
"""Test groups.
@ -3717,18 +3734,22 @@ async def test_a_groups(admin: KeycloakAdmin, user: str):
assert err.match('404: b\'{"error":"Could not find group by id".*}\''), err
# Create 1 more subgroup
subsubgroup_id_1 = await admin.a_create_group(payload={"name": "subsubgroup-1"}, parent=subgroup_id_2)
subsubgroup_id_1 = await admin.a_create_group(
payload={"name": "subsubgroup-1"}, parent=subgroup_id_2
)
main_group = await admin.a_get_group(group_id=group_id)
# Test nested searches
subgroup_2 = await admin.a_get_group(group_id=subgroup_id_2)
res = await admin.a_get_subgroups(group=subgroup_2, path="/main-group/subgroup-2/subsubgroup-1")
res = await admin.a_get_subgroups(
group=subgroup_2, path="/main-group/subgroup-2/subsubgroup-1"
)
assert res is not None, res
assert res["id"] == subsubgroup_id_1
# Test nested search from main group
res = await admin.a_get_subgroups(
group= await admin.a_get_group(group_id=group_id, full_hierarchy=True),
group = await admin.a_get_group(group_id=group_id, full_hierarchy=True),
path="/main-group/subgroup-2/subsubgroup-1",
)
assert res["id"] == subsubgroup_id_1
@ -3836,6 +3857,7 @@ async def test_a_groups(admin: KeycloakAdmin, user: str):
await admin.a_delete_group(group_id="does-not-exist")
assert err.match('404: b\'{"error":"Could not find group by id".*}\''), err
@pytest.mark.asyncio
async def test_a_clients(admin: KeycloakAdmin, realm: str):
"""Test clients.
@ -3862,7 +3884,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
), clients
# Test create client
client_id = await admin.a_create_client(payload={"name": "test-client", "clientId": "test-client"})
client_id = await admin.a_create_client(
payload={"name": "test-client", "clientId": "test-client"}
)
assert client_id, client_id
with pytest.raises(KeycloakPostError) as err:
@ -3894,7 +3918,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
assert res == dict(), res
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_client(client_id="does-not-exist", payload={"name": "test-client-change"})
await admin.a_update_client(
client_id="does-not-exist", payload={"name": "test-client-change"}
)
assert err.match('404: b\'{"error":"Could not find client".*}\'')
# Test client mappers
@ -3918,10 +3944,14 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
mapper = (await admin.a_get_mappers_from_client(client_id=client_id))[0]
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict())
await admin.a_update_client_mapper(
client_id=client_id, mapper_id="does-not-exist", payload=dict()
)
assert err.match('404: b\'{"error":"Model not found".*}\'')
mapper["config"]["user.attribute"] = "test"
res = await admin.a_update_client_mapper(client_id=client_id, mapper_id=mapper["id"], payload=mapper)
res = await admin.a_update_client_mapper(
client_id=client_id, mapper_id=mapper["id"], payload=mapper
)
assert res == dict()
res = await admin.a_remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"])
@ -3971,7 +4001,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
assert res["name"] == "test-resource", res
test_resource_id = res["_id"]
res = await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=test_resource_id)
res = await admin.a_get_client_authz_resource(
client_id=auth_client_id, resource_id=test_resource_id
)
assert res["_id"] == test_resource_id, res
assert res["name"] == "test-resource", res
@ -3999,7 +4031,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
resource_id=temp_resource_id,
payload={"name": "temp-updated-resource"},
)
res = await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id)
res = await admin.a_get_client_authz_resource(
client_id=auth_client_id, resource_id=temp_resource_id
)
assert res["name"] == "temp-updated-resource", res
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_client_authz_resource(
@ -4008,9 +4042,13 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
payload={"name": "temp-updated-resource"},
)
assert err.match("404: b''"), err
await admin.a_delete_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id)
await admin.a_delete_client_authz_resource(
client_id=auth_client_id, resource_id=temp_resource_id
)
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id)
await admin.a_get_client_authz_resource(
client_id=auth_client_id, resource_id=temp_resource_id
)
assert err.match("404: b''")
# Authz policies
@ -4174,7 +4212,7 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
assert err.match('404: b\'{"error":"Could not find client".*}\'')
secrets = await admin.a_get_client_secrets(
client_id = await admin.a_get_client_id(client_id="test-confidential")
client_id=await admin.a_get_client_id(client_id="test-confidential")
)
assert secrets == {"type": "secret", "value": "test-secret"}
@ -4187,10 +4225,13 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str):
)
assert res
assert (
await admin.a_get_client_secrets(client_id=await admin.a_get_client_id(client_id="test-confidential"))
await admin.a_get_client_secrets(
client_id=await admin.a_get_client_id(client_id="test-confidential")
)
== res
)
@pytest.mark.asyncio
async def test_a_realm_roles(admin: KeycloakAdmin, realm: str):
"""Test realm roles.
@ -4223,12 +4264,16 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str):
assert members == list(), members
# Test create realm role
role_id = await admin.a_create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True)
role_id = await admin.a_create_realm_role(
payload={"name": "test-realm-role"}, skip_exists=True
)
assert role_id, role_id
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_realm_role(payload={"name": "test-realm-role"})
assert err.match('409: b\'{"errorMessage":"Role with name test-realm-role already exists"}\'')
role_id_2 = await admin.a_create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True)
role_id_2 = await admin.a_create_realm_role(
payload={"name": "test-realm-role"}, skip_exists=True
)
assert role_id == role_id_2
# Test get realm role by its id
@ -4248,7 +4293,9 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str):
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
# Test realm role user assignment
user_id = await admin.a_create_user(payload={"username": "role-testing", "email": "test@test.test"})
user_id = await admin.a_create_user(
payload={"username": "role-testing", "email": "test@test.test"}
)
with pytest.raises(KeycloakPostError) as err:
await admin.a_assign_realm_roles(user_id=user_id, roles=["bad"])
assert err.match(UNKOWN_ERROR_REGEX), err
@ -4264,7 +4311,8 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str):
x["username"] for x in await admin.a_get_realm_role_members(role_name="offline_access")
]
assert admin.get_user(user_id=user_id)["username"] in [
x["username"] for x in await admin.a_get_realm_role_members(role_name="test-realm-role-update")
x["username"]
for x in await admin.a_get_realm_role_members(role_name="test-realm-role-update")
]
roles = await admin.a_get_realm_roles_of_user(user_id=user_id)
@ -4445,6 +4493,7 @@ async def test_a_role_attributes(
res = await admin.a_delete_client_role(client, role_name=attribute_role)
assert res == dict(), res
@pytest.mark.asyncio
async def test_a_client_scope_realm_roles(admin: KeycloakAdmin, realm: str):
"""Test client realm roles.
@ -4464,7 +4513,9 @@ async def test_a_client_scope_realm_roles(admin: KeycloakAdmin, realm: str):
assert "offline_access" in role_names, role_names
# create realm role for test
role_id = await admin.a_create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True)
role_id = await admin.a_create_realm_role(
payload={"name": "test-realm-role"}, skip_exists=True
)
assert role_id, role_id
# Test realm role client assignment
@ -4509,6 +4560,7 @@ async def test_a_client_scope_realm_roles(admin: KeycloakAdmin, realm: str):
roles = await admin.a_get_realm_roles_of_client_scope(client_id=client_id)
assert len(roles) == 0
@pytest.mark.asyncio
async def test_a_client_scope_client_roles(admin: KeycloakAdmin, realm: str, client: str):
"""Test client assignment of other client roles.
@ -4667,6 +4719,7 @@ async def test_a_client_optional_client_scopes(admin: KeycloakAdmin, realm: str,
optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id)
assert len(optional_client_scopes) == 4, optional_client_scopes
@pytest.mark.asyncio
async def test_a_client_roles(admin: KeycloakAdmin, client: str):
"""Test client roles.
@ -4688,7 +4741,9 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True
)
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_client_role(client_role_id=client, payload={"name": "client-role-test"})
await admin.a_create_client_role(
client_role_id=client, payload={"name": "client-role-test"}
)
assert err.match('409: b\'{"errorMessage":"Role with name client-role-test already exists"}\'')
client_role_id_2 = await admin.a_create_client_role(
client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True
@ -4723,7 +4778,9 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
# Test user with client role
res = await admin.a_get_client_role_members(client_id=client, role_name="client-role-test-update")
res = await admin.a_get_client_role_members(
client_id=client, role_name="client-role-test-update"
)
assert len(res) == 0
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_role_members(client_id=client, role_name="bad")
@ -4736,11 +4793,17 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
res = await admin.a_assign_client_role(
user_id=user_id,
client_id=client,
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")],
roles=[
await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")
],
)
assert res == dict()
assert (
len(await admin.a_get_client_role_members(client_id=client, role_name="client-role-test-update"))
len(
await admin.a_get_client_role_members(
client_id=client, role_name="client-role-test-update"
)
)
== 1
)
@ -4768,12 +4831,16 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
await admin.a_delete_client_roles_of_user(
user_id=user_id,
client_id=client,
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")],
roles=[
await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")
],
)
assert len(await admin.a_get_client_roles_of_user(user_id=user_id, client_id=client)) == 0
# Test groups and client roles
res = await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update")
res = await admin.a_get_client_role_groups(
client_id=client, role_name="client-role-test-update"
)
assert len(res) == 0
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_role_groups(client_id=client, role_name="bad")
@ -4792,11 +4859,17 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
res = await admin.a_assign_group_client_roles(
group_id=group_id,
client_id=client,
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")],
roles=[
await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")
],
)
assert res == dict()
assert (
len(await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update"))
len(
await admin.a_get_client_role_groups(
client_id=client, role_name="client-role-test-update"
)
)
== 1
)
assert len(await admin.a_get_group_client_roles(group_id=group_id, client_id=client)) == 1
@ -4807,7 +4880,9 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
res = await admin.a_delete_group_client_roles(
group_id=group_id,
client_id=client,
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")],
roles=[
await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")
],
)
assert res == dict()
@ -4828,10 +4903,14 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
]
# Test delete of client role
res = await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update")
res = await admin.a_delete_client_role(
client_role_id=client, role_name="client-role-test-update"
)
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update")
await admin.a_delete_client_role(
client_role_id=client, role_name="client-role-test-update"
)
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
# Test of roles by id - Get role
@ -4863,6 +4942,7 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
await admin.a_delete_role_by_id(role_id="bad")
assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX)
@pytest.mark.asyncio
async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str):
"""Test enable token exchange.
@ -4984,6 +5064,7 @@ async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str):
)
assert err.match('404: b\'{"error":"Could not find client".*}\'')
@pytest.mark.asyncio
async def test_a_email(admin: KeycloakAdmin, user: str):
"""Test email.
@ -5003,6 +5084,7 @@ async def test_a_email(admin: KeycloakAdmin, user: str):
await admin.a_send_verify_email(user_id=user)
assert err.match('500: b\'{"errorMessage":"Failed to send .*"}\'')
@pytest.mark.asyncio
async def test_a_get_sessions(admin: KeycloakAdmin):
"""Test get sessions.
@ -5010,12 +5092,15 @@ async def test_a_get_sessions(admin: KeycloakAdmin):
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
sessions = await admin.a_get_sessions(user_id=admin.get_user_id(username=admin.connection.username))
sessions = await admin.a_get_sessions(
user_id=admin.get_user_id(username=admin.connection.username)
)
assert len(sessions) >= 1
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_sessions(user_id="bad")
assert err.match(USER_NOT_FOUND_REGEX)
@pytest.mark.asyncio
async def test_a_get_client_installation_provider(admin: KeycloakAdmin, client: str):
"""Test get client installation provider.
@ -5041,6 +5126,7 @@ async def test_a_get_client_installation_provider(admin: KeycloakAdmin, client:
"ssl-required",
}
@pytest.mark.asyncio
async def test_a_auth_flows(admin: KeycloakAdmin, realm: str):
"""Test auth flows.
@ -5101,7 +5187,9 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str):
await admin.a_copy_authentication_flow(payload=dict(), flow_alias="bad")
assert err.match("404: b''")
res = await admin.a_copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser")
res = await admin.a_copy_authentication_flow(
payload={"newName": "test-browser"}, flow_alias="browser"
)
assert res == b"", res
assert len(await admin.a_get_authentication_flows()) == (default_flows + 1)
@ -5111,7 +5199,9 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str):
)
assert res == b""
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_authentication_flow(payload={"alias": "test-create", "builtIn": False})
await admin.a_create_authentication_flow(
payload={"alias": "test-create", "builtIn": False}
)
assert err.match('409: b\'{"errorMessage":"Flow test-create already exists"}\'')
assert await admin.a_create_authentication_flow(
payload={"alias": "test-create"}, skip_exists=True
@ -5160,7 +5250,9 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str):
assert err.match('400: b\'{"error":"Unrecognized field')
payload = (await admin.a_get_authentication_flow_executions(flow_alias="test-create"))[0]
payload["displayName"] = "test"
res = await admin.a_update_authentication_flow_executions(payload=payload, flow_alias="test-create")
res = await admin.a_update_authentication_flow_executions(
payload=payload, flow_alias="test-create"
)
assert res
exec_id = (await admin.a_get_authentication_flow_executions(flow_alias="test-create"))[0]["id"]
@ -5200,15 +5292,16 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str):
assert res == {"msg": "Already exists"}
# Test delete auth flow
flow_id = [x for x in await admin.a_get_authentication_flows() if x["alias"] == "test-browser"][0][
"id"
]
flow_id = [
x for x in await admin.a_get_authentication_flows() if x["alias"] == "test-browser"
][0]["id"]
res = await admin.a_delete_authentication_flow(flow_id=flow_id)
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_authentication_flow(flow_id=flow_id)
assert err.match('404: b\'{"error":"Could not find flow with id".*}\'')
@pytest.mark.asyncio
async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str):
"""Test authentication configs.
@ -5247,6 +5340,7 @@ async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str):
await admin.a_delete_authenticator_config(config_id="bad")
assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'')
@pytest.mark.asyncio
async def test_a_sync_users(admin: KeycloakAdmin, realm: str):
"""Test sync users.
@ -5263,6 +5357,7 @@ async def test_a_sync_users(admin: KeycloakAdmin, realm: str):
await admin.a_sync_users(storage_id="does-not-exist", action="triggerFullSync")
assert err.match('404: b\'{"error":"Could not find component".*}\'')
@pytest.mark.asyncio
async def test_a_client_scopes(admin: KeycloakAdmin, realm: str):
"""Test client scopes.
@ -5344,9 +5439,9 @@ async def test_a_client_scopes(admin: KeycloakAdmin, realm: str):
client_scope_id=res, protocol_mapper_id=test_mapper["id"], payload=test_mapper
)
assert res_update == dict()
assert (
(await admin.a_get_mappers_from_client_scope(client_scope_id=res))[0]["config"]["user.attribute"]
== "test"
assert ((await admin.a_get_mappers_from_client_scope(client_scope_id=res))[0]["config"][
"user.attribute"
] == "test"
)
# Test delete mapper
@ -5407,6 +5502,7 @@ async def test_a_client_scopes(admin: KeycloakAdmin, realm: str):
await admin.a_delete_client_scope(client_scope_id=res)
assert err.match(NO_CLIENT_SCOPE_REGEX)
@pytest.mark.asyncio
async def test_a_components(admin: KeycloakAdmin, realm: str):
"""Test components.
@ -5464,6 +5560,7 @@ async def test_a_components(admin: KeycloakAdmin, realm: str):
await admin.a_delete_component(component_id=res)
assert err.match('404: b\'{"error":"Could not find component".*}\'')
@pytest.mark.asyncio
async def test_a_keys(admin: KeycloakAdmin, realm: str):
"""Test keys.
@ -5474,9 +5571,12 @@ async def test_a_keys(admin: KeycloakAdmin, realm: str):
:type realm: str
"""
await admin.a_change_current_realm(realm)
assert set((await admin.a_get_keys())["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} or set(
(await admin.a_get_keys())["active"].keys()
) == {"RSA-OAEP", "RS256", "HS512", "AES"}
assert set((await admin.a_get_keys())["active"].keys()) == {
"AES",
"HS256",
"RS256",
"RSA-OAEP"
} or set((await admin.a_get_keys())["active"].keys()) == {"RSA-OAEP", "RS256", "HS512", "AES"}
assert {k["algorithm"] for k in (await admin.a_get_keys())["keys"]} == {
"HS256",
"RSA-OAEP",
@ -5489,6 +5589,7 @@ async def test_a_keys(admin: KeycloakAdmin, realm: str):
"RS256",
}
@pytest.mark.asyncio
async def test_a_admin_events(admin: KeycloakAdmin, realm: str):
"""Test events.
@ -5505,6 +5606,7 @@ async def test_a_admin_events(admin: KeycloakAdmin, realm: str):
events = await admin.a_get_admin_events()
assert events == list()
@pytest.mark.asyncio
async def test_a_user_events(admin: KeycloakAdmin, realm: str):
"""Test events.
@ -5523,7 +5625,9 @@ async def test_a_user_events(admin: KeycloakAdmin, realm: str):
await admin.a_set_events(payload={"bad": "conf"})
assert err.match('400: b\'{"error":"Unrecognized field')
res = await admin.a_set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True})
res = await admin.a_set_events(
payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True}
)
assert res == dict()
await admin.a_create_client(payload={"name": "test", "clientId": "test"})
@ -5531,6 +5635,7 @@ async def test_a_user_events(admin: KeycloakAdmin, realm: str):
events = await admin.a_get_events()
assert events == list()
@pytest.mark.asyncio
@freezegun.freeze_time("2023-02-25 10:00:00")
async def test_a_auto_refresh(admin_frozen: KeycloakAdmin, realm: str):
@ -5593,6 +5698,7 @@ async def test_a_auto_refresh(admin_frozen: KeycloakAdmin, realm: str):
assert await admin.a_delete_realm(realm_name="test-refresh") == dict()
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00")
@pytest.mark.asyncio
async def test_a_get_required_actions(admin: KeycloakAdmin, realm: str):
"""Test required actions.
@ -5617,6 +5723,7 @@ async def test_a_get_required_actions(admin: KeycloakAdmin, realm: str):
]:
assert key in ra
@pytest.mark.asyncio
async def test_a_get_required_action_by_alias(admin: KeycloakAdmin, realm: str):
"""Test get required action by alias.
@ -5633,6 +5740,7 @@ async def test_a_get_required_action_by_alias(admin: KeycloakAdmin, realm: str):
assert ra["alias"] == "UPDATE_PASSWORD"
assert await admin.a_get_required_action_by_alias("does-not-exist") is None
@pytest.mark.asyncio
async def test_a_update_required_action(admin: KeycloakAdmin, realm: str):
"""Test update required action.
@ -5651,6 +5759,7 @@ async def test_a_update_required_action(admin: KeycloakAdmin, realm: str):
assert old != newra
assert newra["enabled"] is False
@pytest.mark.asyncio
async def test_a_get_composite_client_roles_of_group(
admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str
@ -5674,6 +5783,7 @@ async def test_a_get_composite_client_roles_of_group(
result = await admin.a_get_composite_client_roles_of_group(client, group)
assert role["id"] in [x["id"] for x in result]
@pytest.mark.asyncio
async def test_a_get_role_client_level_children(
admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str
@ -5697,8 +5807,11 @@ async def test_a_get_role_client_level_children(
res = await admin.a_get_role_client_level_children(client, parent["id"])
assert child["id"] in [x["id"] for x in res]
@pytest.mark.asyncio
async def test_a_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple):
async def test_a_upload_certificate(
admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple
):
"""Test upload certificate.
:param admin: Keycloak Admin client
@ -5717,6 +5830,7 @@ async def test_a_upload_certificate(admin: KeycloakAdmin, realm: str, client: st
cl = await admin.a_get_client(client)
assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1])
@pytest.mark.asyncio
async def test_a_get_bruteforce_status_for_user(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
@ -5754,6 +5868,7 @@ async def test_a_get_bruteforce_status_for_user(
res = await admin.a_get_realm(realm_name=realm)
assert res["bruteForceProtected"] is False
@pytest.mark.asyncio
async def test_a_clear_bruteforce_attempts_for_user(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
@ -5835,6 +5950,7 @@ async def test_a_clear_bruteforce_attempts_for_all_users(
res = await admin.a_get_realm(realm_name=realm)
assert res["bruteForceProtected"] is False
@pytest.mark.asyncio
async def test_a_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> None:
"""Test that the default realm role is present in a brand new realm.
@ -5847,10 +5963,17 @@ async def test_a_default_realm_role_present(realm: str, admin: KeycloakAdmin) ->
await admin.a_change_current_realm(realm)
assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()]
assert (
len([x["name"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"])
len(
[
x["name"]
for x in await admin.a_get_realm_roles()
if x["name"] == f"default-roles-{realm}"
]
)
== 1
)
@pytest.mark.asyncio
async def test_a_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> None:
"""Test getter for the ID of the default realm role.
@ -5863,9 +5986,14 @@ async def test_a_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) ->
await admin.a_change_current_realm(realm)
assert (
await admin.a_get_default_realm_role_id()
== [x["id"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"][0]
== [
x["id"]
for x in await admin.a_get_realm_roles()
if x["name"] == f"default-roles-{realm}"
][0]
)
@pytest.mark.asyncio
async def test_a_realm_default_roles(admin: KeycloakAdmin, realm: str) -> None:
"""Test getting, adding and deleting default realm roles.
@ -5908,6 +6036,7 @@ async def test_a_realm_default_roles(admin: KeycloakAdmin, realm: str) -> None:
await admin.a_add_realm_default_roles(payload=[{"id": "bad id"}])
assert err.match('404: b\'{"error":"Could not find composite role".*}\'')
@pytest.mark.asyncio
async def test_a_clear_keys_cache(realm: str, admin: KeycloakAdmin) -> None:
"""Test clearing the keys cache.
@ -5921,6 +6050,7 @@ async def test_a_clear_keys_cache(realm: str, admin: KeycloakAdmin) -> None:
res = await admin.a_clear_keys_cache()
assert res == {}
@pytest.mark.asyncio
async def test_a_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None:
"""Test clearing the realm cache.
@ -5934,6 +6064,7 @@ async def test_a_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None:
res = await admin.a_clear_realm_cache()
assert res == {}
@pytest.mark.asyncio
async def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None:
"""Test clearing the user cache.
@ -5947,6 +6078,7 @@ async def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None:
res = await admin.a_clear_user_cache()
assert res == {}
@pytest.mark.asyncio
async def test_a_initial_access_token(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -5983,9 +6115,12 @@ async def test_a_initial_access_token(
assert res["clientId"] == client
new_secret = str(uuid.uuid4())
res = await oid.a_update_client(res["registrationAccessToken"], client, payload={"secret": new_secret})
res = await oid.a_update_client(
res["registrationAccessToken"], client, payload={"secret": new_secret}
)
assert res["secret"] == new_secret
@pytest.mark.asyncio
async def test_a_refresh_token(admin: KeycloakAdmin):
"""Test refresh token on connection even if it is expired.

51
tests/test_keycloak_openid.py

@ -488,7 +488,7 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"interval": 5,
}
#async function start
# async function start
@pytest.mark.asyncio
async def test_a_well_known(oid: KeycloakOpenID):
@ -557,6 +557,7 @@ async def test_a_well_known(oid: KeycloakOpenID):
]:
assert key in res
@pytest.mark.asyncio
async def test_a_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method.
@ -574,6 +575,7 @@ async def test_a_auth_url(env, oid: KeycloakOpenID):
+ "&redirect_uri=http://test.test/*&scope=email&state="
)
@pytest.mark.asyncio
async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method.
@ -623,6 +625,7 @@ async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"token_type": "Bearer",
}
@pytest.mark.asyncio
async def test_a_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
@ -676,6 +679,7 @@ async def test_a_exchange_token(
}
assert token != new_token
@pytest.mark.asyncio
async def test_a_logout(oid_with_credentials):
"""Test logout.
@ -692,6 +696,7 @@ async def test_a_logout(oid_with_credentials):
with pytest.raises(KeycloakAuthenticationError):
await oid.a_userinfo(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_certs(oid: KeycloakOpenID):
"""Test certificates.
@ -701,6 +706,7 @@ async def test_a_certs(oid: KeycloakOpenID):
"""
assert len((await oid.a_certs())["keys"]) == 2
@pytest.mark.asyncio
async def test_a_public_key(oid: KeycloakOpenID):
"""Test public key.
@ -710,6 +716,7 @@ async def test_a_public_key(oid: KeycloakOpenID):
"""
assert await oid.a_public_key() is not None
@pytest.mark.asyncio
async def test_a_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
@ -731,6 +738,7 @@ async def test_a_entitlement(
with pytest.raises(KeycloakDeprecationError):
await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id)
@pytest.mark.asyncio
async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect.
@ -747,7 +755,10 @@ async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str
) == {"active": False}
with pytest.raises(KeycloakRPTNotFound):
await oid.a_introspect(token=token["access_token"], token_type_hint="requesting_party_token")
await oid.a_introspect(
token=token["access_token"], token_type_hint="requesting_party_token"
)
@pytest.mark.asyncio
async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
@ -766,8 +777,11 @@ async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, s
assert decoded_access_token["preferred_username"] == username, decoded_access_token
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
@pytest.mark.asyncio
async def test_a_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
async def test_a_load_authorization_config(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
):
"""Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
@ -786,6 +800,7 @@ async def test_a_load_authorization_config(oid_with_credentials_authz: Tuple[Key
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
)
@pytest.mark.asyncio
async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
@ -810,10 +825,12 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID,
policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy
assert [
str(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
str(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
] == ["Policy: test (role)"]
assert [
repr(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
repr(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id
@ -821,6 +838,7 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID,
with pytest.raises(KeycloakInvalidTokenError):
await oid.a_get_policies(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
@ -840,7 +858,9 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
orig_client_id = oid.client_id
oid.client_id = "account"
assert await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
assert (
await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
)
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
policy.add_permission(
@ -851,11 +871,15 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode")
for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
)
] == ["Permission: test-perm (resource)"]
assert [
repr(x)
for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode")
for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
)
] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id
@ -863,6 +887,7 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
with pytest.raises(KeycloakInvalidTokenError):
await oid.a_get_permissions(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
@ -875,7 +900,10 @@ async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
token = await oid.a_token(username=username, password=password)
assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1
assert (await oid.a_uma_permissions(token=token["access_token"]))[0]["rsname"] == "Default Resource"
assert (await oid.a_uma_permissions(token=token["access_token"]))[0][
"rsname"
] == "Default Resource"
@pytest.mark.asyncio
async def test_a_has_uma_access(
@ -897,7 +925,9 @@ async def test_a_has_uma_access(
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
assert (
str(await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource"))
str(
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")
)
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
@ -919,6 +949,7 @@ async def test_a_has_uma_access(
+ "{'Default Resource'})"
)
@pytest.mark.asyncio
async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"""Test device authorization flow.

8
tests/test_keycloak_uma.py

@ -311,7 +311,7 @@ def test_uma_permission_ticket(uma: KeycloakUMA):
uma.resource_set_delete(resource["_id"])
#async function start
# async function start
@pytest.mark.asyncio
async def test_a_uma_well_known(uma: KeycloakUMA):
@ -326,6 +326,7 @@ async def test_a_uma_well_known(uma: KeycloakUMA):
for key in ["resource_registration_endpoint"]:
assert key in res
@pytest.mark.asyncio
async def test_a_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
@ -334,7 +335,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
:type uma: KeycloakUMA
"""
# Check that only the default resource is present
resource_sets = await uma.a_resource_set_list()
resource_sets = uma.resource_set_list()
resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
@ -422,6 +423,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
await uma.a_resource_set_delete(resource_id=created_resource["_id"])
assert err.match("404: b''")
@pytest.mark.asyncio
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
@ -521,6 +523,7 @@ async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
admin.delete_realm_role(role_id)
admin.delete_group(group_id)
@pytest.mark.asyncio
async def test_a_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
@ -555,6 +558,7 @@ async def test_a_uma_access(uma: KeycloakUMA):
assert not await uma.a_permissions_check(token["access_token"], permissions)
uma.resource_set_delete(resource["_id"])
@pytest.mark.asyncio
async def test_a_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.

Loading…
Cancel
Save