Browse Source

write tests for keycloak admin async functions

pull/565/head
David 12 months ago
parent
commit
0a33cd5106
  1. 443
      tests/test_keycloak_admin.py

443
tests/test_keycloak_admin.py

@ -5209,3 +5209,446 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str):
await admin.a_delete_authentication_flow(flow_id=flow_id)
assert err.match('404: b\'{"error":"Could not find flow with id".*}\'')
@pytest.mark.asyncio
async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str):
"""Test authentication configs.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin.change_current_realm(realm)
# Test list of auth providers
res = await admin.a_get_authenticator_providers()
assert len(res) <= 38
res = await admin.a_get_authenticator_provider_config_description(provider_id="auth-cookie")
assert res == {
"helpText": "Validates the SSO cookie set by the auth server.",
"name": "Cookie",
"properties": [],
"providerId": "auth-cookie",
}
# Test authenticator config
# Currently unable to find a sustainable way to fetch the config id,
# therefore testing only failures
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_authenticator_config(config_id="bad")
assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'')
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_authenticator_config(payload=dict(), config_id="bad")
assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'')
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_authenticator_config(config_id="bad")
assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'')
@pytest.mark.asyncio
async def test_a_sync_users(admin: KeycloakAdmin, realm: str):
"""Test sync users.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
# Only testing the error message
with pytest.raises(KeycloakPostError) as err:
await admin.a_sync_users(storage_id="does-not-exist", action="triggerFullSync")
assert err.match('404: b\'{"error":"Could not find component".*}\'')
@pytest.mark.asyncio
async def test_a_client_scopes(admin: KeycloakAdmin, realm: str):
"""Test client scopes.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
# Test get client scopes
res = await admin.a_get_client_scopes()
scope_names = {x["name"] for x in res}
assert len(res) == 10
assert "email" in scope_names
assert "profile" in scope_names
assert "offline_access" in scope_names
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_client_scope(client_scope_id="does-not-exist")
assert err.match(NO_CLIENT_SCOPE_REGEX)
scope = await admin.a_get_client_scope(client_scope_id=res[0]["id"])
assert res[0] == scope
scope = await admin.a_get_client_scope_by_name(client_scope_name=res[0]["name"])
assert res[0] == scope
# Test create client scope
res = await admin.a_create_client_scope(payload={"name": "test-scope"}, skip_exists=True)
assert res
res2 = await admin.a_create_client_scope(payload={"name": "test-scope"}, skip_exists=True)
assert res == res2
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_client_scope(payload={"name": "test-scope"}, skip_exists=False)
assert err.match('409: b\'{"errorMessage":"Client Scope test-scope already exists"}\'')
# Test update client scope
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_client_scope(client_scope_id="does-not-exist", payload=dict())
assert err.match(NO_CLIENT_SCOPE_REGEX)
res_update = await admin.a_update_client_scope(
client_scope_id=res, payload={"name": "test-scope-update"}
)
assert res_update == dict()
assert await admin.a_get_client_scope(client_scope_id=res)["name"] == "test-scope-update"
# Test get mappers
mappers = await admin.a_get_mappers_from_client_scope(client_scope_id=res)
assert mappers == list()
# Test add mapper
with pytest.raises(KeycloakPostError) as err:
await admin.a_add_mapper_to_client_scope(client_scope_id=res, payload=dict())
assert err.match('404: b\'{"error":"ProtocolMapper provider not found".*}\'')
res_add = await admin.a_add_mapper_to_client_scope(
client_scope_id=res,
payload={
"name": "test-mapper",
"protocol": "openid-connect",
"protocolMapper": "oidc-usermodel-attribute-mapper",
},
)
assert res_add == b""
assert len(await admin.a_get_mappers_from_client_scope(client_scope_id=res)) == 1
# Test update mapper
test_mapper = await admin.a_get_mappers_from_client_scope(client_scope_id=res)[0]
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_mapper_in_client_scope(
client_scope_id="does-not-exist", protocol_mapper_id=test_mapper["id"], payload=dict()
)
assert err.match(NO_CLIENT_SCOPE_REGEX)
test_mapper["config"]["user.attribute"] = "test"
res_update = await admin.a_update_mapper_in_client_scope(
client_scope_id=res, protocol_mapper_id=test_mapper["id"], payload=test_mapper
)
assert res_update == dict()
assert (
await admin.a_get_mappers_from_client_scope(client_scope_id=res)[0]["config"]["user.attribute"]
== "test"
)
# Test delete mapper
res_del = await admin.a_delete_mapper_from_client_scope(
client_scope_id=res, protocol_mapper_id=test_mapper["id"]
)
assert res_del == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_mapper_from_client_scope(
client_scope_id=res, protocol_mapper_id=test_mapper["id"]
)
assert err.match('404: b\'{"error":"Model not found".*}\'')
# Test default default scopes
res_defaults = await admin.a_get_default_default_client_scopes()
assert len(res_defaults) == 6
with pytest.raises(KeycloakPutError) as err:
await admin.a_add_default_default_client_scope(scope_id="does-not-exist")
assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX)
res_add = await admin.a_add_default_default_client_scope(scope_id=res)
assert res_add == dict()
assert len(await admin.a_get_default_default_client_scopes()) == 7
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_default_default_client_scope(scope_id="does-not-exist")
assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX)
res_del = await admin.a_delete_default_default_client_scope(scope_id=res)
assert res_del == dict()
assert len(await admin.a_get_default_default_client_scopes()) == 6
# Test default optional scopes
res_defaults = await admin.a_get_default_optional_client_scopes()
assert len(res_defaults) == 4
with pytest.raises(KeycloakPutError) as err:
await admin.a_add_default_optional_client_scope(scope_id="does-not-exist")
assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX)
res_add = await admin.a_add_default_optional_client_scope(scope_id=res)
assert res_add == dict()
assert len(await admin.a_get_default_optional_client_scopes()) == 5
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_default_optional_client_scope(scope_id="does-not-exist")
assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX)
res_del = await admin.a_delete_default_optional_client_scope(scope_id=res)
assert res_del == dict()
assert len(await admin.a_get_default_optional_client_scopes()) == 4
# Test client scope delete
res_del = await admin.a_delete_client_scope(client_scope_id=res)
assert res_del == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_client_scope(client_scope_id=res)
assert err.match(NO_CLIENT_SCOPE_REGEX)
@pytest.mark.asyncio
async def test_a_components(admin: KeycloakAdmin, realm: str):
"""Test components.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
# Test get components
res = await admin.a_get_components()
assert len(res) == 12
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_component(component_id="does-not-exist")
assert err.match('404: b\'{"error":"Could not find component".*}\'')
res_get = await admin.a_get_component(component_id=res[0]["id"])
assert res_get == res[0]
# Test create component
with pytest.raises(KeycloakPostError) as err:
await admin.a_create_component(payload={"bad": "dict"})
assert err.match('400: b\'{"error":"Unrecognized field')
res = await admin.a_create_component(
payload={
"name": "Test Component",
"providerId": "max-clients",
"providerType": "org.keycloak.services.clientregistration."
+ "policy.ClientRegistrationPolicy",
"config": {"max-clients": ["1000"]},
}
)
assert res
assert await admin.a_get_component(component_id=res)["name"] == "Test Component"
# Test update component
component = await admin.a_get_component(component_id=res)
component["name"] = "Test Component Update"
with pytest.raises(KeycloakPutError) as err:
await admin.a_update_component(component_id="does-not-exist", payload=dict())
assert err.match('404: b\'{"error":"Could not find component".*}\'')
res_upd = await admin.a_update_component(component_id=res, payload=component)
assert res_upd == dict()
assert await admin.a_get_component(component_id=res)["name"] == "Test Component Update"
# Test delete component
res_del = await admin.a_delete_component(component_id=res)
assert res_del == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_component(component_id=res)
assert err.match('404: b\'{"error":"Could not find component".*}\'')
@pytest.mark.asyncio
async def test_a_keys(admin: KeycloakAdmin, realm: str):
"""Test keys.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
assert set(await admin.a_get_keys()["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} or set(
await admin.a_get_keys()["active"].keys()
) == {"RSA-OAEP", "RS256", "HS512", "AES"}
assert {k["algorithm"] for k in await admin.a_get_keys()["keys"]} == {
"HS256",
"RSA-OAEP",
"AES",
"RS256",
} or {k["algorithm"] for k in await admin.a_get_keys()["keys"]} == {
"HS512",
"RSA-OAEP",
"AES",
"RS256",
}
@pytest.mark.asyncio
async def test_a_admin_events(admin: KeycloakAdmin, realm: str):
"""Test events.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
await admin.a_create_client(payload={"name": "test", "clientId": "test"})
events = await admin.a_get_admin_events()
assert events == list()
@pytest.mark.asyncio
async def test_a_user_events(admin: KeycloakAdmin, realm: str):
"""Test events.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
events = await admin.a_get_events()
assert events == list()
with pytest.raises(KeycloakPutError) as err:
await admin.a_set_events(payload={"bad": "conf"})
assert err.match('400: b\'{"error":"Unrecognized field')
res = await admin.a_set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True})
assert res == dict()
await admin.a_create_client(payload={"name": "test", "clientId": "test"})
events = await admin.a_get_events()
assert events == list()
@pytest.mark.asyncio
@freezegun.freeze_time("2023-02-25 10:00:00")
async def test_a_auto_refresh(admin_frozen: KeycloakAdmin, realm: str):
"""Test auto refresh token.
:param admin_frozen: Keycloak Admin client with time frozen in place
:type admin_frozen: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin = admin_frozen
# Test get refresh
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
}
with pytest.raises(KeycloakAuthenticationError) as err:
await admin.a_get_realm(realm_name=realm)
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized".*}\'')
# Freeze time to simulate the access token expiring
with freezegun.freeze_time("2023-02-25 10:05:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:05:00")
assert await admin.a_get_realm(realm_name=realm)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:05:00")
# Test bad refresh token, but first make sure access token has expired again
with freezegun.freeze_time("2023-02-25 10:10:00"):
admin.connection.custom_headers = {"Content-Type": "application/json"}
admin.connection.token["refresh_token"] = "bad"
with pytest.raises(KeycloakPostError) as err:
await admin.a_get_realm(realm_name="test-refresh")
assert err.match(
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\''
)
admin.connection.get_token()
# Test post refresh
with freezegun.freeze_time("2023-02-25 10:15:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:15:00")
admin.connection.token = None
assert await admin.a_create_realm(payload={"realm": "test-refresh"}) == b""
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:15:00")
# Test update refresh
with freezegun.freeze_time("2023-02-25 10:25:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00")
admin.connection.token = None
assert (
await admin.a_update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
== dict()
)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00")
# Test delete refresh
with freezegun.freeze_time("2023-02-25 10:35:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:35:00")
admin.connection.token = None
assert await admin.a_delete_realm(realm_name="test-refresh") == dict()
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00")
@pytest.mark.asyncio
async def test_a_get_required_actions(admin: KeycloakAdmin, realm: str):
"""Test required actions.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
ractions = await admin.a_get_required_actions()
assert isinstance(ractions, list)
for ra in ractions:
for key in [
"alias",
"name",
"providerId",
"enabled",
"defaultAction",
"priority",
"config",
]:
assert key in ra
@pytest.mark.asyncio
async def test_a_get_required_action_by_alias(admin: KeycloakAdmin, realm: str):
"""Test get required action by alias.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
ractions = await admin.a_get_required_actions()
ra = await admin.a_get_required_action_by_alias("UPDATE_PASSWORD")
assert ra in ractions
assert ra["alias"] == "UPDATE_PASSWORD"
assert await admin.a_get_required_action_by_alias("does-not-exist") is None
@pytest.mark.asyncio
async def test_a_update_required_action(admin: KeycloakAdmin, realm: str):
"""Test update required action.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
ra = awaitadmin.a_get_required_action_by_alias("UPDATE_PASSWORD")
old = copy.deepcopy(ra)
ra["enabled"] = False
admin.update_required_action("UPDATE_PASSWORD", ra)
newra = await admin.a_get_required_action_by_alias("UPDATE_PASSWORD")
assert old != newra
assert newra["enabled"] is False
Loading…
Cancel
Save