diff --git a/.github/workflows/daily.yaml b/.github/workflows/run_tests.yml similarity index 87% rename from .github/workflows/daily.yaml rename to .github/workflows/run_tests.yml index 6f4168a..dd28663 100644 --- a/.github/workflows/daily.yaml +++ b/.github/workflows/run_tests.yml @@ -1,8 +1,12 @@ -name: Daily check +name: Run Tests on: - schedule: - - cron: "0 4 * * *" + push: + branches: + - master + pull_request: + branches: + - master jobs: test: diff --git a/poetry.lock b/poetry.lock index 5f3f46b..d41942f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1638,14 +1638,14 @@ jeepney = ">=0.6" [[package]] name = "setuptools" -version = "67.1.0" +version = "67.2.0" description = "Easily download, build, install, upgrade, and uninstall Python packages" category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "setuptools-67.1.0-py3-none-any.whl", hash = "sha256:a7687c12b444eaac951ea87a9627c4f904ac757e7abdc5aac32833234af90378"}, - {file = "setuptools-67.1.0.tar.gz", hash = "sha256:e261cdf010c11a41cb5cb5f1bf3338a7433832029f559a6a7614bd42a967c300"}, + {file = "setuptools-67.2.0-py3-none-any.whl", hash = "sha256:16ccf598aab3b506593c17378473978908a2734d7336755a8769b480906bec1c"}, + {file = "setuptools-67.2.0.tar.gz", hash = "sha256:b440ee5f7e607bb8c9de15259dba2583dd41a38879a7abc1d43a71c59524da48"}, ] [package.extras] @@ -2029,14 +2029,14 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [[package]] name = "virtualenv" -version = "20.17.1" +version = "20.18.0" description = "Virtual Python Environment builder" category = "dev" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" files = [ - {file = "virtualenv-20.17.1-py3-none-any.whl", hash = "sha256:ce3b1684d6e1a20a3e5ed36795a97dfc6af29bc3970ca8dab93e11ac6094b3c4"}, - {file = "virtualenv-20.17.1.tar.gz", hash = "sha256:f8b927684efc6f1cc206c9db297a570ab9ad0e51c16fa9e45487d36d1905c058"}, + {file = "virtualenv-20.18.0-py3-none-any.whl", hash = "sha256:9d61e4ec8d2c0345dab329fb825eb05579043766a4b26a2f66b28948de68c722"}, + {file = "virtualenv-20.18.0.tar.gz", hash = "sha256:f262457a4d7298a6b733b920a196bf8b46c8af15bf1fd9da7142995eff15118e"}, ] [package.dependencies] @@ -2046,8 +2046,8 @@ importlib-metadata = {version = ">=4.8.3", markers = "python_version < \"3.8\""} platformdirs = ">=2.4,<3" [package.extras] -docs = ["proselint (>=0.13)", "sphinx (>=5.3)", "sphinx-argparse (>=0.3.2)", "sphinx-rtd-theme (>=1)", "towncrier (>=22.8)"] -testing = ["coverage (>=6.2)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=21.3)", "pytest (>=7.0.1)", "pytest-env (>=0.6.2)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.6.1)", "pytest-randomly (>=3.10.3)", "pytest-timeout (>=2.1)"] +docs = ["furo (>=2022.12.7)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=22.12)"] +test = ["covdefaults (>=2.2.2)", "coverage (>=7.1)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23)", "pytest (>=7.2.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)"] [[package]] name = "wcwidth" diff --git a/src/keycloak/connection.py b/src/keycloak/connection.py index 136213b..729da26 100644 --- a/src/keycloak/connection.py +++ b/src/keycloak/connection.py @@ -214,7 +214,7 @@ class ConnectionManager(object): urljoin(self.base_url, path), params=kwargs, data=data, - files=kwargs.get('files'), + files=kwargs.get("files"), headers=self.headers, timeout=self.timeout, ) diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 5af97d1..67c846b 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -1689,7 +1689,10 @@ class KeycloakAdmin: :return: Keycloak Server Response (UserRepresentation) :rtype: list """ - params_path = {"realm-name": self.realm_name, "role-id": await self.get_default_realm_role_id()} + params_path = { + "realm-name": self.realm_name, + "role-id": await self.get_default_realm_role_id(), + } data_raw = await self.raw_get( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES_REALM.format(**params_path) ) @@ -1703,7 +1706,10 @@ class KeycloakAdmin: :return: Keycloak Server Response :rtype: dict """ - params_path = {"realm-name": self.realm_name, "role-id": await self.get_default_realm_role_id()} + params_path = { + "realm-name": self.realm_name, + "role-id": await self.get_default_realm_role_id(), + } data_raw = await self.raw_delete( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), data=json.dumps(payload), @@ -1718,7 +1724,10 @@ class KeycloakAdmin: :return: Keycloak Server Response :rtype: dict """ - params_path = {"realm-name": self.realm_name, "role-id": await self.get_default_realm_role_id()} + params_path = { + "realm-name": self.realm_name, + "role-id": await self.get_default_realm_role_id(), + } data_raw = await self.raw_post( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), data=json.dumps(payload), @@ -1799,7 +1808,9 @@ class KeycloakAdmin: """ if skip_exists: try: - res = await self.get_client_role(client_id=client_role_id, role_name=payload["name"]) + res = await self.get_client_role( + client_id=client_role_id, role_name=payload["name"] + ) return res["name"] except KeycloakGetError: pass @@ -3684,7 +3695,7 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakGetError) - #async def upload_certificate(self, client_id, certcont): + # async def upload_certificate(self, client_id, certcont): # """Upload a new certificate for the client. # :param client_id: id of the client. diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 89a706e..c0e7f17 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -498,7 +498,9 @@ class KeycloakOpenID: payload = self._add_secret_key(payload) - data_raw = await self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload) + data_raw = await self.connection.raw_post( + URL_INTROSPECT.format(**params_path), data=payload + ) return raise_error_from_response(data_raw, KeycloakPostError) def decode_token(self, token, key, algorithms=["RS256"], **kwargs): @@ -613,7 +615,7 @@ class KeycloakOpenID: return list(set(permissions)) - #async def uma_permissions(self, token, permissions=""): + # async def uma_permissions(self, token, permissions=""): # """Get UMA permissions by user token with requested permissions. # The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be @@ -642,7 +644,7 @@ class KeycloakOpenID: # data_raw = await self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) # return raise_error_from_response(data_raw, KeycloakPostError) - #async def has_uma_access(self, token, permissions): + # async def has_uma_access(self, token, permissions): # """Determine whether user has uma permissions with specified user token. # :param token: user token diff --git a/tests/conftest.py b/tests/conftest.py index b4bffb3..ec96140 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -278,15 +278,9 @@ async def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Ke } ) role = await admin.get_realm_role(role_name="offline_access") - payload = { - "name": "test-authz-rb-policy", - "roles": [{"id": role["id"]}], - } + payload = {"name": "test-authz-rb-policy", "roles": [{"id": role["id"]}]} print(payload) - await admin.create_client_authz_role_based_policy( - client_id=client_id, - payload=payload, - ) + await admin.create_client_authz_role_based_policy(client_id=client_id, payload=payload) # Create user username = str(uuid.uuid4()) password = str(uuid.uuid4()) @@ -343,7 +337,9 @@ async def user(admin: KeycloakAdmin, realm: str) -> str: """ admin.realm_name = realm username = str(uuid.uuid4()) - user_id = await admin.create_user(payload={"username": username, "email": f"{username}@test.test"}) + user_id = await admin.create_user( + payload={"username": username, "email": f"{username}@test.test"} + ) yield user_id await admin.delete_user(user_id=user_id) @@ -405,7 +401,9 @@ async def client_role(admin: KeycloakAdmin, realm: str, client: str) -> str: @pytest_asyncio.fixture -async def composite_client_role(admin: KeycloakAdmin, realm: str, client: str, client_role: str) -> str: +async def composite_client_role( + admin: KeycloakAdmin, realm: str, client: str, client_role: str +) -> str: """Fixture for a new random composite client role. :param admin: Keycloak admin diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index df1ec9e..c9961bb 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -600,7 +600,9 @@ async def test_groups(admin: KeycloakAdmin, user: str): assert err.match('404: b\'{"error":"Could not find group by id"}\''), err # Create 1 more subgroup - subsubgroup_id_1 = await admin.create_group(payload={"name": "subsubgroup-1"}, parent=subgroup_id_2) + subsubgroup_id_1 = await admin.create_group( + payload={"name": "subsubgroup-1"}, parent=subgroup_id_2 + ) main_group = await admin.get_group(group_id=group_id) # Test nested searches @@ -721,7 +723,9 @@ async def test_clients(admin: KeycloakAdmin, realm: str): ), clients # Test create client - client_id = await admin.create_client(payload={"name": "test-client", "clientId": "test-client"}) + client_id = await admin.create_client( + payload={"name": "test-client", "clientId": "test-client"} + ) assert client_id, client_id with pytest.raises(KeycloakPostError) as err: @@ -753,7 +757,9 @@ async def test_clients(admin: KeycloakAdmin, realm: str): assert res == dict(), res with pytest.raises(KeycloakPutError) as err: - await admin.update_client(client_id="does-not-exist", payload={"name": "test-client-change"}) + await admin.update_client( + client_id="does-not-exist", payload={"name": "test-client-change"} + ) assert err.match('404: b\'{"error":"Could not find client"}\'') # Test client mappers @@ -778,10 +784,14 @@ async def test_clients(admin: KeycloakAdmin, realm: str): mappers = await admin.get_mappers_from_client(client_id=client_id) mapper = mappers[0] with pytest.raises(KeycloakPutError) as err: - await admin.update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict()) + await admin.update_client_mapper( + client_id=client_id, mapper_id="does-not-exist", payload=dict() + ) assert err.match('404: b\'{"error":"Model not found"}\'') mapper["config"]["user.attribute"] = "test" - res = await admin.update_client_mapper(client_id=client_id, mapper_id=mapper["id"], payload=mapper) + res = await admin.update_client_mapper( + client_id=client_id, mapper_id=mapper["id"], payload=mapper + ) assert res == dict() res = await admin.remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) @@ -977,7 +987,9 @@ async def test_clients(admin: KeycloakAdmin, realm: str): ) assert res assert ( - await admin.get_client_secrets(client_id=await admin.get_client_id(client_id="test-confidential")) + await admin.get_client_secrets( + client_id=await admin.get_client_id(client_id="test-confidential") + ) == res ) @@ -1013,7 +1025,9 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str): with pytest.raises(KeycloakPostError) as err: await admin.create_realm_role(payload={"name": "test-realm-role"}) assert err.match('409: b\'{"errorMessage":"Role with name test-realm-role already exists"}\'') - role_id_2 = await admin.create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True) + role_id_2 = await admin.create_realm_role( + payload={"name": "test-realm-role"}, skip_exists=True + ) assert role_id == role_id_2 # Test update realm role @@ -1028,7 +1042,9 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str): assert err.match('404: b\'{"error":"Could not find role"}\''), err # Test realm role user assignment - user_id = await admin.create_user(payload={"username": "role-testing", "email": "test@test.test"}) + user_id = await admin.create_user( + payload={"username": "role-testing", "email": "test@test.test"} + ) with pytest.raises(KeycloakPostError) as err: await admin.assign_realm_roles(user_id=user_id, roles=["bad"]) assert err.match('500: b\'{"error":"unknown_error"}\'') @@ -1084,10 +1100,7 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str): await admin.get_realm_role(role_name="offline_access"), await admin.get_realm_role(role_name="test-realm-role-update"), ] - res = await admin.assign_group_realm_roles( - group_id=group_id, - roles=roles - ) + res = await admin.assign_group_realm_roles(group_id=group_id, roles=roles) assert res == dict(), res roles = await admin.get_group_realm_roles(group_id=group_id) @@ -1112,7 +1125,8 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str): await admin.add_composite_realm_roles_to_role(role_name=composite_role, roles=["bad"]) assert err.match('500: b\'{"error":"unknown_error"}\'') res = await admin.add_composite_realm_roles_to_role( - role_name=composite_role, roles=[await admin.get_realm_role(role_name="test-realm-role-update")] + role_name=composite_role, + roles=[await admin.get_realm_role(role_name="test-realm-role-update")], ) assert res == dict(), res @@ -1136,7 +1150,8 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str): await admin.remove_composite_realm_roles_to_role(role_name=composite_role, roles=["bad"]) assert err.match('500: b\'{"error":"unknown_error"}\'') res = await admin.remove_composite_realm_roles_to_role( - role_name=composite_role, roles=[await admin.get_realm_role(role_name="test-realm-role-update")] + role_name=composite_role, + roles=[await admin.get_realm_role(role_name="test-realm-role-update")], ) assert res == dict(), res @@ -1254,10 +1269,7 @@ async def test_client_scope_realm_roles(admin: KeycloakAdmin, realm: str): await admin.get_realm_role(role_name="offline_access"), await admin.get_realm_role(role_name="test-realm-role"), ] - res = await admin.assign_realm_roles_to_client_scope( - client_id=client_id, - roles=roles - ) + res = await admin.assign_realm_roles_to_client_scope(client_id=client_id, roles=roles) assert res == dict(), res roles = await admin.get_realm_roles_of_client_scope(client_id=client_id) @@ -1273,18 +1285,14 @@ async def test_client_scope_realm_roles(admin: KeycloakAdmin, realm: str): assert err.match('500: b\'{"error":"unknown_error"}\'') roles = [await admin.get_realm_role(role_name="offline_access")] - res = await admin.delete_realm_roles_of_client_scope( - client_id=client_id, roles=roles - ) + res = await admin.delete_realm_roles_of_client_scope(client_id=client_id, roles=roles) assert res == dict(), res roles = await admin.get_realm_roles_of_client_scope(client_id=client_id) assert len(roles) == 1 assert "test-realm-role" in [x["name"] for x in roles] roles = [await admin.get_realm_role(role_name="test-realm-role")] - res = await admin.delete_realm_roles_of_client_scope( - client_id=client_id, roles=roles - ) + res = await admin.delete_realm_roles_of_client_scope(client_id=client_id, roles=roles) assert res == dict(), res roles = await admin.get_realm_roles_of_client_scope(client_id=client_id) assert len(roles) == 0 @@ -1413,7 +1421,9 @@ async def test_client_roles(admin: KeycloakAdmin, client: str): assert err.match('404: b\'{"error":"Could not find role"}\'') # Test user with client role - res = await admin.get_client_role_members(client_id=client, role_name="client-role-test-update") + res = await admin.get_client_role_members( + client_id=client, role_name="client-role-test-update" + ) assert len(res) == 0 with pytest.raises(KeycloakGetError) as err: await admin.get_client_role_members(client_id=client, role_name="bad") @@ -1430,7 +1440,11 @@ async def test_client_roles(admin: KeycloakAdmin, client: str): ) assert res == dict() assert ( - len(await admin.get_client_role_members(client_id=client, role_name="client-role-test-update")) + len( + await admin.get_client_role_members( + client_id=client, role_name="client-role-test-update" + ) + ) == 1 ) @@ -1486,7 +1500,11 @@ async def test_client_roles(admin: KeycloakAdmin, client: str): ) assert res == dict() assert ( - len(await admin.get_client_role_groups(client_id=client, role_name="client-role-test-update")) + len( + await admin.get_client_role_groups( + client_id=client, role_name="client-role-test-update" + ) + ) == 1 ) assert len(await admin.get_group_client_roles(group_id=group_id, client_id=client)) == 1 @@ -1514,12 +1532,12 @@ async def test_client_roles(admin: KeycloakAdmin, client: str): ) assert res == dict() role = await admin.get_client_role(client_id=client, role_name="client-role-test-update") - assert role[ - "composite" - ] + assert role["composite"] # Test delete of client role - res = await admin.delete_client_role(client_role_id=client, role_name="client-role-test-update") + res = await admin.delete_client_role( + client_role_id=client, role_name="client-role-test-update" + ) assert res == dict() with pytest.raises(KeycloakDeleteError) as err: await admin.delete_client_role(client_role_id=client, role_name="client-role-test-update") @@ -1638,12 +1656,12 @@ async def test_email(admin: KeycloakAdmin, user: str): # Emails will fail as we don't have SMTP test setup with pytest.raises(KeycloakPutError) as err: await admin.send_update_account(user_id=user, payload=dict()) - #assert err.match('500: b\'{"error":"unknown_error"}\'') + # assert err.match('500: b\'{"error":"unknown_error"}\'') await admin.update_user(user_id=user, payload={"enabled": True}) with pytest.raises(KeycloakPutError) as err: await admin.send_verify_email(user_id=user) - #assert err.match('500: b\'{"errorMessage":"Failed to send execute actions email"}\'') + # assert err.match('500: b\'{"errorMessage":"Failed to send execute actions email"}\'') @pytest.mark.asyncio @@ -1731,7 +1749,9 @@ async def test_auth_flows(admin: KeycloakAdmin, realm: str): await admin.copy_authentication_flow(payload=dict(), flow_alias="bad") assert err.match("404: b''") - res = await admin.copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser") + res = await admin.copy_authentication_flow( + payload={"newName": "test-browser"}, flow_alias="browser" + ) assert res == b"", res assert len(await admin.get_authentication_flows()) == 9 @@ -1791,7 +1811,9 @@ async def test_auth_flows(admin: KeycloakAdmin, realm: str): flow = await admin.get_authentication_flow_executions(flow_alias="test-create") payload = flow[0] payload["displayName"] = "test" - res = await admin.update_authentication_flow_executions(payload=payload, flow_alias="test-create") + res = await admin.update_authentication_flow_executions( + payload=payload, flow_alias="test-create" + ) assert res flow = await admin.get_authentication_flow_executions(flow_alias="test-create") @@ -1832,9 +1854,9 @@ async def test_auth_flows(admin: KeycloakAdmin, realm: str): assert res == {"msg": "Already exists"} # Test delete auth flow - flow_id = [x for x in await admin.get_authentication_flows() if x["alias"] == "test-browser"][0][ - "id" - ] + flow_id = [x for x in await admin.get_authentication_flows() if x["alias"] == "test-browser"][ + 0 + ]["id"] res = await admin.delete_authentication_flow(flow_id=flow_id) assert res == dict() with pytest.raises(KeycloakDeleteError) as err: @@ -1982,10 +2004,7 @@ async def test_client_scopes(admin: KeycloakAdmin, realm: str): ) assert res_update == dict() mapper = await admin.get_mappers_from_client_scope(client_scope_id=res) - assert ( - mapper[0]["config"]["user.attribute"] - == "test" - ) + assert mapper[0]["config"]["user.attribute"] == "test" # Test delete mapper res_del = await admin.delete_mapper_from_client_scope( @@ -2118,12 +2137,7 @@ async def test_keys(admin: KeycloakAdmin, realm: str): admin.realm_name = realm keys = await admin.get_keys() assert set(keys["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} - assert {k["algorithm"] for k in keys["keys"]} == { - "HS256", - "RSA-OAEP", - "AES", - "RS256", - } + assert {k["algorithm"] for k in keys["keys"]} == {"HS256", "RSA-OAEP", "AES", "RS256"} @pytest.mark.asyncio @@ -2144,7 +2158,9 @@ async def test_events(admin: KeycloakAdmin, realm: str): await admin.set_events(payload={"bad": "conf"}) assert err.match('400: b\'{"error":"Unrecognized field') - res = await admin.set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True}) + res = await admin.set_events( + payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True} + ) assert res == dict() await admin.create_client(payload={"name": "test", "clientId": "test"}) @@ -2226,7 +2242,8 @@ async def test_auto_refresh(admin: KeycloakAdmin, realm: str): admin.auto_refresh_token = ["get", "post", "put"] assert ( - await admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict() + await admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) + == dict() ) # Test delete refresh @@ -2353,8 +2370,8 @@ async def test_get_role_client_level_children( assert child["id"] in [x["id"] for x in res] -#@pytest.mark.asyncio -#async def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple): +# @pytest.mark.asyncio +# async def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple): # """Test upload certificate. # # :param admin: Keycloak Admin client @@ -2506,7 +2523,13 @@ async def test_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> N admin.realm_name = realm assert f"default-roles-{realm}" in [x["name"] for x in await admin.get_realm_roles()] assert ( - len([x["name"] for x in await admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"]) + len( + [ + x["name"] + for x in await admin.get_realm_roles() + if x["name"] == f"default-roles-{realm}" + ] + ) == 1 ) @@ -2523,7 +2546,9 @@ async def test_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> No admin.realm_name = realm assert ( await admin.get_default_realm_role_id() - == [x["id"] for x in await admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"][0] + == [ + x["id"] for x in await admin.get_realm_roles() if x["name"] == f"default-roles-{realm}" + ][0] ) diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index afa59c6..34bce34 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -194,18 +194,9 @@ async def test_exchange_token( admin.realm_name = oid.realm_name user_id = await admin.get_user_id(username=username) client_id = await admin.get_client_id(client_id="realm-management") - roles = [ - await admin.get_client_role( - client_id=client_id, - role_name="impersonation", - ) - ] + roles = [await admin.get_client_role(client_id=client_id, role_name="impersonation")] print(roles) - await admin.assign_client_role( - user_id=user_id, - client_id=client_id, - roles=roles - ) + await admin.assign_client_role(user_id=user_id, client_id=client_id, roles=roles) token = await oid.token(username=username, password=password) assert await oid.userinfo(token=token["access_token"]) == { @@ -285,9 +276,7 @@ async def test_entitlement( token = await oid.token(username=username, password=password) client_id = await admin.get_client_id(oid.client_id) with pytest.raises(KeycloakDeprecationError): - resource_servers = await admin.get_client_authz_resources( - client_id=client_id - ) + resource_servers = await admin.get_client_authz_resources(client_id=client_id) resource_server_id = resource_servers[0]["_id"] await oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id) @@ -330,14 +319,13 @@ async def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str key="-----BEGIN PUBLIC KEY-----\n" + public_key + "\n-----END PUBLIC KEY-----", options={"verify_aud": False}, ) - assert ( - decoded_token["preferred_username"] - == username - ) + assert decoded_token["preferred_username"] == username @pytest.mark.asyncio -async def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): +async def test_load_authorization_config( + oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] +): """Test load authorization config. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization @@ -377,17 +365,24 @@ async def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, st key = "-----BEGIN PUBLIC KEY-----\n" + await oid.public_key() + "\n-----END PUBLIC KEY-----" orig_client_id = oid.client_id oid.client_id = "account" - assert await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == [] + assert ( + await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) + == [] + ) policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy.add_role(role="account/view-profile") oid.authorization.policies["test"] = policy assert [ str(x) - for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) + for x in await oid.get_policies( + token=token["access_token"], method_token_info="decode", key=key + ) ] == ["Policy: test (role)"] assert [ repr(x) - for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) + for x in await oid.get_policies( + token=token["access_token"], method_token_info="decode", key=key + ) ] == [""] oid.client_id = orig_client_id @@ -417,7 +412,8 @@ async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, orig_client_id = oid.client_id oid.client_id = "account" assert ( - await oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == [] + await oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) + == [] ) policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy.add_role(role="account/view-profile") @@ -446,8 +442,8 @@ async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, await oid.get_permissions(token=token["access_token"]) -#@pytest.mark.asyncio -#async def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): +# @pytest.mark.asyncio +# async def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): # """Test UMA permissions. # # :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization @@ -462,10 +458,10 @@ async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, # assert uma_permissions[0]["rsname"] == "Default Resource" # # -#@pytest.mark.asyncio -#async def test_has_uma_access( +# @pytest.mark.asyncio +# async def test_has_uma_access( # oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin -#): +# ): # """Test has UMA access. # # :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization