Browse Source

test: linting done

ci/fix-tests-26-1
Richard Nemeth 1 week ago
parent
commit
a1efb60d86
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 6
      pyproject.toml
  2. 194
      tests/conftest.py
  3. 4
      tests/test_authorization.py
  4. 15
      tests/test_connection.py
  5. 7
      tests/test_exceptions.py
  6. 1651
      tests/test_keycloak_admin.py
  7. 341
      tests/test_keycloak_openid.py
  8. 91
      tests/test_keycloak_uma.py
  9. 5
      tests/test_license.py
  10. 58
      tests/test_uma_permissions.py
  11. 6
      tests/test_urls_patterns.py
  12. 8
      tox.ini

6
pyproject.toml

@ -86,11 +86,17 @@ ignore = [
"D212", "D212",
"FBT001", "FBT001",
"FBT002", "FBT002",
"FBT003",
"N818", "N818",
"PLR0912", "PLR0912",
"PLR0913", "PLR0913",
"PLR0915",
"TRY003", "TRY003",
] ]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["ARG001","PLR2004", "PT011", "S101", "SLF001"]
"docs/*" = ["A001", "EXE001", "ERA001"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "function" asyncio_default_fixture_loop_scope = "function"

194
tests/conftest.py

@ -4,8 +4,7 @@ import ipaddress
import os import os
import uuid import uuid
from collections.abc import Generator from collections.abc import Generator
from datetime import datetime, timedelta
from typing import Tuple
from datetime import datetime, timedelta, timezone
import freezegun import freezegun
import pytest import pytest
@ -19,7 +18,8 @@ from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection, Ke
class KeycloakTestEnv: class KeycloakTestEnv:
"""Wrapper for test Keycloak connection configuration.
"""
Wrapper for test Keycloak connection configuration.
:param host: Hostname :param host: Hostname
:type host: str :type host: str
@ -37,8 +37,9 @@ class KeycloakTestEnv:
port: str = os.environ["KEYCLOAK_PORT"], port: str = os.environ["KEYCLOAK_PORT"],
username: str = os.environ["KEYCLOAK_ADMIN"], username: str = os.environ["KEYCLOAK_ADMIN"],
password: str = os.environ["KEYCLOAK_ADMIN_PASSWORD"], password: str = os.environ["KEYCLOAK_ADMIN_PASSWORD"],
):
"""Init method.
) -> None:
"""
Init method.
:param host: Hostname :param host: Hostname
:type host: str :type host: str
@ -49,87 +50,96 @@ class KeycloakTestEnv:
:param password: Admin password :param password: Admin password
:type password: str :type password: str
""" """
self.KEYCLOAK_HOST = host
self.KEYCLOAK_PORT = port
self.KEYCLOAK_ADMIN = username
self.KEYCLOAK_ADMIN_PASSWORD = password
self.keycloak_host = host
self.keycloak_port = port
self.keycloak_admin = username
self.keycloak_admin_password = password
@property @property
def KEYCLOAK_HOST(self):
"""Hostname getter.
def keycloak_host(self) -> str:
"""
Hostname getter.
:returns: Keycloak host :returns: Keycloak host
:rtype: str :rtype: str
""" """
return self._KEYCLOAK_HOST
return self._keycloak_host
@KEYCLOAK_HOST.setter
def KEYCLOAK_HOST(self, value: str):
"""Hostname setter.
@keycloak_host.setter
def keycloak_host(self, value: str) -> None:
"""
Hostname setter.
:param value: Keycloak host :param value: Keycloak host
:type value: str :type value: str
""" """
self._KEYCLOAK_HOST = value
self._keycloak_host = value
@property @property
def KEYCLOAK_PORT(self):
"""Port getter.
def keycloak_port(self) -> str:
"""
Port getter.
:returns: Keycloak port :returns: Keycloak port
:rtype: str :rtype: str
""" """
return self._KEYCLOAK_PORT
return self._keycloak_port
@KEYCLOAK_PORT.setter
def KEYCLOAK_PORT(self, value: str):
"""Port setter.
@keycloak_port.setter
def keycloak_port(self, value: str) -> None:
"""
Port setter.
:param value: Keycloak port :param value: Keycloak port
:type value: str :type value: str
""" """
self._KEYCLOAK_PORT = value
self._keycloak_port = value
@property @property
def KEYCLOAK_ADMIN(self):
"""Admin username getter.
def keycloak_admin(self) -> str:
"""
Admin username getter.
:returns: Admin username :returns: Admin username
:rtype: str :rtype: str
""" """
return self._KEYCLOAK_ADMIN
return self._keycloak_admin
@KEYCLOAK_ADMIN.setter
def KEYCLOAK_ADMIN(self, value: str):
"""Admin username setter.
@keycloak_admin.setter
def keycloak_admin(self, value: str) -> None:
"""
Admin username setter.
:param value: Admin username :param value: Admin username
:type value: str :type value: str
""" """
self._KEYCLOAK_ADMIN = value
self._keycloak_admin = value
@property @property
def KEYCLOAK_ADMIN_PASSWORD(self):
"""Admin password getter.
def keycloak_admin_password(self) -> str:
"""
Admin password getter.
:returns: Admin password :returns: Admin password
:rtype: str :rtype: str
""" """
return self._KEYCLOAK_ADMIN_PASSWORD
return self._keycloak_admin_password
@KEYCLOAK_ADMIN_PASSWORD.setter
def KEYCLOAK_ADMIN_PASSWORD(self, value: str):
"""Admin password setter.
@keycloak_admin_password.setter
def keycloak_admin_password(self, value: str) -> None:
"""
Admin password setter.
:param value: Admin password :param value: Admin password
:type value: str :type value: str
""" """
self._KEYCLOAK_ADMIN_PASSWORD = value
self._keycloak_admin_password = value
@pytest.fixture @pytest.fixture
def env():
"""Fixture for getting the test environment configuration object.
def env() -> KeycloakTestEnv:
"""
Fixture for getting the test environment configuration object.
:returns: Keycloak test environment object :returns: Keycloak test environment object
:rtype: KeycloakTestEnv :rtype: KeycloakTestEnv
@ -138,8 +148,9 @@ def env():
@pytest.fixture @pytest.fixture
def admin(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class.
def admin(env: KeycloakTestEnv) -> KeycloakAdmin:
"""
Fixture for initialized KeycloakAdmin class.
:param env: Keycloak test environment :param env: Keycloak test environment
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -147,16 +158,17 @@ def admin(env: KeycloakTestEnv):
:rtype: KeycloakAdmin :rtype: KeycloakAdmin
""" """
return KeycloakAdmin( return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
username=env.keycloak_admin,
password=env.keycloak_admin_password,
) )
@pytest.fixture @pytest.fixture
@freezegun.freeze_time("2023-02-25 10:00:00") @freezegun.freeze_time("2023-02-25 10:00:00")
def admin_frozen(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class, with time frozen.
def admin_frozen(env: KeycloakTestEnv) -> KeycloakAdmin:
"""
Fixture for initialized KeycloakAdmin class, with time frozen.
:param env: Keycloak test environment :param env: Keycloak test environment
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -164,15 +176,20 @@ def admin_frozen(env: KeycloakTestEnv):
:rtype: KeycloakAdmin :rtype: KeycloakAdmin
""" """
return KeycloakAdmin( return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
username=env.keycloak_admin,
password=env.keycloak_admin_password,
) )
@pytest.fixture @pytest.fixture
def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for initialized KeycloakOpenID class.
def oid(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[KeycloakOpenID, None, None]:
"""
Fixture for initialized KeycloakOpenID class.
:param env: Keycloak test environment :param env: Keycloak test environment
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -198,7 +215,7 @@ def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
) )
# Return OID # Return OID
yield KeycloakOpenID( yield KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm, realm_name=realm,
client_id=client, client_id=client,
) )
@ -207,8 +224,13 @@ def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
@pytest.fixture @pytest.fixture
def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials.
def oid_with_credentials(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[tuple[KeycloakOpenID, str, str], None, None]:
"""
Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment :param env: Keycloak test environment
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -253,7 +275,7 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
yield ( yield (
KeycloakOpenID( KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm, realm_name=realm,
client_id=client, client_id=client,
client_secret_key=secret, client_secret_key=secret,
@ -268,8 +290,13 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
@pytest.fixture @pytest.fixture
def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials.
def oid_with_credentials_authz(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[tuple[KeycloakOpenID, str, str], None, None]:
"""
Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment :param env: Keycloak test environment
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -323,7 +350,7 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
yield ( yield (
KeycloakOpenID( KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm, realm_name=realm,
client_id=client, client_id=client,
client_secret_key=secret, client_secret_key=secret,
@ -338,8 +365,13 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
@pytest.fixture @pytest.fixture
def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials.
def oid_with_credentials_device(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[tuple[KeycloakOpenID, str, str], None, None]:
"""
Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment :param env: Keycloak test environment
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -385,7 +417,7 @@ def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: Keycloa
yield ( yield (
KeycloakOpenID( KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm, realm_name=realm,
client_id=client, client_id=client,
client_secret_key=secret, client_secret_key=secret,
@ -401,7 +433,8 @@ def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: Keycloa
@pytest.fixture @pytest.fixture
def realm(admin: KeycloakAdmin) -> Generator[str, None, None]: def realm(admin: KeycloakAdmin) -> Generator[str, None, None]:
"""Fixture for a new random realm.
"""
Fixture for a new random realm.
:param admin: Keycloak admin :param admin: Keycloak admin
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
@ -416,7 +449,8 @@ def realm(admin: KeycloakAdmin) -> Generator[str, None, None]:
@pytest.fixture @pytest.fixture
def user(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]: def user(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
"""Fixture for a new random user.
"""
Fixture for a new random user.
:param admin: Keycloak admin :param admin: Keycloak admin
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
@ -434,7 +468,8 @@ def user(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
@pytest.fixture @pytest.fixture
def group(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]: def group(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
"""Fixture for a new random group.
"""
Fixture for a new random group.
:param admin: Keycloak admin :param admin: Keycloak admin
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
@ -452,7 +487,8 @@ def group(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
@pytest.fixture @pytest.fixture
def client(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]: def client(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
"""Fixture for a new random client.
"""
Fixture for a new random client.
:param admin: Keycloak admin :param admin: Keycloak admin
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
@ -470,7 +506,8 @@ def client(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
@pytest.fixture @pytest.fixture
def client_role(admin: KeycloakAdmin, realm: str, client: str) -> Generator[str, None, None]: def client_role(admin: KeycloakAdmin, realm: str, client: str) -> Generator[str, None, None]:
"""Fixture for a new random client role.
"""
Fixture for a new random client role.
:param admin: Keycloak admin :param admin: Keycloak admin
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
@ -495,7 +532,8 @@ def composite_client_role(
client: str, client: str,
client_role: str, client_role: str,
) -> Generator[str, None, None]: ) -> Generator[str, None, None]:
"""Fixture for a new random composite client role.
"""
Fixture for a new random composite client role.
:param admin: Keycloak admin :param admin: Keycloak admin
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
@ -518,8 +556,9 @@ def composite_client_role(
@pytest.fixture @pytest.fixture
def selfsigned_cert():
"""Generate self signed certificate for a hostname, and optional IP addresses.
def selfsigned_cert() -> tuple[str, str]:
"""
Generate self signed certificate for a hostname, and optional IP addresses.
:returns: Selfsigned certificate :returns: Selfsigned certificate
:rtype: Tuple[str, str] :rtype: Tuple[str, str]
@ -551,7 +590,7 @@ def selfsigned_cert():
# path_len=0 means this cert can only sign itself, not other certs. # path_len=0 means this cert can only sign itself, not other certs.
basic_contraints = x509.BasicConstraints(ca=True, path_length=0) basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
now = datetime.utcnow()
now = datetime.now(tz=timezone.utc)
cert = ( cert = (
x509.CertificateBuilder() x509.CertificateBuilder()
.subject_name(name) .subject_name(name)
@ -575,8 +614,11 @@ def selfsigned_cert():
@pytest.fixture @pytest.fixture
def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Fixture for initialized KeycloakUMA class.
def oid_connection_with_authz(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> KeycloakOpenIDConnection:
"""
Fixture for initialized KeycloakUMA class.
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
@ -584,19 +626,19 @@ def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID,
:rtype: KeycloakOpenIDConnection :rtype: KeycloakOpenIDConnection
""" """
oid, _, _ = oid_with_credentials_authz oid, _, _ = oid_with_credentials_authz
connection = KeycloakOpenIDConnection(
return KeycloakOpenIDConnection(
server_url=oid.connection.base_url, server_url=oid.connection.base_url,
realm_name=oid.realm_name, realm_name=oid.realm_name,
client_id=oid.client_id, client_id=oid.client_id,
client_secret_key=oid.client_secret_key, client_secret_key=oid.client_secret_key,
timeout=60, timeout=60,
) )
return connection
@pytest.fixture @pytest.fixture
def uma(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Fixture for initialized KeycloakUMA class.
def uma(oid_connection_with_authz: KeycloakOpenIDConnection) -> KeycloakUMA:
"""
Fixture for initialized KeycloakUMA class.
:param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client :param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client
:type oid_connection_with_authz: KeycloakOpenIDConnection :type oid_connection_with_authz: KeycloakOpenIDConnection

4
tests/test_authorization.py

@ -6,7 +6,7 @@ from keycloak.authorization import Permission, Policy, Role
from keycloak.exceptions import KeycloakAuthorizationConfigError from keycloak.exceptions import KeycloakAuthorizationConfigError
def test_authorization_objects():
def test_authorization_objects() -> None:
"""Test authorization objects.""" """Test authorization objects."""
# Test permission # Test permission
p = Permission(name="test", type="test", logic="test", decision_strategy="test") p = Permission(name="test", type="test", logic="test", decision_strategy="test")
@ -39,5 +39,5 @@ def test_authorization_objects():
assert r.name == "test" assert r.name == "test"
assert not r.required assert not r.required
assert r.get_name() == "test" assert r.get_name() == "test"
assert r == r
assert r == r # noqa: PLR0124
assert r == "test" assert r == "test"

15
tests/test_connection.py

@ -8,21 +8,22 @@ from keycloak.connection import ConnectionManager
from keycloak.exceptions import KeycloakConnectionError from keycloak.exceptions import KeycloakConnectionError
def test_connection_proxy():
def test_connection_proxy() -> None:
"""Test proxies of connection manager.""" """Test proxies of connection manager."""
cm = ConnectionManager( cm = ConnectionManager(
base_url="http://test.test", proxies={"http://test.test": "http://localhost:8080"},
base_url="http://test.test",
proxies={"http://test.test": "http://localhost:8080"},
) )
assert cm._s.proxies == {"http://test.test": "http://localhost:8080"} assert cm._s.proxies == {"http://test.test": "http://localhost:8080"}
def test_headers():
def test_headers() -> None:
"""Test headers manipulation.""" """Test headers manipulation."""
cm = ConnectionManager(base_url="http://test.test", headers={"H": "A"}) cm = ConnectionManager(base_url="http://test.test", headers={"H": "A"})
assert cm.param_headers(key="H") == "A" assert cm.param_headers(key="H") == "A"
assert cm.param_headers(key="A") is None assert cm.param_headers(key="A") is None
cm.clean_headers() cm.clean_headers()
assert cm.headers == dict()
assert cm.headers == {}
cm.add_param_headers(key="H", value="B") cm.add_param_headers(key="H", value="B")
assert cm.exist_param_headers(key="H") assert cm.exist_param_headers(key="H")
assert not cm.exist_param_headers(key="B") assert not cm.exist_param_headers(key="B")
@ -30,7 +31,7 @@ def test_headers():
assert not cm.exist_param_headers(key="H") assert not cm.exist_param_headers(key="H")
def test_bad_connection():
def test_bad_connection() -> None:
"""Test bad connection.""" """Test bad connection."""
cm = ConnectionManager(base_url="http://not.real.domain") cm = ConnectionManager(base_url="http://not.real.domain")
with pytest.raises(KeycloakConnectionError): with pytest.raises(KeycloakConnectionError):
@ -44,7 +45,7 @@ def test_bad_connection():
@pytest.mark.asyncio @pytest.mark.asyncio
async def a_test_bad_connection():
async def a_test_bad_connection() -> None:
"""Test bad connection.""" """Test bad connection."""
cm = ConnectionManager(base_url="http://not.real.domain") cm = ConnectionManager(base_url="http://not.real.domain")
with pytest.raises(KeycloakConnectionError): with pytest.raises(KeycloakConnectionError):
@ -57,7 +58,7 @@ async def a_test_bad_connection():
await cm.a_raw_put(path="bad", data={}) await cm.a_raw_put(path="bad", data={})
def test_counter_part():
def test_counter_part() -> None:
"""Test that each function has its async counter part.""" """Test that each function has its async counter part."""
con_methods = [ con_methods = [
func for func in dir(ConnectionManager) if callable(getattr(ConnectionManager, func)) func for func in dir(ConnectionManager) if callable(getattr(ConnectionManager, func))

7
tests/test_exceptions.py

@ -7,7 +7,7 @@ import pytest
from keycloak.exceptions import KeycloakOperationError, raise_error_from_response from keycloak.exceptions import KeycloakOperationError, raise_error_from_response
def test_raise_error_from_response_from_dict():
def test_raise_error_from_response_from_dict() -> None:
"""Test raise error from response using a dictionary.""" """Test raise error from response using a dictionary."""
response = Mock() response = Mock()
response.json.return_value = {"key": "value"} response.json.return_value = {"key": "value"}
@ -16,5 +16,8 @@ def test_raise_error_from_response_from_dict():
with pytest.raises(KeycloakOperationError): with pytest.raises(KeycloakOperationError):
raise_error_from_response( raise_error_from_response(
response=response, error=dict(), expected_codes=[200], skip_exists=False,
response=response,
error={},
expected_codes=[200],
skip_exists=False,
) )

1651
tests/test_keycloak_admin.py
File diff suppressed because it is too large
View File

341
tests/test_keycloak_openid.py

@ -1,7 +1,6 @@
"""Test module for KeycloakOpenID.""" """Test module for KeycloakOpenID."""
from inspect import iscoroutinefunction, signature from inspect import iscoroutinefunction, signature
from typing import Tuple
from unittest import mock from unittest import mock
import jwcrypto.jwk import jwcrypto.jwk
@ -22,16 +21,18 @@ from keycloak.exceptions import (
KeycloakPostError, KeycloakPostError,
KeycloakRPTNotFound, KeycloakRPTNotFound,
) )
from tests.conftest import KeycloakTestEnv
def test_keycloak_openid_init(env):
"""Test KeycloakOpenId's init method.
def test_keycloak_openid_init(env: KeycloakTestEnv) -> None:
"""
Test KeycloakOpenId's init method.
:param env: Environment fixture :param env: Environment fixture
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
""" """
oid = KeycloakOpenID( oid = KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name="master", realm_name="master",
client_id="admin-cli", client_id="admin-cli",
) )
@ -43,15 +44,16 @@ def test_keycloak_openid_init(env):
assert isinstance(oid.authorization, Authorization) assert isinstance(oid.authorization, Authorization)
def test_well_known(oid: KeycloakOpenID):
"""Test the well_known method.
def test_well_known(oid: KeycloakOpenID) -> None:
"""
Test the well_known method.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
""" """
res = oid.well_known() res = oid.well_known()
assert res is not None assert res is not None
assert res != dict()
assert res != {}
for key in [ for key in [
"acr_values_supported", "acr_values_supported",
"authorization_encryption_alg_values_supported", "authorization_encryption_alg_values_supported",
@ -110,8 +112,9 @@ def test_well_known(oid: KeycloakOpenID):
assert key in res assert key in res
def test_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method.
def test_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None:
"""
Test the auth_url method.
:param env: Environment fixture :param env: Environment fixture
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -120,15 +123,15 @@ def test_auth_url(env, oid: KeycloakOpenID):
""" """
res = oid.auth_url(redirect_uri="http://test.test/*") res = oid.auth_url(redirect_uri="http://test.test/*")
assert ( assert (
res
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
+ f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
+ "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
res == f"http://{env.keycloak_host}:{env.keycloak_port}/realms/{oid.realm_name}"
f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
"&redirect_uri=http://test.test/*&scope=email&state=&nonce="
) )
def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method.
def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test the token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -177,9 +180,11 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
def test_exchange_token( def test_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
):
"""Test the exchange token method.
oid_with_credentials: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test the exchange token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -215,7 +220,9 @@ def test_exchange_token(
# Exchange token with the new user # Exchange token with the new user
new_token = oid.exchange_token( new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username,
token=token["access_token"],
audience=oid.client_id,
subject=username,
) )
assert oid.userinfo(token=new_token["access_token"]) == { assert oid.userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test", "email": f"{username}@test.test",
@ -229,8 +236,9 @@ def test_exchange_token(
assert token != new_token assert token != new_token
def test_logout(oid_with_credentials):
"""Test logout.
def test_logout(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test logout.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -238,15 +246,16 @@ def test_logout(oid_with_credentials):
oid, username, password = oid_with_credentials oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password) token = oid.token(username=username, password=password)
assert oid.userinfo(token=token["access_token"]) != dict()
assert oid.logout(refresh_token=token["refresh_token"]) == dict()
assert oid.userinfo(token=token["access_token"]) != {}
assert oid.logout(refresh_token=token["refresh_token"]) == {}
with pytest.raises(KeycloakAuthenticationError): with pytest.raises(KeycloakAuthenticationError):
oid.userinfo(token=token["access_token"]) oid.userinfo(token=token["access_token"])
def test_certs(oid: KeycloakOpenID):
"""Test certificates.
def test_certs(oid: KeycloakOpenID) -> None:
"""
Test certificates.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
@ -254,8 +263,9 @@ def test_certs(oid: KeycloakOpenID):
assert len(oid.certs()["keys"]) == 2 assert len(oid.certs()["keys"]) == 2
def test_public_key(oid: KeycloakOpenID):
"""Test public key.
def test_public_key(oid: KeycloakOpenID) -> None:
"""
Test public key.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
@ -264,9 +274,11 @@ def test_public_key(oid: KeycloakOpenID):
def test_entitlement( def test_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
):
"""Test entitlement.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test entitlement.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -284,8 +296,9 @@ def test_entitlement(
oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id) oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect.
def test_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test introspect.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -295,15 +308,18 @@ def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
assert oid.introspect(token=token["access_token"])["active"] assert oid.introspect(token=token["access_token"])["active"]
assert oid.introspect( assert oid.introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token",
token=token["access_token"],
rpt="some",
token_type_hint="requesting_party_token", # noqa: S106
) == {"active": False} ) == {"active": False}
with pytest.raises(KeycloakRPTNotFound): with pytest.raises(KeycloakRPTNotFound):
oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token") # noqa: S106
def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token.
def test_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test decode token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -319,8 +335,9 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
def test_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token with an invalid token.
def test_decode_token_invalid_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test decode token with an invalid token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -340,20 +357,27 @@ def test_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID,
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = oid.decode_token( decoded_invalid_access_token = oid.decode_token(
token=invalid_access_token, validate=True, key=key,
token=invalid_access_token,
validate=True,
key=key,
) )
decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False) decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False)
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
decoded_invalid_access_token = oid.decode_token( decoded_invalid_access_token = oid.decode_token(
token=invalid_access_token, validate=False, key=key,
token=invalid_access_token,
validate=False,
key=key,
) )
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test load authorization config.
def test_load_authorization_config(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -368,12 +392,14 @@ def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpe
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role) assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2 assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance( assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission,
oid.authorization.policies["test-authz-rb-policy"].permissions[0],
Permission,
) )
def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
def test_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -390,15 +416,17 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert oid.get_policies(token=token["access_token"], method_token_info="decode") == []
assert oid.get_policies(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
str(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["Policy: test (role)"] ] == ["Policy: test (role)"]
assert [ assert [
repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
repr(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["<Policy: test (role)>"] ] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id
@ -407,8 +435,9 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str
oid.get_policies(token=token["access_token"]) oid.get_policies(token=token["access_token"])
def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
def test_get_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -425,22 +454,25 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == []
assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
policy.add_permission( policy.add_permission(
permission=Permission( permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS",
name="test-perm",
type="resource",
logic="POSITIVE",
decision_strategy="UNANIMOUS",
), ),
) )
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) str(x)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["Permission: test-perm (resource)"] ] == ["Permission: test-perm (resource)"]
assert [ assert [
repr(x) repr(x)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["<Permission: test-perm (resource)>"] ] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id
@ -449,8 +481,9 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
oid.get_permissions(token=token["access_token"]) oid.get_permissions(token=token["access_token"])
def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
def test_uma_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test UMA permissions.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -464,9 +497,11 @@ def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
def test_has_uma_access( def test_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
):
"""Test has UMA access.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test has UMA access.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -497,16 +532,18 @@ def test_has_uma_access(
assert ( assert (
str( str(
oid.has_uma_access( oid.has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource",
token=admin.connection.token["access_token"],
permissions="Default Resource",
), ),
) )
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})"
"{'Default Resource'})"
) )
def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"""Test device authorization flow.
def test_device(oid_with_credentials_device: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test device authorization flow.
:param oid_with_credentials_device: Keycloak OpenID client with pre-configured user :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
credentials and device authorization flow enabled credentials and device authorization flow enabled
@ -519,7 +556,7 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"user_code": mock.ANY, "user_code": mock.ANY,
"verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device", "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
"verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/" "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
+ f"device?user_code={res['user_code']}",
f"device?user_code={res['user_code']}",
"expires_in": 600, "expires_in": 600,
"interval": 5, "interval": 5,
} }
@ -529,15 +566,16 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_well_known(oid: KeycloakOpenID):
"""Test the well_known method.
async def test_a_well_known(oid: KeycloakOpenID) -> None:
"""
Test the well_known method.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
""" """
res = await oid.a_well_known() res = await oid.a_well_known()
assert res is not None assert res is not None
assert res != dict()
assert res != {}
for key in [ for key in [
"acr_values_supported", "acr_values_supported",
"authorization_encryption_alg_values_supported", "authorization_encryption_alg_values_supported",
@ -597,8 +635,9 @@ async def test_a_well_known(oid: KeycloakOpenID):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method.
async def test_a_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None:
"""
Test the auth_url method.
:param env: Environment fixture :param env: Environment fixture
:type env: KeycloakTestEnv :type env: KeycloakTestEnv
@ -607,16 +646,16 @@ async def test_a_auth_url(env, oid: KeycloakOpenID):
""" """
res = await oid.a_auth_url(redirect_uri="http://test.test/*") res = await oid.a_auth_url(redirect_uri="http://test.test/*")
assert ( assert (
res
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
+ f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
+ "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
res == f"http://{env.keycloak_host}:{env.keycloak_port}/realms/{oid.realm_name}"
f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
"&redirect_uri=http://test.test/*&scope=email&state=&nonce="
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method.
async def test_a_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test the token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -666,9 +705,11 @@ async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_exchange_token( async def test_a_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
):
"""Test the exchange token method.
oid_with_credentials: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test the exchange token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -704,7 +745,9 @@ async def test_a_exchange_token(
# Exchange token with the new user # Exchange token with the new user
new_token = oid.exchange_token( new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username,
token=token["access_token"],
audience=oid.client_id,
subject=username,
) )
assert await oid.a_userinfo(token=new_token["access_token"]) == { assert await oid.a_userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test", "email": f"{username}@test.test",
@ -719,8 +762,9 @@ async def test_a_exchange_token(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_logout(oid_with_credentials):
"""Test logout.
async def test_a_logout(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test logout.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -728,16 +772,17 @@ async def test_a_logout(oid_with_credentials):
oid, username, password = oid_with_credentials oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password) token = await oid.a_token(username=username, password=password)
assert await oid.a_userinfo(token=token["access_token"]) != dict()
assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict()
assert await oid.a_userinfo(token=token["access_token"]) != {}
assert await oid.a_logout(refresh_token=token["refresh_token"]) == {}
with pytest.raises(KeycloakAuthenticationError): with pytest.raises(KeycloakAuthenticationError):
await oid.a_userinfo(token=token["access_token"]) await oid.a_userinfo(token=token["access_token"])
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_certs(oid: KeycloakOpenID):
"""Test certificates.
async def test_a_certs(oid: KeycloakOpenID) -> None:
"""
Test certificates.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
@ -746,8 +791,9 @@ async def test_a_certs(oid: KeycloakOpenID):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_public_key(oid: KeycloakOpenID):
"""Test public key.
async def test_a_public_key(oid: KeycloakOpenID) -> None:
"""
Test public key.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
@ -757,9 +803,11 @@ async def test_a_public_key(oid: KeycloakOpenID):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_entitlement( async def test_a_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
):
"""Test entitlement.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test entitlement.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -778,8 +826,9 @@ async def test_a_entitlement(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect.
async def test_a_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test introspect.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -789,18 +838,22 @@ async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str
assert (await oid.a_introspect(token=token["access_token"]))["active"] assert (await oid.a_introspect(token=token["access_token"]))["active"]
assert await oid.a_introspect( assert await oid.a_introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token",
token=token["access_token"],
rpt="some",
token_type_hint="requesting_party_token", # noqa: S106
) == {"active": False} ) == {"active": False}
with pytest.raises(KeycloakRPTNotFound): with pytest.raises(KeycloakRPTNotFound):
await oid.a_introspect( await oid.a_introspect(
token=token["access_token"], token_type_hint="requesting_party_token",
token=token["access_token"],
token_type_hint="requesting_party_token", # noqa: S106
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token asynchronously.
async def test_a_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test decode token asynchronously.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -817,8 +870,11 @@ async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, s
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token asynchronously an invalid token.
async def test_a_decode_token_invalid_token(
oid_with_credentials: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test decode token asynchronously an invalid token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -835,30 +891,37 @@ async def test_a_decode_token_invalid_token(oid_with_credentials: Tuple[Keycloak
invalid_access_token = access_token + "a" invalid_access_token = access_token + "a"
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=True,
token=invalid_access_token,
validate=True,
) )
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=True, key=key,
token=invalid_access_token,
validate=True,
key=key,
) )
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=False,
token=invalid_access_token,
validate=False,
) )
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=False, key=key,
token=invalid_access_token,
validate=False,
key=key,
) )
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_load_authorization_config( async def test_a_load_authorization_config(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str],
):
"""Test load authorization config.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -873,15 +936,18 @@ async def test_a_load_authorization_config(
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role) assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2 assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance( assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission,
oid.authorization.policies["test-authz-rb-policy"].permissions[0],
Permission,
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_has_uma_access( async def test_a_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
):
"""Test has UMA access.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test has UMA access.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -898,7 +964,10 @@ async def test_a_has_uma_access(
) )
assert ( assert (
str( str(
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource"),
await oid.a_has_uma_access(
token=token["access_token"],
permissions="Default Resource",
),
) )
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
) )
@ -914,17 +983,19 @@ async def test_a_has_uma_access(
assert ( assert (
str( str(
await oid.a_has_uma_access( await oid.a_has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource",
token=admin.connection.token["access_token"],
permissions="Default Resource",
), ),
) )
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})"
"{'Default Resource'})"
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
async def test_a_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -941,17 +1012,17 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID,
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == []
assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) str(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["Policy: test (role)"] ] == ["Policy: test (role)"]
assert [ assert [
repr(x) repr(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["<Policy: test (role)>"] ] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id
@ -961,8 +1032,11 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID,
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
async def test_a_get_permissions(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -980,26 +1054,31 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert ( assert (
await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
) )
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
policy.add_permission( policy.add_permission(
permission=Permission( permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS",
name="test-perm",
type="resource",
logic="POSITIVE",
decision_strategy="UNANIMOUS",
), ),
) )
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) str(x)
for x in await oid.a_get_permissions( for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode",
token=token["access_token"],
method_token_info="decode", # noqa: S106
) )
] == ["Permission: test-perm (resource)"] ] == ["Permission: test-perm (resource)"]
assert [ assert [
repr(x) repr(x)
for x in await oid.a_get_permissions( for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode",
token=token["access_token"],
method_token_info="decode", # noqa: S106
) )
] == ["<Permission: test-perm (resource)>"] ] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id
@ -1010,8 +1089,11 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
async def test_a_uma_permissions(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test UMA permissions.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials server with client credentials
@ -1027,8 +1109,9 @@ async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"""Test device authorization flow.
async def test_a_device(oid_with_credentials_device: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test device authorization flow.
:param oid_with_credentials_device: Keycloak OpenID client with pre-configured user :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
credentials and device authorization flow enabled credentials and device authorization flow enabled
@ -1041,13 +1124,13 @@ async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str,
"user_code": mock.ANY, "user_code": mock.ANY,
"verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device", "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
"verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/" "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
+ f"device?user_code={res['user_code']}",
f"device?user_code={res['user_code']}",
"expires_in": 600, "expires_in": 600,
"interval": 5, "interval": 5,
} }
def test_counter_part():
def test_counter_part() -> None:
"""Test that each function has its async counter part.""" """Test that each function has its async counter part."""
openid_methods = [ openid_methods = [
func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func)) func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func))

91
tests/test_keycloak_uma.py

@ -15,8 +15,9 @@ from keycloak.exceptions import (
from keycloak.uma_permissions import UMAPermission from keycloak.uma_permissions import UMAPermission
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Test KeycloakUMA's init method.
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection) -> None:
"""
Test KeycloakUMA's init method.
:param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
:type oid_connection_with_authz: KeycloakOpenIDConnection :type oid_connection_with_authz: KeycloakOpenIDConnection
@ -32,21 +33,23 @@ def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
assert uma._well_known is not None assert uma._well_known is not None
def test_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
def test_uma_well_known(uma: KeycloakUMA) -> None:
"""
Test the well_known method.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
""" """
res = uma.uma_well_known res = uma.uma_well_known
assert res is not None assert res is not None
assert res != dict()
assert res != {}
for key in ["resource_registration_endpoint"]: for key in ["resource_registration_endpoint"]:
assert key in res assert key in res
def test_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
def test_uma_resource_sets(uma: KeycloakUMA) -> None:
"""
Test resource sets.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -109,7 +112,8 @@ def test_uma_resource_sets(uma: KeycloakUMA):
assert len(resource_set_list_ids) == 0 assert len(resource_set_list_ids) == 0
# With matchingUri query option # With matchingUri query option
resource_set_list_ids = uma.resource_set_list_ids( resource_set_list_ids = uma.resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True,
uri="/some_resources/resource",
matchingUri=True,
) )
assert len(resource_set_list_ids) == 1 assert len(resource_set_list_ids) == 1
@ -130,7 +134,7 @@ def test_uma_resource_sets(uma: KeycloakUMA):
# Test update resource set # Test update resource set
latest_resource["name"] = "New Resource Name" latest_resource["name"] = "New Resource Name"
res = uma.resource_set_update(created_resource["_id"], latest_resource) res = uma.resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
assert res == {}, res
updated_resource = uma.resource_set_read(created_resource["_id"]) updated_resource = uma.resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name" assert updated_resource["name"] == "New Resource Name"
@ -141,7 +145,7 @@ def test_uma_resource_sets(uma: KeycloakUMA):
# Test delete resource set # Test delete resource set
res = uma.resource_set_delete(resource_id=created_resource["_id"]) res = uma.resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
assert res == {}, res
with pytest.raises(KeycloakGetError) as err: with pytest.raises(KeycloakGetError) as err:
uma.resource_set_read(created_resource["_id"]) uma.resource_set_read(created_resource["_id"])
err.match("404: b''") err.match("404: b''")
@ -152,8 +156,9 @@ def test_uma_resource_sets(uma: KeycloakUMA):
assert err.match("404: b''") assert err.match("404: b''")
def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin) -> None:
"""
Test policies.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -215,7 +220,8 @@ def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
with pytest.raises(KeycloakDeleteError) as err: with pytest.raises(KeycloakDeleteError) as err:
uma.policy_delete(policy_id) uma.policy_delete(policy_id)
assert err.match( assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'',
'404: b\'{"error":"invalid_request","error_description":'
'"Policy with .* does not exist"}\'',
) )
policies = uma.policy_query() policies = uma.policy_query()
@ -251,8 +257,9 @@ def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
admin.delete_group(group_id) admin.delete_group(group_id)
def test_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
def test_uma_access(uma: KeycloakUMA) -> None:
"""
Test permission access checks.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -274,7 +281,7 @@ def test_uma_access(uma: KeycloakUMA):
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
token = uma.connection.token token = uma.connection.token
permissions = list()
permissions = []
assert uma.permissions_check(token["access_token"], permissions) assert uma.permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource=resource_to_create["name"])) permissions.append(UMAPermission(resource=resource_to_create["name"]))
@ -285,8 +292,9 @@ def test_uma_access(uma: KeycloakUMA):
uma.resource_set_delete(resource["_id"]) uma.resource_set_delete(resource["_id"])
def test_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.
def test_uma_permission_ticket(uma: KeycloakUMA) -> None:
"""
Test permission ticket generation.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -312,7 +320,8 @@ def test_uma_permission_ticket(uma: KeycloakUMA):
response = uma.permission_ticket_create(permissions) response = uma.permission_ticket_create(permissions)
rpt = uma.connection.keycloak_openid.token( rpt = uma.connection.keycloak_openid.token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"],
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket",
ticket=response["ticket"],
) )
assert rpt assert rpt
assert "access_token" in rpt assert "access_token" in rpt
@ -328,22 +337,24 @@ def test_uma_permission_ticket(uma: KeycloakUMA):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
async def test_a_uma_well_known(uma: KeycloakUMA) -> None:
"""
Test the well_known method.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
""" """
res = uma.uma_well_known res = uma.uma_well_known
assert res is not None assert res is not None
assert res != dict()
assert res != {}
for key in ["resource_registration_endpoint"]: for key in ["resource_registration_endpoint"]:
assert key in res assert key in res
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
async def test_a_uma_resource_sets(uma: KeycloakUMA) -> None:
"""
Test resource sets.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -406,7 +417,8 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
assert len(resource_set_list_ids) == 0 assert len(resource_set_list_ids) == 0
# With matchingUri query option # With matchingUri query option
resource_set_list_ids = await uma.a_resource_set_list_ids( resource_set_list_ids = await uma.a_resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True,
uri="/some_resources/resource",
matchingUri=True,
) )
assert len(resource_set_list_ids) == 1 assert len(resource_set_list_ids) == 1
@ -427,7 +439,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
# Test update resource set # Test update resource set
latest_resource["name"] = "New Resource Name" latest_resource["name"] = "New Resource Name"
res = await uma.a_resource_set_update(created_resource["_id"], latest_resource) res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
assert res == {}, res
updated_resource = await uma.a_resource_set_read(created_resource["_id"]) updated_resource = await uma.a_resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name" assert updated_resource["name"] == "New Resource Name"
@ -438,7 +450,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
# Test delete resource set # Test delete resource set
res = await uma.a_resource_set_delete(resource_id=created_resource["_id"]) res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
assert res == {}, res
with pytest.raises(KeycloakGetError) as err: with pytest.raises(KeycloakGetError) as err:
await uma.a_resource_set_read(created_resource["_id"]) await uma.a_resource_set_read(created_resource["_id"])
err.match("404: b''") err.match("404: b''")
@ -450,8 +462,9 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin) -> None:
"""
Test policies.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -513,7 +526,8 @@ async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
with pytest.raises(KeycloakDeleteError) as err: with pytest.raises(KeycloakDeleteError) as err:
await uma.a_policy_delete(policy_id) await uma.a_policy_delete(policy_id)
assert err.match( assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'',
'404: b\'{"error":"invalid_request","error_description":'
'"Policy with .* does not exist"}\'',
) )
policies = await uma.a_policy_query() policies = await uma.a_policy_query()
@ -550,8 +564,9 @@ async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
async def test_a_uma_access(uma: KeycloakUMA) -> None:
"""
Test permission access checks.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -573,7 +588,7 @@ async def test_a_uma_access(uma: KeycloakUMA):
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
token = uma.connection.token token = uma.connection.token
permissions = list()
permissions = []
assert await uma.a_permissions_check(token["access_token"], permissions) assert await uma.a_permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource=resource_to_create["name"])) permissions.append(UMAPermission(resource=resource_to_create["name"]))
@ -585,8 +600,9 @@ async def test_a_uma_access(uma: KeycloakUMA):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.
async def test_a_uma_permission_ticket(uma: KeycloakUMA) -> None:
"""
Test permission ticket generation.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
@ -612,7 +628,8 @@ async def test_a_uma_permission_ticket(uma: KeycloakUMA):
response = await uma.a_permission_ticket_create(permissions) response = await uma.a_permission_ticket_create(permissions)
rpt = await uma.connection.keycloak_openid.a_token( rpt = await uma.connection.keycloak_openid.a_token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"],
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket",
ticket=response["ticket"],
) )
assert rpt assert rpt
assert "access_token" in rpt assert "access_token" in rpt
@ -624,7 +641,7 @@ async def test_a_uma_permission_ticket(uma: KeycloakUMA):
await uma.a_resource_set_delete(resource["_id"]) await uma.a_resource_set_delete(resource["_id"])
def test_counter_part():
def test_counter_part() -> None:
"""Test that each function has its async counter part.""" """Test that each function has its async counter part."""
uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))] uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
sync_methods = [ sync_methods = [

5
tests/test_license.py

@ -1,14 +1,15 @@
"""Tests for license.""" """Tests for license."""
import os import os
import pathlib
def test_license_present():
def test_license_present() -> None:
"""Test that the MIT license is present in the header of each module file.""" """Test that the MIT license is present in the header of each module file."""
for path, _, files in os.walk("src/keycloak"): for path, _, files in os.walk("src/keycloak"):
for _file in files: for _file in files:
if _file.endswith(".py"): if _file.endswith(".py"):
with open(os.path.join(path, _file)) as fp:
with pathlib.Path(pathlib.Path(path) / _file).open("r") as fp:
content = fp.read() content = fp.read()
assert content.startswith( assert content.startswith(
"# -*- coding: utf-8 -*-\n#\n# The MIT License (MIT)\n#\n#", "# -*- coding: utf-8 -*-\n#\n# The MIT License (MIT)\n#\n#",

58
tests/test_uma_permissions.py

@ -30,7 +30,7 @@ from keycloak.uma_permissions import (
) )
def test_uma_permission_obj():
def test_uma_permission_obj() -> None:
"""Test generic UMA permission.""" """Test generic UMA permission."""
with pytest.raises(PermissionDefinitionError): with pytest.raises(PermissionDefinitionError):
UMAPermission(permission="bad") UMAPermission(permission="bad")
@ -49,35 +49,35 @@ def test_uma_permission_obj():
assert {p1, p1} != {p2, p2} assert {p1, p1} != {p2, p2}
def test_resource_with_scope_obj():
def test_resource_with_scope_obj() -> None:
"""Test resource with scope.""" """Test resource with scope."""
r = Resource("Resource1") r = Resource("Resource1")
s = Scope("Scope1") s = Scope("Scope1")
assert r(s) == "Resource1#Scope1" assert r(s) == "Resource1#Scope1"
def test_scope_with_resource_obj():
def test_scope_with_resource_obj() -> None:
"""Test scope with resource.""" """Test scope with resource."""
r = Resource("Resource1") r = Resource("Resource1")
s = Scope("Scope1") s = Scope("Scope1")
assert s(r) == "Resource1#Scope1" assert s(r) == "Resource1#Scope1"
def test_resource_scope_str():
def test_resource_scope_str() -> None:
"""Test resource scope as string.""" """Test resource scope as string."""
r = Resource("Resource1") r = Resource("Resource1")
s = "Scope1" s = "Scope1"
assert r(scope=s) == "Resource1#Scope1" assert r(scope=s) == "Resource1#Scope1"
def test_scope_resource_str():
def test_scope_resource_str() -> None:
"""Test scope resource as string.""" """Test scope resource as string."""
r = "Resource1" r = "Resource1"
s = Scope("Scope1") s = Scope("Scope1")
assert s(resource=r) == "Resource1#Scope1" assert s(resource=r) == "Resource1#Scope1"
def test_resource_scope_list():
def test_resource_scope_list() -> None:
"""Test resource scope as list.""" """Test resource scope as list."""
r = Resource("Resource1") r = Resource("Resource1")
s = ["Scope1"] s = ["Scope1"]
@ -86,126 +86,126 @@ def test_resource_scope_list():
assert err.match(re.escape("can't determine if '['Scope1']' is a resource or scope")) assert err.match(re.escape("can't determine if '['Scope1']' is a resource or scope"))
def test_build_permission_none():
def test_build_permission_none() -> None:
"""Test build permission param with None.""" """Test build permission param with None."""
assert build_permission_param(None) == set() assert build_permission_param(None) == set()
def test_build_permission_empty_str():
def test_build_permission_empty_str() -> None:
"""Test build permission param with an empty string.""" """Test build permission param with an empty string."""
assert build_permission_param("") == set() assert build_permission_param("") == set()
def test_build_permission_empty_list():
def test_build_permission_empty_list() -> None:
"""Test build permission param with an empty list.""" """Test build permission param with an empty list."""
assert build_permission_param([]) == set() assert build_permission_param([]) == set()
def test_build_permission_empty_tuple():
def test_build_permission_empty_tuple() -> None:
"""Test build permission param with an empty tuple.""" """Test build permission param with an empty tuple."""
assert build_permission_param(()) == set() assert build_permission_param(()) == set()
def test_build_permission_empty_set():
def test_build_permission_empty_set() -> None:
"""Test build permission param with an empty set.""" """Test build permission param with an empty set."""
assert build_permission_param(set()) == set() assert build_permission_param(set()) == set()
def test_build_permission_empty_dict():
def test_build_permission_empty_dict() -> None:
"""Test build permission param with an empty dict.""" """Test build permission param with an empty dict."""
assert build_permission_param({}) == set() assert build_permission_param({}) == set()
def test_build_permission_str():
def test_build_permission_str() -> None:
"""Test build permission param as string.""" """Test build permission param as string."""
assert build_permission_param("resource1") == {"resource1"} assert build_permission_param("resource1") == {"resource1"}
def test_build_permission_list_str():
def test_build_permission_list_str() -> None:
"""Test build permission param with list of strings.""" """Test build permission param with list of strings."""
assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"} assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_str():
def test_build_permission_tuple_str() -> None:
"""Test build permission param with tuple of strings.""" """Test build permission param with tuple of strings."""
assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"} assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"}
def test_build_permission_set_str():
def test_build_permission_set_str() -> None:
"""Test build permission param with set of strings.""" """Test build permission param with set of strings."""
assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"} assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_str():
def test_build_permission_tuple_dict_str_str() -> None:
"""Test build permission param with dictionary.""" """Test build permission param with dictionary."""
assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"} assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"}
def test_build_permission_tuple_dict_str_list_str():
def test_build_permission_tuple_dict_str_list_str() -> None:
"""Test build permission param with dictionary of list.""" """Test build permission param with dictionary of list."""
assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"} assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_list_str2():
def test_build_permission_tuple_dict_str_list_str2() -> None:
"""Test build permission param with mutliple-keyed dictionary.""" """Test build permission param with mutliple-keyed dictionary."""
assert build_permission_param( assert build_permission_param(
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]}, {"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]},
) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"} ) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}
def test_build_permission_uma():
def test_build_permission_uma() -> None:
"""Test build permission param with UMA.""" """Test build permission param with UMA."""
assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"} assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"}
def test_build_permission_uma_list():
def test_build_permission_uma_list() -> None:
"""Test build permission param with list of UMAs.""" """Test build permission param with list of UMAs."""
assert build_permission_param( assert build_permission_param(
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))], [Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))],
) == {"res1#scope1", "res1#scope2"} ) == {"res1#scope1", "res1#scope2"}
def test_build_permission_misbuilt_dict_str_list_list_str():
def test_build_permission_misbuilt_dict_str_list_list_str() -> None:
"""Test bad build of permission param from dictionary.""" """Test bad build of permission param from dictionary."""
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": [["scope1", "scope2"]]}) build_permission_param({"res1": [["scope1", "scope2"]]})
assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}")) assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}"))
def test_build_permission_misbuilt_list_list_str():
def test_build_permission_misbuilt_list_list_str() -> None:
"""Test bad build of permission param from list.""" """Test bad build of permission param from list."""
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
print(build_permission_param([["scope1", "scope2"]]))
build_permission_param([["scope1", "scope2"]])
assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]")) assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]"))
def test_build_permission_misbuilt_list_set_str():
def test_build_permission_misbuilt_list_set_str() -> None:
"""Test bad build of permission param from set.""" """Test bad build of permission param from set."""
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1", "scope2"}]) build_permission_param([{"scope1", "scope2"}])
assert err.match("misbuilt permission.*") assert err.match("misbuilt permission.*")
def test_build_permission_misbuilt_set_set_str():
def test_build_permission_misbuilt_set_set_str() -> None:
"""Test bad build of permission param from list of set.""" """Test bad build of permission param from list of set."""
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1"}]) build_permission_param([{"scope1"}])
assert err.match(re.escape("misbuilt permission [{'scope1'}]")) assert err.match(re.escape("misbuilt permission [{'scope1'}]"))
def test_build_permission_misbuilt_dict_non_iterable():
def test_build_permission_misbuilt_dict_non_iterable() -> None:
"""Test bad build of permission param from non-iterable.""" """Test bad build of permission param from non-iterable."""
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": 5}) build_permission_param({"res1": 5})
assert err.match(re.escape("misbuilt permission {'res1': 5}")) assert err.match(re.escape("misbuilt permission {'res1': 5}"))
def test_auth_status_bool():
def test_auth_status_bool() -> None:
"""Test bool method of AuthStatus.""" """Test bool method of AuthStatus."""
assert not bool(AuthStatus(is_logged_in=True, is_authorized=False, missing_permissions="")) assert not bool(AuthStatus(is_logged_in=True, is_authorized=False, missing_permissions=""))
assert bool(AuthStatus(is_logged_in=True, is_authorized=True, missing_permissions="")) assert bool(AuthStatus(is_logged_in=True, is_authorized=True, missing_permissions=""))
def test_build_permission_without_scopes():
def test_build_permission_without_scopes() -> None:
"""Test build permission param with scopes.""" """Test build permission param with scopes."""
assert build_permission_param(permissions={"Resource": None}) == {"Resource"} assert build_permission_param(permissions={"Resource": None}) == {"Resource"}

6
tests/test_urls_patterns.py

@ -5,7 +5,7 @@ import inspect
from keycloak import urls_patterns from keycloak import urls_patterns
def test_correctness_of_patterns():
def test_correctness_of_patterns() -> None:
"""Test that there are no duplicate url patterns.""" """Test that there are no duplicate url patterns."""
# Test that the patterns are present # Test that the patterns are present
urls = [x for x in dir(urls_patterns) if not x.startswith("__")] urls = [x for x in dir(urls_patterns) if not x.startswith("__")]
@ -16,7 +16,7 @@ def test_correctness_of_patterns():
assert url.startswith("URL_"), f"The url pattern {url} does not begin with URL_" assert url.startswith("URL_"), f"The url pattern {url} does not begin with URL_"
# Test that the patterns have unique names # Test that the patterns have unique names
seen_urls = list()
seen_urls = []
urls_from_src = [ urls_from_src = [
x.split("=")[0].strip() x.split("=")[0].strip()
for x in inspect.getsource(urls_patterns).splitlines() for x in inspect.getsource(urls_patterns).splitlines()
@ -27,7 +27,7 @@ def test_correctness_of_patterns():
seen_urls.append(url) seen_urls.append(url)
# Test that the pattern values are unique # Test that the pattern values are unique
seen_url_values = list()
seen_url_values = []
for url in urls: for url in urls:
url_value = urls_patterns.__dict__[url] url_value = urls_patterns.__dict__[url]
assert url_value not in seen_url_values, f"The url {url} has a duplicate value {url_value}" assert url_value not in seen_url_values, f"The url {url} has a duplicate value {url_value}"

8
tox.ini

@ -10,16 +10,12 @@ commands_pre =
[testenv:check] [testenv:check]
commands = commands =
black --check --diff src/keycloak tests docs
isort -c --df src/keycloak tests docs
flake8 src/keycloak tests docs
ruff check src/keycloak tests docs
codespell src tests docs codespell src tests docs
[testenv:apply-check] [testenv:apply-check]
commands = commands =
black -C src/keycloak tests docs
black src/keycloak tests docs
isort src/keycloak tests docs
ruff check --fix src/keycloak tests docs
[testenv:docs] [testenv:docs]
commands = commands =

Loading…
Cancel
Save