|
|
@ -3270,12 +3270,12 @@ async def test_a_users(admin: KeycloakAdmin, realm: str): |
|
|
|
# Test disable user |
|
|
|
res = await admin.a_disable_user(user_id=user_id) |
|
|
|
assert res == {}, res |
|
|
|
assert not await admin.a_get_user(user_id=user_id)["enabled"] |
|
|
|
assert not (await admin.a_get_user(user_id=user_id))["enabled"] |
|
|
|
|
|
|
|
# Test enable user |
|
|
|
res = await admin.a_enable_user(user_id=user_id) |
|
|
|
assert res == {}, res |
|
|
|
assert await admin.a_get_user(user_id=user_id)["enabled"] |
|
|
|
assert (await admin.a_get_user(user_id=user_id))["enabled"] |
|
|
|
|
|
|
|
# Test get users again |
|
|
|
users = await admin.a_get_users() |
|
|
@ -3350,21 +3350,21 @@ async def test_a_enable_disable_all_users(admin: KeycloakAdmin, realm: str): |
|
|
|
payload={"username": "test3", "email": "test3@test.test", "enabled": True} |
|
|
|
) |
|
|
|
|
|
|
|
assert await admin.a_get_user(user_id_1)["enabled"] |
|
|
|
assert await admin.a_get_user(user_id_2)["enabled"] |
|
|
|
assert await admin.a_get_user(user_id_3)["enabled"] |
|
|
|
assert (await admin.a_get_user(user_id_1))["enabled"] |
|
|
|
assert (await admin.a_get_user(user_id_2))["enabled"] |
|
|
|
assert (await admin.a_get_user(user_id_3))["enabled"] |
|
|
|
|
|
|
|
admin.a_disable_all_users() |
|
|
|
await admin.a_disable_all_users() |
|
|
|
|
|
|
|
assert not await admin.a_get_user(user_id_1)["enabled"] |
|
|
|
assert not await admin.a_get_user(user_id_2)["enabled"] |
|
|
|
assert not await admin.a_get_user(user_id_3)["enabled"] |
|
|
|
assert not (await admin.a_get_user(user_id_1))["enabled"] |
|
|
|
assert not (await admin.a_get_user(user_id_2))["enabled"] |
|
|
|
assert not (await admin.a_get_user(user_id_3))["enabled"] |
|
|
|
|
|
|
|
admin.a_enable_all_users() |
|
|
|
await admin.a_enable_all_users() |
|
|
|
|
|
|
|
assert await admin.a_get_user(user_id_1)["enabled"] |
|
|
|
assert await admin.a_get_user(user_id_2)["enabled"] |
|
|
|
assert await admin.a_get_user(user_id_3)["enabled"] |
|
|
|
assert (await admin.a_get_user(user_id_1))["enabled"] |
|
|
|
assert (await admin.a_get_user(user_id_2))["enabled"] |
|
|
|
assert (await admin.a_get_user(user_id_3))["enabled"] |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_users_roles(admin: KeycloakAdmin, realm: str): |
|
|
@ -3593,7 +3593,7 @@ async def test_a_social_logins(admin: KeycloakAdmin, user: str): |
|
|
|
user_id=user, provider_id="github", provider_userid="test", provider_username="test" |
|
|
|
) |
|
|
|
assert res == dict(), res |
|
|
|
|
|
|
|
|
|
|
|
# Test add social login fail |
|
|
|
with pytest.raises(KeycloakPostError) as err: |
|
|
|
await admin.a_add_user_social_login( |
|
|
@ -3629,7 +3629,7 @@ async def test_a_server_info(admin: KeycloakAdmin): |
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
info = admin.a_get_server_info() |
|
|
|
info = await admin.a_get_server_info() |
|
|
|
assert set(info.keys()).issubset( |
|
|
|
{ |
|
|
|
"systemInfo", |
|
|
@ -3894,7 +3894,7 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): |
|
|
|
assert res == dict(), res |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPutError) as err: |
|
|
|
admin.update_client(client_id="does-not-exist", payload={"name": "test-client-change"}) |
|
|
|
await admin.a_update_client(client_id="does-not-exist", payload={"name": "test-client-change"}) |
|
|
|
assert err.match('404: b\'{"error":"Could not find client".*}\'') |
|
|
|
|
|
|
|
# Test client mappers |
|
|
@ -3916,7 +3916,7 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): |
|
|
|
assert res == b"" |
|
|
|
assert len(await admin.a_get_mappers_from_client(client_id=client_id)) == 1 |
|
|
|
|
|
|
|
mapper = await admin.a_get_mappers_from_client(client_id=client_id)[0] |
|
|
|
mapper = (await admin.a_get_mappers_from_client(client_id=client_id))[0] |
|
|
|
with pytest.raises(KeycloakPutError) as err: |
|
|
|
await admin.a_update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict()) |
|
|
|
assert err.match('404: b\'{"error":"Model not found".*}\'') |
|
|
@ -3939,7 +3939,7 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): |
|
|
|
assert await admin.a_get_client_sessions_stats() == list() |
|
|
|
|
|
|
|
# Test authz |
|
|
|
auth_client_id = admin.a_create_client( |
|
|
|
auth_client_id = await admin.a_create_client( |
|
|
|
payload={ |
|
|
|
"name": "authz-client", |
|
|
|
"clientId": "authz-client", |
|
|
@ -3953,7 +3953,7 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): |
|
|
|
assert len(res["policies"]) >= 0 |
|
|
|
|
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
aawait admin.a_get_client_authz_settings(client_id=client_id) |
|
|
|
await admin.a_get_client_authz_settings(client_id=client_id) |
|
|
|
assert err.match(HTTP_404_REGEX) |
|
|
|
|
|
|
|
# Authz resources |
|
|
@ -4022,7 +4022,7 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): |
|
|
|
await admin.a_get_client_authz_policies(client_id="does-not-exist") |
|
|
|
assert err.match('404: b\'{"error":"Could not find client".*}\'') |
|
|
|
|
|
|
|
role_id = await admin.a_get_realm_role(role_name="offline_access")["id"] |
|
|
|
role_id = (await admin.a_get_realm_role(role_name="offline_access"))["id"] |
|
|
|
res = await admin.a_create_client_authz_role_based_policy( |
|
|
|
client_id=auth_client_id, |
|
|
|
payload={"name": "test-authz-rb-policy", "roles": [{"id": role_id}]}, |
|
|
@ -4232,7 +4232,7 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str): |
|
|
|
assert role_id == role_id_2 |
|
|
|
|
|
|
|
# Test get realm role by its id |
|
|
|
role_id = await admin.a_get_realm_role(role_name="test-realm-role")["id"] |
|
|
|
role_id = (await admin.a_get_realm_role(role_name="test-realm-role"))["id"] |
|
|
|
res = await admin.a_get_realm_role_by_id(role_id) |
|
|
|
assert res["name"] == "test-realm-role" |
|
|
|
|
|
|
@ -4702,7 +4702,7 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): |
|
|
|
await admin.a_get_client_role(client_id=client, role_name="bad") |
|
|
|
assert err.match(COULD_NOT_FIND_ROLE_REGEX) |
|
|
|
|
|
|
|
res_ = admin.a_get_client_role_id(client_id=client, role_name="client-role-test") |
|
|
|
res_ = await admin.a_get_client_role_id(client_id=client, role_name="client-role-test") |
|
|
|
assert res_ == res["id"] |
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
await admin.a_get_client_role_id(client_id=client, role_name="bad") |
|
|
@ -4736,11 +4736,11 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): |
|
|
|
res = await admin.a_assign_client_role( |
|
|
|
user_id=user_id, |
|
|
|
client_id=client, |
|
|
|
roles=[admin.get_client_role(client_id=client, role_name="client-role-test-update")], |
|
|
|
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], |
|
|
|
) |
|
|
|
assert res == dict() |
|
|
|
assert ( |
|
|
|
len(admin.get_client_role_members(client_id=client, role_name="client-role-test-update")) |
|
|
|
len(await admin.a_get_client_role_members(client_id=client, role_name="client-role-test-update")) |
|
|
|
== 1 |
|
|
|
) |
|
|
|
|
|
|
@ -5158,12 +5158,12 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): |
|
|
|
payload={"required": "yes"}, flow_alias="test-create" |
|
|
|
) |
|
|
|
assert err.match('400: b\'{"error":"Unrecognized field') |
|
|
|
payload = await admin.a_get_authentication_flow_executions(flow_alias="test-create")[0] |
|
|
|
payload = (await admin.a_get_authentication_flow_executions(flow_alias="test-create"))[0] |
|
|
|
payload["displayName"] = "test" |
|
|
|
res = await admin.a_update_authentication_flow_executions(payload=payload, flow_alias="test-create") |
|
|
|
assert res |
|
|
|
|
|
|
|
exec_id = await admin.a_get_authentication_flow_executions(flow_alias="test-create")[0]["id"] |
|
|
|
exec_id = (await admin.a_get_authentication_flow_executions(flow_alias="test-create"))[0]["id"] |
|
|
|
res = await admin.a_delete_authentication_flow_execution(execution_id=exec_id) |
|
|
|
assert res == dict() |
|
|
|
with pytest.raises(KeycloakDeleteError) as err: |
|
|
@ -5310,7 +5310,7 @@ async def test_a_client_scopes(admin: KeycloakAdmin, realm: str): |
|
|
|
client_scope_id=res, payload={"name": "test-scope-update"} |
|
|
|
) |
|
|
|
assert res_update == dict() |
|
|
|
assert await admin.a_get_client_scope(client_scope_id=res)["name"] == "test-scope-update" |
|
|
|
assert (await admin.a_get_client_scope(client_scope_id=res))["name"] == "test-scope-update" |
|
|
|
|
|
|
|
# Test get mappers |
|
|
|
mappers = await admin.a_get_mappers_from_client_scope(client_scope_id=res) |
|
|
@ -5333,7 +5333,7 @@ async def test_a_client_scopes(admin: KeycloakAdmin, realm: str): |
|
|
|
assert len(await admin.a_get_mappers_from_client_scope(client_scope_id=res)) == 1 |
|
|
|
|
|
|
|
# Test update mapper |
|
|
|
test_mapper = await admin.a_get_mappers_from_client_scope(client_scope_id=res)[0] |
|
|
|
test_mapper = (await admin.a_get_mappers_from_client_scope(client_scope_id=res))[0] |
|
|
|
with pytest.raises(KeycloakPutError) as err: |
|
|
|
await admin.a_update_mapper_in_client_scope( |
|
|
|
client_scope_id="does-not-exist", protocol_mapper_id=test_mapper["id"], payload=dict() |
|
|
@ -5345,7 +5345,7 @@ async def test_a_client_scopes(admin: KeycloakAdmin, realm: str): |
|
|
|
) |
|
|
|
assert res_update == dict() |
|
|
|
assert ( |
|
|
|
await admin.a_get_mappers_from_client_scope(client_scope_id=res)[0]["config"]["user.attribute"] |
|
|
|
(await admin.a_get_mappers_from_client_scope(client_scope_id=res))[0]["config"]["user.attribute"] |
|
|
|
== "test" |
|
|
|
) |
|
|
|
|
|
|
@ -5444,7 +5444,7 @@ async def test_a_components(admin: KeycloakAdmin, realm: str): |
|
|
|
} |
|
|
|
) |
|
|
|
assert res |
|
|
|
assert await admin.a_get_component(component_id=res)["name"] == "Test Component" |
|
|
|
assert (await admin.a_get_component(component_id=res))["name"] == "Test Component" |
|
|
|
|
|
|
|
# Test update component |
|
|
|
component = await admin.a_get_component(component_id=res) |
|
|
@ -5455,7 +5455,7 @@ async def test_a_components(admin: KeycloakAdmin, realm: str): |
|
|
|
assert err.match('404: b\'{"error":"Could not find component".*}\'') |
|
|
|
res_upd = await admin.a_update_component(component_id=res, payload=component) |
|
|
|
assert res_upd == dict() |
|
|
|
assert await admin.a_get_component(component_id=res)["name"] == "Test Component Update" |
|
|
|
assert (await admin.a_get_component(component_id=res))["name"] == "Test Component Update" |
|
|
|
|
|
|
|
# Test delete component |
|
|
|
res_del = await admin.a_delete_component(component_id=res) |
|
|
@ -5474,15 +5474,15 @@ async def test_a_keys(admin: KeycloakAdmin, realm: str): |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
assert set(await admin.a_get_keys()["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} or set( |
|
|
|
await admin.a_get_keys()["active"].keys() |
|
|
|
assert set((await admin.a_get_keys())["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} or set( |
|
|
|
(await admin.a_get_keys())["active"].keys() |
|
|
|
) == {"RSA-OAEP", "RS256", "HS512", "AES"} |
|
|
|
assert {k["algorithm"] for k in await admin.a_get_keys()["keys"]} == { |
|
|
|
assert {k["algorithm"] for k in (await admin.a_get_keys())["keys"]} == { |
|
|
|
"HS256", |
|
|
|
"RSA-OAEP", |
|
|
|
"AES", |
|
|
|
"RS256", |
|
|
|
} or {k["algorithm"] for k in await admin.a_get_keys()["keys"]} == { |
|
|
|
} or {k["algorithm"] for k in (await admin.a_get_keys())["keys"]} == { |
|
|
|
"HS512", |
|
|
|
"RSA-OAEP", |
|
|
|
"AES", |
|
|
@ -5935,7 +5935,7 @@ async def test_a_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
assert res == {} |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
async def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
"""Test clearing the user cache. |
|
|
|
|
|
|
|
:param realm: Realm name |
|
|
|