Browse Source

Merge pull request #2 from PilotDataPlatform/async

linting, updating workflow
pull/585/head
Greg McCoy 2 years ago
committed by GitHub
parent
commit
b887d6617b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      .github/workflows/run_tests.yml
  2. 18
      poetry.lock
  3. 2
      src/keycloak/connection.py
  4. 21
      src/keycloak/keycloak_admin.py
  5. 8
      src/keycloak/keycloak_openid.py
  6. 18
      tests/conftest.py
  7. 133
      tests/test_keycloak_admin.py
  8. 52
      tests/test_keycloak_openid.py

10
.github/workflows/daily.yaml → .github/workflows/run_tests.yml

@ -1,8 +1,12 @@
name: Daily check
name: Run Tests
on:
schedule:
- cron: "0 4 * * *"
push:
branches:
- master
pull_request:
branches:
- master
jobs:
test:

18
poetry.lock

@ -1638,14 +1638,14 @@ jeepney = ">=0.6"
[[package]]
name = "setuptools"
version = "67.1.0"
version = "67.2.0"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "setuptools-67.1.0-py3-none-any.whl", hash = "sha256:a7687c12b444eaac951ea87a9627c4f904ac757e7abdc5aac32833234af90378"},
{file = "setuptools-67.1.0.tar.gz", hash = "sha256:e261cdf010c11a41cb5cb5f1bf3338a7433832029f559a6a7614bd42a967c300"},
{file = "setuptools-67.2.0-py3-none-any.whl", hash = "sha256:16ccf598aab3b506593c17378473978908a2734d7336755a8769b480906bec1c"},
{file = "setuptools-67.2.0.tar.gz", hash = "sha256:b440ee5f7e607bb8c9de15259dba2583dd41a38879a7abc1d43a71c59524da48"},
]
[package.extras]
@ -2029,14 +2029,14 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
[[package]]
name = "virtualenv"
version = "20.17.1"
version = "20.18.0"
description = "Virtual Python Environment builder"
category = "dev"
optional = false
python-versions = ">=3.6"
python-versions = ">=3.7"
files = [
{file = "virtualenv-20.17.1-py3-none-any.whl", hash = "sha256:ce3b1684d6e1a20a3e5ed36795a97dfc6af29bc3970ca8dab93e11ac6094b3c4"},
{file = "virtualenv-20.17.1.tar.gz", hash = "sha256:f8b927684efc6f1cc206c9db297a570ab9ad0e51c16fa9e45487d36d1905c058"},
{file = "virtualenv-20.18.0-py3-none-any.whl", hash = "sha256:9d61e4ec8d2c0345dab329fb825eb05579043766a4b26a2f66b28948de68c722"},
{file = "virtualenv-20.18.0.tar.gz", hash = "sha256:f262457a4d7298a6b733b920a196bf8b46c8af15bf1fd9da7142995eff15118e"},
]
[package.dependencies]
@ -2046,8 +2046,8 @@ importlib-metadata = {version = ">=4.8.3", markers = "python_version < \"3.8\""}
platformdirs = ">=2.4,<3"
[package.extras]
docs = ["proselint (>=0.13)", "sphinx (>=5.3)", "sphinx-argparse (>=0.3.2)", "sphinx-rtd-theme (>=1)", "towncrier (>=22.8)"]
testing = ["coverage (>=6.2)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=21.3)", "pytest (>=7.0.1)", "pytest-env (>=0.6.2)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.6.1)", "pytest-randomly (>=3.10.3)", "pytest-timeout (>=2.1)"]
docs = ["furo (>=2022.12.7)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=22.12)"]
test = ["covdefaults (>=2.2.2)", "coverage (>=7.1)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23)", "pytest (>=7.2.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)"]
[[package]]
name = "wcwidth"

2
src/keycloak/connection.py

@ -214,7 +214,7 @@ class ConnectionManager(object):
urljoin(self.base_url, path),
params=kwargs,
data=data,
files=kwargs.get('files'),
files=kwargs.get("files"),
headers=self.headers,
timeout=self.timeout,
)

21
src/keycloak/keycloak_admin.py

@ -1689,7 +1689,10 @@ class KeycloakAdmin:
:return: Keycloak Server Response (UserRepresentation)
:rtype: list
"""
params_path = {"realm-name": self.realm_name, "role-id": await self.get_default_realm_role_id()}
params_path = {
"realm-name": self.realm_name,
"role-id": await self.get_default_realm_role_id(),
}
data_raw = await self.raw_get(
urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES_REALM.format(**params_path)
)
@ -1703,7 +1706,10 @@ class KeycloakAdmin:
:return: Keycloak Server Response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "role-id": await self.get_default_realm_role_id()}
params_path = {
"realm-name": self.realm_name,
"role-id": await self.get_default_realm_role_id(),
}
data_raw = await self.raw_delete(
urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path),
data=json.dumps(payload),
@ -1718,7 +1724,10 @@ class KeycloakAdmin:
:return: Keycloak Server Response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "role-id": await self.get_default_realm_role_id()}
params_path = {
"realm-name": self.realm_name,
"role-id": await self.get_default_realm_role_id(),
}
data_raw = await self.raw_post(
urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path),
data=json.dumps(payload),
@ -1799,7 +1808,9 @@ class KeycloakAdmin:
"""
if skip_exists:
try:
res = await self.get_client_role(client_id=client_role_id, role_name=payload["name"])
res = await self.get_client_role(
client_id=client_role_id, role_name=payload["name"]
)
return res["name"]
except KeycloakGetError:
pass
@ -3684,7 +3695,7 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
#async def upload_certificate(self, client_id, certcont):
# async def upload_certificate(self, client_id, certcont):
# """Upload a new certificate for the client.
# :param client_id: id of the client.

8
src/keycloak/keycloak_openid.py

@ -498,7 +498,9 @@ class KeycloakOpenID:
payload = self._add_secret_key(payload)
data_raw = await self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
data_raw = await self.connection.raw_post(
URL_INTROSPECT.format(**params_path), data=payload
)
return raise_error_from_response(data_raw, KeycloakPostError)
def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
@ -613,7 +615,7 @@ class KeycloakOpenID:
return list(set(permissions))
#async def uma_permissions(self, token, permissions=""):
# async def uma_permissions(self, token, permissions=""):
# """Get UMA permissions by user token with requested permissions.
# The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
@ -642,7 +644,7 @@ class KeycloakOpenID:
# data_raw = await self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
# return raise_error_from_response(data_raw, KeycloakPostError)
#async def has_uma_access(self, token, permissions):
# async def has_uma_access(self, token, permissions):
# """Determine whether user has uma permissions with specified user token.
# :param token: user token

18
tests/conftest.py

@ -278,15 +278,9 @@ async def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Ke
}
)
role = await admin.get_realm_role(role_name="offline_access")
payload = {
"name": "test-authz-rb-policy",
"roles": [{"id": role["id"]}],
}
payload = {"name": "test-authz-rb-policy", "roles": [{"id": role["id"]}]}
print(payload)
await admin.create_client_authz_role_based_policy(
client_id=client_id,
payload=payload,
)
await admin.create_client_authz_role_based_policy(client_id=client_id, payload=payload)
# Create user
username = str(uuid.uuid4())
password = str(uuid.uuid4())
@ -343,7 +337,9 @@ async def user(admin: KeycloakAdmin, realm: str) -> str:
"""
admin.realm_name = realm
username = str(uuid.uuid4())
user_id = await admin.create_user(payload={"username": username, "email": f"{username}@test.test"})
user_id = await admin.create_user(
payload={"username": username, "email": f"{username}@test.test"}
)
yield user_id
await admin.delete_user(user_id=user_id)
@ -405,7 +401,9 @@ async def client_role(admin: KeycloakAdmin, realm: str, client: str) -> str:
@pytest_asyncio.fixture
async def composite_client_role(admin: KeycloakAdmin, realm: str, client: str, client_role: str) -> str:
async def composite_client_role(
admin: KeycloakAdmin, realm: str, client: str, client_role: str
) -> str:
"""Fixture for a new random composite client role.
:param admin: Keycloak admin

133
tests/test_keycloak_admin.py

@ -600,7 +600,9 @@ async def test_groups(admin: KeycloakAdmin, user: str):
assert err.match('404: b\'{"error":"Could not find group by id"}\''), err
# Create 1 more subgroup
subsubgroup_id_1 = await admin.create_group(payload={"name": "subsubgroup-1"}, parent=subgroup_id_2)
subsubgroup_id_1 = await admin.create_group(
payload={"name": "subsubgroup-1"}, parent=subgroup_id_2
)
main_group = await admin.get_group(group_id=group_id)
# Test nested searches
@ -721,7 +723,9 @@ async def test_clients(admin: KeycloakAdmin, realm: str):
), clients
# Test create client
client_id = await admin.create_client(payload={"name": "test-client", "clientId": "test-client"})
client_id = await admin.create_client(
payload={"name": "test-client", "clientId": "test-client"}
)
assert client_id, client_id
with pytest.raises(KeycloakPostError) as err:
@ -753,7 +757,9 @@ async def test_clients(admin: KeycloakAdmin, realm: str):
assert res == dict(), res
with pytest.raises(KeycloakPutError) as err:
await admin.update_client(client_id="does-not-exist", payload={"name": "test-client-change"})
await admin.update_client(
client_id="does-not-exist", payload={"name": "test-client-change"}
)
assert err.match('404: b\'{"error":"Could not find client"}\'')
# Test client mappers
@ -778,10 +784,14 @@ async def test_clients(admin: KeycloakAdmin, realm: str):
mappers = await admin.get_mappers_from_client(client_id=client_id)
mapper = mappers[0]
with pytest.raises(KeycloakPutError) as err:
await admin.update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict())
await admin.update_client_mapper(
client_id=client_id, mapper_id="does-not-exist", payload=dict()
)
assert err.match('404: b\'{"error":"Model not found"}\'')
mapper["config"]["user.attribute"] = "test"
res = await admin.update_client_mapper(client_id=client_id, mapper_id=mapper["id"], payload=mapper)
res = await admin.update_client_mapper(
client_id=client_id, mapper_id=mapper["id"], payload=mapper
)
assert res == dict()
res = await admin.remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"])
@ -977,7 +987,9 @@ async def test_clients(admin: KeycloakAdmin, realm: str):
)
assert res
assert (
await admin.get_client_secrets(client_id=await admin.get_client_id(client_id="test-confidential"))
await admin.get_client_secrets(
client_id=await admin.get_client_id(client_id="test-confidential")
)
== res
)
@ -1013,7 +1025,9 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str):
with pytest.raises(KeycloakPostError) as err:
await admin.create_realm_role(payload={"name": "test-realm-role"})
assert err.match('409: b\'{"errorMessage":"Role with name test-realm-role already exists"}\'')
role_id_2 = await admin.create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True)
role_id_2 = await admin.create_realm_role(
payload={"name": "test-realm-role"}, skip_exists=True
)
assert role_id == role_id_2
# Test update realm role
@ -1028,7 +1042,9 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str):
assert err.match('404: b\'{"error":"Could not find role"}\''), err
# Test realm role user assignment
user_id = await admin.create_user(payload={"username": "role-testing", "email": "test@test.test"})
user_id = await admin.create_user(
payload={"username": "role-testing", "email": "test@test.test"}
)
with pytest.raises(KeycloakPostError) as err:
await admin.assign_realm_roles(user_id=user_id, roles=["bad"])
assert err.match('500: b\'{"error":"unknown_error"}\'')
@ -1084,10 +1100,7 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str):
await admin.get_realm_role(role_name="offline_access"),
await admin.get_realm_role(role_name="test-realm-role-update"),
]
res = await admin.assign_group_realm_roles(
group_id=group_id,
roles=roles
)
res = await admin.assign_group_realm_roles(group_id=group_id, roles=roles)
assert res == dict(), res
roles = await admin.get_group_realm_roles(group_id=group_id)
@ -1112,7 +1125,8 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str):
await admin.add_composite_realm_roles_to_role(role_name=composite_role, roles=["bad"])
assert err.match('500: b\'{"error":"unknown_error"}\'')
res = await admin.add_composite_realm_roles_to_role(
role_name=composite_role, roles=[await admin.get_realm_role(role_name="test-realm-role-update")]
role_name=composite_role,
roles=[await admin.get_realm_role(role_name="test-realm-role-update")],
)
assert res == dict(), res
@ -1136,7 +1150,8 @@ async def test_realm_roles(admin: KeycloakAdmin, realm: str):
await admin.remove_composite_realm_roles_to_role(role_name=composite_role, roles=["bad"])
assert err.match('500: b\'{"error":"unknown_error"}\'')
res = await admin.remove_composite_realm_roles_to_role(
role_name=composite_role, roles=[await admin.get_realm_role(role_name="test-realm-role-update")]
role_name=composite_role,
roles=[await admin.get_realm_role(role_name="test-realm-role-update")],
)
assert res == dict(), res
@ -1254,10 +1269,7 @@ async def test_client_scope_realm_roles(admin: KeycloakAdmin, realm: str):
await admin.get_realm_role(role_name="offline_access"),
await admin.get_realm_role(role_name="test-realm-role"),
]
res = await admin.assign_realm_roles_to_client_scope(
client_id=client_id,
roles=roles
)
res = await admin.assign_realm_roles_to_client_scope(client_id=client_id, roles=roles)
assert res == dict(), res
roles = await admin.get_realm_roles_of_client_scope(client_id=client_id)
@ -1273,18 +1285,14 @@ async def test_client_scope_realm_roles(admin: KeycloakAdmin, realm: str):
assert err.match('500: b\'{"error":"unknown_error"}\'')
roles = [await admin.get_realm_role(role_name="offline_access")]
res = await admin.delete_realm_roles_of_client_scope(
client_id=client_id, roles=roles
)
res = await admin.delete_realm_roles_of_client_scope(client_id=client_id, roles=roles)
assert res == dict(), res
roles = await admin.get_realm_roles_of_client_scope(client_id=client_id)
assert len(roles) == 1
assert "test-realm-role" in [x["name"] for x in roles]
roles = [await admin.get_realm_role(role_name="test-realm-role")]
res = await admin.delete_realm_roles_of_client_scope(
client_id=client_id, roles=roles
)
res = await admin.delete_realm_roles_of_client_scope(client_id=client_id, roles=roles)
assert res == dict(), res
roles = await admin.get_realm_roles_of_client_scope(client_id=client_id)
assert len(roles) == 0
@ -1413,7 +1421,9 @@ async def test_client_roles(admin: KeycloakAdmin, client: str):
assert err.match('404: b\'{"error":"Could not find role"}\'')
# Test user with client role
res = await admin.get_client_role_members(client_id=client, role_name="client-role-test-update")
res = await admin.get_client_role_members(
client_id=client, role_name="client-role-test-update"
)
assert len(res) == 0
with pytest.raises(KeycloakGetError) as err:
await admin.get_client_role_members(client_id=client, role_name="bad")
@ -1430,7 +1440,11 @@ async def test_client_roles(admin: KeycloakAdmin, client: str):
)
assert res == dict()
assert (
len(await admin.get_client_role_members(client_id=client, role_name="client-role-test-update"))
len(
await admin.get_client_role_members(
client_id=client, role_name="client-role-test-update"
)
)
== 1
)
@ -1486,7 +1500,11 @@ async def test_client_roles(admin: KeycloakAdmin, client: str):
)
assert res == dict()
assert (
len(await admin.get_client_role_groups(client_id=client, role_name="client-role-test-update"))
len(
await admin.get_client_role_groups(
client_id=client, role_name="client-role-test-update"
)
)
== 1
)
assert len(await admin.get_group_client_roles(group_id=group_id, client_id=client)) == 1
@ -1514,12 +1532,12 @@ async def test_client_roles(admin: KeycloakAdmin, client: str):
)
assert res == dict()
role = await admin.get_client_role(client_id=client, role_name="client-role-test-update")
assert role[
"composite"
]
assert role["composite"]
# Test delete of client role
res = await admin.delete_client_role(client_role_id=client, role_name="client-role-test-update")
res = await admin.delete_client_role(
client_role_id=client, role_name="client-role-test-update"
)
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
await admin.delete_client_role(client_role_id=client, role_name="client-role-test-update")
@ -1638,12 +1656,12 @@ async def test_email(admin: KeycloakAdmin, user: str):
# Emails will fail as we don't have SMTP test setup
with pytest.raises(KeycloakPutError) as err:
await admin.send_update_account(user_id=user, payload=dict())
#assert err.match('500: b\'{"error":"unknown_error"}\'')
# assert err.match('500: b\'{"error":"unknown_error"}\'')
await admin.update_user(user_id=user, payload={"enabled": True})
with pytest.raises(KeycloakPutError) as err:
await admin.send_verify_email(user_id=user)
#assert err.match('500: b\'{"errorMessage":"Failed to send execute actions email"}\'')
# assert err.match('500: b\'{"errorMessage":"Failed to send execute actions email"}\'')
@pytest.mark.asyncio
@ -1731,7 +1749,9 @@ async def test_auth_flows(admin: KeycloakAdmin, realm: str):
await admin.copy_authentication_flow(payload=dict(), flow_alias="bad")
assert err.match("404: b''")
res = await admin.copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser")
res = await admin.copy_authentication_flow(
payload={"newName": "test-browser"}, flow_alias="browser"
)
assert res == b"", res
assert len(await admin.get_authentication_flows()) == 9
@ -1791,7 +1811,9 @@ async def test_auth_flows(admin: KeycloakAdmin, realm: str):
flow = await admin.get_authentication_flow_executions(flow_alias="test-create")
payload = flow[0]
payload["displayName"] = "test"
res = await admin.update_authentication_flow_executions(payload=payload, flow_alias="test-create")
res = await admin.update_authentication_flow_executions(
payload=payload, flow_alias="test-create"
)
assert res
flow = await admin.get_authentication_flow_executions(flow_alias="test-create")
@ -1832,9 +1854,9 @@ async def test_auth_flows(admin: KeycloakAdmin, realm: str):
assert res == {"msg": "Already exists"}
# Test delete auth flow
flow_id = [x for x in await admin.get_authentication_flows() if x["alias"] == "test-browser"][0][
"id"
]
flow_id = [x for x in await admin.get_authentication_flows() if x["alias"] == "test-browser"][
0
]["id"]
res = await admin.delete_authentication_flow(flow_id=flow_id)
assert res == dict()
with pytest.raises(KeycloakDeleteError) as err:
@ -1982,10 +2004,7 @@ async def test_client_scopes(admin: KeycloakAdmin, realm: str):
)
assert res_update == dict()
mapper = await admin.get_mappers_from_client_scope(client_scope_id=res)
assert (
mapper[0]["config"]["user.attribute"]
== "test"
)
assert mapper[0]["config"]["user.attribute"] == "test"
# Test delete mapper
res_del = await admin.delete_mapper_from_client_scope(
@ -2118,12 +2137,7 @@ async def test_keys(admin: KeycloakAdmin, realm: str):
admin.realm_name = realm
keys = await admin.get_keys()
assert set(keys["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"}
assert {k["algorithm"] for k in keys["keys"]} == {
"HS256",
"RSA-OAEP",
"AES",
"RS256",
}
assert {k["algorithm"] for k in keys["keys"]} == {"HS256", "RSA-OAEP", "AES", "RS256"}
@pytest.mark.asyncio
@ -2144,7 +2158,9 @@ async def test_events(admin: KeycloakAdmin, realm: str):
await admin.set_events(payload={"bad": "conf"})
assert err.match('400: b\'{"error":"Unrecognized field')
res = await admin.set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True})
res = await admin.set_events(
payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True}
)
assert res == dict()
await admin.create_client(payload={"name": "test", "clientId": "test"})
@ -2226,7 +2242,8 @@ async def test_auto_refresh(admin: KeycloakAdmin, realm: str):
admin.auto_refresh_token = ["get", "post", "put"]
assert (
await admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict()
await admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
== dict()
)
# Test delete refresh
@ -2353,8 +2370,8 @@ async def test_get_role_client_level_children(
assert child["id"] in [x["id"] for x in res]
#@pytest.mark.asyncio
#async def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple):
# @pytest.mark.asyncio
# async def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple):
# """Test upload certificate.
#
# :param admin: Keycloak Admin client
@ -2506,7 +2523,13 @@ async def test_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> N
admin.realm_name = realm
assert f"default-roles-{realm}" in [x["name"] for x in await admin.get_realm_roles()]
assert (
len([x["name"] for x in await admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"])
len(
[
x["name"]
for x in await admin.get_realm_roles()
if x["name"] == f"default-roles-{realm}"
]
)
== 1
)
@ -2523,7 +2546,9 @@ async def test_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> No
admin.realm_name = realm
assert (
await admin.get_default_realm_role_id()
== [x["id"] for x in await admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"][0]
== [
x["id"] for x in await admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"
][0]
)

52
tests/test_keycloak_openid.py

@ -194,18 +194,9 @@ async def test_exchange_token(
admin.realm_name = oid.realm_name
user_id = await admin.get_user_id(username=username)
client_id = await admin.get_client_id(client_id="realm-management")
roles = [
await admin.get_client_role(
client_id=client_id,
role_name="impersonation",
)
]
roles = [await admin.get_client_role(client_id=client_id, role_name="impersonation")]
print(roles)
await admin.assign_client_role(
user_id=user_id,
client_id=client_id,
roles=roles
)
await admin.assign_client_role(user_id=user_id, client_id=client_id, roles=roles)
token = await oid.token(username=username, password=password)
assert await oid.userinfo(token=token["access_token"]) == {
@ -285,9 +276,7 @@ async def test_entitlement(
token = await oid.token(username=username, password=password)
client_id = await admin.get_client_id(oid.client_id)
with pytest.raises(KeycloakDeprecationError):
resource_servers = await admin.get_client_authz_resources(
client_id=client_id
)
resource_servers = await admin.get_client_authz_resources(client_id=client_id)
resource_server_id = resource_servers[0]["_id"]
await oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
@ -330,14 +319,13 @@ async def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str
key="-----BEGIN PUBLIC KEY-----\n" + public_key + "\n-----END PUBLIC KEY-----",
options={"verify_aud": False},
)
assert (
decoded_token["preferred_username"]
== username
)
assert decoded_token["preferred_username"] == username
@pytest.mark.asyncio
async def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
async def test_load_authorization_config(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
):
"""Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
@ -377,17 +365,24 @@ async def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, st
key = "-----BEGIN PUBLIC KEY-----\n" + await oid.public_key() + "\n-----END PUBLIC KEY-----"
orig_client_id = oid.client_id
oid.client_id = "account"
assert await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
assert (
await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
== []
)
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
for x in await oid.get_policies(
token=token["access_token"], method_token_info="decode", key=key
)
] == ["Policy: test (role)"]
assert [
repr(x)
for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
for x in await oid.get_policies(
token=token["access_token"], method_token_info="decode", key=key
)
] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id
@ -417,7 +412,8 @@ async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID,
orig_client_id = oid.client_id
oid.client_id = "account"
assert (
await oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
await oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key)
== []
)
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
@ -446,8 +442,8 @@ async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID,
await oid.get_permissions(token=token["access_token"])
#@pytest.mark.asyncio
#async def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
# @pytest.mark.asyncio
# async def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
# """Test UMA permissions.
#
# :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
@ -462,10 +458,10 @@ async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID,
# assert uma_permissions[0]["rsname"] == "Default Resource"
#
#
#@pytest.mark.asyncio
#async def test_has_uma_access(
# @pytest.mark.asyncio
# async def test_has_uma_access(
# oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
#):
# ):
# """Test has UMA access.
#
# :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization

Loading…
Cancel
Save