Browse Source

fix: small bugs, use ruff as linter, added annotations

* fix: ruffing up

* test: linting done

* fix: tests and code

* fix: new docs

* docs: updated conf

* docs: update
pull/634/head v5.1.2
Richard Nemeth 2 weeks ago
committed by GitHub
parent
commit
327efd8d6f
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      .gitignore
  2. 7
      .readthedocs.yaml
  3. 9
      docs/source/conf.py
  4. 1049
      poetry.lock
  5. 35
      pyproject.toml
  6. 9
      src/keycloak/__init__.py
  7. 1
      src/keycloak/_version.py
  8. 18
      src/keycloak/authorization/__init__.py
  9. 65
      src/keycloak/authorization/permission.py
  10. 82
      src/keycloak/authorization/policy.py
  11. 30
      src/keycloak/authorization/role.py
  12. 170
      src/keycloak/connection.py
  13. 95
      src/keycloak/exceptions.py
  14. 4561
      src/keycloak/keycloak_admin.py
  15. 570
      src/keycloak/keycloak_openid.py
  16. 313
      src/keycloak/keycloak_uma.py
  17. 247
      src/keycloak/openid_connection.py
  18. 121
      src/keycloak/uma_permissions.py
  19. 5
      src/keycloak/urls_patterns.py
  20. 222
      tests/conftest.py
  21. 9
      tests/test_authorization.py
  22. 15
      tests/test_connection.py
  23. 7
      tests/test_exceptions.py
  24. 1867
      tests/test_keycloak_admin.py
  25. 357
      tests/test_keycloak_openid.py
  26. 103
      tests/test_keycloak_uma.py
  27. 7
      tests/test_license.py
  28. 63
      tests/test_uma_permissions.py
  29. 12
      tests/test_urls_patterns.py
  30. 10
      tox.ini

3
.gitignore

@ -108,6 +108,7 @@ main2.py
s3air-authz-config.json
.vscode
_build
.ruff_cache
.DS_Store
test.py

7
.readthedocs.yaml

@ -1,9 +1,12 @@
version: 2
sphinx:
configuration: docs/source/conf.py
build:
os: "ubuntu-22.04"
os: "ubuntu-24.04"
tools:
python: "3.12"
python: "3.13"
jobs:
post_create_environment:
- python -m pip install poetry

9
docs/source/conf.py

@ -1,5 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# python-keycloak documentation build configuration file, created by
# sphinx-quickstart on Tue Aug 15 11:02:59 2017.
@ -101,12 +100,12 @@ todo_include_todos = True
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = "sphinx_rtd_theme"
html_theme = "sphinx_book_theme"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
@ -169,7 +168,7 @@ latex_documents = [
"python-keycloak Documentation",
"Marcos Pereira",
"manual",
)
),
]
@ -194,5 +193,5 @@ texinfo_documents = [
"python-keycloak",
"One line description of project.",
"Miscellaneous",
)
),
]

1049
poetry.lock
File diff suppressed because it is too large
View File

35
pyproject.toml

@ -37,17 +37,17 @@ deprecation = ">=2.1.0"
jwcrypto = ">=1.5.4"
httpx = ">=0.23.2"
async-property = ">=0.2.2"
aiofiles = ">=24.1.0"
[tool.poetry.group.docs.dependencies]
alabaster = ">=0.7.0"
commonmark = ">=0.9.1"
recommonmark = ">=0.7.1"
Sphinx = ">=7.0.0"
sphinx-rtd-theme = ">=1.0.0"
readthedocs-sphinx-ext = ">=2.1.9"
m2r2 = ">=0.3.2"
sphinx-autoapi = ">=3.0.0"
setuptools = ">=70.0.0"
sphinx-book-theme = ">=1.1.3"
[tool.poetry.group.dev.dependencies]
tox = ">=4.0.0"
@ -56,10 +56,6 @@ pytest-cov = ">=3.0.0"
pytest-asyncio = ">=0.23.7"
wheel = ">=0.38.4"
pre-commit = ">=3.5.0"
isort = ">=5.10.1"
black = ">=22.3.0"
flake8 = ">=7.0.0"
flake8-docstrings = ">=1.6.0"
commitizen = ">=2.28.0"
cryptography = ">=42.0.0"
codespell = ">=2.1.0"
@ -67,6 +63,7 @@ darglint = ">=1.8.1"
twine = ">=4.0.2"
freezegun = ">=1.2.2"
docutils = "<0.21"
ruff = ">=0.9.3"
[[tool.poetry.source]]
name = "PyPI"
@ -76,15 +73,29 @@ priority = "primary"
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.black]
[tool.ruff]
line-length = 99
[tool.isort]
line_length = 99
profile = "black"
[tool.ruff.lint]
select = ["ALL"]
ignore = [
"BLE001",
"C901",
"D203",
"D212",
"FBT001",
"FBT002",
"FBT003",
"N818",
"PLR0912",
"PLR0913",
"PLR0915",
"TRY003",
]
[tool.darglint]
enable = "DAR104"
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["ARG001","PLR2004", "PT011", "S101", "SLF001"]
"docs/*" = ["A001", "EXE001", "ERA001"]
[tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "function"

9
src/keycloak/__init__.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -46,8 +45,8 @@ from .keycloak_uma import KeycloakUMA
from .openid_connection import KeycloakOpenIDConnection
__all__ = [
"__version__",
"ConnectionManager",
"KeycloakAdmin",
"KeycloakAuthenticationError",
"KeycloakAuthorizationConfigError",
"KeycloakConnectionError",
@ -56,13 +55,13 @@ __all__ = [
"KeycloakError",
"KeycloakGetError",
"KeycloakInvalidTokenError",
"KeycloakOpenID",
"KeycloakOpenIDConnection",
"KeycloakOperationError",
"KeycloakPostError",
"KeycloakPutError",
"KeycloakRPTNotFound",
"KeycloakSecretNotFound",
"KeycloakAdmin",
"KeycloakOpenID",
"KeycloakOpenIDConnection",
"KeycloakUMA",
"__version__",
]

1
src/keycloak/_version.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#

18
src/keycloak/authorization/__init__.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -32,19 +31,21 @@ from .role import Role
class Authorization:
"""Keycloak Authorization (policies, roles, scopes and resources).
"""
Keycloak Authorization (policies, roles, scopes and resources).
https://keycloak.gitbooks.io/documentation/authorization_services/index.html
"""
def __init__(self):
def __init__(self) -> None:
"""Init method."""
self.policies = {}
@property
def policies(self):
"""Get policies.
def policies(self) -> dict:
"""
Get policies.
:returns: Policies
:rtype: dict
@ -52,11 +53,12 @@ class Authorization:
return self._policies
@policies.setter
def policies(self, value):
def policies(self, value: dict) -> None:
self._policies = value
def load_config(self, data):
"""Load policies, roles and permissions (scope/resources).
def load_config(self, data: dict) -> None:
"""
Load policies, roles and permissions (scope/resources).
:param data: keycloak authorization data (dict)
:type data: dict

65
src/keycloak/authorization/permission.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -25,7 +24,8 @@
class Permission:
"""Base permission class.
"""
Base permission class.
Consider this simple and very common permission:
@ -56,8 +56,9 @@ class Permission:
"""
def __init__(self, name, type, logic, decision_strategy):
"""Init method.
def __init__(self, name: str, type: str, logic: str, decision_strategy: str) -> None: # noqa: A002
"""
Init method.
:param name: Name
:type name: str
@ -75,25 +76,28 @@ class Permission:
self.resources = []
self.scopes = []
def __repr__(self):
"""Repr method.
def __repr__(self) -> str:
"""
Repr method.
:returns: Class representation
:rtype: str
"""
return "<Permission: %s (%s)>" % (self.name, self.type)
return f"<Permission: {self.name} ({self.type})>"
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: Class string representation
:rtype: str
"""
return "Permission: %s (%s)" % (self.name, self.type)
return f"Permission: {self.name} ({self.type})"
@property
def name(self):
"""Get name.
def name(self) -> str:
"""
Get name.
:returns: name
:rtype: str
@ -101,12 +105,13 @@ class Permission:
return self._name
@name.setter
def name(self, value):
def name(self, value: str) -> None:
self._name = value
@property
def type(self):
"""Get type.
def type(self) -> str:
"""
Get type.
:returns: type
:rtype: str
@ -114,12 +119,13 @@ class Permission:
return self._type
@type.setter
def type(self, value):
def type(self, value: str) -> None:
self._type = value
@property
def logic(self):
"""Get logic.
def logic(self) -> str:
"""
Get logic.
:returns: Logic
:rtype: str
@ -127,12 +133,13 @@ class Permission:
return self._logic
@logic.setter
def logic(self, value):
def logic(self, value: str) -> str:
self._logic = value
@property
def decision_strategy(self):
"""Get decision strategy.
def decision_strategy(self) -> str:
"""
Get decision strategy.
:returns: Decision strategy
:rtype: str
@ -140,12 +147,13 @@ class Permission:
return self._decision_strategy
@decision_strategy.setter
def decision_strategy(self, value):
def decision_strategy(self, value: str) -> None:
self._decision_strategy = value
@property
def resources(self):
"""Get resources.
def resources(self) -> list:
"""
Get resources.
:returns: Resources
:rtype: list
@ -153,12 +161,13 @@ class Permission:
return self._resources
@resources.setter
def resources(self, value):
def resources(self, value: list) -> None:
self._resources = value
@property
def scopes(self):
"""Get scopes.
def scopes(self) -> list:
"""
Get scopes.
:returns: Scopes
:rtype: list
@ -166,5 +175,5 @@ class Permission:
return self._scopes
@scopes.setter
def scopes(self, value):
def scopes(self, value: list) -> None:
self._scopes = value

82
src/keycloak/authorization/policy.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -23,11 +22,12 @@
"""Keycloak authorization Policy module."""
from ..exceptions import KeycloakAuthorizationConfigError
from keycloak.exceptions import KeycloakAuthorizationConfigError
class Policy:
"""Base policy class.
"""
Base policy class.
A policy defines the conditions that must be satisfied to grant access to an object.
Unlike permissions, you do not specify the object being protected but rather the conditions
@ -50,8 +50,9 @@ class Policy:
"""
def __init__(self, name, type, logic, decision_strategy):
"""Init method.
def __init__(self, name: str, type: str, logic: str, decision_strategy: str) -> None: # noqa: A002
"""
Init method.
:param name: Name
:type name: str
@ -69,25 +70,28 @@ class Policy:
self.roles = []
self.permissions = []
def __repr__(self):
"""Repr method.
def __repr__(self) -> str:
"""
Repr method.
:returns: Class representation
:rtype: str
"""
return "<Policy: %s (%s)>" % (self.name, self.type)
return f"<Policy: {self.name} ({self.type})>"
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: Class string representation
:rtype: str
"""
return "Policy: %s (%s)" % (self.name, self.type)
return f"Policy: {self.name} ({self.type})"
@property
def name(self):
"""Get name.
def name(self) -> str:
"""
Get name.
:returns: Name
:rtype: str
@ -95,12 +99,13 @@ class Policy:
return self._name
@name.setter
def name(self, value):
def name(self, value: str) -> None:
self._name = value
@property
def type(self):
"""Get type.
def type(self) -> str:
"""
Get type.
:returns: Type
:rtype: str
@ -108,12 +113,13 @@ class Policy:
return self._type
@type.setter
def type(self, value):
def type(self, value: str) -> None:
self._type = value
@property
def logic(self):
"""Get logic.
def logic(self) -> str:
"""
Get logic.
:returns: Logic
:rtype: str
@ -121,12 +127,13 @@ class Policy:
return self._logic
@logic.setter
def logic(self, value):
def logic(self, value: str) -> None:
self._logic = value
@property
def decision_strategy(self):
"""Get decision strategy.
def decision_strategy(self) -> str:
"""
Get decision strategy.
:returns: Decision strategy
:rtype: str
@ -134,12 +141,13 @@ class Policy:
return self._decision_strategy
@decision_strategy.setter
def decision_strategy(self, value):
def decision_strategy(self, value: str) -> None:
self._decision_strategy = value
@property
def roles(self):
"""Get roles.
def roles(self) -> list:
"""
Get roles.
:returns: Roles
:rtype: list
@ -147,12 +155,13 @@ class Policy:
return self._roles
@roles.setter
def roles(self, value):
def roles(self, value: list) -> None:
self._roles = value
@property
def permissions(self):
"""Get permissions.
def permissions(self) -> list:
"""
Get permissions.
:returns: Permissions
:rtype: list
@ -160,24 +169,25 @@ class Policy:
return self._permissions
@permissions.setter
def permissions(self, value):
def permissions(self, value: list) -> None:
self._permissions = value
def add_role(self, role):
"""Add keycloak role in policy.
def add_role(self, role: dict) -> None:
"""
Add keycloak role in policy.
:param role: Keycloak role
:type role: keycloak.authorization.Role
:raises KeycloakAuthorizationConfigError: In case of misconfigured policy type
"""
if self.type != "role":
raise KeycloakAuthorizationConfigError(
"Can't add role. Policy type is different of role"
)
error_msg = "Can't add role. Policy type is different of role"
raise KeycloakAuthorizationConfigError(error_msg)
self._roles.append(role)
def add_permission(self, permission):
"""Add keycloak permission in policy.
def add_permission(self, permission: dict) -> None:
"""
Add keycloak permission in policy.
:param permission: Keycloak permission
:type permission: keycloak.authorization.Permission

30
src/keycloak/authorization/role.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -23,9 +22,12 @@
"""The authorization Role module."""
from __future__ import annotations
class Role:
"""Authorization Role base class.
"""
Authorization Role base class.
Roles identify a type or category of user. Admin, user,
manager, and employee are all typical roles that may exist in an organization.
@ -38,8 +40,9 @@ class Role:
:type required: bool
"""
def __init__(self, name, required=False):
"""Init method.
def __init__(self, name: str, required: bool = False) -> None:
"""
Init method.
:param name: Name
:type name: str
@ -49,22 +52,29 @@ class Role:
self.name = name
self.required = required
def get_name(self):
"""Get name.
def get_name(self) -> str:
"""
Get name.
:returns: Name
:rtype: str
"""
return self.name
def __eq__(self, other):
"""Eq method.
def __eq__(self, other: str | Role) -> bool:
"""
Eq method.
:param other: The other object
:type other: str
:returns: Equality bool
:rtype: bool | NotImplemented
:rtype: bool
"""
if isinstance(other, str):
return self.name == other
return NotImplemented
if isinstance(other, Role):
return self.name == other.name
msg = f"Cannot compare Role with {type(other)}"
raise NotImplementedError(msg)

170
src/keycloak/connection.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -23,6 +22,8 @@
"""Connection manager module."""
from __future__ import annotations
try:
from urllib.parse import urljoin
except ImportError: # pragma: no cover
@ -30,13 +31,16 @@ except ImportError: # pragma: no cover
import httpx
import requests
from httpx import Response as AsyncResponse
from requests import Response
from requests.adapters import HTTPAdapter
from .exceptions import KeycloakConnectionError
class ConnectionManager(object):
"""Represents a simple server connection.
class ConnectionManager:
"""
Represents a simple server connection.
:param base_url: The server URL.
:type base_url: str
@ -58,9 +62,17 @@ class ConnectionManager(object):
"""
def __init__(
self, base_url, headers={}, timeout=60, verify=True, proxies=None, cert=None, max_retries=1
):
"""Init method.
self,
base_url: str,
headers: dict | None = None,
timeout: int = 60,
verify: bool = True,
proxies: dict | None = None,
cert: str | tuple | None = None,
max_retries: int = 1,
) -> None:
"""
Init method.
:param base_url: The server URL.
:type base_url: str
@ -106,19 +118,20 @@ class ConnectionManager(object):
self.async_s.auth = None # don't let requests add auth headers
self.async_s.transport = httpx.AsyncHTTPTransport(retries=1)
async def aclose(self):
async def aclose(self) -> None:
"""Close the async connection on delete."""
if hasattr(self, "_s"):
await self.async_s.aclose()
def __del__(self):
def __del__(self) -> None:
"""Del method."""
if hasattr(self, "_s"):
self._s.close()
@property
def base_url(self):
"""Return base url in use for requests to the server.
def base_url(self) -> str:
"""
Return base url in use for requests to the server.
:returns: Base URL
:rtype: str
@ -126,12 +139,13 @@ class ConnectionManager(object):
return self._base_url
@base_url.setter
def base_url(self, value):
def base_url(self, value: str) -> None:
self._base_url = value
@property
def timeout(self):
"""Return timeout in use for request to the server.
def timeout(self) -> int:
"""
Return timeout in use for request to the server.
:returns: Timeout
:rtype: int
@ -139,12 +153,13 @@ class ConnectionManager(object):
return self._timeout
@timeout.setter
def timeout(self, value):
def timeout(self, value: int) -> None:
self._timeout = value
@property
def verify(self):
"""Return verify in use for request to the server.
def verify(self) -> bool:
"""
Return verify in use for request to the server.
:returns: Verify indicator
:rtype: bool
@ -152,12 +167,13 @@ class ConnectionManager(object):
return self._verify
@verify.setter
def verify(self, value):
def verify(self, value: bool) -> None:
self._verify = value
@property
def cert(self):
"""Return client certificates in use for request to the server.
def cert(self) -> str | tuple:
"""
Return client certificates in use for request to the server.
:returns: Client certificate
:rtype: Union[str,Tuple[str,str]]
@ -165,12 +181,13 @@ class ConnectionManager(object):
return self._cert
@cert.setter
def cert(self, value):
def cert(self, value: str | tuple) -> None:
self._cert = value
@property
def headers(self):
"""Return header request to the server.
def headers(self) -> dict:
"""
Return header request to the server.
:returns: Request headers
:rtype: dict
@ -178,11 +195,12 @@ class ConnectionManager(object):
return self._headers
@headers.setter
def headers(self, value):
self._headers = value
def headers(self, value: dict) -> None:
self._headers = value or {}
def param_headers(self, key):
"""Return a specific header parameter.
def param_headers(self, key: str) -> str | None:
"""
Return a specific header parameter.
:param key: Header parameters key.
:type key: str
@ -191,12 +209,13 @@ class ConnectionManager(object):
"""
return self.headers.get(key)
def clean_headers(self):
def clean_headers(self) -> None:
"""Clear header parameters."""
self.headers = {}
def exist_param_headers(self, key):
"""Check if the parameter exists in the header.
def exist_param_headers(self, key: str) -> bool:
"""
Check if the parameter exists in the header.
:param key: Header parameters key.
:type key: str
@ -205,8 +224,9 @@ class ConnectionManager(object):
"""
return self.param_headers(key) is not None
def add_param_headers(self, key, value):
"""Add a single parameter inside the header.
def add_param_headers(self, key: str, value: str) -> None:
"""
Add a single parameter inside the header.
:param key: Header parameters key.
:type key: str
@ -215,16 +235,18 @@ class ConnectionManager(object):
"""
self.headers[key] = value
def del_param_headers(self, key):
"""Remove a specific parameter.
def del_param_headers(self, key: str) -> None:
"""
Remove a specific parameter.
:param key: Key of the header parameters.
:type key: str
"""
self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
"""Submit get request to the path.
def raw_get(self, path: str, **kwargs: dict) -> Response:
"""
Submit get request to the path.
:param path: Path for request.
:type path: str
@ -244,10 +266,12 @@ class ConnectionManager(object):
cert=self.cert,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
def raw_post(self, path, data, **kwargs):
"""Submit post request to the path.
def raw_post(self, path: str, data: dict, **kwargs: dict) -> Response:
"""
Submit post request to the path.
:param path: Path for request.
:type path: str
@ -270,10 +294,12 @@ class ConnectionManager(object):
cert=self.cert,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
def raw_put(self, path, data, **kwargs):
"""Submit put request to the path.
def raw_put(self, path: str, data: dict, **kwargs: dict) -> Response:
"""
Submit put request to the path.
:param path: Path for request.
:type path: str
@ -296,10 +322,12 @@ class ConnectionManager(object):
cert=self.cert,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
def raw_delete(self, path, data=None, **kwargs):
"""Submit delete request to the path.
def raw_delete(self, path: str, data: dict | None = None, **kwargs: dict) -> Response:
"""
Submit delete request to the path.
:param path: Path for request.
:type path: str
@ -312,21 +340,22 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
r = self._s.delete(
return self._s.delete(
urljoin(self.base_url, path),
params=kwargs,
data=data or dict(),
data=data or {},
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
cert=self.cert,
)
return r
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_get(self, path, **kwargs):
"""Submit get request to the path.
async def a_raw_get(self, path: str, **kwargs: dict) -> AsyncResponse:
"""
Submit get request to the path.
:param path: Path for request.
:type path: str
@ -344,10 +373,12 @@ class ConnectionManager(object):
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_post(self, path, data, **kwargs):
"""Submit post request to the path.
async def a_raw_post(self, path: str, data: dict, **kwargs: dict) -> AsyncResponse:
"""
Submit post request to the path.
:param path: Path for request.
:type path: str
@ -369,10 +400,12 @@ class ConnectionManager(object):
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_put(self, path, data, **kwargs):
"""Submit put request to the path.
async def a_raw_put(self, path: str, data: dict, **kwargs: dict) -> AsyncResponse:
"""
Submit put request to the path.
:param path: Path for request.
:type path: str
@ -393,10 +426,17 @@ class ConnectionManager(object):
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
async def a_raw_delete(self, path, data=None, **kwargs):
"""Submit delete request to the path.
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_delete(
self,
path: str,
data: dict | None = None,
**kwargs: dict,
) -> AsyncResponse:
"""
Submit delete request to the path.
:param path: Path for request.
:type path: str
@ -412,17 +452,19 @@ class ConnectionManager(object):
return await self.async_s.request(
method="DELETE",
url=urljoin(self.base_url, path),
data=data or dict(),
data=data or {},
params=self._filter_query_params(kwargs),
headers=self.headers,
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
@staticmethod
def _filter_query_params(query_params):
"""Explicitly filter query params with None values for compatibility.
def _filter_query_params(query_params: dict) -> dict:
"""
Explicitly filter query params with None values for compatibility.
Httpx and requests differ in the way they handle query params with the value None,
requests does not include params with the value None while httpx includes them as-is.

95
src/keycloak/exceptions.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -23,11 +22,32 @@
"""Keycloak custom exceptions module."""
from __future__ import annotations
from typing import TYPE_CHECKING
import requests
if TYPE_CHECKING:
from httpx import Response as AsyncResponse
from requests import Response
HTTP_OK = 200
HTTP_CREATED = 201
HTTP_ACCEPTED = 202
HTTP_NO_CONTENT = 204
HTTP_BAD_REQUEST = 400
HTTP_UNAUTHORIZED = 401
HTTP_FORBIDDEN = 403
HTTP_NOT_FOUND = 404
HTTP_NOT_ALLOWED = 405
HTTP_CONFLICT = 409
class KeycloakError(Exception):
"""Base class for custom Keycloak errors.
"""
Base class for custom Keycloak errors.
:param error_message: The error message
:type error_message: str
@ -35,8 +55,14 @@ class KeycloakError(Exception):
:type response_code: int
"""
def __init__(self, error_message="", response_code=None, response_body=None):
"""Init method.
def __init__(
self,
error_message: str = "",
response_code: int | None = None,
response_body: bytes | None = None,
) -> None:
"""
Init method.
:param error_message: The error message
:type error_message: str
@ -51,104 +77,82 @@ class KeycloakError(Exception):
self.response_body = response_body
self.error_message = error_message
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: String representation of the object
:rtype: str
"""
if self.response_code is not None:
return "{0}: {1}".format(self.response_code, self.error_message)
else:
return "{0}".format(self.error_message)
return f"{self.response_code}: {self.error_message}"
return f"{self.error_message}"
class KeycloakAuthenticationError(KeycloakError):
"""Keycloak authentication error exception."""
pass
class KeycloakConnectionError(KeycloakError):
"""Keycloak connection error exception."""
pass
class KeycloakOperationError(KeycloakError):
"""Keycloak operation error exception."""
pass
class KeycloakDeprecationError(KeycloakError):
"""Keycloak deprecation error exception."""
pass
class KeycloakGetError(KeycloakOperationError):
"""Keycloak request get error exception."""
pass
class KeycloakPostError(KeycloakOperationError):
"""Keycloak request post error exception."""
pass
class KeycloakPutError(KeycloakOperationError):
"""Keycloak request put error exception."""
pass
class KeycloakDeleteError(KeycloakOperationError):
"""Keycloak request delete error exception."""
pass
class KeycloakSecretNotFound(KeycloakOperationError):
"""Keycloak secret not found exception."""
pass
class KeycloakRPTNotFound(KeycloakOperationError):
"""Keycloak RPT not found exception."""
pass
class KeycloakAuthorizationConfigError(KeycloakOperationError):
"""Keycloak authorization config exception."""
pass
class KeycloakInvalidTokenError(KeycloakOperationError):
"""Keycloak invalid token exception."""
pass
class KeycloakPermissionFormatError(KeycloakOperationError):
"""Keycloak permission format exception."""
pass
class PermissionDefinitionError(Exception):
"""Keycloak permission definition exception."""
pass
def raise_error_from_response(response, error, expected_codes=None, skip_exists=False):
"""Raise an exception for the response.
def raise_error_from_response(
response: Response | AsyncResponse,
error: dict | Exception,
expected_codes: list[int] | None = None,
skip_exists: bool = False,
) -> bytes | dict | list:
"""
Raise an exception for the response.
:param response: The response object
:type response: Response
@ -162,9 +166,9 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
:returns: Content of the response message
:type: bytes or dict
:raises KeycloakError: In case of unexpected status codes
""" # noqa: DAR401,DAR402
"""
if expected_codes is None:
expected_codes = [200, 201, 204]
expected_codes = [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT]
if response.status_code in expected_codes:
if response.status_code == requests.codes.no_content:
@ -175,7 +179,7 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
except ValueError:
return response.content
if skip_exists and response.status_code == 409:
if skip_exists and response.status_code == HTTP_CONFLICT:
return {"msg": "Already exists"}
try:
@ -185,10 +189,11 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
if isinstance(error, dict):
error = error.get(response.status_code, KeycloakOperationError)
else:
if response.status_code == 401:
elif response.status_code == HTTP_UNAUTHORIZED:
error = KeycloakAuthenticationError
raise error(
error_message=message, response_code=response.status_code, response_body=response.content
error_message=message,
response_code=response.status_code,
response_body=response.content,
)

4561
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

570
src/keycloak/keycloak_openid.py
File diff suppressed because it is too large
View File

313
src/keycloak/keycloak_uma.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -21,38 +20,51 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak UMA module.
"""
Keycloak UMA module.
The module contains a UMA compatible client for keycloak:
https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html
"""
from __future__ import annotations
import json
from typing import Iterable
from typing import TYPE_CHECKING
from urllib.parse import quote_plus
from async_property import async_property
from .connection import ConnectionManager
from .exceptions import (
HTTP_CREATED,
HTTP_NO_CONTENT,
HTTP_OK,
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
raise_error_from_response,
)
from .openid_connection import KeycloakOpenIDConnection
from .uma_permissions import UMAPermission
from .urls_patterns import URL_UMA_WELL_KNOWN
if TYPE_CHECKING:
from collections.abc import Iterable
from .openid_connection import KeycloakOpenIDConnection
from .uma_permissions import UMAPermission
class KeycloakUMA:
"""Keycloak UMA client.
"""
Keycloak UMA client.
:param connection: OpenID connection manager
"""
def __init__(self, connection: KeycloakOpenIDConnection):
"""Init method.
def __init__(self, connection: KeycloakOpenIDConnection) -> None:
"""
Init method.
:param connection: OpenID connection manager
:type connection: KeycloakOpenIDConnection
@ -60,14 +72,15 @@ class KeycloakUMA:
self.connection = connection
self._well_known = None
def _fetch_well_known(self):
def _fetch_well_known(self) -> dict:
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@staticmethod
def format_url(url, **kwargs):
"""Substitute url path parameters.
def format_url(url: str, **kwargs: dict) -> str:
"""
Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example,
@ -84,8 +97,9 @@ class KeycloakUMA:
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@staticmethod
async def a_format_url(url, **kwargs):
"""Substitute url path parameters.
async def a_format_url(url: str, **kwargs: dict) -> str:
"""
Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example,
@ -102,8 +116,9 @@ class KeycloakUMA:
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@property
def uma_well_known(self):
"""Get the well_known UMA2 config.
def uma_well_known(self) -> dict:
"""
Get the well_known UMA2 config.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
@ -111,21 +126,25 @@ class KeycloakUMA:
# per instance cache
if not self._well_known:
self._well_known = self._fetch_well_known()
return self._well_known
@async_property
async def a_uma_well_known(self):
"""Get the well_known UMA2 config async.
async def a_uma_well_known(self) -> dict:
"""
Get the well_known UMA2 config async.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
if not self._well_known:
self._well_known = await self.a__fetch_well_known()
return self._well_known
def resource_set_create(self, payload):
"""Create a resource set.
def resource_set_create(self, payload: dict) -> dict | bytes:
"""
Create a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
@ -139,12 +158,18 @@ class KeycloakUMA:
:rtype: dict
"""
data_raw = self.connection.raw_post(
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload)
self.uma_well_known["resource_registration_endpoint"],
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_CREATED],
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def resource_set_update(self, resource_id, payload):
"""Update a resource set.
def resource_set_update(self, resource_id: str, payload: dict) -> bytes:
"""
Update a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
@ -157,16 +182,22 @@ class KeycloakUMA:
:param payload: ResourceRepresentation
:type payload: dict
:return: Response dict (empty)
:rtype: dict
:rtype: bytes
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
self.uma_well_known["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = self.connection.raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakPutError,
expected_codes=[HTTP_NO_CONTENT],
)
def resource_set_read(self, resource_id):
"""Read a resource set.
def resource_set_read(self, resource_id: str) -> dict:
"""
Read a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
@ -180,13 +211,15 @@ class KeycloakUMA:
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
self.uma_well_known["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = self.connection.raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
def resource_set_delete(self, resource_id):
"""Delete a resource set.
def resource_set_delete(self, resource_id: str) -> bytes:
"""
Delete a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
@ -197,10 +230,15 @@ class KeycloakUMA:
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
self.uma_well_known["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = self.connection.raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakDeleteError,
expected_codes=[HTTP_NO_CONTENT],
)
def resource_set_list_ids(
self,
@ -210,11 +248,12 @@ class KeycloakUMA:
owner: str = "",
resource_type: str = "",
scope: str = "",
matchingUri: bool = False,
matchingUri: bool = False, # noqa: N803
first: int = 0,
maximum: int = -1,
):
"""Query for list of resource set ids.
) -> list:
"""
Query for list of resource set ids.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -240,7 +279,7 @@ class KeycloakUMA:
:return: List of ids
:rtype: List[str]
"""
query = dict()
query = {}
if name:
query["name"] = name
if exact_name:
@ -261,12 +300,14 @@ class KeycloakUMA:
query["max"] = maximum
data_raw = self.connection.raw_get(
self.uma_well_known["resource_registration_endpoint"], **query
self.uma_well_known["resource_registration_endpoint"],
**query,
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
def resource_set_list(self):
"""List all resource sets.
def resource_set_list(self) -> list:
"""
List all resource sets.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -281,8 +322,9 @@ class KeycloakUMA:
resource = self.resource_set_read(resource_id)
yield resource
def permission_ticket_create(self, permissions: Iterable[UMAPermission]):
"""Create a permission ticket.
def permission_ticket_create(self, permissions: Iterable[UMAPermission]) -> dict:
"""
Create a permission ticket.
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
@ -290,19 +332,23 @@ class KeycloakUMA:
:rtype: boolean
:raises KeycloakPostError: In case permission resource not found
"""
resources = dict()
resources = {}
for permission in permissions:
resource_id = getattr(permission, "resource_id", None)
if resource_id is None:
resource_ids = self.resource_set_list_ids(
exact_name=True, name=permission.resource, first=0, maximum=1
exact_name=True,
name=permission.resource,
first=0,
maximum=1,
)
if not resource_ids:
raise KeycloakPostError("Invalid resource specified")
msg = "Invalid resource specified"
raise KeycloakPostError(msg)
setattr(permission, "resource_id", resource_ids[0])
permission.resource_id = resource_ids[0]
resources.setdefault(resource_id, set())
if permission.scope:
@ -314,12 +360,19 @@ class KeycloakUMA:
]
data_raw = self.connection.raw_post(
self.uma_well_known["permission_endpoint"], data=json.dumps(payload)
self.uma_well_known["permission_endpoint"],
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError)
def permissions_check(self, token, permissions: Iterable[UMAPermission], **extra_payload):
"""Check UMA permissions by user token with requested permissions.
def permissions_check(
self,
token: str,
permissions: Iterable[UMAPermission],
**extra_payload: dict,
) -> bool:
"""
Check UMA permissions by user token with requested permissions.
The token endpoint is used to check UMA permissions from Keycloak. It can only be
invoked by confidential clients.
@ -358,8 +411,9 @@ class KeycloakUMA:
return False
return data.get("result", False)
def policy_resource_create(self, resource_id, payload):
"""Create permission policy for resource.
def policy_resource_create(self, resource_id: str, payload: dict) -> dict:
"""
Create permission policy for resource.
Supports name, description, scopes, roles, groups, clients
@ -373,12 +427,14 @@ class KeycloakUMA:
:rtype: dict
"""
data_raw = self.connection.raw_post(
self.uma_well_known["policy_endpoint"] + f"/{resource_id}", data=json.dumps(payload)
self.uma_well_known["policy_endpoint"] + f"/{resource_id}",
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError)
def policy_update(self, policy_id, payload):
"""Update permission policy.
def policy_update(self, policy_id: str, payload: dict) -> dict:
"""
Update permission policy.
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -391,12 +447,14 @@ class KeycloakUMA:
:rtype: dict
"""
data_raw = self.connection.raw_put(
self.uma_well_known["policy_endpoint"] + f"/{policy_id}", data=json.dumps(payload)
self.uma_well_known["policy_endpoint"] + f"/{policy_id}",
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPutError)
def policy_delete(self, policy_id):
"""Delete permission policy.
def policy_delete(self, policy_id: str) -> dict:
"""
Delete permission policy.
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -407,7 +465,7 @@ class KeycloakUMA:
:rtype: dict
"""
data_raw = self.connection.raw_delete(
self.uma_well_known["policy_endpoint"] + f"/{policy_id}"
self.uma_well_known["policy_endpoint"] + f"/{policy_id}",
)
return raise_error_from_response(data_raw, KeycloakDeleteError)
@ -418,8 +476,9 @@ class KeycloakUMA:
scope: str = "",
first: int = 0,
maximum: int = -1,
):
"""Query permission policies.
) -> list:
"""
Query permission policies.
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission
@ -437,7 +496,7 @@ class KeycloakUMA:
:return: List of ids
:rtype: List[str]
"""
query = dict()
query = {}
if name:
query["name"] = name
if resource:
@ -452,8 +511,9 @@ class KeycloakUMA:
data_raw = self.connection.raw_get(self.uma_well_known["policy_endpoint"], **query)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a__fetch_well_known(self):
"""Get the well_known UMA2 config async.
async def a__fetch_well_known(self) -> dict:
"""
Get the well_known UMA2 config async.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
@ -462,8 +522,9 @@ class KeycloakUMA:
data_raw = await self.connection.a_raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_resource_set_create(self, payload):
"""Create a resource set asynchronously.
async def a_resource_set_create(self, payload: dict) -> dict:
"""
Create a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
@ -480,10 +541,15 @@ class KeycloakUMA:
(await self.a_uma_well_known)["resource_registration_endpoint"],
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_CREATED],
)
async def a_resource_set_update(self, resource_id, payload):
"""Update a resource set asynchronously.
async def a_resource_set_update(self, resource_id: str, payload: dict) -> bytes:
"""
Update a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
@ -496,17 +562,22 @@ class KeycloakUMA:
:param payload: ResourceRepresentation
:type payload: dict
:return: Response dict (empty)
:rtype: dict
:rtype: bytes
"""
url = self.format_url(
(await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = await self.connection.a_raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakPutError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_resource_set_read(self, resource_id):
"""Read a resource set asynchronously.
async def a_resource_set_read(self, resource_id: str) -> dict:
"""
Read a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
@ -524,10 +595,11 @@ class KeycloakUMA:
id=resource_id,
)
data_raw = await self.connection.a_raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
async def a_resource_set_delete(self, resource_id):
"""Delete a resource set asynchronously.
async def a_resource_set_delete(self, resource_id: str) -> bytes:
"""
Delete a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
@ -535,14 +607,18 @@ class KeycloakUMA:
:param resource_id: id of the resource
:type resource_id: str
:return: Response dict (empty)
:rtype: dict
:rtype: bytes
"""
url = self.format_url(
(await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = await self.connection.a_raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakDeleteError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_resource_set_list_ids(
self,
@ -552,11 +628,12 @@ class KeycloakUMA:
owner: str = "",
resource_type: str = "",
scope: str = "",
matchingUri: bool = False,
matchingUri: bool = False, # noqa: N803
first: int = 0,
maximum: int = -1,
):
"""Query for list of resource set ids asynchronously.
) -> list:
"""
Query for list of resource set ids asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -582,7 +659,7 @@ class KeycloakUMA:
:return: List of ids
:rtype: List[str]
"""
query = dict()
query = {}
if name:
query["name"] = name
if exact_name:
@ -603,12 +680,14 @@ class KeycloakUMA:
query["max"] = maximum
data_raw = await self.connection.a_raw_get(
(await self.a_uma_well_known)["resource_registration_endpoint"], **query
(await self.a_uma_well_known)["resource_registration_endpoint"],
**query,
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
async def a_resource_set_list(self):
"""List all resource sets asynchronously.
async def a_resource_set_list(self) -> list:
"""
List all resource sets asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -623,8 +702,9 @@ class KeycloakUMA:
resource = await self.a_resource_set_read(resource_id)
yield resource
async def a_permission_ticket_create(self, permissions: Iterable[UMAPermission]):
"""Create a permission ticket asynchronously.
async def a_permission_ticket_create(self, permissions: Iterable[UMAPermission]) -> bool:
"""
Create a permission ticket asynchronously.
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
@ -632,19 +712,23 @@ class KeycloakUMA:
:rtype: boolean
:raises KeycloakPostError: In case permission resource not found
"""
resources = dict()
resources = {}
for permission in permissions:
resource_id = getattr(permission, "resource_id", None)
if resource_id is None:
resource_ids = await self.a_resource_set_list_ids(
exact_name=True, name=permission.resource, first=0, maximum=1
exact_name=True,
name=permission.resource,
first=0,
maximum=1,
)
if not resource_ids:
raise KeycloakPostError("Invalid resource specified")
msg = "Invalid resource specified"
raise KeycloakPostError(msg)
setattr(permission, "resource_id", resource_ids[0])
permission.resource_id = resource_ids[0]
resources.setdefault(resource_id, set())
if permission.scope:
@ -656,14 +740,19 @@ class KeycloakUMA:
]
data_raw = await self.connection.a_raw_post(
(await self.a_uma_well_known)["permission_endpoint"], data=json.dumps(payload)
(await self.a_uma_well_known)["permission_endpoint"],
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_permissions_check(
self, token, permissions: Iterable[UMAPermission], **extra_payload
):
"""Check UMA permissions by user token with requested permissions asynchronously.
self,
token: str,
permissions: Iterable[UMAPermission],
**extra_payload: dict,
) -> bool:
"""
Check UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to check UMA permissions from Keycloak. It can only be
invoked by confidential clients.
@ -696,7 +785,8 @@ class KeycloakUMA:
connection.add_param_headers("Authorization", "Bearer " + token)
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = await connection.a_raw_post(
(await self.a_uma_well_known)["token_endpoint"], data=payload
(await self.a_uma_well_known)["token_endpoint"],
data=payload,
)
try:
data = raise_error_from_response(data_raw, KeycloakPostError)
@ -704,8 +794,9 @@ class KeycloakUMA:
return False
return data.get("result", False)
async def a_policy_resource_create(self, resource_id, payload):
"""Create permission policy for resource asynchronously.
async def a_policy_resource_create(self, resource_id: str, payload: dict) -> dict:
"""
Create permission policy for resource asynchronously.
Supports name, description, scopes, roles, groups, clients
@ -724,8 +815,9 @@ class KeycloakUMA:
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_policy_update(self, policy_id, payload):
"""Update permission policy asynchronously.
async def a_policy_update(self, policy_id: str, payload: dict) -> dict:
"""
Update permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -743,8 +835,9 @@ class KeycloakUMA:
)
return raise_error_from_response(data_raw, KeycloakPutError)
async def a_policy_delete(self, policy_id):
"""Delete permission policy asynchronously.
async def a_policy_delete(self, policy_id: str) -> dict:
"""
Delete permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -755,7 +848,7 @@ class KeycloakUMA:
:rtype: dict
"""
data_raw = await self.connection.a_raw_delete(
(await self.a_uma_well_known)["policy_endpoint"] + f"/{policy_id}"
(await self.a_uma_well_known)["policy_endpoint"] + f"/{policy_id}",
)
return raise_error_from_response(data_raw, KeycloakDeleteError)
@ -766,8 +859,9 @@ class KeycloakUMA:
scope: str = "",
first: int = 0,
maximum: int = -1,
):
"""Query permission policies asynchronously.
) -> list:
"""
Query permission policies asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission
@ -785,7 +879,7 @@ class KeycloakUMA:
:return: List of ids
:rtype: List[str]
"""
query = dict()
query = {}
if name:
query["name"] = name
if resource:
@ -798,6 +892,7 @@ class KeycloakUMA:
query["max"] = maximum
data_raw = await self.connection.a_raw_get(
(await self.a_uma_well_known)["policy_endpoint"], **query
(await self.a_uma_well_known)["policy_endpoint"],
**query,
)
return raise_error_from_response(data_raw, KeycloakGetError)

247
src/keycloak/openid_connection.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -21,22 +20,31 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID Connection Manager module.
"""
Keycloak OpenID Connection Manager module.
The module contains mainly the implementation of KeycloakOpenIDConnection class.
This is an extension of the ConnectionManager class, and handles the automatic refresh
of openid tokens when required.
"""
from datetime import datetime, timedelta
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from httpx import Response as AsyncResponse
from requests import Response
from .connection import ConnectionManager
from .exceptions import KeycloakPostError
from .exceptions import HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, KeycloakPostError
from .keycloak_openid import KeycloakOpenID
class KeycloakOpenIDConnection(ConnectionManager):
"""A class to help with OpenID connections which can auto refresh tokens.
"""
A class to help with OpenID connections which can auto refresh tokens.
:param object: _description_
:type object: _type_
@ -59,23 +67,24 @@ class KeycloakOpenIDConnection(ConnectionManager):
def __init__(
self,
server_url,
grant_type=None,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
timeout=60,
cert=None,
max_retries=1,
):
"""Init method.
server_url: str,
grant_type: str | None = None,
username: str | None = None,
password: str | None = None,
token: str | None = None,
totp: str | None = None,
realm_name: str = "master",
client_id: str = "admin-cli",
verify: str | bool = True,
client_secret_key: str | None = None,
custom_headers: dict | None = None,
user_realm_name: str | None = None,
timeout: int | None = 60,
cert: str | tuple | None = None,
max_retries: int = 1,
) -> None:
"""
Init method.
:param server_url: Keycloak server url
:type server_url: str
@ -144,11 +153,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
timeout=self.timeout,
verify=self.verify,
cert=cert,
max_retries=max_retries,
)
@property
def server_url(self):
"""Get server url.
def server_url(self) -> str:
"""
Get server url.
:returns: Keycloak server url
:rtype: str
@ -156,12 +167,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self.base_url
@server_url.setter
def server_url(self, value):
def server_url(self, value: str) -> None:
self.base_url = value
@property
def grant_type(self):
"""Get grant type.
def grant_type(self) -> str:
"""
Get grant type.
:returns: Grant type
:rtype: str
@ -169,12 +181,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._grant_type
@grant_type.setter
def grant_type(self, value):
def grant_type(self, value: str) -> None:
self._grant_type = value
@property
def realm_name(self):
"""Get realm name.
def realm_name(self) -> str:
"""
Get realm name.
:returns: Realm name
:rtype: str
@ -182,12 +195,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._realm_name
@realm_name.setter
def realm_name(self, value):
def realm_name(self, value: str) -> None:
self._realm_name = value
@property
def client_id(self):
"""Get client id.
def client_id(self) -> str:
"""
Get client id.
:returns: Client id
:rtype: str
@ -195,12 +209,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._client_id
@client_id.setter
def client_id(self, value):
def client_id(self, value: str) -> None:
self._client_id = value
@property
def client_secret_key(self):
"""Get client secret key.
def client_secret_key(self) -> str:
"""
Get client secret key.
:returns: Client secret key
:rtype: str
@ -208,12 +223,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
def client_secret_key(self, value: str) -> None:
self._client_secret_key = value
@property
def username(self):
"""Get username.
def username(self) -> str:
"""
Get username.
:returns: Admin username
:rtype: str
@ -221,12 +237,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._username
@username.setter
def username(self, value):
def username(self, value: str) -> None:
self._username = value
@property
def password(self):
"""Get password.
def password(self) -> str:
"""
Get password.
:returns: Admin password
:rtype: str
@ -234,12 +251,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._password
@password.setter
def password(self, value):
def password(self, value: str) -> None:
self._password = value
@property
def totp(self):
"""Get totp.
def totp(self) -> str:
"""
Get totp.
:returns: TOTP
:rtype: str
@ -247,12 +265,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._totp
@totp.setter
def totp(self, value):
def totp(self, value: str) -> None:
self._totp = value
@property
def token(self):
"""Get token.
def token(self) -> dict:
"""
Get token.
:returns: Access and refresh token
:rtype: dict
@ -260,17 +279,18 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._token
@token.setter
def token(self, value):
def token(self, value: dict) -> None:
self._token = value
self._expires_at = datetime.now() + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
self._expires_at = datetime.now(tz=timezone.utc) + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0),
)
if value is not None:
self.add_param_headers("Authorization", "Bearer " + value.get("access_token"))
@property
def expires_at(self):
"""Get token expiry time.
def expires_at(self) -> datetime:
"""
Get token expiry time.
:returns: Datetime at which the current token will expire
:rtype: datetime
@ -278,8 +298,9 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._expires_at
@property
def user_realm_name(self):
"""Get user realm name.
def user_realm_name(self) -> str:
"""
Get user realm name.
:returns: User realm name
:rtype: str
@ -287,12 +308,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._user_realm_name
@user_realm_name.setter
def user_realm_name(self, value):
def user_realm_name(self, value: str) -> None:
self._user_realm_name = value
@property
def custom_headers(self):
"""Get custom headers.
def custom_headers(self) -> dict:
"""
Get custom headers.
:returns: Custom headers
:rtype: dict
@ -300,7 +322,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._custom_headers
@custom_headers.setter
def custom_headers(self, value):
def custom_headers(self, value: dict) -> None:
self._custom_headers = value
if self.custom_headers is not None:
# merge custom headers to main headers
@ -308,7 +330,8 @@ class KeycloakOpenIDConnection(ConnectionManager):
@property
def keycloak_openid(self) -> KeycloakOpenID:
"""Get the KeycloakOpenID object.
"""
Get the KeycloakOpenID object.
The KeycloakOpenID is used to refresh tokens
@ -321,7 +344,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
token_realm_name = "master" # noqa: S105
self._keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
@ -336,20 +359,25 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._keycloak_openid
def get_token(self):
"""Get admin token.
def get_token(self) -> None:
"""
Get admin token.
The admin token is then set in the `token` attribute.
"""
if self.grant_type:
self.token = self.keycloak_openid.token(
self.username, self.password, grant_type=self.grant_type, totp=self.totp
self.username,
self.password,
grant_type=self.grant_type,
totp=self.totp,
)
else:
self.token = None
def refresh_token(self):
"""Refresh the token.
def refresh_token(self) -> None:
"""
Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed.
"""
@ -365,17 +393,20 @@ class KeycloakOpenIDConnection(ConnectionManager):
b"Token is not active",
b"Session not active",
]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
if e.response_code == HTTP_BAD_REQUEST and any(
err in e.response_body for err in list_errors
):
self.get_token()
else:
raise
def _refresh_if_required(self):
if datetime.now() >= self.expires_at:
def _refresh_if_required(self) -> None:
if datetime.now(tz=timezone.utc) >= self.expires_at:
self.refresh_token()
def raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
def raw_get(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
@ -389,14 +420,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_get(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token()
r = super().raw_get(*args, **kwargs)
return r
def raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
def raw_post(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
@ -410,14 +442,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_post(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token()
r = super().raw_post(*args, **kwargs)
return r
def raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
def raw_put(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
@ -431,14 +464,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_put(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token()
r = super().raw_put(*args, **kwargs)
return r
def raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
def raw_delete(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more.
@ -452,26 +486,31 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_delete(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token()
r = super().raw_delete(*args, **kwargs)
return r
async def a_get_token(self):
"""Get admin token.
async def a_get_token(self) -> None:
"""
Get admin token.
The admin token is then set in the `token` attribute.
"""
if self.grant_type:
self.token = await self.keycloak_openid.a_token(
self.username, self.password, grant_type=self.grant_type, totp=self.totp
self.username,
self.password,
grant_type=self.grant_type,
totp=self.totp,
)
else:
self.token = None
async def a_refresh_token(self):
"""Refresh the token.
async def a_refresh_token(self) -> None:
"""
Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed.
"""
@ -487,18 +526,21 @@ class KeycloakOpenIDConnection(ConnectionManager):
b"Token is not active",
b"Session not active",
]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
if e.response_code == HTTP_BAD_REQUEST and any(
err in e.response_body for err in list_errors
):
await self.a_get_token()
else:
raise
async def a__refresh_if_required(self):
async def a__refresh_if_required(self) -> None:
"""Refresh the token if it is expired."""
if datetime.now() >= self.expires_at:
if datetime.now(tz=timezone.utc) >= self.expires_at:
await self.a_refresh_token()
async def a_raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
async def a_raw_get(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
@ -512,14 +554,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
await self.a__refresh_if_required()
r = await super().a_raw_get(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token()
r = await super().a_raw_get(*args, **kwargs)
return r
async def a_raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
async def a_raw_post(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
@ -533,14 +576,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
await self.a__refresh_if_required()
r = await super().a_raw_post(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token()
r = await super().a_raw_post(*args, **kwargs)
return r
async def a_raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
async def a_raw_put(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
@ -554,14 +598,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
await self.a__refresh_if_required()
r = await super().a_raw_put(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token()
r = await super().a_raw_put(*args, **kwargs)
return r
async def a_raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
async def a_raw_delete(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more.
@ -575,7 +620,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
"""
await self.a__refresh_if_required()
r = await super().a_raw_delete(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token()
r = await super().a_raw_delete(*args, **kwargs)

121
src/keycloak/uma_permissions.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -23,11 +22,14 @@
"""User-managed access permissions module."""
from __future__ import annotations
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
class UMAPermission:
"""A class to conveniently assemble permissions.
"""
A class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.
@ -47,8 +49,14 @@ class UMAPermission:
:type scope: str
"""
def __init__(self, permission=None, resource="", scope=""):
"""Init method.
def __init__(
self,
permission: UMAPermission | None = None,
resource: str = "",
scope: str = "",
) -> None:
"""
Init method.
:param permission: Permission
:type permission: UMAPermission
@ -63,16 +71,16 @@ class UMAPermission:
if permission:
if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
msg = f"can't determine if '{permission}' is a resource or scope"
raise PermissionDefinitionError(msg)
if permission.resource:
self.resource = str(permission.resource)
if permission.scope:
self.scope = str(permission.scope)
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: String representation
:rtype: str
@ -80,20 +88,22 @@ class UMAPermission:
scope = self.scope
if scope:
scope = "#" + scope
return "{}{}".format(self.resource, scope)
return f"{self.resource}{scope}"
def __eq__(self, __o: object) -> bool:
"""Eq method.
def __eq__(self, other: object) -> bool:
"""
Eq method.
:param __o: The other object
:type __o: object
:returns: Equality boolean
:rtype: bool
"""
return str(self) == str(__o)
return str(self) == str(other)
def __repr__(self) -> str:
"""Repr method.
"""
Repr method.
:returns: The object representation
:rtype: str
@ -101,15 +111,22 @@ class UMAPermission:
return self.__str__()
def __hash__(self) -> int:
"""Hash method.
"""
Hash method.
:returns: Hash of the object
:rtype: int
"""
return hash(str(self))
def __call__(self, permission=None, resource="", scope="") -> "UMAPermission":
"""Call method.
def __call__(
self,
permission: UMAPermission | None = None,
resource: str = "",
scope: str = "",
) -> UMAPermission:
"""
Call method.
:param permission: Permission
:type permission: UMAPermission
@ -131,9 +148,8 @@ class UMAPermission:
if permission:
if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
msg = f"can't determine if '{permission}' is a resource or scope"
raise PermissionDefinitionError(msg)
if permission.resource:
result_resource = str(permission.resource)
if permission.scope:
@ -143,7 +159,8 @@ class UMAPermission:
class Resource(UMAPermission):
"""A UMAPermission Resource class to conveniently assemble permissions.
"""
A UMAPermission Resource class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.
@ -151,8 +168,9 @@ class Resource(UMAPermission):
:type resource: str
"""
def __init__(self, resource):
"""Init method.
def __init__(self, resource: Resource) -> None:
"""
Init method.
:param resource: Resource
:type resource: str
@ -161,7 +179,8 @@ class Resource(UMAPermission):
class Scope(UMAPermission):
"""A UMAPermission Scope class to conveniently assemble permissions.
"""
A UMAPermission Scope class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.
@ -169,8 +188,9 @@ class Scope(UMAPermission):
:type scope: str
"""
def __init__(self, scope):
"""Init method.
def __init__(self, scope: Scope) -> None:
"""
Init method.
:param scope: Scope
:type scope: str
@ -179,7 +199,8 @@ class Scope(UMAPermission):
class AuthStatus:
"""A class that represents the authorization/login status of a user associated with a token.
"""
A class that represents the authorization/login status of a user associated with a token.
This has to evaluate to True if and only if the user is properly authorized
for the requested resource.
@ -192,8 +213,9 @@ class AuthStatus:
:type missing_permissions: set
"""
def __init__(self, is_logged_in, is_authorized, missing_permissions):
"""Init method.
def __init__(self, is_logged_in: bool, is_authorized: bool, missing_permissions: set) -> None:
"""
Init method.
:param is_logged_in: Is logged in indicator
:type is_logged_in: bool
@ -206,16 +228,18 @@ class AuthStatus:
self.is_authorized = is_authorized
self.missing_permissions = missing_permissions
def __bool__(self):
"""Bool method.
def __bool__(self) -> bool:
"""
Bool method.
:returns: Boolean representation
:rtype: bool
"""
return self.is_authorized
def __repr__(self):
"""Repr method.
def __repr__(self) -> str:
"""
Repr method.
:returns: The object representation
:rtype: str
@ -228,8 +252,9 @@ class AuthStatus:
)
def build_permission_param(permissions):
"""Transform permissions to a set, so they are usable for requests.
def build_permission_param(permissions: str | list | dict) -> set:
"""
Transform permissions to a set, so they are usable for requests.
:param permissions: Permissions
:type permissions: str | Iterable[str] | dict[str, str] | dict[str, Iterabble[str]]
@ -240,9 +265,9 @@ def build_permission_param(permissions):
if permissions is None or permissions == "":
return set()
if isinstance(permissions, str):
return set((permissions,))
return {permissions}
if isinstance(permissions, UMAPermission):
return set((str(permissions),))
return {str(permissions)}
try: # treat as dictionary of permissions
result = set()
@ -250,26 +275,26 @@ def build_permission_param(permissions):
if scopes is None:
result.add(resource)
elif isinstance(scopes, str):
result.add("{}#{}".format(resource, scopes))
result.add(f"{resource}#{scopes}")
else:
try:
for scope in scopes:
if not isinstance(scope, str):
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
result.add("{}#{}".format(resource, scope))
except TypeError:
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
return result
msg = f"misbuilt permission {permissions}"
raise KeycloakPermissionFormatError(msg)
result.add(f"{resource}#{scope}")
except TypeError as e:
msg = f"misbuilt permission {permissions}"
raise KeycloakPermissionFormatError(msg) from e
except AttributeError:
pass
else:
return result
result = set()
for permission in permissions:
if not isinstance(permission, (str, UMAPermission)):
raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions))
msg = f"misbuilt permission {permissions}"
raise KeycloakPermissionFormatError(msg)
result.add(str(permission))
return result

5
src/keycloak/urls_patterns.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
@ -27,7 +26,7 @@
URL_REALM = "realms/{realm-name}"
URL_WELL_KNOWN_BASE = "realms/{realm-name}/.well-known"
URL_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/openid-configuration"
URL_TOKEN = "realms/{realm-name}/protocol/openid-connect/token"
URL_TOKEN = "realms/{realm-name}/protocol/openid-connect/token" # noqa: S105
URL_USERINFO = "realms/{realm-name}/protocol/openid-connect/userinfo"
URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout"
URL_CERTS = "realms/{realm-name}/protocol/openid-connect/certs"
@ -49,7 +48,7 @@ URL_ADMIN_USER = "admin/realms/{realm-name}/users/{id}"
URL_ADMIN_USER_CONSENTS = "admin/realms/{realm-name}/users/{id}/consents"
URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-actions-email"
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password" # noqa: S105
URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions"
URL_ADMIN_USER_ALL_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings"
URL_ADMIN_USER_CLIENT_ROLES = (

222
tests/conftest.py

@ -3,8 +3,8 @@
import ipaddress
import os
import uuid
from datetime import datetime, timedelta
from typing import Generator, Tuple
from collections.abc import Generator
from datetime import datetime, timedelta, timezone
import freezegun
import pytest
@ -17,8 +17,9 @@ from cryptography.x509.oid import NameOID
from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection, KeycloakUMA
class KeycloakTestEnv(object):
"""Wrapper for test Keycloak connection configuration.
class KeycloakTestEnv:
"""
Wrapper for test Keycloak connection configuration.
:param host: Hostname
:type host: str
@ -36,8 +37,9 @@ class KeycloakTestEnv(object):
port: str = os.environ["KEYCLOAK_PORT"],
username: str = os.environ["KEYCLOAK_ADMIN"],
password: str = os.environ["KEYCLOAK_ADMIN_PASSWORD"],
):
"""Init method.
) -> None:
"""
Init method.
:param host: Hostname
:type host: str
@ -48,87 +50,96 @@ class KeycloakTestEnv(object):
:param password: Admin password
:type password: str
"""
self.KEYCLOAK_HOST = host
self.KEYCLOAK_PORT = port
self.KEYCLOAK_ADMIN = username
self.KEYCLOAK_ADMIN_PASSWORD = password
self.keycloak_host = host
self.keycloak_port = port
self.keycloak_admin = username
self.keycloak_admin_password = password
@property
def KEYCLOAK_HOST(self):
"""Hostname getter.
def keycloak_host(self) -> str:
"""
Hostname getter.
:returns: Keycloak host
:rtype: str
"""
return self._KEYCLOAK_HOST
return self._keycloak_host
@KEYCLOAK_HOST.setter
def KEYCLOAK_HOST(self, value: str):
"""Hostname setter.
@keycloak_host.setter
def keycloak_host(self, value: str) -> None:
"""
Hostname setter.
:param value: Keycloak host
:type value: str
"""
self._KEYCLOAK_HOST = value
self._keycloak_host = value
@property
def KEYCLOAK_PORT(self):
"""Port getter.
def keycloak_port(self) -> str:
"""
Port getter.
:returns: Keycloak port
:rtype: str
"""
return self._KEYCLOAK_PORT
return self._keycloak_port
@KEYCLOAK_PORT.setter
def KEYCLOAK_PORT(self, value: str):
"""Port setter.
@keycloak_port.setter
def keycloak_port(self, value: str) -> None:
"""
Port setter.
:param value: Keycloak port
:type value: str
"""
self._KEYCLOAK_PORT = value
self._keycloak_port = value
@property
def KEYCLOAK_ADMIN(self):
"""Admin username getter.
def keycloak_admin(self) -> str:
"""
Admin username getter.
:returns: Admin username
:rtype: str
"""
return self._KEYCLOAK_ADMIN
return self._keycloak_admin
@KEYCLOAK_ADMIN.setter
def KEYCLOAK_ADMIN(self, value: str):
"""Admin username setter.
@keycloak_admin.setter
def keycloak_admin(self, value: str) -> None:
"""
Admin username setter.
:param value: Admin username
:type value: str
"""
self._KEYCLOAK_ADMIN = value
self._keycloak_admin = value
@property
def KEYCLOAK_ADMIN_PASSWORD(self):
"""Admin password getter.
def keycloak_admin_password(self) -> str:
"""
Admin password getter.
:returns: Admin password
:rtype: str
"""
return self._KEYCLOAK_ADMIN_PASSWORD
return self._keycloak_admin_password
@KEYCLOAK_ADMIN_PASSWORD.setter
def KEYCLOAK_ADMIN_PASSWORD(self, value: str):
"""Admin password setter.
@keycloak_admin_password.setter
def keycloak_admin_password(self, value: str) -> None:
"""
Admin password setter.
:param value: Admin password
:type value: str
"""
self._KEYCLOAK_ADMIN_PASSWORD = value
self._keycloak_admin_password = value
@pytest.fixture
def env():
"""Fixture for getting the test environment configuration object.
def env() -> KeycloakTestEnv:
"""
Fixture for getting the test environment configuration object.
:returns: Keycloak test environment object
:rtype: KeycloakTestEnv
@ -137,8 +148,9 @@ def env():
@pytest.fixture
def admin(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class.
def admin(env: KeycloakTestEnv) -> KeycloakAdmin:
"""
Fixture for initialized KeycloakAdmin class.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
@ -146,16 +158,17 @@ def admin(env: KeycloakTestEnv):
:rtype: KeycloakAdmin
"""
return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
username=env.keycloak_admin,
password=env.keycloak_admin_password,
)
@pytest.fixture
@freezegun.freeze_time("2023-02-25 10:00:00")
def admin_frozen(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class, with time frozen.
def admin_frozen(env: KeycloakTestEnv) -> KeycloakAdmin:
"""
Fixture for initialized KeycloakAdmin class, with time frozen.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
@ -163,15 +176,20 @@ def admin_frozen(env: KeycloakTestEnv):
:rtype: KeycloakAdmin
"""
return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
username=env.keycloak_admin,
password=env.keycloak_admin_password,
)
@pytest.fixture
def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for initialized KeycloakOpenID class.
def oid(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[KeycloakOpenID, None, None]:
"""
Fixture for initialized KeycloakOpenID class.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
@ -193,11 +211,11 @@ def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"enabled": True,
"publicClient": True,
"protocol": "openid-connect",
}
},
)
# Return OID
yield KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm,
client_id=client,
)
@ -206,8 +224,13 @@ def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
@pytest.fixture
def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials.
def oid_with_credentials(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[tuple[KeycloakOpenID, str, str], None, None]:
"""
Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
@ -232,7 +255,7 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
"protocol": "openid-connect",
"secret": secret,
"clientAuthenticatorType": "client-secret",
}
},
)
# Create user
username = str(uuid.uuid4())
@ -247,12 +270,12 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
"emailVerified": True,
"requiredActions": [],
"credentials": [{"type": "password", "value": password, "temporary": False}],
}
},
)
yield (
KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm,
client_id=client,
client_secret_key=secret,
@ -267,8 +290,13 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
@pytest.fixture
def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials.
def oid_with_credentials_authz(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[tuple[KeycloakOpenID, str, str], None, None]:
"""
Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
@ -295,7 +323,7 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
"clientAuthenticatorType": "client-secret",
"authorizationServicesEnabled": True,
"serviceAccountsEnabled": True,
}
},
)
admin.create_client_authz_role_based_policy(
client_id=client_id,
@ -317,12 +345,12 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
"lastName": "last",
"requiredActions": [],
"credentials": [{"type": "password", "value": password, "temporary": False}],
}
},
)
yield (
KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm,
client_id=client,
client_secret_key=secret,
@ -337,8 +365,13 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
@pytest.fixture
def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials.
def oid_with_credentials_device(
env: KeycloakTestEnv,
realm: str,
admin: KeycloakAdmin,
) -> Generator[tuple[KeycloakOpenID, str, str], None, None]:
"""
Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
@ -364,7 +397,7 @@ def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: Keycloa
"secret": secret,
"clientAuthenticatorType": "client-secret",
"attributes": {"oauth2.device.authorization.grant.enabled": True},
}
},
)
# Create user
username = str(uuid.uuid4())
@ -379,12 +412,12 @@ def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: Keycloa
"emailVerified": True,
"requiredActions": [],
"credentials": [{"type": "password", "value": password, "temporary": False}],
}
},
)
yield (
KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name=realm,
client_id=client,
client_secret_key=secret,
@ -400,7 +433,8 @@ def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: Keycloa
@pytest.fixture
def realm(admin: KeycloakAdmin) -> Generator[str, None, None]:
"""Fixture for a new random realm.
"""
Fixture for a new random realm.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
@ -415,7 +449,8 @@ def realm(admin: KeycloakAdmin) -> Generator[str, None, None]:
@pytest.fixture
def user(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
"""Fixture for a new random user.
"""
Fixture for a new random user.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
@ -433,7 +468,8 @@ def user(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
@pytest.fixture
def group(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
"""Fixture for a new random group.
"""
Fixture for a new random group.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
@ -451,7 +487,8 @@ def group(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
@pytest.fixture
def client(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
"""Fixture for a new random client.
"""
Fixture for a new random client.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
@ -469,7 +506,8 @@ def client(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
@pytest.fixture
def client_role(admin: KeycloakAdmin, realm: str, client: str) -> Generator[str, None, None]:
"""Fixture for a new random client role.
"""
Fixture for a new random client role.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
@ -489,9 +527,13 @@ def client_role(admin: KeycloakAdmin, realm: str, client: str) -> Generator[str,
@pytest.fixture
def composite_client_role(
admin: KeycloakAdmin, realm: str, client: str, client_role: str
admin: KeycloakAdmin,
realm: str,
client: str,
client_role: str,
) -> Generator[str, None, None]:
"""Fixture for a new random composite client role.
"""
Fixture for a new random composite client role.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
@ -514,8 +556,9 @@ def composite_client_role(
@pytest.fixture
def selfsigned_cert():
"""Generate self signed certificate for a hostname, and optional IP addresses.
def selfsigned_cert() -> tuple[str, str]:
"""
Generate self signed certificate for a hostname, and optional IP addresses.
:returns: Selfsigned certificate
:rtype: Tuple[str, str]
@ -526,7 +569,9 @@ def selfsigned_cert():
# Generate our key
if key is None:
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
public_exponent=65537,
key_size=2048,
backend=default_backend(),
)
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
@ -545,7 +590,7 @@ def selfsigned_cert():
# path_len=0 means this cert can only sign itself, not other certs.
basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
now = datetime.utcnow()
now = datetime.now(tz=timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(name)
@ -569,8 +614,11 @@ def selfsigned_cert():
@pytest.fixture
def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Fixture for initialized KeycloakUMA class.
def oid_connection_with_authz(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> KeycloakOpenIDConnection:
"""
Fixture for initialized KeycloakUMA class.
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
@ -578,19 +626,19 @@ def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID,
:rtype: KeycloakOpenIDConnection
"""
oid, _, _ = oid_with_credentials_authz
connection = KeycloakOpenIDConnection(
return KeycloakOpenIDConnection(
server_url=oid.connection.base_url,
realm_name=oid.realm_name,
client_id=oid.client_id,
client_secret_key=oid.client_secret_key,
timeout=60,
)
yield connection
@pytest.fixture
def uma(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Fixture for initialized KeycloakUMA class.
def uma(oid_connection_with_authz: KeycloakOpenIDConnection) -> KeycloakUMA:
"""
Fixture for initialized KeycloakUMA class.
:param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client
:type oid_connection_with_authz: KeycloakOpenIDConnection
@ -599,4 +647,4 @@ def uma(oid_connection_with_authz: KeycloakOpenIDConnection):
"""
connection = oid_connection_with_authz
# Return UMA
yield KeycloakUMA(connection=connection)
return KeycloakUMA(connection=connection)

9
tests/test_authorization.py

@ -6,7 +6,7 @@ from keycloak.authorization import Permission, Policy, Role
from keycloak.exceptions import KeycloakAuthorizationConfigError
def test_authorization_objects():
def test_authorization_objects() -> None:
"""Test authorization objects."""
# Test permission
p = Permission(name="test", type="test", logic="test", decision_strategy="test")
@ -39,5 +39,10 @@ def test_authorization_objects():
assert r.name == "test"
assert not r.required
assert r.get_name() == "test"
assert r == r
assert r == r # noqa: PLR0124
assert r == "test"
with pytest.raises(NotImplementedError) as err:
assert r == 1
assert str(err.value) == "Cannot compare Role with <class 'int'>"

15
tests/test_connection.py

@ -8,21 +8,22 @@ from keycloak.connection import ConnectionManager
from keycloak.exceptions import KeycloakConnectionError
def test_connection_proxy():
def test_connection_proxy() -> None:
"""Test proxies of connection manager."""
cm = ConnectionManager(
base_url="http://test.test", proxies={"http://test.test": "http://localhost:8080"}
base_url="http://test.test",
proxies={"http://test.test": "http://localhost:8080"},
)
assert cm._s.proxies == {"http://test.test": "http://localhost:8080"}
def test_headers():
def test_headers() -> None:
"""Test headers manipulation."""
cm = ConnectionManager(base_url="http://test.test", headers={"H": "A"})
assert cm.param_headers(key="H") == "A"
assert cm.param_headers(key="A") is None
cm.clean_headers()
assert cm.headers == dict()
assert cm.headers == {}
cm.add_param_headers(key="H", value="B")
assert cm.exist_param_headers(key="H")
assert not cm.exist_param_headers(key="B")
@ -30,7 +31,7 @@ def test_headers():
assert not cm.exist_param_headers(key="H")
def test_bad_connection():
def test_bad_connection() -> None:
"""Test bad connection."""
cm = ConnectionManager(base_url="http://not.real.domain")
with pytest.raises(KeycloakConnectionError):
@ -44,7 +45,7 @@ def test_bad_connection():
@pytest.mark.asyncio
async def a_test_bad_connection():
async def a_test_bad_connection() -> None:
"""Test bad connection."""
cm = ConnectionManager(base_url="http://not.real.domain")
with pytest.raises(KeycloakConnectionError):
@ -57,7 +58,7 @@ async def a_test_bad_connection():
await cm.a_raw_put(path="bad", data={})
def test_counter_part():
def test_counter_part() -> None:
"""Test that each function has its async counter part."""
con_methods = [
func for func in dir(ConnectionManager) if callable(getattr(ConnectionManager, func))

7
tests/test_exceptions.py

@ -7,7 +7,7 @@ import pytest
from keycloak.exceptions import KeycloakOperationError, raise_error_from_response
def test_raise_error_from_response_from_dict():
def test_raise_error_from_response_from_dict() -> None:
"""Test raise error from response using a dictionary."""
response = Mock()
response.json.return_value = {"key": "value"}
@ -16,5 +16,8 @@ def test_raise_error_from_response_from_dict():
with pytest.raises(KeycloakOperationError):
raise_error_from_response(
response=response, error=dict(), expected_codes=[200], skip_exists=False
response=response,
error={},
expected_codes=[200],
skip_exists=False,
)

1867
tests/test_keycloak_admin.py
File diff suppressed because it is too large
View File

357
tests/test_keycloak_openid.py

@ -1,7 +1,6 @@
"""Test module for KeycloakOpenID."""
from inspect import iscoroutinefunction, signature
from typing import Tuple
from unittest import mock
import jwcrypto.jwk
@ -22,16 +21,18 @@ from keycloak.exceptions import (
KeycloakPostError,
KeycloakRPTNotFound,
)
from tests.conftest import KeycloakTestEnv
def test_keycloak_openid_init(env):
"""Test KeycloakOpenId's init method.
def test_keycloak_openid_init(env: KeycloakTestEnv) -> None:
"""
Test KeycloakOpenId's init method.
:param env: Environment fixture
:type env: KeycloakTestEnv
"""
oid = KeycloakOpenID(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}",
realm_name="master",
client_id="admin-cli",
)
@ -43,15 +44,16 @@ def test_keycloak_openid_init(env):
assert isinstance(oid.authorization, Authorization)
def test_well_known(oid: KeycloakOpenID):
"""Test the well_known method.
def test_well_known(oid: KeycloakOpenID) -> None:
"""
Test the well_known method.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
res = oid.well_known()
assert res is not None
assert res != dict()
assert res != {}
for key in [
"acr_values_supported",
"authorization_encryption_alg_values_supported",
@ -110,8 +112,9 @@ def test_well_known(oid: KeycloakOpenID):
assert key in res
def test_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method.
def test_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None:
"""
Test the auth_url method.
:param env: Environment fixture
:type env: KeycloakTestEnv
@ -120,15 +123,15 @@ def test_auth_url(env, oid: KeycloakOpenID):
"""
res = oid.auth_url(redirect_uri="http://test.test/*")
assert (
res
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
+ f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
+ "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
res == f"http://{env.keycloak_host}:{env.keycloak_port}/realms/{oid.realm_name}"
f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
"&redirect_uri=http://test.test/*&scope=email&state=&nonce="
)
def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method.
def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test the token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -177,9 +180,11 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
def test_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test the exchange token method.
oid_with_credentials: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test the exchange token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -198,7 +203,7 @@ def test_exchange_token(
admin.get_client_role(
client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation",
)
),
],
)
@ -215,7 +220,9 @@ def test_exchange_token(
# Exchange token with the new user
new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username
token=token["access_token"],
audience=oid.client_id,
subject=username,
)
assert oid.userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test",
@ -229,8 +236,9 @@ def test_exchange_token(
assert token != new_token
def test_logout(oid_with_credentials):
"""Test logout.
def test_logout(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test logout.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -238,15 +246,16 @@ def test_logout(oid_with_credentials):
oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password)
assert oid.userinfo(token=token["access_token"]) != dict()
assert oid.logout(refresh_token=token["refresh_token"]) == dict()
assert oid.userinfo(token=token["access_token"]) != {}
assert oid.logout(refresh_token=token["refresh_token"]) == {}
with pytest.raises(KeycloakAuthenticationError):
oid.userinfo(token=token["access_token"])
def test_certs(oid: KeycloakOpenID):
"""Test certificates.
def test_certs(oid: KeycloakOpenID) -> None:
"""
Test certificates.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
@ -254,8 +263,9 @@ def test_certs(oid: KeycloakOpenID):
assert len(oid.certs()["keys"]) == 2
def test_public_key(oid: KeycloakOpenID):
"""Test public key.
def test_public_key(oid: KeycloakOpenID) -> None:
"""
Test public key.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
@ -264,9 +274,11 @@ def test_public_key(oid: KeycloakOpenID):
def test_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test entitlement.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test entitlement.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -277,15 +289,16 @@ def test_entitlement(
oid, username, password = oid_with_credentials_authz
token = oid.token(username=username, password=password)
resource_server_id = admin.get_client_authz_resources(
client_id=admin.get_client_id(oid.client_id)
client_id=admin.get_client_id(oid.client_id),
)[0]["_id"]
with pytest.raises(KeycloakDeprecationError):
oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect.
def test_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test introspect.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -295,15 +308,18 @@ def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
assert oid.introspect(token=token["access_token"])["active"]
assert oid.introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
token=token["access_token"],
rpt="some",
token_type_hint="requesting_party_token", # noqa: S106
) == {"active": False}
with pytest.raises(KeycloakRPTNotFound):
oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token") # noqa: S106
def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token.
def test_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test decode token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -319,8 +335,9 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
def test_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token with an invalid token.
def test_decode_token_invalid_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test decode token with an invalid token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -340,20 +357,27 @@ def test_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID,
with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = oid.decode_token(
token=invalid_access_token, validate=True, key=key
token=invalid_access_token,
validate=True,
key=key,
)
decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False)
assert decoded_access_token == decoded_invalid_access_token
decoded_invalid_access_token = oid.decode_token(
token=invalid_access_token, validate=False, key=key
token=invalid_access_token,
validate=False,
key=key,
)
assert decoded_access_token == decoded_invalid_access_token
def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test load authorization config.
def test_load_authorization_config(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -368,12 +392,14 @@ def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpe
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
oid.authorization.policies["test-authz-rb-policy"].permissions[0],
Permission,
)
def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
def test_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -390,15 +416,17 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str
orig_client_id = oid.client_id
oid.client_id = "account"
assert oid.get_policies(token=token["access_token"], method_token_info="decode") == []
assert oid.get_policies(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy
assert [
str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
str(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["Policy: test (role)"]
assert [
repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
repr(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id
@ -407,8 +435,9 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str
oid.get_policies(token=token["access_token"])
def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
def test_get_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -425,22 +454,25 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
orig_client_id = oid.client_id
oid.client_id = "account"
assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == []
assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
policy.add_permission(
permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
)
name="test-perm",
type="resource",
logic="POSITIVE",
decision_strategy="UNANIMOUS",
),
)
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["Permission: test-perm (resource)"]
assert [
repr(x)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id
@ -449,8 +481,9 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
oid.get_permissions(token=token["access_token"])
def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
def test_uma_permissions(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test UMA permissions.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -464,9 +497,11 @@ def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
def test_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test has UMA access.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test has UMA access.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -497,16 +532,18 @@ def test_has_uma_access(
assert (
str(
oid.has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource"
)
token=admin.connection.token["access_token"],
permissions="Default Resource",
),
)
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})"
"{'Default Resource'})"
)
def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"""Test device authorization flow.
def test_device(oid_with_credentials_device: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test device authorization flow.
:param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
credentials and device authorization flow enabled
@ -519,7 +556,7 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"user_code": mock.ANY,
"verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
"verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
+ f"device?user_code={res['user_code']}",
f"device?user_code={res['user_code']}",
"expires_in": 600,
"interval": 5,
}
@ -529,15 +566,16 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_a_well_known(oid: KeycloakOpenID):
"""Test the well_known method.
async def test_a_well_known(oid: KeycloakOpenID) -> None:
"""
Test the well_known method.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
res = await oid.a_well_known()
assert res is not None
assert res != dict()
assert res != {}
for key in [
"acr_values_supported",
"authorization_encryption_alg_values_supported",
@ -597,8 +635,9 @@ async def test_a_well_known(oid: KeycloakOpenID):
@pytest.mark.asyncio
async def test_a_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method.
async def test_a_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None:
"""
Test the auth_url method.
:param env: Environment fixture
:type env: KeycloakTestEnv
@ -607,16 +646,16 @@ async def test_a_auth_url(env, oid: KeycloakOpenID):
"""
res = await oid.a_auth_url(redirect_uri="http://test.test/*")
assert (
res
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
+ f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
+ "&redirect_uri=http://test.test/*&scope=email&state=&nonce="
res == f"http://{env.keycloak_host}:{env.keycloak_port}/realms/{oid.realm_name}"
f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
"&redirect_uri=http://test.test/*&scope=email&state=&nonce="
)
@pytest.mark.asyncio
async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method.
async def test_a_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test the token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -666,9 +705,11 @@ async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_a_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test the exchange token method.
oid_with_credentials: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test the exchange token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -687,7 +728,7 @@ async def test_a_exchange_token(
await admin.a_get_client_role(
client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation",
)
),
],
)
@ -704,7 +745,9 @@ async def test_a_exchange_token(
# Exchange token with the new user
new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username
token=token["access_token"],
audience=oid.client_id,
subject=username,
)
assert await oid.a_userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test",
@ -719,8 +762,9 @@ async def test_a_exchange_token(
@pytest.mark.asyncio
async def test_a_logout(oid_with_credentials):
"""Test logout.
async def test_a_logout(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test logout.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -728,16 +772,17 @@ async def test_a_logout(oid_with_credentials):
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
assert await oid.a_userinfo(token=token["access_token"]) != dict()
assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict()
assert await oid.a_userinfo(token=token["access_token"]) != {}
assert await oid.a_logout(refresh_token=token["refresh_token"]) == {}
with pytest.raises(KeycloakAuthenticationError):
await oid.a_userinfo(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_certs(oid: KeycloakOpenID):
"""Test certificates.
async def test_a_certs(oid: KeycloakOpenID) -> None:
"""
Test certificates.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
@ -746,8 +791,9 @@ async def test_a_certs(oid: KeycloakOpenID):
@pytest.mark.asyncio
async def test_a_public_key(oid: KeycloakOpenID):
"""Test public key.
async def test_a_public_key(oid: KeycloakOpenID) -> None:
"""
Test public key.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
@ -757,9 +803,11 @@ async def test_a_public_key(oid: KeycloakOpenID):
@pytest.mark.asyncio
async def test_a_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test entitlement.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test entitlement.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -770,7 +818,7 @@ async def test_a_entitlement(
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
resource_server_id = admin.get_client_authz_resources(
client_id=admin.get_client_id(oid.client_id)
client_id=admin.get_client_id(oid.client_id),
)[0]["_id"]
with pytest.raises(KeycloakDeprecationError):
@ -778,8 +826,9 @@ async def test_a_entitlement(
@pytest.mark.asyncio
async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect.
async def test_a_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test introspect.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -789,18 +838,22 @@ async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str
assert (await oid.a_introspect(token=token["access_token"]))["active"]
assert await oid.a_introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
token=token["access_token"],
rpt="some",
token_type_hint="requesting_party_token", # noqa: S106
) == {"active": False}
with pytest.raises(KeycloakRPTNotFound):
await oid.a_introspect(
token=token["access_token"], token_type_hint="requesting_party_token"
token=token["access_token"],
token_type_hint="requesting_party_token", # noqa: S106
)
@pytest.mark.asyncio
async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token asynchronously.
async def test_a_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test decode token asynchronously.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -817,8 +870,11 @@ async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, s
@pytest.mark.asyncio
async def test_a_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token asynchronously an invalid token.
async def test_a_decode_token_invalid_token(
oid_with_credentials: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test decode token asynchronously an invalid token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
@ -835,30 +891,37 @@ async def test_a_decode_token_invalid_token(oid_with_credentials: Tuple[Keycloak
invalid_access_token = access_token + "a"
with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=True
token=invalid_access_token,
validate=True,
)
with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=True, key=key
token=invalid_access_token,
validate=True,
key=key,
)
decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=False
token=invalid_access_token,
validate=False,
)
assert decoded_access_token == decoded_invalid_access_token
decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=False, key=key
token=invalid_access_token,
validate=False,
key=key,
)
assert decoded_access_token == decoded_invalid_access_token
@pytest.mark.asyncio
async def test_a_load_authorization_config(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
):
"""Test load authorization config.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -873,15 +936,18 @@ async def test_a_load_authorization_config(
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
oid.authorization.policies["test-authz-rb-policy"].permissions[0],
Permission,
)
@pytest.mark.asyncio
async def test_a_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test has UMA access.
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
admin: KeycloakAdmin,
) -> None:
"""
Test has UMA access.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -898,7 +964,10 @@ async def test_a_has_uma_access(
)
assert (
str(
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")
await oid.a_has_uma_access(
token=token["access_token"],
permissions="Default Resource",
),
)
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
@ -914,17 +983,19 @@ async def test_a_has_uma_access(
assert (
str(
await oid.a_has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource"
)
token=admin.connection.token["access_token"],
permissions="Default Resource",
),
)
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})"
"{'Default Resource'})"
)
@pytest.mark.asyncio
async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
async def test_a_get_policies(oid_with_credentials_authz: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -941,17 +1012,17 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID,
orig_client_id = oid.client_id
oid.client_id = "account"
assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == []
assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["Policy: test (role)"]
assert [
repr(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") # noqa: S106
] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id
@ -961,8 +1032,11 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID,
@pytest.mark.asyncio
async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
async def test_a_get_permissions(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -980,26 +1054,31 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
orig_client_id = oid.client_id
oid.client_id = "account"
assert (
await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == [] # noqa: S106
)
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
policy.add_permission(
permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
)
name="test-perm",
type="resource",
logic="POSITIVE",
decision_strategy="UNANIMOUS",
),
)
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
token=token["access_token"],
method_token_info="decode", # noqa: S106
)
] == ["Permission: test-perm (resource)"]
assert [
repr(x)
for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
token=token["access_token"],
method_token_info="decode", # noqa: S106
)
] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id
@ -1010,8 +1089,11 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
@pytest.mark.asyncio
async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
async def test_a_uma_permissions(
oid_with_credentials_authz: tuple[KeycloakOpenID, str, str],
) -> None:
"""
Test UMA permissions.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
@ -1027,8 +1109,9 @@ async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
@pytest.mark.asyncio
async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"""Test device authorization flow.
async def test_a_device(oid_with_credentials_device: tuple[KeycloakOpenID, str, str]) -> None:
"""
Test device authorization flow.
:param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
credentials and device authorization flow enabled
@ -1041,13 +1124,13 @@ async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str,
"user_code": mock.ANY,
"verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
"verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
+ f"device?user_code={res['user_code']}",
f"device?user_code={res['user_code']}",
"expires_in": 600,
"interval": 5,
}
def test_counter_part():
def test_counter_part() -> None:
"""Test that each function has its async counter part."""
openid_methods = [
func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func))

103
tests/test_keycloak_uma.py

@ -15,8 +15,9 @@ from keycloak.exceptions import (
from keycloak.uma_permissions import UMAPermission
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Test KeycloakUMA's init method.
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection) -> None:
"""
Test KeycloakUMA's init method.
:param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
:type oid_connection_with_authz: KeycloakOpenIDConnection
@ -32,21 +33,23 @@ def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
assert uma._well_known is not None
def test_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
def test_uma_well_known(uma: KeycloakUMA) -> None:
"""
Test the well_known method.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
res = uma.uma_well_known
assert res is not None
assert res != dict()
assert res != {}
for key in ["resource_registration_endpoint"]:
assert key in res
def test_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
def test_uma_resource_sets(uma: KeycloakUMA) -> None:
"""
Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -109,7 +112,8 @@ def test_uma_resource_sets(uma: KeycloakUMA):
assert len(resource_set_list_ids) == 0
# With matchingUri query option
resource_set_list_ids = uma.resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True
uri="/some_resources/resource",
matchingUri=True,
)
assert len(resource_set_list_ids) == 1
@ -119,8 +123,8 @@ def test_uma_resource_sets(uma: KeycloakUMA):
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
'"Resource with name [mytest] already exists."}\'',
),
)
# Test get resource set
@ -130,18 +134,18 @@ def test_uma_resource_sets(uma: KeycloakUMA):
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = uma.resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
assert res == {}, res
updated_resource = uma.resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
assert err.match("Unrecognized field")
# Test delete resource set
res = uma.resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
assert res == {}, res
with pytest.raises(KeycloakGetError) as err:
uma.resource_set_read(created_resource["_id"])
err.match("404: b''")
@ -152,8 +156,9 @@ def test_uma_resource_sets(uma: KeycloakUMA):
assert err.match("404: b''")
def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin) -> None:
"""
Test policies.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -215,7 +220,8 @@ def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
with pytest.raises(KeycloakDeleteError) as err:
uma.policy_delete(policy_id)
assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
'404: b\'{"error":"invalid_request","error_description":'
'"Policy with .* does not exist"}\'',
)
policies = uma.policy_query()
@ -251,8 +257,9 @@ def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
admin.delete_group(group_id)
def test_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
def test_uma_access(uma: KeycloakUMA) -> None:
"""
Test permission access checks.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -274,7 +281,7 @@ def test_uma_access(uma: KeycloakUMA):
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
token = uma.connection.token
permissions = list()
permissions = []
assert uma.permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource=resource_to_create["name"]))
@ -285,8 +292,9 @@ def test_uma_access(uma: KeycloakUMA):
uma.resource_set_delete(resource["_id"])
def test_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.
def test_uma_permission_ticket(uma: KeycloakUMA) -> None:
"""
Test permission ticket generation.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -312,7 +320,8 @@ def test_uma_permission_ticket(uma: KeycloakUMA):
response = uma.permission_ticket_create(permissions)
rpt = uma.connection.keycloak_openid.token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket",
ticket=response["ticket"],
)
assert rpt
assert "access_token" in rpt
@ -328,22 +337,24 @@ def test_uma_permission_ticket(uma: KeycloakUMA):
@pytest.mark.asyncio
async def test_a_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
async def test_a_uma_well_known(uma: KeycloakUMA) -> None:
"""
Test the well_known method.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
res = uma.uma_well_known
assert res is not None
assert res != dict()
assert res != {}
for key in ["resource_registration_endpoint"]:
assert key in res
@pytest.mark.asyncio
async def test_a_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
async def test_a_uma_resource_sets(uma: KeycloakUMA) -> None:
"""
Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -406,7 +417,8 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
assert len(resource_set_list_ids) == 0
# With matchingUri query option
resource_set_list_ids = await uma.a_resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True
uri="/some_resources/resource",
matchingUri=True,
)
assert len(resource_set_list_ids) == 1
@ -416,8 +428,8 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
'"Resource with name [mytest] already exists."}\'',
),
)
# Test get resource set
@ -427,18 +439,18 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
assert res == {}, res
updated_resource = await uma.a_resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
assert err.match("Unrecognized field")
# Test delete resource set
res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
assert res == {}, res
with pytest.raises(KeycloakGetError) as err:
await uma.a_resource_set_read(created_resource["_id"])
err.match("404: b''")
@ -450,8 +462,9 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
@pytest.mark.asyncio
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin) -> None:
"""
Test policies.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -513,7 +526,8 @@ async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
with pytest.raises(KeycloakDeleteError) as err:
await uma.a_policy_delete(policy_id)
assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
'404: b\'{"error":"invalid_request","error_description":'
'"Policy with .* does not exist"}\'',
)
policies = await uma.a_policy_query()
@ -550,8 +564,9 @@ async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
@pytest.mark.asyncio
async def test_a_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
async def test_a_uma_access(uma: KeycloakUMA) -> None:
"""
Test permission access checks.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -573,7 +588,7 @@ async def test_a_uma_access(uma: KeycloakUMA):
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
token = uma.connection.token
permissions = list()
permissions = []
assert await uma.a_permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource=resource_to_create["name"]))
@ -585,8 +600,9 @@ async def test_a_uma_access(uma: KeycloakUMA):
@pytest.mark.asyncio
async def test_a_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.
async def test_a_uma_permission_ticket(uma: KeycloakUMA) -> None:
"""
Test permission ticket generation.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
@ -612,7 +628,8 @@ async def test_a_uma_permission_ticket(uma: KeycloakUMA):
response = await uma.a_permission_ticket_create(permissions)
rpt = await uma.connection.keycloak_openid.a_token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket",
ticket=response["ticket"],
)
assert rpt
assert "access_token" in rpt
@ -624,7 +641,7 @@ async def test_a_uma_permission_ticket(uma: KeycloakUMA):
await uma.a_resource_set_delete(resource["_id"])
def test_counter_part():
def test_counter_part() -> None:
"""Test that each function has its async counter part."""
uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
sync_methods = [

7
tests/test_license.py

@ -1,15 +1,16 @@
"""Tests for license."""
import os
import pathlib
def test_license_present():
def test_license_present() -> None:
"""Test that the MIT license is present in the header of each module file."""
for path, _, files in os.walk("src/keycloak"):
for _file in files:
if _file.endswith(".py"):
with open(os.path.join(path, _file), "r") as fp:
with pathlib.Path(pathlib.Path(path) / _file).open("r") as fp:
content = fp.read()
assert content.startswith(
"# -*- coding: utf-8 -*-\n#\n# The MIT License (MIT)\n#\n#"
"#\n# The MIT License (MIT)\n#\n#",
)

63
tests/test_uma_permissions.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
@ -31,7 +30,7 @@ from keycloak.uma_permissions import (
)
def test_uma_permission_obj():
def test_uma_permission_obj() -> None:
"""Test generic UMA permission."""
with pytest.raises(PermissionDefinitionError):
UMAPermission(permission="bad")
@ -50,35 +49,35 @@ def test_uma_permission_obj():
assert {p1, p1} != {p2, p2}
def test_resource_with_scope_obj():
def test_resource_with_scope_obj() -> None:
"""Test resource with scope."""
r = Resource("Resource1")
s = Scope("Scope1")
assert r(s) == "Resource1#Scope1"
def test_scope_with_resource_obj():
def test_scope_with_resource_obj() -> None:
"""Test scope with resource."""
r = Resource("Resource1")
s = Scope("Scope1")
assert s(r) == "Resource1#Scope1"
def test_resource_scope_str():
def test_resource_scope_str() -> None:
"""Test resource scope as string."""
r = Resource("Resource1")
s = "Scope1"
assert r(scope=s) == "Resource1#Scope1"
def test_scope_resource_str():
def test_scope_resource_str() -> None:
"""Test scope resource as string."""
r = "Resource1"
s = Scope("Scope1")
assert s(resource=r) == "Resource1#Scope1"
def test_resource_scope_list():
def test_resource_scope_list() -> None:
"""Test resource scope as list."""
r = Resource("Resource1")
s = ["Scope1"]
@ -87,126 +86,126 @@ def test_resource_scope_list():
assert err.match(re.escape("can't determine if '['Scope1']' is a resource or scope"))
def test_build_permission_none():
def test_build_permission_none() -> None:
"""Test build permission param with None."""
assert build_permission_param(None) == set()
def test_build_permission_empty_str():
def test_build_permission_empty_str() -> None:
"""Test build permission param with an empty string."""
assert build_permission_param("") == set()
def test_build_permission_empty_list():
def test_build_permission_empty_list() -> None:
"""Test build permission param with an empty list."""
assert build_permission_param([]) == set()
def test_build_permission_empty_tuple():
def test_build_permission_empty_tuple() -> None:
"""Test build permission param with an empty tuple."""
assert build_permission_param(()) == set()
def test_build_permission_empty_set():
def test_build_permission_empty_set() -> None:
"""Test build permission param with an empty set."""
assert build_permission_param(set()) == set()
def test_build_permission_empty_dict():
def test_build_permission_empty_dict() -> None:
"""Test build permission param with an empty dict."""
assert build_permission_param({}) == set()
def test_build_permission_str():
def test_build_permission_str() -> None:
"""Test build permission param as string."""
assert build_permission_param("resource1") == {"resource1"}
def test_build_permission_list_str():
def test_build_permission_list_str() -> None:
"""Test build permission param with list of strings."""
assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_str():
def test_build_permission_tuple_str() -> None:
"""Test build permission param with tuple of strings."""
assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"}
def test_build_permission_set_str():
def test_build_permission_set_str() -> None:
"""Test build permission param with set of strings."""
assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_str():
def test_build_permission_tuple_dict_str_str() -> None:
"""Test build permission param with dictionary."""
assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"}
def test_build_permission_tuple_dict_str_list_str():
def test_build_permission_tuple_dict_str_list_str() -> None:
"""Test build permission param with dictionary of list."""
assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_list_str2():
def test_build_permission_tuple_dict_str_list_str2() -> None:
"""Test build permission param with mutliple-keyed dictionary."""
assert build_permission_param(
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]}
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]},
) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}
def test_build_permission_uma():
def test_build_permission_uma() -> None:
"""Test build permission param with UMA."""
assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"}
def test_build_permission_uma_list():
def test_build_permission_uma_list() -> None:
"""Test build permission param with list of UMAs."""
assert build_permission_param(
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))]
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))],
) == {"res1#scope1", "res1#scope2"}
def test_build_permission_misbuilt_dict_str_list_list_str():
def test_build_permission_misbuilt_dict_str_list_list_str() -> None:
"""Test bad build of permission param from dictionary."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": [["scope1", "scope2"]]})
assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}"))
def test_build_permission_misbuilt_list_list_str():
def test_build_permission_misbuilt_list_list_str() -> None:
"""Test bad build of permission param from list."""
with pytest.raises(KeycloakPermissionFormatError) as err:
print(build_permission_param([["scope1", "scope2"]]))
build_permission_param([["scope1", "scope2"]])
assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]"))
def test_build_permission_misbuilt_list_set_str():
def test_build_permission_misbuilt_list_set_str() -> None:
"""Test bad build of permission param from set."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1", "scope2"}])
assert err.match("misbuilt permission.*")
def test_build_permission_misbuilt_set_set_str():
def test_build_permission_misbuilt_set_set_str() -> None:
"""Test bad build of permission param from list of set."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1"}])
assert err.match(re.escape("misbuilt permission [{'scope1'}]"))
def test_build_permission_misbuilt_dict_non_iterable():
def test_build_permission_misbuilt_dict_non_iterable() -> None:
"""Test bad build of permission param from non-iterable."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": 5})
assert err.match(re.escape("misbuilt permission {'res1': 5}"))
def test_auth_status_bool():
def test_auth_status_bool() -> None:
"""Test bool method of AuthStatus."""
assert not bool(AuthStatus(is_logged_in=True, is_authorized=False, missing_permissions=""))
assert bool(AuthStatus(is_logged_in=True, is_authorized=True, missing_permissions=""))
def test_build_permission_without_scopes():
def test_build_permission_without_scopes() -> None:
"""Test build permission param with scopes."""
assert build_permission_param(permissions={"Resource": None}) == {"Resource"}

12
tests/test_urls_patterns.py

@ -5,7 +5,7 @@ import inspect
from keycloak import urls_patterns
def test_correctness_of_patterns():
def test_correctness_of_patterns() -> None:
"""Test that there are no duplicate url patterns."""
# Test that the patterns are present
urls = [x for x in dir(urls_patterns) if not x.startswith("__")]
@ -16,7 +16,7 @@ def test_correctness_of_patterns():
assert url.startswith("URL_"), f"The url pattern {url} does not begin with URL_"
# Test that the patterns have unique names
seen_urls = list()
seen_urls = []
urls_from_src = [
x.split("=")[0].strip()
for x in inspect.getsource(urls_patterns).splitlines()
@ -27,11 +27,11 @@ def test_correctness_of_patterns():
seen_urls.append(url)
# Test that the pattern values are unique
seen_url_values = list()
seen_url_values = []
for url in urls:
url_value = urls_patterns.__dict__[url]
assert url_value not in seen_url_values, f"The url {url} has a duplicate value {url_value}"
assert (
url_value == url_value.strip()
), f"The url {url} with value '{url_value}' has whitespace values"
assert url_value == url_value.strip(), (
f"The url {url} with value '{url_value}' has whitespace values"
)
seen_url_values.append(url_value)

10
tox.ini

@ -6,20 +6,16 @@ envlist = check, apply-check, docs, tests, build, changelog
[testenv]
allowlist_externals = poetry, ./test_keycloak_init.sh
commands_pre =
poetry install --sync
poetry sync
[testenv:check]
commands =
black --check --diff src/keycloak tests docs
isort -c --df src/keycloak tests docs
flake8 src/keycloak tests docs
ruff check src/keycloak tests docs
codespell src tests docs
[testenv:apply-check]
commands =
black -C src/keycloak tests docs
black src/keycloak tests docs
isort src/keycloak tests docs
ruff check --fix src/keycloak tests docs
[testenv:docs]
commands =

Loading…
Cancel
Save