Browse Source

write tests for keycloak admin async functions

pull/565/head
David 12 months ago
parent
commit
3409d63eee
  1. 590
      tests/test_keycloak_admin.py

590
tests/test_keycloak_admin.py

@ -4619,3 +4619,593 @@ async def test_a_client_default_client_scopes(admin: KeycloakAdmin, realm: str,
await admin.a_delete_client_default_client_scope(client_id, new_client_scope_id)
default_client_scopes = await admin.a_get_client_default_client_scopes(client_id)
assert len(default_client_scopes) == 5, default_client_scopes
@pytest.mark.asyncio
async def test_a_client_optional_client_scopes(admin: KeycloakAdmin, realm: str, client: str):
"""Test client assignment of optional client scopes.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
:param client: Keycloak client
:type client: str
"""
await admin.a_change_current_realm(realm)
client_id = await admin.a_create_client(
payload={"name": "role-testing-client", "clientId": "role-testing-client"}
)
# Test get client optional scopes
# keycloak optional roles: microprofile-jwt, offline_access, address, phone
optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id)
assert len(optional_client_scopes) == 4, optional_client_scopes
# Test add a client scope to client optional scopes
optional_client_scope = "test-client-optional-scope"
new_client_scope = {
"name": optional_client_scope,
"description": f"Test Client Scope: {optional_client_scope}",
"protocol": "openid-connect",
"attributes": {},
}
new_client_scope_id = await admin.a_create_client_scope(new_client_scope, skip_exists=False)
new_optional_client_scope_data = {
"realm": realm,
"client": client_id,
"clientScopeId": new_client_scope_id,
}
await admin.a_add_client_optional_client_scope(
client_id, new_client_scope_id, new_optional_client_scope_data
)
optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id)
assert len(optional_client_scopes) == 5, optional_client_scopes
# Test remove a client optional scope
await admin.a_delete_client_optional_client_scope(client_id, new_client_scope_id)
optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id)
assert len(optional_client_scopes) == 4, optional_client_scopes
@pytest.mark.asyncio
async def test_a_client_roles(admin: KeycloakAdmin, client: str):
"""Test client roles.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param client: Keycloak client
:type client: str
"""
# Test get client roles
res = await admin.a_get_client_roles(client_id=client)
assert len(res) == 0
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_roles(client_id="bad")
assert err.match('404: b\'{"error":"Could not find client".*}\'')
# Test create client role
client_role_id = await admin.a_create_client_role(
client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True
)
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_client_role(client_role_id=client, payload={"name": "client-role-test"})
assert err.match('409: b\'{"errorMessage":"Role with name client-role-test already exists"}\'')
client_role_id_2 = await admin.a_create_client_role(
client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True
)
assert client_role_id == client_role_id_2
# Test get client role
res = await admin.a_get_client_role(client_id=client, role_name="client-role-test")
assert res["name"] == client_role_id
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_role(client_id=client, role_name="bad")
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
res_ = admin.a_get_client_role_id(client_id=client, role_name="client-role-test")
assert res_ == res["id"]
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_role_id(client_id=client, role_name="bad")
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
assert len(await admin.a_get_client_roles(client_id=client)) == 1
# Test update client role
res = await admin.a_update_client_role(
client_id=client, role_name="client-role-test", payload={"name": "client-role-test-update"}
)
assert res == dict()
with pytest.raises(KeycloakPutError) as err:
res = await admin.a_update_client_role(
client_id=client,
role_name="client-role-test",
payload={"name": "client-role-test-update"},
)
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
# Test user with client role
res = await admin.a_get_client_role_members(client_id=client, role_name="client-role-test-update")
assert len(res) == 0
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_role_members(client_id=client, role_name="bad")
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
user_id = await admin.a_create_user(payload={"username": "test", "email": "test@test.test"})
with pytest.raises(KeycloakPostError) as err:
await admin.a_assign_client_role(user_id=user_id, client_id=client, roles=["bad"])
assert err.match(UNKOWN_ERROR_REGEX), err
res = await admin.a_assign_client_role(
user_id=user_id,
client_id=client,
roles=[admin.get_client_role(client_id=client, role_name="client-role-test-update")],
)
assert res == dict()
assert (
len(admin.get_client_role_members(client_id=client, role_name="client-role-test-update"))
== 1
)
roles = await admin.a_get_client_roles_of_user(user_id=user_id, client_id=client)
assert len(roles) == 1, roles
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_roles_of_user(user_id=user_id, client_id="bad")
assert err.match(CLIENT_NOT_FOUND_REGEX)
roles = await admin.a_get_composite_client_roles_of_user(user_id=user_id, client_id=client)
assert len(roles) == 1, roles
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_composite_client_roles_of_user(user_id=user_id, client_id="bad")
assert err.match(CLIENT_NOT_FOUND_REGEX)
roles = await admin.a_get_available_client_roles_of_user(user_id=user_id, client_id=client)
assert len(roles) == 0, roles
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_composite_client_roles_of_user(user_id=user_id, client_id="bad")
assert err.match(CLIENT_NOT_FOUND_REGEX)
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_client_roles_of_user(user_id=user_id, client_id=client, roles=["bad"])
assert err.match(UNKOWN_ERROR_REGEX), err
await admin.a_delete_client_roles_of_user(
user_id=user_id,
client_id=client,
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")],
)
assert len(await admin.a_get_client_roles_of_user(user_id=user_id, client_id=client)) == 0
# Test groups and client roles
res = await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update")
assert len(res) == 0
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_role_groups(client_id=client, role_name="bad")
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
group_id = await admin.a_create_group(payload={"name": "test-group"})
res = await admin.a_get_group_client_roles(group_id=group_id, client_id=client)
assert len(res) == 0
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_group_client_roles(group_id=group_id, client_id="bad")
assert err.match(CLIENT_NOT_FOUND_REGEX)
with pytest.raises(KeycloakPostError) as err:
await admin.a_assign_group_client_roles(group_id=group_id, client_id=client, roles=["bad"])
assert err.match(UNKOWN_ERROR_REGEX), err
res = await admin.a_assign_group_client_roles(
group_id=group_id,
client_id=client,
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")],
)
assert res == dict()
assert (
len(await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update"))
== 1
)
assert len(await admin.a_get_group_client_roles(group_id=group_id, client_id=client)) == 1
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_group_client_roles(group_id=group_id, client_id=client, roles=["bad"])
assert err.match(UNKOWN_ERROR_REGEX), err
res = await admin.a_delete_group_client_roles(
group_id=group_id,
client_id=client,
roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")],
)
assert res == dict()
# Test composite client roles
with pytest.raises(KeycloakPostError) as err:
await admin.a_add_composite_client_roles_to_role(
client_role_id=client, role_name="client-role-test-update", roles=["bad"]
)
assert err.match(UNKOWN_ERROR_REGEX), err
res = await admin.a_add_composite_client_roles_to_role(
client_role_id=client,
role_name="client-role-test-update",
roles=[await admin.a_get_realm_role(role_name="offline_access")],
)
assert res == dict()
assert await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")[
"composite"
]
# Test delete of client role
res = await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update")
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update")
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
# Test of roles by id - Get role
await admin.a_create_client_role(
client_role_id=client, payload={"name": "client-role-by-id-test"}, skip_exists=True
)
role = await admin.a_get_client_role(client_id=client, role_name="client-role-by-id-test")
res = admin.a_get_role_by_id(role_id=role["id"])
assert res["name"] == "client-role-by-id-test"
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_role_by_id(role_id="bad")
assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX)
# Test of roles by id - Update role
res = await admin.a_update_role_by_id(
role_id=role["id"], payload={"name": "client-role-by-id-test-update"}
)
assert res == dict()
with pytest.raises(KeycloakPutError) as err:
res = await admin.a_update_role_by_id(
role_id="bad", payload={"name": "client-role-by-id-test-update"}
)
assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX)
# Test of roles by id - Delete role
res = await admin.a_delete_role_by_id(role_id=role["id"])
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_role_by_id(role_id="bad")
assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX)
@pytest.mark.asyncio
async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str):
"""Test enable token exchange.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
:raises AssertionError: In case of bad configuration
"""
# Test enabling token exchange between two confidential clients
await admin.a_change_current_realm(realm)
# Create test clients
source_client_id = await admin.a_create_client(
payload={"name": "Source Client", "clientId": "source-client"}
)
target_client_id = await admin.a_create_client(
payload={"name": "Target Client", "clientId": "target-client"}
)
for c in await admin.a_get_clients():
if c["clientId"] == "realm-management":
realm_management_id = c["id"]
break
else:
raise AssertionError("Missing realm management client")
# Enable permissions on the Superset client
await admin.a_update_client_management_permissions(
payload={"enabled": True}, client_id=target_client_id
)
# Fetch various IDs and strings needed when creating the permission
token_exchange_permission_id = await admin.a_get_client_management_permissions(
client_id=target_client_id
)["scopePermissions"]["token-exchange"]
scopes = await admin.a_get_client_authz_policy_scopes(
client_id=realm_management_id, policy_id=token_exchange_permission_id
)
for s in scopes:
if s["name"] == "token-exchange":
token_exchange_scope_id = s["id"]
break
else:
raise AssertionError("Missing token-exchange scope")
resources = await admin.a_get_client_authz_policy_resources(
client_id=realm_management_id, policy_id=token_exchange_permission_id
)
for r in resources:
if r["name"] == f"client.resource.{target_client_id}":
token_exchange_resource_id = r["_id"]
break
else:
raise AssertionError("Missing client resource")
# Create a client policy for source client
policy_name = "Exchange source client token with target client token"
client_policy_id = await admin.a_create_client_authz_client_policy(
payload={
"type": "client",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"name": policy_name,
"clients": [source_client_id],
},
client_id=realm_management_id,
)["id"]
policies = await admin.a_get_client_authz_client_policies(client_id=realm_management_id)
for policy in policies:
if policy["name"] == policy_name:
assert policy["clients"] == [source_client_id]
break
else:
raise AssertionError("Missing client policy")
# Update permissions on the target client to reference this policy
permission_name = await admin.a_get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
)["name"]
await admin.a_update_client_authz_scope_permission(
payload={
"id": token_exchange_permission_id,
"name": permission_name,
"type": "scope",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"resources": [token_exchange_resource_id],
"scopes": [token_exchange_scope_id],
"policies": [client_policy_id],
},
client_id=realm_management_id,
scope_id=token_exchange_permission_id,
)
# Create permissions on the target client to reference this policy
await admin.a_create_client_authz_scope_permission(
payload={
"id": "some-id",
"name": "test-permission",
"type": "scope",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"resources": [token_exchange_resource_id],
"scopes": [token_exchange_scope_id],
"policies": [client_policy_id],
},
client_id=realm_management_id,
)
permission_name = await admin.a_get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
)["name"]
assert permission_name.startswith("token-exchange.permission.client.")
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_client_authz_scope_permission(
payload={"name": "test-permission", "scopes": [token_exchange_scope_id]},
client_id="realm_management_id",
)
assert err.match('404: b\'{"error":"Could not find client".*}\'')
@pytest.mark.asyncio
async def test_a_email(admin: KeycloakAdmin, user: str):
"""Test email.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param user: Keycloak user
:type user: str
"""
# Emails will fail as we don't have SMTP test setup
with pytest.raises(KeycloakPutError) as err:
await admin.a_send_update_account(user_id=user, payload=dict())
assert err.match(UNKOWN_ERROR_REGEX), err
admin.update_user(user_id=user, payload={"enabled": True})
with pytest.raises(KeycloakPutError) as err:
await admin.a_send_verify_email(user_id=user)
assert err.match('500: b\'{"errorMessage":"Failed to send .*"}\'')
@pytest.mark.asyncio
async def test_a_get_sessions(admin: KeycloakAdmin):
"""Test get sessions.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
sessions = await admin.a_get_sessions(user_id=admin.get_user_id(username=admin.connection.username))
assert len(sessions) >= 1
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_sessions(user_id="bad")
assert err.match(USER_NOT_FOUND_REGEX)
@pytest.mark.asyncio
async def test_a_get_client_installation_provider(admin: KeycloakAdmin, client: str):
"""Test get client installation provider.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param client: Keycloak client
:type client: str
"""
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_installation_provider(client_id=client, provider_id="bad")
assert err.match('404: b\'{"error":"Unknown Provider".*}\'')
installation = await admin.a_get_client_installation_provider(
client_id=client, provider_id="keycloak-oidc-keycloak-json"
)
assert set(installation.keys()) == {
"auth-server-url",
"confidential-port",
"credentials",
"realm",
"resource",
"ssl-required",
}
@pytest.mark.asyncio
async def test_a_auth_flows(admin: KeycloakAdmin, realm: str):
"""Test auth flows.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
res = await admin.a_get_authentication_flows()
assert len(res) <= 8, res
default_flows = len(res)
assert {x["alias"] for x in res}.issubset(
{
"reset credentials",
"browser",
"registration",
"http challenge",
"docker auth",
"direct grant",
"first broker login",
"clients",
}
)
assert set(res[0].keys()) == {
"alias",
"authenticationExecutions",
"builtIn",
"description",
"id",
"providerId",
"topLevel",
}
assert {x["alias"] for x in res}.issubset(
{
"reset credentials",
"browser",
"registration",
"docker auth",
"direct grant",
"first broker login",
"clients",
"http challenge",
}
)
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_authentication_flow_for_id(flow_id="bad")
assert err.match('404: b\'{"error":"Could not find flow with id".*}\'')
browser_flow_id = [x for x in res if x["alias"] == "browser"][0]["id"]
res = await admin.a_get_authentication_flow_for_id(flow_id=browser_flow_id)
assert res["alias"] == "browser"
# Test copying
with pytest.raises(KeycloakPostError) as err:
await admin.a_copy_authentication_flow(payload=dict(), flow_alias="bad")
assert err.match("404: b''")
res = await admin.a_copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser")
assert res == b"", res
assert len(await admin.a_get_authentication_flows()) == (default_flows + 1)
# Test create
res = await admin.a_create_authentication_flow(
payload={"alias": "test-create", "providerId": "basic-flow"}
)
assert res == b""
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_authentication_flow(payload={"alias": "test-create", "builtIn": False})
assert err.match('409: b\'{"errorMessage":"Flow test-create already exists"}\'')
assert await admin.a_create_authentication_flow(
payload={"alias": "test-create"}, skip_exists=True
) == {"msg": "Already exists"}
# Test flow executions
res = await admin.a_get_authentication_flow_executions(flow_alias="browser")
assert len(res) == 8, res
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_authentication_flow_executions(flow_alias="bad")
assert err.match("404: b''")
exec_id = res[0]["id"]
res = await admin.a_get_authentication_flow_execution(execution_id=exec_id)
assert set(res.keys()) == {
"alternative",
"authenticator",
"authenticatorFlow",
"conditional",
"disabled",
"enabled",
"id",
"parentFlow",
"priority",
"required",
"requirement",
}, res
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_authentication_flow_execution(execution_id="bad")
assert err.match(ILLEGAL_EXECUTION_REGEX)
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_authentication_flow_execution(payload=dict(), flow_alias="browser")
assert err.match('400: b\'{"error":"It is illegal to add execution to a built in flow".*}\'')
res = await admin.a_create_authentication_flow_execution(
payload={"provider": "auth-cookie"}, flow_alias="test-create"
)
assert res == b""
assert len(await admin.a_get_authentication_flow_executions(flow_alias="test-create")) == 1
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_authentication_flow_executions(
payload={"required": "yes"}, flow_alias="test-create"
)
assert err.match('400: b\'{"error":"Unrecognized field')
payload = await admin.a_get_authentication_flow_executions(flow_alias="test-create")[0]
payload["displayName"] = "test"
res = await admin.a_update_authentication_flow_executions(payload=payload, flow_alias="test-create")
assert res
exec_id = await admin.a_get_authentication_flow_executions(flow_alias="test-create")[0]["id"]
res = await admin.a_delete_authentication_flow_execution(execution_id=exec_id)
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_authentication_flow_execution(execution_id=exec_id)
assert err.match(ILLEGAL_EXECUTION_REGEX)
# Test subflows
res = await admin.a_create_authentication_flow_subflow(
payload={
"alias": "test-subflow",
"provider": "basic-flow",
"type": "something",
"description": "something",
},
flow_alias="test-browser",
)
assert res == b""
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_authentication_flow_subflow(
payload={"alias": "test-subflow", "providerId": "basic-flow"},
flow_alias="test-browser",
)
assert err.match('409: b\'{"errorMessage":"New flow alias name already exists"}\'')
res = await admin.a_create_authentication_flow_subflow(
payload={
"alias": "test-subflow",
"provider": "basic-flow",
"type": "something",
"description": "something",
},
flow_alias="test-create",
skip_exists=True,
)
assert res == {"msg": "Already exists"}
# Test delete auth flow
flow_id = [x for x in await admin.a_get_authentication_flows() if x["alias"] == "test-browser"][0][
"id"
]
res = await admin.a_delete_authentication_flow(flow_id=flow_id)
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_authentication_flow(flow_id=flow_id)
assert err.match('404: b\'{"error":"Could not find flow with id".*}\'')
Loading…
Cancel
Save