From 76088fa3c9399b906edec2c475853c88eb738f70 Mon Sep 17 00:00:00 2001 From: David Date: Wed, 15 May 2024 16:59:51 -0500 Subject: [PATCH] tests for uma and admin classes --- tests/test_keycloak_admin.py | 1129 ++++++++++++++++++++++++++++++++++ tests/test_keycloak_uma.py | 283 +++++++++ 2 files changed, 1412 insertions(+) diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index cfd724e..2df7fcf 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -3062,3 +3062,1132 @@ def test_refresh_token(admin: KeycloakAdmin): assert admin.connection.token is not None admin.user_logout(admin.get_user_id(admin.connection.username)) admin.connection.refresh_token() + + +#async function start + +@pytest.mark.asyncio +async def test_a_realms(admin: KeycloakAdmin): + """Test realms. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + # Get realms + realms = await admin.a_get_realms() + assert len(realms) == 1, realms + assert "master" == realms[0]["realm"] + + # Create a test realm + res = await admin.a_create_realm(payload={"realm": "test"}) + assert res == b"", res + + # Create the same realm, should fail + with pytest.raises(KeycloakPostError) as err: + res = await admin.a_create_realm(payload={"realm": "test"}) + assert err.match('409: b\'{"errorMessage":"Conflict detected. See logs for details"}\'') + + # Create the same realm, skip_exists true + res = await admin.a_create_realm(payload={"realm": "test"}, skip_exists=True) + assert res == {"msg": "Already exists"}, res + + # Get a single realm + res = await admin.a_get_realm(realm_name="test") + assert res["realm"] == "test" + + # Get non-existing realm + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_realm(realm_name="non-existent") + assert err.match('404: b\'{"error":"Realm not found.".*\'') + + # Update realm + res = await admin.a_update_realm(realm_name="test", payload={"accountTheme": "test"}) + assert res == dict(), res + + # Check that the update worked + res = await admin.a_get_realm(realm_name="test") + assert res["realm"] == "test" + assert res["accountTheme"] == "test" + + # Update wrong payload + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_realm(realm_name="test", payload={"wrong": "payload"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + # Check that get realms returns both realms + realms = await admin.a_get_realms() + realm_names = [x["realm"] for x in realms] + assert len(realms) == 2, realms + assert "master" in realm_names, realm_names + assert "test" in realm_names, realm_names + + # Delete the realm + res = await admin.a_delete_realm(realm_name="test") + assert res == dict(), res + + # Check that the realm does not exist anymore + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_realm(realm_name="test") + assert err.match('404: b\'{"error":"Realm not found.".*}\'') + + # Delete non-existing realm + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_realm(realm_name="non-existent") + assert err.match('404: b\'{"error":"Realm not found.".*}\'') + +@pytest.mark.asyncio +async def test_a_changing_of_realms(admin: KeycloakAdmin, realm: str): + """Test changing of realms. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + assert await admin.a_get_current_realm() == "master" + await admin.a_change_current_realm(realm) + assert await admin.a_get_current_realm() == realm + +@pytest.mark.asyncio +async def test_a_import_export_realms(admin: KeycloakAdmin, realm: str): + """Test import and export of realms. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + realm_export = await admin.a_export_realm(export_clients=True, export_groups_and_role=True) + assert realm_export != dict(), realm_export + + await admin.a_delete_realm(realm_name=realm) + admin.realm_name = "master" + res = await admin.a_import_realm(payload=realm_export) + assert res == b"", res + + # Test bad import + with pytest.raises(KeycloakPostError) as err: + await admin.a_import_realm(payload=dict()) + assert err.match( + '500: b\'{"error":"unknown_error"}\'|400: b\'{"errorMessage":"Realm name cannot be empty"}\'' # noqa: E501 + ) + +@pytest.mark.asyncio +async def test_a_partial_import_realm(admin: KeycloakAdmin, realm: str): + """Test partial import of realm configuration. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + test_realm_role = str(uuid.uuid4()) + test_user = str(uuid.uuid4()) + test_client = str(uuid.uuid4()) + + await admin.a_change_current_realm(realm) + client_id = await admin.a_create_client(payload={"name": test_client, "clientId": test_client}) + + realm_export = await admin.a_export_realm(export_clients=True, export_groups_and_role=False) + + client_config = [ + client_entry for client_entry in realm_export["clients"] if client_entry["id"] == client_id + ][0] + + # delete before partial import + await admin.a_delete_client(client_id) + + payload = { + "ifResourceExists": "SKIP", + "id": realm_export["id"], + "realm": realm, + "clients": [client_config], + "roles": {"realm": [{"name": test_realm_role}]}, + "users": [{"username": test_user, "email": f"{test_user}@test.test"}], + } + + # check add + res = await admin.a_partial_import_realm(realm_name=realm, payload=payload) + assert res["added"] == 3 + + # check skip + res = await admin.a_partial_import_realm(realm_name=realm, payload=payload) + assert res["skipped"] == 3 + + # check overwrite + payload["ifResourceExists"] = "OVERWRITE" + res = await admin.a_partial_import_realm(realm_name=realm, payload=payload) + assert res["overwritten"] == 3 + +@pytest.mark.asyncio +async def test_a_users(admin: KeycloakAdmin, realm: str): + """Test users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + # Check no users present + users = await admin.a_get_users() + assert users == list(), users + + # Test create user + user_id = await admin.a_create_user(payload={"username": "test", "email": "test@test.test"}) + assert user_id is not None, user_id + + # Test create the same user + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_user(payload={"username": "test", "email": "test@test.test"}) + assert err.match(".*User exists with same.*") + + # Test create the same user, exists_ok true + user_id_2 = await admin.a_create_user( + payload={"username": "test", "email": "test@test.test"}, exist_ok=True + ) + assert user_id == user_id_2 + + # Test get user + user = await admin.a_get_user(user_id=user_id) + assert user["username"] == "test", user["username"] + assert user["email"] == "test@test.test", user["email"] + + # Test update user + res = await admin.a_update_user(user_id=user_id, payload={"firstName": "Test"}) + assert res == dict(), res + user = await admin.a_get_user(user_id=user_id) + assert user["firstName"] == "Test" + + # Test update user fail + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_user(user_id=user_id, payload={"wrong": "payload"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + # Test disable user + res = await admin.a_disable_user(user_id=user_id) + assert res == {}, res + assert not await admin.a_get_user(user_id=user_id)["enabled"] + + # Test enable user + res = await admin.a_enable_user(user_id=user_id) + assert res == {}, res + assert await admin.a_get_user(user_id=user_id)["enabled"] + + # Test get users again + users = await admin.a_get_users() + usernames = [x["username"] for x in users] + assert "test" in usernames + + # Test users counts + count = await admin.a_users_count() + assert count == 1, count + + # Test users count with query + count = await admin.a_users_count(query={"username": "notpresent"}) + assert count == 0 + + # Test user groups + groups = await admin.a_get_user_groups(user_id=user["id"]) + assert len(groups) == 0 + + # Test user groups bad id + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_user_groups(user_id="does-not-exist") + assert err.match(USER_NOT_FOUND_REGEX) + + # Test logout + res = await admin.a_user_logout(user_id=user["id"]) + assert res == dict(), res + + # Test logout fail + with pytest.raises(KeycloakPostError) as err: + await admin.a_user_logout(user_id="non-existent-id") + assert err.match(USER_NOT_FOUND_REGEX) + + # Test consents + res = await admin.a_user_consents(user_id=user["id"]) + assert len(res) == 0, res + + # Test consents fail + with pytest.raises(KeycloakGetError) as err: + await admin.a_user_consents(user_id="non-existent-id") + assert err.match(USER_NOT_FOUND_REGEX) + + # Test delete user + res = await admin.a_delete_user(user_id=user_id) + assert res == dict(), res + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_user(user_id=user_id) + err.match(USER_NOT_FOUND_REGEX) + + # Test delete fail + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_user(user_id="non-existent-id") + assert err.match(USER_NOT_FOUND_REGEX) + +@pytest.mark.asyncio +async def test_a_enable_disable_all_users(admin: KeycloakAdmin, realm: str): + """Test enable and disable all users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + admin.change_current_realm(realm) + + user_id_1 = await admin.a_create_user( + payload={"username": "test", "email": "test@test.test", "enabled": True} + ) + user_id_2 = await admin.a_create_user( + payload={"username": "test2", "email": "test2@test.test", "enabled": True} + ) + user_id_3 = await admin.a_create_user( + payload={"username": "test3", "email": "test3@test.test", "enabled": True} + ) + + assert await admin.a_get_user(user_id_1)["enabled"] + assert await admin.a_get_user(user_id_2)["enabled"] + assert await admin.a_get_user(user_id_3)["enabled"] + + admin.a_disable_all_users() + + assert not await admin.a_get_user(user_id_1)["enabled"] + assert not await admin.a_get_user(user_id_2)["enabled"] + assert not await admin.a_get_user(user_id_3)["enabled"] + + admin.a_enable_all_users() + + assert await admin.a_get_user(user_id_1)["enabled"] + assert await admin.a_get_user(user_id_2)["enabled"] + assert await admin.a_get_user(user_id_3)["enabled"] + +@pytest.mark.asyncio +async def test_a_users_roles(admin: KeycloakAdmin, realm: str): + """Test users roles. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + user_id = await admin.a_create_user(payload={"username": "test", "email": "test@test.test"}) + + # Test all level user roles + client_id = await admin.a_create_client(payload={"name": "test-client", "clientId": "test-client"}) + await admin.a_create_client_role(client_role_id=client_id, payload={"name": "test-role"}) + await admin.a_assign_client_role( + client_id=client_id, + user_id=user_id, + roles=[admin.get_client_role(client_id=client_id, role_name="test-role")], + ) + all_roles = await admin.a_get_all_roles_of_user(user_id=user_id) + realm_roles = all_roles["realmMappings"] + assert len(realm_roles) == 1, realm_roles + client_roles = all_roles["clientMappings"] + assert len(client_roles) == 1, client_roles + + # Test all level user roles fail + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_all_roles_of_user(user_id="non-existent-id") + err.match('404: b\'{"error":"User not found"') + + await admin.a_delete_user(user_id) + await admin.a_delete_client(client_id) + +@pytest.mark.asyncio +async def test_a_users_pagination(admin: KeycloakAdmin, realm: str): + """Test user pagination. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + for ind in range(admin.PAGE_SIZE + 50): + username = f"user_{ind}" + admin.create_user(payload={"username": username, "email": f"{username}@test.test"}) + + users = await admin.a_get_users() + assert len(users) == admin.PAGE_SIZE + 50, len(users) + + users = await admin.a_get_users(query={"first": 100}) + assert len(users) == 50, len(users) + + users = await admin.a_get_users(query={"max": 20}) + assert len(users) == 20, len(users) + +@pytest.mark.asyncio +async def test_a_user_groups_pagination(admin: KeycloakAdmin, realm: str): + """Test user groups pagination. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + user_id = await admin.a_create_user( + payload={"username": "username_1", "email": "username_1@test.test"} + ) + + for ind in range(admin.PAGE_SIZE + 50): + group_name = f"group_{ind}" + group_id = await admin.a_create_group(payload={"name": group_name}) + await admin.a_group_user_add(user_id=user_id, group_id=group_id) + + groups = await admin.a_get_user_groups(user_id=user_id) + assert len(groups) == admin.PAGE_SIZE + 50, len(groups) + + groups = await admin.a_get_user_groups(user_id=user_id, query={"first": 100, "max": -1, "search": ""}) + assert len(groups) == 50, len(groups) + + groups = await admin.a_get_user_groups(user_id=user_id, query={"max": 20, "first": -1, "search": ""}) + assert len(groups) == 20, len(groups) + +@pytest.mark.asyncio +async def test_a_idps(admin: KeycloakAdmin, realm: str): + """Test IDPs. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + # Create IDP + res = await admin.a_create_idp( + payload=dict( + providerId="github", alias="github", config=dict(clientId="test", clientSecret="test") + ) + ) + assert res == b"", res + + # Test create idp fail + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_idp(payload={"providerId": "does-not-exist", "alias": "something"}) + assert err.match("Invalid identity provider id"), err + + # Test listing + idps = await admin.a_get_idps() + assert len(idps) == 1 + assert "github" == idps[0]["alias"] + + # Test get idp + idp = await admin.a_get_idp("github") + assert "github" == idp["alias"] + assert idp.get("config") + assert "test" == idp["config"]["clientId"] + assert "**********" == idp["config"]["clientSecret"] + + # Test get idp fail + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_idp("does-not-exist") + assert err.match(HTTP_404_REGEX) + + # Test IdP update + res = await admin.a_update_idp(idp_alias="github", payload=idps[0]) + + assert res == {}, res + + # Test adding a mapper + res = await admin.a_add_mapper_to_idp( + idp_alias="github", + payload={ + "identityProviderAlias": "github", + "identityProviderMapper": "github-user-attribute-mapper", + "name": "test", + }, + ) + assert res == b"", res + + # Test mapper fail + with pytest.raises(KeycloakPostError) as err: + await admin.a_add_mapper_to_idp(idp_alias="does-no-texist", payload=dict()) + assert err.match(HTTP_404_REGEX) + + # Test IdP mappers listing + idp_mappers = await admin.a_get_idp_mappers(idp_alias="github") + assert len(idp_mappers) == 1 + + # Test IdP mapper update + res = await admin.a_update_mapper_in_idp( + idp_alias="github", + mapper_id=idp_mappers[0]["id"], + # For an obscure reason, keycloak expect all fields + payload={ + "id": idp_mappers[0]["id"], + "identityProviderAlias": "github-alias", + "identityProviderMapper": "github-user-attribute-mapper", + "name": "test", + "config": idp_mappers[0]["config"], + }, + ) + assert res == dict(), res + + # Test delete + res = await admin.a_delete_idp(idp_alias="github") + assert res == dict(), res + + # Test delete fail + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_idp(idp_alias="does-not-exist") + assert err.match(HTTP_404_REGEX) + +@pytest.mark.asyncio +async def test_a_user_credentials(admin: KeycloakAdmin, user: str): + """Test user credentials. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param user: Keycloak user + :type user: str + """ + res = await admin.a_set_user_password(user_id=user, password="booya", temporary=True) + assert res == dict(), res + + # Test user password set fail + with pytest.raises(KeycloakPutError) as err: + await admin.a_set_user_password(user_id="does-not-exist", password="") + assert err.match(USER_NOT_FOUND_REGEX) + + credentials = await admin.a_get_credentials(user_id=user) + assert len(credentials) == 1 + assert credentials[0]["type"] == "password", credentials + + # Test get credentials fail + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_credentials(user_id="does-not-exist") + assert err.match(USER_NOT_FOUND_REGEX) + + res = await admin.a_delete_credential(user_id=user, credential_id=credentials[0]["id"]) + assert res == dict(), res + + # Test delete fail + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_credential(user_id=user, credential_id="does-not-exist") + assert err.match('404: b\'{"error":"Credential not found".*}\'') + +@pytest.mark.asyncio +async def test_a_social_logins(admin: KeycloakAdmin, user: str): + """Test social logins. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param user: Keycloak user + :type user: str + """ + res = await admin.a_add_user_social_login( + user_id=user, provider_id="gitlab", provider_userid="test", provider_username="test" + ) + assert res == dict(), res + await admin.a_add_user_social_login( + user_id=user, provider_id="github", provider_userid="test", provider_username="test" + ) + assert res == dict(), res + + # Test add social login fail + with pytest.raises(KeycloakPostError) as err: + await admin.a_add_user_social_login( + user_id="does-not-exist", + provider_id="does-not-exist", + provider_userid="test", + provider_username="test", + ) + assert err.match(USER_NOT_FOUND_REGEX) + + res = await admin.a_get_user_social_logins(user_id=user) + assert res == list(), res + + # Test get social logins fail + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_user_social_logins(user_id="does-not-exist") + assert err.match(USER_NOT_FOUND_REGEX) + + res = await admin.a_delete_user_social_login(user_id=user, provider_id="gitlab") + assert res == {}, res + + res = await admin.a_delete_user_social_login(user_id=user, provider_id="github") + assert res == {}, res + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_user_social_login(user_id=user, provider_id="instagram") + assert err.match('404: b\'{"error":"Link not found".*}\''), err + +@pytest.mark.asyncio +async def test_a_server_info(admin: KeycloakAdmin): + """Test server info. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + info = admin.a_get_server_info() + assert set(info.keys()).issubset( + { + "systemInfo", + "memoryInfo", + "profileInfo", + "features", + "themes", + "socialProviders", + "identityProviders", + "providers", + "protocolMapperTypes", + "builtinProtocolMappers", + "clientInstallations", + "componentTypes", + "passwordPolicies", + "enums", + "cryptoInfo", + "features", + } + ), info.keys() + +@pytest.mark.asyncio +async def test_a_groups(admin: KeycloakAdmin, user: str): + """Test groups. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param user: Keycloak user + :type user: str + """ + # Test get groups + groups = await admin.a_get_groups() + assert len(groups) == 0 + + # Test create group + group_id = await admin.a_create_group(payload={"name": "main-group"}) + assert group_id is not None, group_id + + # Test group count + count = await admin.a_groups_count() + assert count.get("count") == 1, count + + # Test group count with query + count = await admin.a_groups_count(query={"search": "notpresent"}) + assert count.get("count") == 0 + + # Test create subgroups + subgroup_id_1 = await admin.a_create_group(payload={"name": "subgroup-1"}, parent=group_id) + subgroup_id_2 = await admin.a_create_group(payload={"name": "subgroup-2"}, parent=group_id) + + # Test create group fail + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_group(payload={"name": "subgroup-1"}, parent=group_id) + assert err.match("409"), err + + # Test skip exists OK + subgroup_id_1_eq = await admin.a_create_group( + payload={"name": "subgroup-1"}, parent=group_id, skip_exists=True + ) + assert subgroup_id_1_eq is None + + # Test get groups again + groups = await admin.a_get_groups() + assert len(groups) == 1, groups + assert len(groups[0]["subGroups"]) == 2, groups[0]["subGroups"] + assert groups[0]["id"] == group_id + assert {x["id"] for x in groups[0]["subGroups"]} == {subgroup_id_1, subgroup_id_2} + + # Test get groups query + groups = await admin.a_get_groups(query={"max": 10}) + assert len(groups) == 1, groups + assert len(groups[0]["subGroups"]) == 2, groups[0]["subGroups"] + assert groups[0]["id"] == group_id + assert {x["id"] for x in groups[0]["subGroups"]} == {subgroup_id_1, subgroup_id_2} + + # Test get group + res = await admin.a_get_group(group_id=subgroup_id_1) + assert res["id"] == subgroup_id_1, res + assert res["name"] == "subgroup-1" + assert res["path"] == "/main-group/subgroup-1" + + # Test get group fail + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_group(group_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find group by id".*}\''), err + + # Create 1 more subgroup + subsubgroup_id_1 = await admin.a_create_group(payload={"name": "subsubgroup-1"}, parent=subgroup_id_2) + main_group = await admin.a_get_group(group_id=group_id) + + # Test nested searches + subgroup_2 = await admin.a_get_group(group_id=subgroup_id_2) + res = await admin.a_get_subgroups(group=subgroup_2, path="/main-group/subgroup-2/subsubgroup-1") + assert res is not None, res + assert res["id"] == subsubgroup_id_1 + + # Test nested search from main group + res = await admin.a_get_subgroups( + group= await admin.a_get_group(group_id=group_id, full_hierarchy=True), + path="/main-group/subgroup-2/subsubgroup-1", + ) + assert res["id"] == subsubgroup_id_1 + + # Test nested search from all groups + res = await admin.a_get_groups(full_hierarchy=True) + assert len(res) == 1 + assert len(res[0]["subGroups"]) == 2 + assert len([x for x in res[0]["subGroups"] if x["id"] == subgroup_id_1][0]["subGroups"]) == 0 + assert len([x for x in res[0]["subGroups"] if x["id"] == subgroup_id_2][0]["subGroups"]) == 1 + + # Test that query params are not allowed for full hierarchy + with pytest.raises(ValueError) as err: + await admin.a_get_group_children(group_id=group_id, full_hierarchy=True, query={"max": 10}) + + # Test that query params are passed + if os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] == "latest" or Version( + os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] + ) >= Version("23"): + res = await admin.a_get_group_children(group_id=group_id, query={"max": 1}) + assert len(res) == 1 + + assert err.match("Cannot use both query and full_hierarchy parameters") + + main_group_id_2 = await admin.a_create_group(payload={"name": "main-group-2"}) + assert len(await admin.a_get_groups(full_hierarchy=True)) == 2 + + # Test empty search + res = await admin.a_get_subgroups(group=main_group, path="/none") + assert res is None, res + + # Test get group by path + res = await admin.a_get_group_by_path(path="/main-group/subgroup-1") + assert res is not None, res + assert res["id"] == subgroup_id_1, res + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1/test") + assert err.match('404: b\'{"error":"Group path does not exist".*}\'') + + res = await admin.a_get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1") + assert res is not None, res + assert res["id"] == subsubgroup_id_1 + + res = await admin.a_get_group_by_path(path="/main-group") + assert res is not None, res + assert res["id"] == group_id, res + + # Test group members + res = await admin.a_get_group_members(group_id=subgroup_id_2) + assert len(res) == 0, res + + # Test fail group members + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_group_members(group_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find group by id".*}\'') + + res = await admin.a_group_user_add(user_id=user, group_id=subgroup_id_2) + assert res == dict(), res + + res = await admin.a_get_group_members(group_id=subgroup_id_2) + assert len(res) == 1, res + assert res[0]["id"] == user + + # Test get group members query + res = await admin.a_get_group_members(group_id=subgroup_id_2, query={"max": 10}) + assert len(res) == 1, res + assert res[0]["id"] == user + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_group_user_remove(user_id="does-not-exist", group_id=subgroup_id_2) + assert err.match(USER_NOT_FOUND_REGEX), err + + res = await admin.a_group_user_remove(user_id=user, group_id=subgroup_id_2) + assert res == dict(), res + + # Test set permissions + res = await admin.a_group_set_permissions(group_id=subgroup_id_2, enabled=True) + assert res["enabled"], res + res = await admin.a_group_set_permissions(group_id=subgroup_id_2, enabled=False) + assert not res["enabled"], res + with pytest.raises(KeycloakPutError) as err: + await admin.a_group_set_permissions(group_id=subgroup_id_2, enabled="blah") + assert err.match(UNKOWN_ERROR_REGEX), err + + # Test update group + res = await admin.a_update_group(group_id=subgroup_id_2, payload={"name": "new-subgroup-2"}) + assert res == dict(), res + assert await admin.a_get_group(group_id=subgroup_id_2)["name"] == "new-subgroup-2" + + # test update fail + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_group(group_id="does-not-exist", payload=dict()) + assert err.match('404: b\'{"error":"Could not find group by id".*}\''), err + + # Test delete + res = await admin.a_delete_group(group_id=group_id) + assert res == dict(), res + res = await admin.a_delete_group(group_id=main_group_id_2) + assert res == dict(), res + assert len(await admin.a_get_groups()) == 0 + + # Test delete fail + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_group(group_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find group by id".*}\''), err + +@pytest.mark.asyncio +async def test_a_clients(admin: KeycloakAdmin, realm: str): + """Test clients. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + # Test get clients + clients = await admin.a_get_clients() + assert len(clients) == 6, clients + assert {x["name"] for x in clients} == set( + [ + "${client_admin-cli}", + "${client_security-admin-console}", + "${client_account-console}", + "${client_broker}", + "${client_account}", + "${client_realm-management}", + ] + ), clients + + # Test create client + client_id = await admin.a_create_client(payload={"name": "test-client", "clientId": "test-client"}) + assert client_id, client_id + + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client(payload={"name": "test-client", "clientId": "test-client"}) + assert err.match('409: b\'{"errorMessage":"Client test-client already exists"}\''), err + + client_id_2 = await admin.a_create_client( + payload={"name": "test-client", "clientId": "test-client"}, skip_exists=True + ) + assert client_id == client_id_2, client_id_2 + + # Test get client + res = await admin.a_get_client(client_id=client_id) + assert res["clientId"] == "test-client", res + assert res["name"] == "test-client", res + assert res["id"] == client_id, res + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client".*}\'') + assert len(await admin.a_get_clients()) == 7 + + # Test get client id + assert await admin.a_get_client_id(client_id="test-client") == client_id + assert await admin.a_get_client_id(client_id="does-not-exist") is None + + # Test update client + res = await admin.a_update_client(client_id=client_id, payload={"name": "test-client-change"}) + assert res == dict(), res + + with pytest.raises(KeycloakPutError) as err: + admin.update_client(client_id="does-not-exist", payload={"name": "test-client-change"}) + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + # Test client mappers + res = await admin.a_get_mappers_from_client(client_id=client_id) + assert len(res) == 0 + + with pytest.raises(KeycloakPostError) as err: + await admin.a_add_mapper_to_client(client_id="does-not-exist", payload=dict()) + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + res = await admin.a_add_mapper_to_client( + client_id=client_id, + payload={ + "name": "test-mapper", + "protocol": "openid-connect", + "protocolMapper": "oidc-usermodel-attribute-mapper", + }, + ) + assert res == b"" + assert len(await admin.a_get_mappers_from_client(client_id=client_id)) == 1 + + mapper = await admin.a_get_mappers_from_client(client_id=client_id)[0] + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict()) + assert err.match('404: b\'{"error":"Model not found".*}\'') + mapper["config"]["user.attribute"] = "test" + res = await admin.a_update_client_mapper(client_id=client_id, mapper_id=mapper["id"], payload=mapper) + assert res == dict() + + res = await admin.a_remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) + assert res == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) + assert err.match('404: b\'{"error":"Model not found".*}\'') + + # Test client sessions + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_all_sessions(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + assert await admin.a_get_client_all_sessions(client_id=client_id) == list() + assert await admin.a_get_client_sessions_stats() == list() + + # Test authz + auth_client_id = admin.a_create_client( + payload={ + "name": "authz-client", + "clientId": "authz-client", + "authorizationServicesEnabled": True, + "serviceAccountsEnabled": True, + } + ) + res = await admin.a_get_client_authz_settings(client_id=auth_client_id) + assert res["allowRemoteResourceManagement"] + assert res["decisionStrategy"] == "UNANIMOUS" + assert len(res["policies"]) >= 0 + + with pytest.raises(KeycloakGetError) as err: + aawait admin.a_get_client_authz_settings(client_id=client_id) + assert err.match(HTTP_404_REGEX) + + # Authz resources + res = await admin.a_get_client_authz_resources(client_id=auth_client_id) + assert len(res) == 1 + assert res[0]["name"] == "Default Resource" + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_authz_resources(client_id=client_id) + assert err.match(HTTP_404_REGEX) + + res = await admin.a_create_client_authz_resource( + client_id=auth_client_id, payload={"name": "test-resource"} + ) + assert res["name"] == "test-resource", res + test_resource_id = res["_id"] + + res = await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=test_resource_id) + assert res["_id"] == test_resource_id, res + assert res["name"] == "test-resource", res + + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_authz_resource( + client_id=auth_client_id, payload={"name": "test-resource"} + ) + assert err.match('409: b\'{"error":"invalid_request"') + assert await admin.a_create_client_authz_resource( + client_id=auth_client_id, payload={"name": "test-resource"}, skip_exists=True + ) == {"msg": "Already exists"} + + res = await admin.a_get_client_authz_resources(client_id=auth_client_id) + assert len(res) == 2 + assert {x["name"] for x in res} == {"Default Resource", "test-resource"} + + res = await admin.a_create_client_authz_resource( + client_id=auth_client_id, payload={"name": "temp-resource"} + ) + assert res["name"] == "temp-resource", res + temp_resource_id: str = res["_id"] + # Test update authz resources + await admin.a_update_client_authz_resource( + client_id=auth_client_id, + resource_id=temp_resource_id, + payload={"name": "temp-updated-resource"}, + ) + res = await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id) + assert res["name"] == "temp-updated-resource", res + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_client_authz_resource( + client_id=auth_client_id, + resource_id="invalid_resource_id", + payload={"name": "temp-updated-resource"}, + ) + assert err.match("404: b''"), err + await admin.a_delete_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id) + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id) + assert err.match("404: b''") + + # Authz policies + res = await admin.a_get_client_authz_policies(client_id=auth_client_id) + assert len(res) == 1, res + assert res[0]["name"] == "Default Policy" + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_authz_policies(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + role_id = await admin.a_get_realm_role(role_name="offline_access")["id"] + res = await admin.a_create_client_authz_role_based_policy( + client_id=auth_client_id, + payload={"name": "test-authz-rb-policy", "roles": [{"id": role_id}]}, + ) + assert res["name"] == "test-authz-rb-policy", res + + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_authz_role_based_policy( + client_id=auth_client_id, + payload={"name": "test-authz-rb-policy", "roles": [{"id": role_id}]}, + ) + assert err.match('409: b\'{"error":"Policy with name') + assert await admin.a_create_client_authz_role_based_policy( + client_id=auth_client_id, + payload={"name": "test-authz-rb-policy", "roles": [{"id": role_id}]}, + skip_exists=True, + ) == {"msg": "Already exists"} + assert len(await admin.a_get_client_authz_policies(client_id=auth_client_id)) == 2 + + res = await admin.a_create_client_authz_role_based_policy( + client_id=auth_client_id, + payload={"name": "test-authz-rb-policy-delete", "roles": [{"id": role_id}]}, + ) + res2 = await admin.a_get_client_authz_policy(client_id=auth_client_id, policy_id=res["id"]) + assert res["id"] == res2["id"] + await admin.a_delete_client_authz_policy(client_id=auth_client_id, policy_id=res["id"]) + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_authz_policy(client_id=auth_client_id, policy_id=res["id"]) + assert err.match("404: b''") + + res = await admin.a_create_client_authz_policy( + client_id=auth_client_id, + payload={ + "name": "test-authz-policy", + "type": "time", + "config": {"hourEnd": "18", "hour": "9"}, + }, + ) + assert res["name"] == "test-authz-policy", res + + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_authz_policy( + client_id=auth_client_id, + payload={ + "name": "test-authz-policy", + "type": "time", + "config": {"hourEnd": "18", "hour": "9"}, + }, + ) + assert err.match('409: b\'{"error":"Policy with name') + assert await admin.a_create_client_authz_policy( + client_id=auth_client_id, + payload={ + "name": "test-authz-policy", + "type": "time", + "config": {"hourEnd": "18", "hour": "9"}, + }, + skip_exists=True, + ) == {"msg": "Already exists"} + assert len(await admin.a_get_client_authz_policies(client_id=auth_client_id)) == 3 + + # Test authz permissions + res = await admin.a_get_client_authz_permissions(client_id=auth_client_id) + assert len(res) == 1, res + assert res[0]["name"] == "Default Permission" + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_authz_permissions(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + res = await admin.a_create_client_authz_resource_based_permission( + client_id=auth_client_id, + payload={"name": "test-permission-rb", "resources": [test_resource_id]}, + ) + assert res, res + assert res["name"] == "test-permission-rb" + assert res["resources"] == [test_resource_id] + + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_authz_resource_based_permission( + client_id=auth_client_id, + payload={"name": "test-permission-rb", "resources": [test_resource_id]}, + ) + assert err.match('409: b\'{"error":"Policy with name') + assert await admin.a_create_client_authz_resource_based_permission( + client_id=auth_client_id, + payload={"name": "test-permission-rb", "resources": [test_resource_id]}, + skip_exists=True, + ) == {"msg": "Already exists"} + assert len(await admin.a_get_client_authz_permissions(client_id=auth_client_id)) == 2 + + # Test authz scopes + res = await admin.a_get_client_authz_scopes(client_id=auth_client_id) + assert len(res) == 0, res + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_authz_scopes(client_id=client_id) + assert err.match(HTTP_404_REGEX) + + res = await admin.a_create_client_authz_scopes( + client_id=auth_client_id, payload={"name": "test-authz-scope"} + ) + assert res["name"] == "test-authz-scope", res + + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_authz_scopes( + client_id="invalid_client_id", payload={"name": "test-authz-scope"} + ) + assert err.match('404: b\'{"error":"Could not find client".*}\'') + assert await admin.a_create_client_authz_scopes( + client_id=auth_client_id, payload={"name": "test-authz-scope"} + ) + + res = await admin.a_get_client_authz_scopes(client_id=auth_client_id) + assert len(res) == 1 + assert {x["name"] for x in res} == {"test-authz-scope"} + + # Test service account user + res = await admin.a_get_client_service_account_user(client_id=auth_client_id) + assert res["username"] == "service-account-authz-client", res + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_service_account_user(client_id=client_id) + assert err.match(UNKOWN_ERROR_REGEX) + + # Test delete client + res = await admin.a_delete_client(client_id=auth_client_id) + assert res == dict(), res + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_client(client_id=auth_client_id) + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + # Test client credentials + await admin.a_create_client( + payload={ + "name": "test-confidential", + "enabled": True, + "protocol": "openid-connect", + "publicClient": False, + "redirectUris": ["http://localhost/*"], + "webOrigins": ["+"], + "clientId": "test-confidential", + "secret": "test-secret", + "clientAuthenticatorType": "client-secret", + } + ) + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_secrets(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + secrets = await admin.a_get_client_secrets( + client_id = await admin.a_get_client_id(client_id="test-confidential") + ) + assert secrets == {"type": "secret", "value": "test-secret"} + + with pytest.raises(KeycloakPostError) as err: + await admin.a_generate_client_secrets(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + res = await admin.a_generate_client_secrets( + client_id=await admin.a_get_client_id(client_id="test-confidential") + ) + assert res + assert ( + await admin.a_get_client_secrets(client_id=await admin.a_get_client_id(client_id="test-confidential")) + == res + ) + diff --git a/tests/test_keycloak_uma.py b/tests/test_keycloak_uma.py index 76682a5..6591e4b 100644 --- a/tests/test_keycloak_uma.py +++ b/tests/test_keycloak_uma.py @@ -310,3 +310,286 @@ def test_uma_permission_ticket(uma: KeycloakUMA): uma.permission_ticket_create(permissions) uma.resource_set_delete(resource["_id"]) + +#async function start + +@pytest.mark.asyncio +async def test_a_uma_well_known(uma: KeycloakUMA): + """Test the well_known method. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + res = uma.uma_well_known + assert res is not None + assert res != dict() + for key in ["resource_registration_endpoint"]: + assert key in res + +@pytest.mark.asyncio +async def test_a_uma_resource_sets(uma: KeycloakUMA): + """Test resource sets. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + # Check that only the default resource is present + resource_sets = await uma.a_resource_set_list() + resource_set_list = list(resource_sets) + assert len(resource_set_list) == 1, resource_set_list + assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] + + # Test query for resource sets + resource_set_list_ids = await uma.a_resource_set_list_ids() + assert len(resource_set_list_ids) == 1 + + resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default") + assert resource_set_list_ids2 == resource_set_list_ids + + resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource") + assert resource_set_list_ids2 == resource_set_list_ids + + resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True) + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(first=1) + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0) + assert len(resource_set_list_ids) == 0 + + # Test create resource set + resource_to_create = { + "name": "mytest", + "scopes": ["test:read", "test:write"], + "type": "urn:test", + } + created_resource = await uma.a_resource_set_create(resource_to_create) + assert created_resource + assert created_resource["_id"], created_resource + assert set(resource_to_create).issubset(set(created_resource)), created_resource + + # Test create the same resource set + with pytest.raises(KeycloakPostError) as err: + await uma.a_resource_set_create(resource_to_create) + assert err.match( + re.escape( + '409: b\'{"error":"invalid_request","error_description":' + '"Resource with name [mytest] already exists."}\'' + ) + ) + + # Test get resource set + latest_resource = await uma.a_resource_set_read(created_resource["_id"]) + assert latest_resource["name"] == created_resource["name"] + + # Test update resource set + latest_resource["name"] = "New Resource Name" + res = await uma.a_resource_set_update(created_resource["_id"], latest_resource) + assert res == dict(), res + updated_resource = await uma.a_resource_set_read(created_resource["_id"]) + assert updated_resource["name"] == "New Resource Name" + + # Test update resource set fail + with pytest.raises(KeycloakPutError) as err: + uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + # Test delete resource set + res = await uma.a_resource_set_delete(resource_id=created_resource["_id"]) + assert res == dict(), res + with pytest.raises(KeycloakGetError) as err: + await uma.a_resource_set_read(created_resource["_id"]) + err.match("404: b''") + + # Test delete fail + with pytest.raises(KeycloakDeleteError) as err: + await uma.a_resource_set_delete(resource_id=created_resource["_id"]) + assert err.match("404: b''") + +@pytest.mark.asyncio +async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin): + """Test policies. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + # Create some required test data + resource_to_create = { + "name": "mytest", + "scopes": ["test:read", "test:write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + created_resource = await uma.a_resource_set_create(resource_to_create) + group_id = admin.create_group({"name": "UMAPolicyGroup"}) + role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"}) + other_client_id = admin.create_client({"name": "UMAOtherClient"}) + client = admin.get_client(other_client_id) + + resource_id = created_resource["_id"] + + # Create a role policy + policy_to_create = { + "name": "TestPolicyRole", + "description": "Test resource policy description", + "scopes": ["test:read", "test:write"], + "roles": ["roleUMAPolicy"], + } + policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + # Create a client policy + policy_to_create = { + "name": "TestPolicyClient", + "description": "Test resource policy description", + "scopes": ["test:read"], + "clients": [client["clientId"]], + } + policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + policy_to_create = { + "name": "TestPolicyGroup", + "description": "Test resource policy description", + "scopes": ["test:read"], + "groups": ["/UMAPolicyGroup"], + } + policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + policies = await uma.a_policy_query() + assert len(policies) == 3 + + policies = await uma.a_policy_query(name="TestPolicyGroup") + assert len(policies) == 1 + + policy_id = policy["id"] + await uma.a_policy_delete(policy_id) + with pytest.raises(KeycloakDeleteError) as err: + await uma.a_policy_delete(policy_id) + assert err.match( + '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'' + ) + + policies = await uma.a_policy_query() + assert len(policies) == 2 + + policy = policies[0] + await uma.a_policy_update(policy_id=policy["id"], payload=policy) + + policies = await uma.a_policy_query() + assert len(policies) == 2 + + policies = await uma.a_policy_query(name="Invalid") + assert len(policies) == 0 + policies = await uma.a_policy_query(scope="Invalid") + assert len(policies) == 0 + policies = await uma.a_policy_query(resource="Invalid") + assert len(policies) == 0 + policies = await uma.a_policy_query(first=3) + assert len(policies) == 0 + policies = await uma.a_policy_query(maximum=0) + assert len(policies) == 0 + + policies = await uma.a_policy_query(name=policy["name"]) + assert len(policies) == 1 + policies = await uma.a_policy_query(scope=policy["scopes"][0]) + assert len(policies) == 2 + policies = await uma.a_policy_query(resource=resource_id) + assert len(policies) == 2 + + uma.a_resource_set_delete(resource_id) + admin.delete_client(other_client_id) + admin.delete_realm_role(role_id) + admin.delete_group(group_id) + +@pytest.mark.asyncio +async def test_a_uma_access(uma: KeycloakUMA): + """Test permission access checks. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + resource_to_create = { + "name": "mytest", + "scopes": ["read", "write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + resource = await uma.a_resource_set_create(resource_to_create) + + policy_to_create = { + "name": "TestPolicy", + "description": "Test resource policy description", + "scopes": [resource_to_create["scopes"][0]], + "clients": [uma.connection.client_id], + } + await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) + + token = uma.connection.token + permissions = list() + assert await uma.a_permissions_check(token["access_token"], permissions) + + permissions.append(UMAPermission(resource=resource_to_create["name"])) + assert await uma.a_permissions_check(token["access_token"], permissions) + + permissions.append(UMAPermission(resource="not valid")) + assert not await uma.a_permissions_check(token["access_token"], permissions) + uma.resource_set_delete(resource["_id"]) + +@pytest.mark.asyncio +async def test_a_uma_permission_ticket(uma: KeycloakUMA): + """Test permission ticket generation. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + resource_to_create = { + "name": "mytest", + "scopes": ["read", "write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + resource = await uma.a_resource_set_create(resource_to_create) + + policy_to_create = { + "name": "TestPolicy", + "description": "Test resource policy description", + "scopes": [resource_to_create["scopes"][0]], + "clients": [uma.connection.client_id], + } + await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) + permissions = ( + UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]), + ) + response = await uma.a_permission_ticket_create(permissions) + + rpt = await uma.connection.keycloak_openid.a_token( + grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"] + ) + assert rpt + assert "access_token" in rpt + + permissions = (UMAPermission(resource="invalid"),) + with pytest.raises(KeycloakPostError): + uma.permission_ticket_create(permissions) + + await uma.a_resource_set_delete(resource["_id"])