diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 56268b7..55b10ce 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -5209,3 +5209,446 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): await admin.a_delete_authentication_flow(flow_id=flow_id) assert err.match('404: b\'{"error":"Could not find flow with id".*}\'') +@pytest.mark.asyncio +async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str): + """Test authentication configs. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + admin.change_current_realm(realm) + + # Test list of auth providers + res = await admin.a_get_authenticator_providers() + assert len(res) <= 38 + + res = await admin.a_get_authenticator_provider_config_description(provider_id="auth-cookie") + assert res == { + "helpText": "Validates the SSO cookie set by the auth server.", + "name": "Cookie", + "properties": [], + "providerId": "auth-cookie", + } + + # Test authenticator config + # Currently unable to find a sustainable way to fetch the config id, + # therefore testing only failures + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_authenticator_config(config_id="bad") + assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'') + + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_authenticator_config(payload=dict(), config_id="bad") + assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'') + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_authenticator_config(config_id="bad") + assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'') + +@pytest.mark.asyncio +async def test_a_sync_users(admin: KeycloakAdmin, realm: str): + """Test sync users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + # Only testing the error message + with pytest.raises(KeycloakPostError) as err: + await admin.a_sync_users(storage_id="does-not-exist", action="triggerFullSync") + assert err.match('404: b\'{"error":"Could not find component".*}\'') + +@pytest.mark.asyncio +async def test_a_client_scopes(admin: KeycloakAdmin, realm: str): + """Test client scopes. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + # Test get client scopes + res = await admin.a_get_client_scopes() + scope_names = {x["name"] for x in res} + assert len(res) == 10 + assert "email" in scope_names + assert "profile" in scope_names + assert "offline_access" in scope_names + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_scope(client_scope_id="does-not-exist") + assert err.match(NO_CLIENT_SCOPE_REGEX) + + scope = await admin.a_get_client_scope(client_scope_id=res[0]["id"]) + assert res[0] == scope + + scope = await admin.a_get_client_scope_by_name(client_scope_name=res[0]["name"]) + assert res[0] == scope + + # Test create client scope + res = await admin.a_create_client_scope(payload={"name": "test-scope"}, skip_exists=True) + assert res + res2 = await admin.a_create_client_scope(payload={"name": "test-scope"}, skip_exists=True) + assert res == res2 + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_scope(payload={"name": "test-scope"}, skip_exists=False) + assert err.match('409: b\'{"errorMessage":"Client Scope test-scope already exists"}\'') + + # Test update client scope + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_client_scope(client_scope_id="does-not-exist", payload=dict()) + assert err.match(NO_CLIENT_SCOPE_REGEX) + + res_update = await admin.a_update_client_scope( + client_scope_id=res, payload={"name": "test-scope-update"} + ) + assert res_update == dict() + assert await admin.a_get_client_scope(client_scope_id=res)["name"] == "test-scope-update" + + # Test get mappers + mappers = await admin.a_get_mappers_from_client_scope(client_scope_id=res) + assert mappers == list() + + # Test add mapper + with pytest.raises(KeycloakPostError) as err: + await admin.a_add_mapper_to_client_scope(client_scope_id=res, payload=dict()) + assert err.match('404: b\'{"error":"ProtocolMapper provider not found".*}\'') + + res_add = await admin.a_add_mapper_to_client_scope( + client_scope_id=res, + payload={ + "name": "test-mapper", + "protocol": "openid-connect", + "protocolMapper": "oidc-usermodel-attribute-mapper", + }, + ) + assert res_add == b"" + assert len(await admin.a_get_mappers_from_client_scope(client_scope_id=res)) == 1 + + # Test update mapper + test_mapper = await admin.a_get_mappers_from_client_scope(client_scope_id=res)[0] + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_mapper_in_client_scope( + client_scope_id="does-not-exist", protocol_mapper_id=test_mapper["id"], payload=dict() + ) + assert err.match(NO_CLIENT_SCOPE_REGEX) + test_mapper["config"]["user.attribute"] = "test" + res_update = await admin.a_update_mapper_in_client_scope( + client_scope_id=res, protocol_mapper_id=test_mapper["id"], payload=test_mapper + ) + assert res_update == dict() + assert ( + await admin.a_get_mappers_from_client_scope(client_scope_id=res)[0]["config"]["user.attribute"] + == "test" + ) + + # Test delete mapper + res_del = await admin.a_delete_mapper_from_client_scope( + client_scope_id=res, protocol_mapper_id=test_mapper["id"] + ) + assert res_del == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_mapper_from_client_scope( + client_scope_id=res, protocol_mapper_id=test_mapper["id"] + ) + assert err.match('404: b\'{"error":"Model not found".*}\'') + + # Test default default scopes + res_defaults = await admin.a_get_default_default_client_scopes() + assert len(res_defaults) == 6 + + with pytest.raises(KeycloakPutError) as err: + await admin.a_add_default_default_client_scope(scope_id="does-not-exist") + assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX) + + res_add = await admin.a_add_default_default_client_scope(scope_id=res) + assert res_add == dict() + assert len(await admin.a_get_default_default_client_scopes()) == 7 + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_default_default_client_scope(scope_id="does-not-exist") + assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX) + + res_del = await admin.a_delete_default_default_client_scope(scope_id=res) + assert res_del == dict() + assert len(await admin.a_get_default_default_client_scopes()) == 6 + + # Test default optional scopes + res_defaults = await admin.a_get_default_optional_client_scopes() + assert len(res_defaults) == 4 + + with pytest.raises(KeycloakPutError) as err: + await admin.a_add_default_optional_client_scope(scope_id="does-not-exist") + assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX) + + res_add = await admin.a_add_default_optional_client_scope(scope_id=res) + assert res_add == dict() + assert len(await admin.a_get_default_optional_client_scopes()) == 5 + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_default_optional_client_scope(scope_id="does-not-exist") + assert err.match(CLIENT_SCOPE_NOT_FOUND_REGEX) + + res_del = await admin.a_delete_default_optional_client_scope(scope_id=res) + assert res_del == dict() + assert len(await admin.a_get_default_optional_client_scopes()) == 4 + + # Test client scope delete + res_del = await admin.a_delete_client_scope(client_scope_id=res) + assert res_del == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_client_scope(client_scope_id=res) + assert err.match(NO_CLIENT_SCOPE_REGEX) + +@pytest.mark.asyncio +async def test_a_components(admin: KeycloakAdmin, realm: str): + """Test components. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + # Test get components + res = await admin.a_get_components() + assert len(res) == 12 + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_component(component_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find component".*}\'') + + res_get = await admin.a_get_component(component_id=res[0]["id"]) + assert res_get == res[0] + + # Test create component + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_component(payload={"bad": "dict"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + res = await admin.a_create_component( + payload={ + "name": "Test Component", + "providerId": "max-clients", + "providerType": "org.keycloak.services.clientregistration." + + "policy.ClientRegistrationPolicy", + "config": {"max-clients": ["1000"]}, + } + ) + assert res + assert await admin.a_get_component(component_id=res)["name"] == "Test Component" + + # Test update component + component = await admin.a_get_component(component_id=res) + component["name"] = "Test Component Update" + + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_component(component_id="does-not-exist", payload=dict()) + assert err.match('404: b\'{"error":"Could not find component".*}\'') + res_upd = await admin.a_update_component(component_id=res, payload=component) + assert res_upd == dict() + assert await admin.a_get_component(component_id=res)["name"] == "Test Component Update" + + # Test delete component + res_del = await admin.a_delete_component(component_id=res) + assert res_del == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_component(component_id=res) + assert err.match('404: b\'{"error":"Could not find component".*}\'') + +@pytest.mark.asyncio +async def test_a_keys(admin: KeycloakAdmin, realm: str): + """Test keys. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + assert set(await admin.a_get_keys()["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} or set( + await admin.a_get_keys()["active"].keys() + ) == {"RSA-OAEP", "RS256", "HS512", "AES"} + assert {k["algorithm"] for k in await admin.a_get_keys()["keys"]} == { + "HS256", + "RSA-OAEP", + "AES", + "RS256", + } or {k["algorithm"] for k in await admin.a_get_keys()["keys"]} == { + "HS512", + "RSA-OAEP", + "AES", + "RS256", + } + +@pytest.mark.asyncio +async def test_a_admin_events(admin: KeycloakAdmin, realm: str): + """Test events. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + await admin.a_create_client(payload={"name": "test", "clientId": "test"}) + + events = await admin.a_get_admin_events() + assert events == list() + +@pytest.mark.asyncio +async def test_a_user_events(admin: KeycloakAdmin, realm: str): + """Test events. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + events = await admin.a_get_events() + assert events == list() + + with pytest.raises(KeycloakPutError) as err: + await admin.a_set_events(payload={"bad": "conf"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + res = await admin.a_set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True}) + assert res == dict() + + await admin.a_create_client(payload={"name": "test", "clientId": "test"}) + + events = await admin.a_get_events() + assert events == list() + +@pytest.mark.asyncio +@freezegun.freeze_time("2023-02-25 10:00:00") +async def test_a_auto_refresh(admin_frozen: KeycloakAdmin, realm: str): + """Test auto refresh token. + + :param admin_frozen: Keycloak Admin client with time frozen in place + :type admin_frozen: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + admin = admin_frozen + # Test get refresh + admin.connection.custom_headers = { + "Authorization": "Bearer bad", + "Content-Type": "application/json", + } + + with pytest.raises(KeycloakAuthenticationError) as err: + await admin.a_get_realm(realm_name=realm) + assert err.match('401: b\'{"error":"HTTP 401 Unauthorized".*}\'') + + # Freeze time to simulate the access token expiring + with freezegun.freeze_time("2023-02-25 10:05:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:05:00") + assert await admin.a_get_realm(realm_name=realm) + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:05:00") + + # Test bad refresh token, but first make sure access token has expired again + with freezegun.freeze_time("2023-02-25 10:10:00"): + admin.connection.custom_headers = {"Content-Type": "application/json"} + admin.connection.token["refresh_token"] = "bad" + with pytest.raises(KeycloakPostError) as err: + await admin.a_get_realm(realm_name="test-refresh") + assert err.match( + '400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\'' + ) + admin.connection.get_token() + + # Test post refresh + with freezegun.freeze_time("2023-02-25 10:15:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:15:00") + admin.connection.token = None + assert await admin.a_create_realm(payload={"realm": "test-refresh"}) == b"" + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:15:00") + + # Test update refresh + with freezegun.freeze_time("2023-02-25 10:25:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00") + admin.connection.token = None + assert ( + await admin.a_update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) + == dict() + ) + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00") + + # Test delete refresh + with freezegun.freeze_time("2023-02-25 10:35:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:35:00") + admin.connection.token = None + assert await admin.a_delete_realm(realm_name="test-refresh") == dict() + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00") + +@pytest.mark.asyncio +async def test_a_get_required_actions(admin: KeycloakAdmin, realm: str): + """Test required actions. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + ractions = await admin.a_get_required_actions() + assert isinstance(ractions, list) + for ra in ractions: + for key in [ + "alias", + "name", + "providerId", + "enabled", + "defaultAction", + "priority", + "config", + ]: + assert key in ra + +@pytest.mark.asyncio +async def test_a_get_required_action_by_alias(admin: KeycloakAdmin, realm: str): + """Test get required action by alias. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + ractions = await admin.a_get_required_actions() + ra = await admin.a_get_required_action_by_alias("UPDATE_PASSWORD") + assert ra in ractions + assert ra["alias"] == "UPDATE_PASSWORD" + assert await admin.a_get_required_action_by_alias("does-not-exist") is None + +@pytest.mark.asyncio +async def test_a_update_required_action(admin: KeycloakAdmin, realm: str): + """Test update required action. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + ra = awaitadmin.a_get_required_action_by_alias("UPDATE_PASSWORD") + old = copy.deepcopy(ra) + ra["enabled"] = False + admin.update_required_action("UPDATE_PASSWORD", ra) + newra = await admin.a_get_required_action_by_alias("UPDATE_PASSWORD") + assert old != newra + assert newra["enabled"] is False + +