Browse Source

fix: ruffing up

ci/fix-tests-26-1
Richard Nemeth 2 weeks ago
parent
commit
17d7d9cf37
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 5
      docs/source/conf.py
  2. 949
      poetry.lock
  3. 28
      pyproject.toml
  4. 9
      src/keycloak/__init__.py
  5. 1
      src/keycloak/_version.py
  6. 18
      src/keycloak/authorization/__init__.py
  7. 65
      src/keycloak/authorization/permission.py
  8. 82
      src/keycloak/authorization/policy.py
  9. 24
      src/keycloak/authorization/role.py
  10. 170
      src/keycloak/connection.py
  11. 95
      src/keycloak/exceptions.py
  12. 4561
      src/keycloak/keycloak_admin.py
  13. 570
      src/keycloak/keycloak_openid.py
  14. 311
      src/keycloak/keycloak_uma.py
  15. 247
      src/keycloak/openid_connection.py
  16. 121
      src/keycloak/uma_permissions.py
  17. 5
      src/keycloak/urls_patterns.py
  18. 32
      tests/conftest.py
  19. 2
      tests/test_connection.py
  20. 2
      tests/test_exceptions.py
  21. 524
      tests/test_keycloak_admin.py
  22. 70
      tests/test_keycloak_openid.py
  23. 24
      tests/test_keycloak_uma.py
  24. 4
      tests/test_license.py
  25. 5
      tests/test_uma_permissions.py
  26. 6
      tests/test_urls_patterns.py

5
docs/source/conf.py

@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
# #
# python-keycloak documentation build configuration file, created by # python-keycloak documentation build configuration file, created by
# sphinx-quickstart on Tue Aug 15 11:02:59 2017. # sphinx-quickstart on Tue Aug 15 11:02:59 2017.
@ -169,7 +168,7 @@ latex_documents = [
"python-keycloak Documentation", "python-keycloak Documentation",
"Marcos Pereira", "Marcos Pereira",
"manual", "manual",
)
),
] ]
@ -194,5 +193,5 @@ texinfo_documents = [
"python-keycloak", "python-keycloak",
"One line description of project.", "One line description of project.",
"Miscellaneous", "Miscellaneous",
)
),
] ]

949
poetry.lock
File diff suppressed because it is too large
View File

28
pyproject.toml

@ -37,6 +37,7 @@ deprecation = ">=2.1.0"
jwcrypto = ">=1.5.4" jwcrypto = ">=1.5.4"
httpx = ">=0.23.2" httpx = ">=0.23.2"
async-property = ">=0.2.2" async-property = ">=0.2.2"
aiofiles = ">=24.1.0"
[tool.poetry.group.docs.dependencies] [tool.poetry.group.docs.dependencies]
alabaster = ">=0.7.0" alabaster = ">=0.7.0"
@ -56,10 +57,6 @@ pytest-cov = ">=3.0.0"
pytest-asyncio = ">=0.23.7" pytest-asyncio = ">=0.23.7"
wheel = ">=0.38.4" wheel = ">=0.38.4"
pre-commit = ">=3.5.0" pre-commit = ">=3.5.0"
isort = ">=5.10.1"
black = ">=22.3.0"
flake8 = ">=7.0.0"
flake8-docstrings = ">=1.6.0"
commitizen = ">=2.28.0" commitizen = ">=2.28.0"
cryptography = ">=42.0.0" cryptography = ">=42.0.0"
codespell = ">=2.1.0" codespell = ">=2.1.0"
@ -67,6 +64,7 @@ darglint = ">=1.8.1"
twine = ">=4.0.2" twine = ">=4.0.2"
freezegun = ">=1.2.2" freezegun = ">=1.2.2"
docutils = "<0.21" docutils = "<0.21"
ruff = ">=0.9.3"
[[tool.poetry.source]] [[tool.poetry.source]]
name = "PyPI" name = "PyPI"
@ -76,15 +74,23 @@ priority = "primary"
requires = ["poetry-core>=1.0.0"] requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.black]
[tool.ruff]
line-length = 99 line-length = 99
[tool.isort]
line_length = 99
profile = "black"
[tool.darglint]
enable = "DAR104"
[tool.ruff.lint]
select = ["ALL"]
ignore = [
"BLE001",
"C901",
"D203",
"D212",
"FBT001",
"FBT002",
"N818",
"PLR0912",
"PLR0913",
"TRY003",
]
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "function" asyncio_default_fixture_loop_scope = "function"

9
src/keycloak/__init__.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -46,8 +45,8 @@ from .keycloak_uma import KeycloakUMA
from .openid_connection import KeycloakOpenIDConnection from .openid_connection import KeycloakOpenIDConnection
__all__ = [ __all__ = [
"__version__",
"ConnectionManager", "ConnectionManager",
"KeycloakAdmin",
"KeycloakAuthenticationError", "KeycloakAuthenticationError",
"KeycloakAuthorizationConfigError", "KeycloakAuthorizationConfigError",
"KeycloakConnectionError", "KeycloakConnectionError",
@ -56,13 +55,13 @@ __all__ = [
"KeycloakError", "KeycloakError",
"KeycloakGetError", "KeycloakGetError",
"KeycloakInvalidTokenError", "KeycloakInvalidTokenError",
"KeycloakOpenID",
"KeycloakOpenIDConnection",
"KeycloakOperationError", "KeycloakOperationError",
"KeycloakPostError", "KeycloakPostError",
"KeycloakPutError", "KeycloakPutError",
"KeycloakRPTNotFound", "KeycloakRPTNotFound",
"KeycloakSecretNotFound", "KeycloakSecretNotFound",
"KeycloakAdmin",
"KeycloakOpenID",
"KeycloakOpenIDConnection",
"KeycloakUMA", "KeycloakUMA",
"__version__",
] ]

1
src/keycloak/_version.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #

18
src/keycloak/authorization/__init__.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -32,19 +31,21 @@ from .role import Role
class Authorization: class Authorization:
"""Keycloak Authorization (policies, roles, scopes and resources).
"""
Keycloak Authorization (policies, roles, scopes and resources).
https://keycloak.gitbooks.io/documentation/authorization_services/index.html https://keycloak.gitbooks.io/documentation/authorization_services/index.html
""" """
def __init__(self):
def __init__(self) -> None:
"""Init method.""" """Init method."""
self.policies = {} self.policies = {}
@property @property
def policies(self):
"""Get policies.
def policies(self) -> dict:
"""
Get policies.
:returns: Policies :returns: Policies
:rtype: dict :rtype: dict
@ -52,11 +53,12 @@ class Authorization:
return self._policies return self._policies
@policies.setter @policies.setter
def policies(self, value):
def policies(self, value: dict) -> None:
self._policies = value self._policies = value
def load_config(self, data):
"""Load policies, roles and permissions (scope/resources).
def load_config(self, data: dict) -> None:
"""
Load policies, roles and permissions (scope/resources).
:param data: keycloak authorization data (dict) :param data: keycloak authorization data (dict)
:type data: dict :type data: dict

65
src/keycloak/authorization/permission.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -25,7 +24,8 @@
class Permission: class Permission:
"""Base permission class.
"""
Base permission class.
Consider this simple and very common permission: Consider this simple and very common permission:
@ -56,8 +56,9 @@ class Permission:
""" """
def __init__(self, name, type, logic, decision_strategy):
"""Init method.
def __init__(self, name: str, type: str, logic: str, decision_strategy: str) -> None: # noqa: A002
"""
Init method.
:param name: Name :param name: Name
:type name: str :type name: str
@ -75,25 +76,28 @@ class Permission:
self.resources = [] self.resources = []
self.scopes = [] self.scopes = []
def __repr__(self):
"""Repr method.
def __repr__(self) -> str:
"""
Repr method.
:returns: Class representation :returns: Class representation
:rtype: str :rtype: str
""" """
return "<Permission: %s (%s)>" % (self.name, self.type)
return f"<Permission: {self.name} ({self.type})>"
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: Class string representation :returns: Class string representation
:rtype: str :rtype: str
""" """
return "Permission: %s (%s)" % (self.name, self.type)
return f"Permission: {self.name} ({self.type})"
@property @property
def name(self):
"""Get name.
def name(self) -> str:
"""
Get name.
:returns: name :returns: name
:rtype: str :rtype: str
@ -101,12 +105,13 @@ class Permission:
return self._name return self._name
@name.setter @name.setter
def name(self, value):
def name(self, value: str) -> None:
self._name = value self._name = value
@property @property
def type(self):
"""Get type.
def type(self) -> str:
"""
Get type.
:returns: type :returns: type
:rtype: str :rtype: str
@ -114,12 +119,13 @@ class Permission:
return self._type return self._type
@type.setter @type.setter
def type(self, value):
def type(self, value: str) -> None:
self._type = value self._type = value
@property @property
def logic(self):
"""Get logic.
def logic(self) -> str:
"""
Get logic.
:returns: Logic :returns: Logic
:rtype: str :rtype: str
@ -127,12 +133,13 @@ class Permission:
return self._logic return self._logic
@logic.setter @logic.setter
def logic(self, value):
def logic(self, value: str) -> str:
self._logic = value self._logic = value
@property @property
def decision_strategy(self):
"""Get decision strategy.
def decision_strategy(self) -> str:
"""
Get decision strategy.
:returns: Decision strategy :returns: Decision strategy
:rtype: str :rtype: str
@ -140,12 +147,13 @@ class Permission:
return self._decision_strategy return self._decision_strategy
@decision_strategy.setter @decision_strategy.setter
def decision_strategy(self, value):
def decision_strategy(self, value: str) -> None:
self._decision_strategy = value self._decision_strategy = value
@property @property
def resources(self):
"""Get resources.
def resources(self) -> list:
"""
Get resources.
:returns: Resources :returns: Resources
:rtype: list :rtype: list
@ -153,12 +161,13 @@ class Permission:
return self._resources return self._resources
@resources.setter @resources.setter
def resources(self, value):
def resources(self, value: list) -> None:
self._resources = value self._resources = value
@property @property
def scopes(self):
"""Get scopes.
def scopes(self) -> list:
"""
Get scopes.
:returns: Scopes :returns: Scopes
:rtype: list :rtype: list
@ -166,5 +175,5 @@ class Permission:
return self._scopes return self._scopes
@scopes.setter @scopes.setter
def scopes(self, value):
def scopes(self, value: list) -> None:
self._scopes = value self._scopes = value

82
src/keycloak/authorization/policy.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -23,11 +22,12 @@
"""Keycloak authorization Policy module.""" """Keycloak authorization Policy module."""
from ..exceptions import KeycloakAuthorizationConfigError
from keycloak.exceptions import KeycloakAuthorizationConfigError
class Policy: class Policy:
"""Base policy class.
"""
Base policy class.
A policy defines the conditions that must be satisfied to grant access to an object. A policy defines the conditions that must be satisfied to grant access to an object.
Unlike permissions, you do not specify the object being protected but rather the conditions Unlike permissions, you do not specify the object being protected but rather the conditions
@ -50,8 +50,9 @@ class Policy:
""" """
def __init__(self, name, type, logic, decision_strategy):
"""Init method.
def __init__(self, name: str, type: str, logic: str, decision_strategy: str) -> None: # noqa: A002
"""
Init method.
:param name: Name :param name: Name
:type name: str :type name: str
@ -69,25 +70,28 @@ class Policy:
self.roles = [] self.roles = []
self.permissions = [] self.permissions = []
def __repr__(self):
"""Repr method.
def __repr__(self) -> str:
"""
Repr method.
:returns: Class representation :returns: Class representation
:rtype: str :rtype: str
""" """
return "<Policy: %s (%s)>" % (self.name, self.type)
return f"<Policy: {self.name} ({self.type})>"
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: Class string representation :returns: Class string representation
:rtype: str :rtype: str
""" """
return "Policy: %s (%s)" % (self.name, self.type)
return f"Policy: {self.name} ({self.type})"
@property @property
def name(self):
"""Get name.
def name(self) -> str:
"""
Get name.
:returns: Name :returns: Name
:rtype: str :rtype: str
@ -95,12 +99,13 @@ class Policy:
return self._name return self._name
@name.setter @name.setter
def name(self, value):
def name(self, value: str) -> None:
self._name = value self._name = value
@property @property
def type(self):
"""Get type.
def type(self) -> str:
"""
Get type.
:returns: Type :returns: Type
:rtype: str :rtype: str
@ -108,12 +113,13 @@ class Policy:
return self._type return self._type
@type.setter @type.setter
def type(self, value):
def type(self, value: str) -> None:
self._type = value self._type = value
@property @property
def logic(self):
"""Get logic.
def logic(self) -> str:
"""
Get logic.
:returns: Logic :returns: Logic
:rtype: str :rtype: str
@ -121,12 +127,13 @@ class Policy:
return self._logic return self._logic
@logic.setter @logic.setter
def logic(self, value):
def logic(self, value: str) -> None:
self._logic = value self._logic = value
@property @property
def decision_strategy(self):
"""Get decision strategy.
def decision_strategy(self) -> str:
"""
Get decision strategy.
:returns: Decision strategy :returns: Decision strategy
:rtype: str :rtype: str
@ -134,12 +141,13 @@ class Policy:
return self._decision_strategy return self._decision_strategy
@decision_strategy.setter @decision_strategy.setter
def decision_strategy(self, value):
def decision_strategy(self, value: str) -> None:
self._decision_strategy = value self._decision_strategy = value
@property @property
def roles(self):
"""Get roles.
def roles(self) -> list:
"""
Get roles.
:returns: Roles :returns: Roles
:rtype: list :rtype: list
@ -147,12 +155,13 @@ class Policy:
return self._roles return self._roles
@roles.setter @roles.setter
def roles(self, value):
def roles(self, value: list) -> None:
self._roles = value self._roles = value
@property @property
def permissions(self):
"""Get permissions.
def permissions(self) -> list:
"""
Get permissions.
:returns: Permissions :returns: Permissions
:rtype: list :rtype: list
@ -160,24 +169,25 @@ class Policy:
return self._permissions return self._permissions
@permissions.setter @permissions.setter
def permissions(self, value):
def permissions(self, value: list) -> None:
self._permissions = value self._permissions = value
def add_role(self, role):
"""Add keycloak role in policy.
def add_role(self, role: dict) -> None:
"""
Add keycloak role in policy.
:param role: Keycloak role :param role: Keycloak role
:type role: keycloak.authorization.Role :type role: keycloak.authorization.Role
:raises KeycloakAuthorizationConfigError: In case of misconfigured policy type :raises KeycloakAuthorizationConfigError: In case of misconfigured policy type
""" """
if self.type != "role": if self.type != "role":
raise KeycloakAuthorizationConfigError(
"Can't add role. Policy type is different of role"
)
error_msg = "Can't add role. Policy type is different of role"
raise KeycloakAuthorizationConfigError(error_msg)
self._roles.append(role) self._roles.append(role)
def add_permission(self, permission):
"""Add keycloak permission in policy.
def add_permission(self, permission: dict) -> None:
"""
Add keycloak permission in policy.
:param permission: Keycloak permission :param permission: Keycloak permission
:type permission: keycloak.authorization.Permission :type permission: keycloak.authorization.Permission

24
src/keycloak/authorization/role.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -25,7 +24,8 @@
class Role: class Role:
"""Authorization Role base class.
"""
Authorization Role base class.
Roles identify a type or category of user. Admin, user, Roles identify a type or category of user. Admin, user,
manager, and employee are all typical roles that may exist in an organization. manager, and employee are all typical roles that may exist in an organization.
@ -38,8 +38,9 @@ class Role:
:type required: bool :type required: bool
""" """
def __init__(self, name, required=False):
"""Init method.
def __init__(self, name: str, required: bool = False) -> None:
"""
Init method.
:param name: Name :param name: Name
:type name: str :type name: str
@ -49,22 +50,25 @@ class Role:
self.name = name self.name = name
self.required = required self.required = required
def get_name(self):
"""Get name.
def get_name(self) -> str:
"""
Get name.
:returns: Name :returns: Name
:rtype: str :rtype: str
""" """
return self.name return self.name
def __eq__(self, other):
"""Eq method.
def __eq__(self, other: str) -> bool:
"""
Eq method.
:param other: The other object :param other: The other object
:type other: str :type other: str
:returns: Equality bool :returns: Equality bool
:rtype: bool | NotImplemented
:rtype: bool
""" """
if isinstance(other, str): if isinstance(other, str):
return self.name == other return self.name == other
return NotImplemented
raise NotImplementedError

170
src/keycloak/connection.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -23,6 +22,8 @@
"""Connection manager module.""" """Connection manager module."""
from __future__ import annotations
try: try:
from urllib.parse import urljoin from urllib.parse import urljoin
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
@ -30,13 +31,16 @@ except ImportError: # pragma: no cover
import httpx import httpx
import requests import requests
from httpx import Response as AsyncResponse
from requests import Response
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from .exceptions import KeycloakConnectionError from .exceptions import KeycloakConnectionError
class ConnectionManager(object):
"""Represents a simple server connection.
class ConnectionManager:
"""
Represents a simple server connection.
:param base_url: The server URL. :param base_url: The server URL.
:type base_url: str :type base_url: str
@ -58,9 +62,17 @@ class ConnectionManager(object):
""" """
def __init__( def __init__(
self, base_url, headers={}, timeout=60, verify=True, proxies=None, cert=None, max_retries=1
):
"""Init method.
self,
base_url: str,
headers: dict | None = None,
timeout: int = 60,
verify: bool = True,
proxies: dict | None = None,
cert: str | tuple | None = None,
max_retries: int = 1,
) -> None:
"""
Init method.
:param base_url: The server URL. :param base_url: The server URL.
:type base_url: str :type base_url: str
@ -106,19 +118,20 @@ class ConnectionManager(object):
self.async_s.auth = None # don't let requests add auth headers self.async_s.auth = None # don't let requests add auth headers
self.async_s.transport = httpx.AsyncHTTPTransport(retries=1) self.async_s.transport = httpx.AsyncHTTPTransport(retries=1)
async def aclose(self):
async def aclose(self) -> None:
"""Close the async connection on delete.""" """Close the async connection on delete."""
if hasattr(self, "_s"): if hasattr(self, "_s"):
await self.async_s.aclose() await self.async_s.aclose()
def __del__(self):
def __del__(self) -> None:
"""Del method.""" """Del method."""
if hasattr(self, "_s"): if hasattr(self, "_s"):
self._s.close() self._s.close()
@property @property
def base_url(self):
"""Return base url in use for requests to the server.
def base_url(self) -> str:
"""
Return base url in use for requests to the server.
:returns: Base URL :returns: Base URL
:rtype: str :rtype: str
@ -126,12 +139,13 @@ class ConnectionManager(object):
return self._base_url return self._base_url
@base_url.setter @base_url.setter
def base_url(self, value):
def base_url(self, value: str) -> None:
self._base_url = value self._base_url = value
@property @property
def timeout(self):
"""Return timeout in use for request to the server.
def timeout(self) -> int:
"""
Return timeout in use for request to the server.
:returns: Timeout :returns: Timeout
:rtype: int :rtype: int
@ -139,12 +153,13 @@ class ConnectionManager(object):
return self._timeout return self._timeout
@timeout.setter @timeout.setter
def timeout(self, value):
def timeout(self, value: int) -> None:
self._timeout = value self._timeout = value
@property @property
def verify(self):
"""Return verify in use for request to the server.
def verify(self) -> bool:
"""
Return verify in use for request to the server.
:returns: Verify indicator :returns: Verify indicator
:rtype: bool :rtype: bool
@ -152,12 +167,13 @@ class ConnectionManager(object):
return self._verify return self._verify
@verify.setter @verify.setter
def verify(self, value):
def verify(self, value: bool) -> None:
self._verify = value self._verify = value
@property @property
def cert(self):
"""Return client certificates in use for request to the server.
def cert(self) -> str | tuple:
"""
Return client certificates in use for request to the server.
:returns: Client certificate :returns: Client certificate
:rtype: Union[str,Tuple[str,str]] :rtype: Union[str,Tuple[str,str]]
@ -165,12 +181,13 @@ class ConnectionManager(object):
return self._cert return self._cert
@cert.setter @cert.setter
def cert(self, value):
def cert(self, value: str | tuple) -> None:
self._cert = value self._cert = value
@property @property
def headers(self):
"""Return header request to the server.
def headers(self) -> dict:
"""
Return header request to the server.
:returns: Request headers :returns: Request headers
:rtype: dict :rtype: dict
@ -178,11 +195,12 @@ class ConnectionManager(object):
return self._headers return self._headers
@headers.setter @headers.setter
def headers(self, value):
self._headers = value
def headers(self, value: dict) -> None:
self._headers = value or {}
def param_headers(self, key):
"""Return a specific header parameter.
def param_headers(self, key: str) -> str | None:
"""
Return a specific header parameter.
:param key: Header parameters key. :param key: Header parameters key.
:type key: str :type key: str
@ -191,12 +209,13 @@ class ConnectionManager(object):
""" """
return self.headers.get(key) return self.headers.get(key)
def clean_headers(self):
def clean_headers(self) -> None:
"""Clear header parameters.""" """Clear header parameters."""
self.headers = {} self.headers = {}
def exist_param_headers(self, key):
"""Check if the parameter exists in the header.
def exist_param_headers(self, key: str) -> bool:
"""
Check if the parameter exists in the header.
:param key: Header parameters key. :param key: Header parameters key.
:type key: str :type key: str
@ -205,8 +224,9 @@ class ConnectionManager(object):
""" """
return self.param_headers(key) is not None return self.param_headers(key) is not None
def add_param_headers(self, key, value):
"""Add a single parameter inside the header.
def add_param_headers(self, key: str, value: str) -> None:
"""
Add a single parameter inside the header.
:param key: Header parameters key. :param key: Header parameters key.
:type key: str :type key: str
@ -215,16 +235,18 @@ class ConnectionManager(object):
""" """
self.headers[key] = value self.headers[key] = value
def del_param_headers(self, key):
"""Remove a specific parameter.
def del_param_headers(self, key: str) -> None:
"""
Remove a specific parameter.
:param key: Key of the header parameters. :param key: Key of the header parameters.
:type key: str :type key: str
""" """
self.headers.pop(key, None) self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
"""Submit get request to the path.
def raw_get(self, path: str, **kwargs: dict) -> Response:
"""
Submit get request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -244,10 +266,12 @@ class ConnectionManager(object):
cert=self.cert, cert=self.cert,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
def raw_post(self, path, data, **kwargs):
"""Submit post request to the path.
def raw_post(self, path: str, data: dict, **kwargs: dict) -> Response:
"""
Submit post request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -270,10 +294,12 @@ class ConnectionManager(object):
cert=self.cert, cert=self.cert,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
def raw_put(self, path, data, **kwargs):
"""Submit put request to the path.
def raw_put(self, path: str, data: dict, **kwargs: dict) -> Response:
"""
Submit put request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -296,10 +322,12 @@ class ConnectionManager(object):
cert=self.cert, cert=self.cert,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
def raw_delete(self, path, data=None, **kwargs):
"""Submit delete request to the path.
def raw_delete(self, path: str, data: dict | None = None, **kwargs: dict) -> Response:
"""
Submit delete request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -312,21 +340,22 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server. :raises KeycloakConnectionError: HttpError Can't connect to server.
""" """
try: try:
r = self._s.delete(
return self._s.delete(
urljoin(self.base_url, path), urljoin(self.base_url, path),
params=kwargs, params=kwargs,
data=data or dict(),
data=data or {},
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
verify=self.verify, verify=self.verify,
cert=self.cert, cert=self.cert,
) )
return r
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_get(self, path, **kwargs):
"""Submit get request to the path.
async def a_raw_get(self, path: str, **kwargs: dict) -> AsyncResponse:
"""
Submit get request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -344,10 +373,12 @@ class ConnectionManager(object):
timeout=self.timeout, timeout=self.timeout,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_post(self, path, data, **kwargs):
"""Submit post request to the path.
async def a_raw_post(self, path: str, data: dict, **kwargs: dict) -> AsyncResponse:
"""
Submit post request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -369,10 +400,12 @@ class ConnectionManager(object):
timeout=self.timeout, timeout=self.timeout,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_put(self, path, data, **kwargs):
"""Submit put request to the path.
async def a_raw_put(self, path: str, data: dict, **kwargs: dict) -> AsyncResponse:
"""
Submit put request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -393,10 +426,17 @@ class ConnectionManager(object):
timeout=self.timeout, timeout=self.timeout,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
async def a_raw_delete(self, path, data=None, **kwargs):
"""Submit delete request to the path.
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
async def a_raw_delete(
self,
path: str,
data: dict | None = None,
**kwargs: dict,
) -> AsyncResponse:
"""
Submit delete request to the path.
:param path: Path for request. :param path: Path for request.
:type path: str :type path: str
@ -412,17 +452,19 @@ class ConnectionManager(object):
return await self.async_s.request( return await self.async_s.request(
method="DELETE", method="DELETE",
url=urljoin(self.base_url, path), url=urljoin(self.base_url, path),
data=data or dict(),
data=data or {},
params=self._filter_query_params(kwargs), params=self._filter_query_params(kwargs),
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e
@staticmethod @staticmethod
def _filter_query_params(query_params):
"""Explicitly filter query params with None values for compatibility.
def _filter_query_params(query_params: dict) -> dict:
"""
Explicitly filter query params with None values for compatibility.
Httpx and requests differ in the way they handle query params with the value None, Httpx and requests differ in the way they handle query params with the value None,
requests does not include params with the value None while httpx includes them as-is. requests does not include params with the value None while httpx includes them as-is.

95
src/keycloak/exceptions.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -23,11 +22,32 @@
"""Keycloak custom exceptions module.""" """Keycloak custom exceptions module."""
from __future__ import annotations
from typing import TYPE_CHECKING
import requests import requests
if TYPE_CHECKING:
from httpx import Response as AsyncResponse
from requests import Response
HTTP_OK = 200
HTTP_CREATED = 201
HTTP_ACCEPTED = 202
HTTP_NO_CONTENT = 204
HTTP_BAD_REQUEST = 400
HTTP_UNAUTHORIZED = 401
HTTP_FORBIDDEN = 403
HTTP_NOT_FOUND = 404
HTTP_NOT_ALLOWED = 405
HTTP_CONFLICT = 409
class KeycloakError(Exception): class KeycloakError(Exception):
"""Base class for custom Keycloak errors.
"""
Base class for custom Keycloak errors.
:param error_message: The error message :param error_message: The error message
:type error_message: str :type error_message: str
@ -35,8 +55,14 @@ class KeycloakError(Exception):
:type response_code: int :type response_code: int
""" """
def __init__(self, error_message="", response_code=None, response_body=None):
"""Init method.
def __init__(
self,
error_message: str = "",
response_code: int | None = None,
response_body: bytes | None = None,
) -> None:
"""
Init method.
:param error_message: The error message :param error_message: The error message
:type error_message: str :type error_message: str
@ -51,104 +77,82 @@ class KeycloakError(Exception):
self.response_body = response_body self.response_body = response_body
self.error_message = error_message self.error_message = error_message
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: String representation of the object :returns: String representation of the object
:rtype: str :rtype: str
""" """
if self.response_code is not None: if self.response_code is not None:
return "{0}: {1}".format(self.response_code, self.error_message)
else:
return "{0}".format(self.error_message)
return f"{self.response_code}: {self.error_message}"
return f"{self.error_message}"
class KeycloakAuthenticationError(KeycloakError): class KeycloakAuthenticationError(KeycloakError):
"""Keycloak authentication error exception.""" """Keycloak authentication error exception."""
pass
class KeycloakConnectionError(KeycloakError): class KeycloakConnectionError(KeycloakError):
"""Keycloak connection error exception.""" """Keycloak connection error exception."""
pass
class KeycloakOperationError(KeycloakError): class KeycloakOperationError(KeycloakError):
"""Keycloak operation error exception.""" """Keycloak operation error exception."""
pass
class KeycloakDeprecationError(KeycloakError): class KeycloakDeprecationError(KeycloakError):
"""Keycloak deprecation error exception.""" """Keycloak deprecation error exception."""
pass
class KeycloakGetError(KeycloakOperationError): class KeycloakGetError(KeycloakOperationError):
"""Keycloak request get error exception.""" """Keycloak request get error exception."""
pass
class KeycloakPostError(KeycloakOperationError): class KeycloakPostError(KeycloakOperationError):
"""Keycloak request post error exception.""" """Keycloak request post error exception."""
pass
class KeycloakPutError(KeycloakOperationError): class KeycloakPutError(KeycloakOperationError):
"""Keycloak request put error exception.""" """Keycloak request put error exception."""
pass
class KeycloakDeleteError(KeycloakOperationError): class KeycloakDeleteError(KeycloakOperationError):
"""Keycloak request delete error exception.""" """Keycloak request delete error exception."""
pass
class KeycloakSecretNotFound(KeycloakOperationError): class KeycloakSecretNotFound(KeycloakOperationError):
"""Keycloak secret not found exception.""" """Keycloak secret not found exception."""
pass
class KeycloakRPTNotFound(KeycloakOperationError): class KeycloakRPTNotFound(KeycloakOperationError):
"""Keycloak RPT not found exception.""" """Keycloak RPT not found exception."""
pass
class KeycloakAuthorizationConfigError(KeycloakOperationError): class KeycloakAuthorizationConfigError(KeycloakOperationError):
"""Keycloak authorization config exception.""" """Keycloak authorization config exception."""
pass
class KeycloakInvalidTokenError(KeycloakOperationError): class KeycloakInvalidTokenError(KeycloakOperationError):
"""Keycloak invalid token exception.""" """Keycloak invalid token exception."""
pass
class KeycloakPermissionFormatError(KeycloakOperationError): class KeycloakPermissionFormatError(KeycloakOperationError):
"""Keycloak permission format exception.""" """Keycloak permission format exception."""
pass
class PermissionDefinitionError(Exception): class PermissionDefinitionError(Exception):
"""Keycloak permission definition exception.""" """Keycloak permission definition exception."""
pass
def raise_error_from_response(response, error, expected_codes=None, skip_exists=False):
"""Raise an exception for the response.
def raise_error_from_response(
response: Response | AsyncResponse,
error: dict | Exception,
expected_codes: list[int] | None = None,
skip_exists: bool = False,
) -> bytes | dict | list:
"""
Raise an exception for the response.
:param response: The response object :param response: The response object
:type response: Response :type response: Response
@ -162,9 +166,9 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
:returns: Content of the response message :returns: Content of the response message
:type: bytes or dict :type: bytes or dict
:raises KeycloakError: In case of unexpected status codes :raises KeycloakError: In case of unexpected status codes
""" # noqa: DAR401,DAR402
"""
if expected_codes is None: if expected_codes is None:
expected_codes = [200, 201, 204]
expected_codes = [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT]
if response.status_code in expected_codes: if response.status_code in expected_codes:
if response.status_code == requests.codes.no_content: if response.status_code == requests.codes.no_content:
@ -175,7 +179,7 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
except ValueError: except ValueError:
return response.content return response.content
if skip_exists and response.status_code == 409:
if skip_exists and response.status_code == HTTP_CONFLICT:
return {"msg": "Already exists"} return {"msg": "Already exists"}
try: try:
@ -185,10 +189,11 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
if isinstance(error, dict): if isinstance(error, dict):
error = error.get(response.status_code, KeycloakOperationError) error = error.get(response.status_code, KeycloakOperationError)
else:
if response.status_code == 401:
elif response.status_code == HTTP_UNAUTHORIZED:
error = KeycloakAuthenticationError error = KeycloakAuthenticationError
raise error( raise error(
error_message=message, response_code=response.status_code, response_body=response.content
error_message=message,
response_code=response.status_code,
response_body=response.content,
) )

4561
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

570
src/keycloak/keycloak_openid.py
File diff suppressed because it is too large
View File

311
src/keycloak/keycloak_uma.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -21,38 +20,51 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak UMA module.
"""
Keycloak UMA module.
The module contains a UMA compatible client for keycloak: The module contains a UMA compatible client for keycloak:
https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html
""" """
from __future__ import annotations
import json import json
from typing import Iterable
from typing import TYPE_CHECKING
from urllib.parse import quote_plus from urllib.parse import quote_plus
from async_property import async_property from async_property import async_property
from .connection import ConnectionManager from .connection import ConnectionManager
from .exceptions import ( from .exceptions import (
HTTP_CREATED,
HTTP_NO_CONTENT,
HTTP_OK,
KeycloakDeleteError, KeycloakDeleteError,
KeycloakGetError, KeycloakGetError,
KeycloakPostError, KeycloakPostError,
KeycloakPutError, KeycloakPutError,
raise_error_from_response, raise_error_from_response,
) )
from .urls_patterns import URL_UMA_WELL_KNOWN
if TYPE_CHECKING:
from collections.abc import Iterable
from .openid_connection import KeycloakOpenIDConnection from .openid_connection import KeycloakOpenIDConnection
from .uma_permissions import UMAPermission from .uma_permissions import UMAPermission
from .urls_patterns import URL_UMA_WELL_KNOWN
class KeycloakUMA: class KeycloakUMA:
"""Keycloak UMA client.
"""
Keycloak UMA client.
:param connection: OpenID connection manager :param connection: OpenID connection manager
""" """
def __init__(self, connection: KeycloakOpenIDConnection):
"""Init method.
def __init__(self, connection: KeycloakOpenIDConnection) -> None:
"""
Init method.
:param connection: OpenID connection manager :param connection: OpenID connection manager
:type connection: KeycloakOpenIDConnection :type connection: KeycloakOpenIDConnection
@ -60,14 +72,15 @@ class KeycloakUMA:
self.connection = connection self.connection = connection
self._well_known = None self._well_known = None
def _fetch_well_known(self):
def _fetch_well_known(self) -> dict:
params_path = {"realm-name": self.connection.realm_name} params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path)) data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
@staticmethod @staticmethod
def format_url(url, **kwargs):
"""Substitute url path parameters.
def format_url(url: str, **kwargs: dict) -> str:
"""
Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example, the given params. For example,
@ -84,8 +97,9 @@ class KeycloakUMA:
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()}) return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@staticmethod @staticmethod
async def a_format_url(url, **kwargs):
"""Substitute url path parameters.
async def a_format_url(url: str, **kwargs: dict) -> str:
"""
Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example, the given params. For example,
@ -102,8 +116,9 @@ class KeycloakUMA:
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()}) return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@property @property
def uma_well_known(self):
"""Get the well_known UMA2 config.
def uma_well_known(self) -> dict:
"""
Get the well_known UMA2 config.
:returns: It lists endpoints and other configuration options relevant :returns: It lists endpoints and other configuration options relevant
:rtype: dict :rtype: dict
@ -111,21 +126,25 @@ class KeycloakUMA:
# per instance cache # per instance cache
if not self._well_known: if not self._well_known:
self._well_known = self._fetch_well_known() self._well_known = self._fetch_well_known()
return self._well_known return self._well_known
@async_property @async_property
async def a_uma_well_known(self):
"""Get the well_known UMA2 config async.
async def a_uma_well_known(self) -> dict:
"""
Get the well_known UMA2 config async.
:returns: It lists endpoints and other configuration options relevant :returns: It lists endpoints and other configuration options relevant
:rtype: dict :rtype: dict
""" """
if not self._well_known: if not self._well_known:
self._well_known = await self.a__fetch_well_known() self._well_known = await self.a__fetch_well_known()
return self._well_known return self._well_known
def resource_set_create(self, payload):
"""Create a resource set.
def resource_set_create(self, payload: dict) -> dict | bytes:
"""
Create a resource set.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1 https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
@ -139,12 +158,18 @@ class KeycloakUMA:
:rtype: dict :rtype: dict
""" """
data_raw = self.connection.raw_post( data_raw = self.connection.raw_post(
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload)
self.uma_well_known["resource_registration_endpoint"],
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_CREATED],
) )
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def resource_set_update(self, resource_id, payload):
"""Update a resource set.
def resource_set_update(self, resource_id: str, payload: dict) -> bytes:
"""
Update a resource set.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
@ -157,16 +182,22 @@ class KeycloakUMA:
:param payload: ResourceRepresentation :param payload: ResourceRepresentation
:type payload: dict :type payload: dict
:return: Response dict (empty) :return: Response dict (empty)
:rtype: dict
:rtype: bytes
""" """
url = self.format_url( url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
self.uma_well_known["resource_registration_endpoint"] + "/{id}",
id=resource_id,
) )
data_raw = self.connection.raw_put(url, data=json.dumps(payload)) data_raw = self.connection.raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakPutError,
expected_codes=[HTTP_NO_CONTENT],
)
def resource_set_read(self, resource_id):
"""Read a resource set.
def resource_set_read(self, resource_id: str) -> dict:
"""
Read a resource set.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
@ -180,13 +211,15 @@ class KeycloakUMA:
:rtype: dict :rtype: dict
""" """
url = self.format_url( url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
self.uma_well_known["resource_registration_endpoint"] + "/{id}",
id=resource_id,
) )
data_raw = self.connection.raw_get(url) data_raw = self.connection.raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
def resource_set_delete(self, resource_id):
"""Delete a resource set.
def resource_set_delete(self, resource_id: str) -> bytes:
"""
Delete a resource set.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
@ -197,10 +230,15 @@ class KeycloakUMA:
:rtype: dict :rtype: dict
""" """
url = self.format_url( url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
self.uma_well_known["resource_registration_endpoint"] + "/{id}",
id=resource_id,
) )
data_raw = self.connection.raw_delete(url) data_raw = self.connection.raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakDeleteError,
expected_codes=[HTTP_NO_CONTENT],
)
def resource_set_list_ids( def resource_set_list_ids(
self, self,
@ -210,11 +248,12 @@ class KeycloakUMA:
owner: str = "", owner: str = "",
resource_type: str = "", resource_type: str = "",
scope: str = "", scope: str = "",
matchingUri: bool = False,
matchingUri: bool = False, # noqa: N803
first: int = 0, first: int = 0,
maximum: int = -1, maximum: int = -1,
):
"""Query for list of resource set ids.
) -> list:
"""
Query for list of resource set ids.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -240,7 +279,7 @@ class KeycloakUMA:
:return: List of ids :return: List of ids
:rtype: List[str] :rtype: List[str]
""" """
query = dict()
query = {}
if name: if name:
query["name"] = name query["name"] = name
if exact_name: if exact_name:
@ -261,12 +300,14 @@ class KeycloakUMA:
query["max"] = maximum query["max"] = maximum
data_raw = self.connection.raw_get( data_raw = self.connection.raw_get(
self.uma_well_known["resource_registration_endpoint"], **query
self.uma_well_known["resource_registration_endpoint"],
**query,
) )
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
def resource_set_list(self):
"""List all resource sets.
def resource_set_list(self) -> list:
"""
List all resource sets.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -281,8 +322,9 @@ class KeycloakUMA:
resource = self.resource_set_read(resource_id) resource = self.resource_set_read(resource_id)
yield resource yield resource
def permission_ticket_create(self, permissions: Iterable[UMAPermission]):
"""Create a permission ticket.
def permission_ticket_create(self, permissions: Iterable[UMAPermission]) -> dict:
"""
Create a permission ticket.
:param permissions: Iterable of uma permissions to validate the token against :param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission] :type permissions: Iterable[UMAPermission]
@ -290,19 +332,23 @@ class KeycloakUMA:
:rtype: boolean :rtype: boolean
:raises KeycloakPostError: In case permission resource not found :raises KeycloakPostError: In case permission resource not found
""" """
resources = dict()
resources = {}
for permission in permissions: for permission in permissions:
resource_id = getattr(permission, "resource_id", None) resource_id = getattr(permission, "resource_id", None)
if resource_id is None: if resource_id is None:
resource_ids = self.resource_set_list_ids( resource_ids = self.resource_set_list_ids(
exact_name=True, name=permission.resource, first=0, maximum=1
exact_name=True,
name=permission.resource,
first=0,
maximum=1,
) )
if not resource_ids: if not resource_ids:
raise KeycloakPostError("Invalid resource specified")
msg = "Invalid resource specified"
raise KeycloakPostError(msg)
setattr(permission, "resource_id", resource_ids[0])
permission.resource_id = resource_ids[0]
resources.setdefault(resource_id, set()) resources.setdefault(resource_id, set())
if permission.scope: if permission.scope:
@ -314,12 +360,19 @@ class KeycloakUMA:
] ]
data_raw = self.connection.raw_post( data_raw = self.connection.raw_post(
self.uma_well_known["permission_endpoint"], data=json.dumps(payload)
self.uma_well_known["permission_endpoint"],
data=json.dumps(payload),
) )
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def permissions_check(self, token, permissions: Iterable[UMAPermission], **extra_payload):
"""Check UMA permissions by user token with requested permissions.
def permissions_check(
self,
token: str,
permissions: Iterable[UMAPermission],
**extra_payload: dict,
) -> bool:
"""
Check UMA permissions by user token with requested permissions.
The token endpoint is used to check UMA permissions from Keycloak. It can only be The token endpoint is used to check UMA permissions from Keycloak. It can only be
invoked by confidential clients. invoked by confidential clients.
@ -358,8 +411,9 @@ class KeycloakUMA:
return False return False
return data.get("result", False) return data.get("result", False)
def policy_resource_create(self, resource_id, payload):
"""Create permission policy for resource.
def policy_resource_create(self, resource_id: str, payload: dict) -> dict:
"""
Create permission policy for resource.
Supports name, description, scopes, roles, groups, clients Supports name, description, scopes, roles, groups, clients
@ -373,12 +427,14 @@ class KeycloakUMA:
:rtype: dict :rtype: dict
""" """
data_raw = self.connection.raw_post( data_raw = self.connection.raw_post(
self.uma_well_known["policy_endpoint"] + f"/{resource_id}", data=json.dumps(payload)
self.uma_well_known["policy_endpoint"] + f"/{resource_id}",
data=json.dumps(payload),
) )
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def policy_update(self, policy_id, payload):
"""Update permission policy.
def policy_update(self, policy_id: str, payload: dict) -> dict:
"""
Update permission policy.
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -391,12 +447,14 @@ class KeycloakUMA:
:rtype: dict :rtype: dict
""" """
data_raw = self.connection.raw_put( data_raw = self.connection.raw_put(
self.uma_well_known["policy_endpoint"] + f"/{policy_id}", data=json.dumps(payload)
self.uma_well_known["policy_endpoint"] + f"/{policy_id}",
data=json.dumps(payload),
) )
return raise_error_from_response(data_raw, KeycloakPutError) return raise_error_from_response(data_raw, KeycloakPutError)
def policy_delete(self, policy_id):
"""Delete permission policy.
def policy_delete(self, policy_id: str) -> dict:
"""
Delete permission policy.
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -407,7 +465,7 @@ class KeycloakUMA:
:rtype: dict :rtype: dict
""" """
data_raw = self.connection.raw_delete( data_raw = self.connection.raw_delete(
self.uma_well_known["policy_endpoint"] + f"/{policy_id}"
self.uma_well_known["policy_endpoint"] + f"/{policy_id}",
) )
return raise_error_from_response(data_raw, KeycloakDeleteError) return raise_error_from_response(data_raw, KeycloakDeleteError)
@ -418,8 +476,9 @@ class KeycloakUMA:
scope: str = "", scope: str = "",
first: int = 0, first: int = 0,
maximum: int = -1, maximum: int = -1,
):
"""Query permission policies.
) -> list:
"""
Query permission policies.
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission https://www.keycloak.org/docs/latest/authorization_services/#querying-permission
@ -437,7 +496,7 @@ class KeycloakUMA:
:return: List of ids :return: List of ids
:rtype: List[str] :rtype: List[str]
""" """
query = dict()
query = {}
if name: if name:
query["name"] = name query["name"] = name
if resource: if resource:
@ -452,8 +511,9 @@ class KeycloakUMA:
data_raw = self.connection.raw_get(self.uma_well_known["policy_endpoint"], **query) data_raw = self.connection.raw_get(self.uma_well_known["policy_endpoint"], **query)
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
async def a__fetch_well_known(self):
"""Get the well_known UMA2 config async.
async def a__fetch_well_known(self) -> dict:
"""
Get the well_known UMA2 config async.
:returns: It lists endpoints and other configuration options relevant :returns: It lists endpoints and other configuration options relevant
:rtype: dict :rtype: dict
@ -462,8 +522,9 @@ class KeycloakUMA:
data_raw = await self.connection.a_raw_get(URL_UMA_WELL_KNOWN.format(**params_path)) data_raw = await self.connection.a_raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
async def a_resource_set_create(self, payload):
"""Create a resource set asynchronously.
async def a_resource_set_create(self, payload: dict) -> dict:
"""
Create a resource set asynchronously.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1 https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
@ -480,10 +541,15 @@ class KeycloakUMA:
(await self.a_uma_well_known)["resource_registration_endpoint"], (await self.a_uma_well_known)["resource_registration_endpoint"],
data=json.dumps(payload), data=json.dumps(payload),
) )
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_CREATED],
)
async def a_resource_set_update(self, resource_id, payload):
"""Update a resource set asynchronously.
async def a_resource_set_update(self, resource_id: str, payload: dict) -> bytes:
"""
Update a resource set asynchronously.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
@ -496,17 +562,22 @@ class KeycloakUMA:
:param payload: ResourceRepresentation :param payload: ResourceRepresentation
:type payload: dict :type payload: dict
:return: Response dict (empty) :return: Response dict (empty)
:rtype: dict
:rtype: bytes
""" """
url = self.format_url( url = self.format_url(
(await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}", (await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}",
id=resource_id, id=resource_id,
) )
data_raw = await self.connection.a_raw_put(url, data=json.dumps(payload)) data_raw = await self.connection.a_raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakPutError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_resource_set_read(self, resource_id):
"""Read a resource set asynchronously.
async def a_resource_set_read(self, resource_id: str) -> dict:
"""
Read a resource set asynchronously.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
@ -524,10 +595,11 @@ class KeycloakUMA:
id=resource_id, id=resource_id,
) )
data_raw = await self.connection.a_raw_get(url) data_raw = await self.connection.a_raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
async def a_resource_set_delete(self, resource_id):
"""Delete a resource set asynchronously.
async def a_resource_set_delete(self, resource_id: str) -> bytes:
"""
Delete a resource set asynchronously.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
@ -535,14 +607,18 @@ class KeycloakUMA:
:param resource_id: id of the resource :param resource_id: id of the resource
:type resource_id: str :type resource_id: str
:return: Response dict (empty) :return: Response dict (empty)
:rtype: dict
:rtype: bytes
""" """
url = self.format_url( url = self.format_url(
(await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}", (await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}",
id=resource_id, id=resource_id,
) )
data_raw = await self.connection.a_raw_delete(url) data_raw = await self.connection.a_raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
return raise_error_from_response(
data_raw,
KeycloakDeleteError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_resource_set_list_ids( async def a_resource_set_list_ids(
self, self,
@ -552,11 +628,12 @@ class KeycloakUMA:
owner: str = "", owner: str = "",
resource_type: str = "", resource_type: str = "",
scope: str = "", scope: str = "",
matchingUri: bool = False,
matchingUri: bool = False, # noqa: N803
first: int = 0, first: int = 0,
maximum: int = -1, maximum: int = -1,
):
"""Query for list of resource set ids asynchronously.
) -> list:
"""
Query for list of resource set ids asynchronously.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -582,7 +659,7 @@ class KeycloakUMA:
:return: List of ids :return: List of ids
:rtype: List[str] :rtype: List[str]
""" """
query = dict()
query = {}
if name: if name:
query["name"] = name query["name"] = name
if exact_name: if exact_name:
@ -603,12 +680,14 @@ class KeycloakUMA:
query["max"] = maximum query["max"] = maximum
data_raw = await self.connection.a_raw_get( data_raw = await self.connection.a_raw_get(
(await self.a_uma_well_known)["resource_registration_endpoint"], **query
(await self.a_uma_well_known)["resource_registration_endpoint"],
**query,
) )
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
async def a_resource_set_list(self):
"""List all resource sets asynchronously.
async def a_resource_set_list(self) -> list:
"""
List all resource sets asynchronously.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
@ -623,8 +702,9 @@ class KeycloakUMA:
resource = await self.a_resource_set_read(resource_id) resource = await self.a_resource_set_read(resource_id)
yield resource yield resource
async def a_permission_ticket_create(self, permissions: Iterable[UMAPermission]):
"""Create a permission ticket asynchronously.
async def a_permission_ticket_create(self, permissions: Iterable[UMAPermission]) -> bool:
"""
Create a permission ticket asynchronously.
:param permissions: Iterable of uma permissions to validate the token against :param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission] :type permissions: Iterable[UMAPermission]
@ -632,19 +712,23 @@ class KeycloakUMA:
:rtype: boolean :rtype: boolean
:raises KeycloakPostError: In case permission resource not found :raises KeycloakPostError: In case permission resource not found
""" """
resources = dict()
resources = {}
for permission in permissions: for permission in permissions:
resource_id = getattr(permission, "resource_id", None) resource_id = getattr(permission, "resource_id", None)
if resource_id is None: if resource_id is None:
resource_ids = await self.a_resource_set_list_ids( resource_ids = await self.a_resource_set_list_ids(
exact_name=True, name=permission.resource, first=0, maximum=1
exact_name=True,
name=permission.resource,
first=0,
maximum=1,
) )
if not resource_ids: if not resource_ids:
raise KeycloakPostError("Invalid resource specified")
msg = "Invalid resource specified"
raise KeycloakPostError(msg)
setattr(permission, "resource_id", resource_ids[0])
permission.resource_id = resource_ids[0]
resources.setdefault(resource_id, set()) resources.setdefault(resource_id, set())
if permission.scope: if permission.scope:
@ -656,14 +740,19 @@ class KeycloakUMA:
] ]
data_raw = await self.connection.a_raw_post( data_raw = await self.connection.a_raw_post(
(await self.a_uma_well_known)["permission_endpoint"], data=json.dumps(payload)
(await self.a_uma_well_known)["permission_endpoint"],
data=json.dumps(payload),
) )
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
async def a_permissions_check( async def a_permissions_check(
self, token, permissions: Iterable[UMAPermission], **extra_payload
):
"""Check UMA permissions by user token with requested permissions asynchronously.
self,
token: str,
permissions: Iterable[UMAPermission],
**extra_payload: dict,
) -> bool:
"""
Check UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to check UMA permissions from Keycloak. It can only be The token endpoint is used to check UMA permissions from Keycloak. It can only be
invoked by confidential clients. invoked by confidential clients.
@ -696,7 +785,8 @@ class KeycloakUMA:
connection.add_param_headers("Authorization", "Bearer " + token) connection.add_param_headers("Authorization", "Bearer " + token)
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = await connection.a_raw_post( data_raw = await connection.a_raw_post(
(await self.a_uma_well_known)["token_endpoint"], data=payload
(await self.a_uma_well_known)["token_endpoint"],
data=payload,
) )
try: try:
data = raise_error_from_response(data_raw, KeycloakPostError) data = raise_error_from_response(data_raw, KeycloakPostError)
@ -704,8 +794,9 @@ class KeycloakUMA:
return False return False
return data.get("result", False) return data.get("result", False)
async def a_policy_resource_create(self, resource_id, payload):
"""Create permission policy for resource asynchronously.
async def a_policy_resource_create(self, resource_id: str, payload: dict) -> dict:
"""
Create permission policy for resource asynchronously.
Supports name, description, scopes, roles, groups, clients Supports name, description, scopes, roles, groups, clients
@ -724,8 +815,9 @@ class KeycloakUMA:
) )
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
async def a_policy_update(self, policy_id, payload):
"""Update permission policy asynchronously.
async def a_policy_update(self, policy_id: str, payload: dict) -> dict:
"""
Update permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -743,8 +835,9 @@ class KeycloakUMA:
) )
return raise_error_from_response(data_raw, KeycloakPutError) return raise_error_from_response(data_raw, KeycloakPutError)
async def a_policy_delete(self, policy_id):
"""Delete permission policy asynchronously.
async def a_policy_delete(self, policy_id: str) -> dict:
"""
Delete permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
@ -755,7 +848,7 @@ class KeycloakUMA:
:rtype: dict :rtype: dict
""" """
data_raw = await self.connection.a_raw_delete( data_raw = await self.connection.a_raw_delete(
(await self.a_uma_well_known)["policy_endpoint"] + f"/{policy_id}"
(await self.a_uma_well_known)["policy_endpoint"] + f"/{policy_id}",
) )
return raise_error_from_response(data_raw, KeycloakDeleteError) return raise_error_from_response(data_raw, KeycloakDeleteError)
@ -766,8 +859,9 @@ class KeycloakUMA:
scope: str = "", scope: str = "",
first: int = 0, first: int = 0,
maximum: int = -1, maximum: int = -1,
):
"""Query permission policies asynchronously.
) -> list:
"""
Query permission policies asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission https://www.keycloak.org/docs/latest/authorization_services/#querying-permission
@ -785,7 +879,7 @@ class KeycloakUMA:
:return: List of ids :return: List of ids
:rtype: List[str] :rtype: List[str]
""" """
query = dict()
query = {}
if name: if name:
query["name"] = name query["name"] = name
if resource: if resource:
@ -798,6 +892,7 @@ class KeycloakUMA:
query["max"] = maximum query["max"] = maximum
data_raw = await self.connection.a_raw_get( data_raw = await self.connection.a_raw_get(
(await self.a_uma_well_known)["policy_endpoint"], **query
(await self.a_uma_well_known)["policy_endpoint"],
**query,
) )
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)

247
src/keycloak/openid_connection.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -21,22 +20,31 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID Connection Manager module.
"""
Keycloak OpenID Connection Manager module.
The module contains mainly the implementation of KeycloakOpenIDConnection class. The module contains mainly the implementation of KeycloakOpenIDConnection class.
This is an extension of the ConnectionManager class, and handles the automatic refresh This is an extension of the ConnectionManager class, and handles the automatic refresh
of openid tokens when required. of openid tokens when required.
""" """
from datetime import datetime, timedelta
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from httpx import Response as AsyncResponse
from requests import Response
from .connection import ConnectionManager from .connection import ConnectionManager
from .exceptions import KeycloakPostError
from .exceptions import HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, KeycloakPostError
from .keycloak_openid import KeycloakOpenID from .keycloak_openid import KeycloakOpenID
class KeycloakOpenIDConnection(ConnectionManager): class KeycloakOpenIDConnection(ConnectionManager):
"""A class to help with OpenID connections which can auto refresh tokens.
"""
A class to help with OpenID connections which can auto refresh tokens.
:param object: _description_ :param object: _description_
:type object: _type_ :type object: _type_
@ -59,23 +67,24 @@ class KeycloakOpenIDConnection(ConnectionManager):
def __init__( def __init__(
self, self,
server_url,
grant_type=None,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
timeout=60,
cert=None,
max_retries=1,
):
"""Init method.
server_url: str,
grant_type: str | None = None,
username: str | None = None,
password: str | None = None,
token: str | None = None,
totp: str | None = None,
realm_name: str = "master",
client_id: str = "admin-cli",
verify: str | bool = True,
client_secret_key: str | None = None,
custom_headers: dict | None = None,
user_realm_name: str | None = None,
timeout: int | None = 60,
cert: str | tuple | None = None,
max_retries: int = 1,
) -> None:
"""
Init method.
:param server_url: Keycloak server url :param server_url: Keycloak server url
:type server_url: str :type server_url: str
@ -144,11 +153,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
timeout=self.timeout, timeout=self.timeout,
verify=self.verify, verify=self.verify,
cert=cert, cert=cert,
max_retries=max_retries,
) )
@property @property
def server_url(self):
"""Get server url.
def server_url(self) -> str:
"""
Get server url.
:returns: Keycloak server url :returns: Keycloak server url
:rtype: str :rtype: str
@ -156,12 +167,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self.base_url return self.base_url
@server_url.setter @server_url.setter
def server_url(self, value):
def server_url(self, value: str) -> None:
self.base_url = value self.base_url = value
@property @property
def grant_type(self):
"""Get grant type.
def grant_type(self) -> str:
"""
Get grant type.
:returns: Grant type :returns: Grant type
:rtype: str :rtype: str
@ -169,12 +181,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._grant_type return self._grant_type
@grant_type.setter @grant_type.setter
def grant_type(self, value):
def grant_type(self, value: str) -> None:
self._grant_type = value self._grant_type = value
@property @property
def realm_name(self):
"""Get realm name.
def realm_name(self) -> str:
"""
Get realm name.
:returns: Realm name :returns: Realm name
:rtype: str :rtype: str
@ -182,12 +195,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._realm_name return self._realm_name
@realm_name.setter @realm_name.setter
def realm_name(self, value):
def realm_name(self, value: str) -> None:
self._realm_name = value self._realm_name = value
@property @property
def client_id(self):
"""Get client id.
def client_id(self) -> str:
"""
Get client id.
:returns: Client id :returns: Client id
:rtype: str :rtype: str
@ -195,12 +209,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._client_id return self._client_id
@client_id.setter @client_id.setter
def client_id(self, value):
def client_id(self, value: str) -> None:
self._client_id = value self._client_id = value
@property @property
def client_secret_key(self):
"""Get client secret key.
def client_secret_key(self) -> str:
"""
Get client secret key.
:returns: Client secret key :returns: Client secret key
:rtype: str :rtype: str
@ -208,12 +223,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._client_secret_key return self._client_secret_key
@client_secret_key.setter @client_secret_key.setter
def client_secret_key(self, value):
def client_secret_key(self, value: str) -> None:
self._client_secret_key = value self._client_secret_key = value
@property @property
def username(self):
"""Get username.
def username(self) -> str:
"""
Get username.
:returns: Admin username :returns: Admin username
:rtype: str :rtype: str
@ -221,12 +237,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._username return self._username
@username.setter @username.setter
def username(self, value):
def username(self, value: str) -> None:
self._username = value self._username = value
@property @property
def password(self):
"""Get password.
def password(self) -> str:
"""
Get password.
:returns: Admin password :returns: Admin password
:rtype: str :rtype: str
@ -234,12 +251,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._password return self._password
@password.setter @password.setter
def password(self, value):
def password(self, value: str) -> None:
self._password = value self._password = value
@property @property
def totp(self):
"""Get totp.
def totp(self) -> str:
"""
Get totp.
:returns: TOTP :returns: TOTP
:rtype: str :rtype: str
@ -247,12 +265,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._totp return self._totp
@totp.setter @totp.setter
def totp(self, value):
def totp(self, value: str) -> None:
self._totp = value self._totp = value
@property @property
def token(self):
"""Get token.
def token(self) -> dict:
"""
Get token.
:returns: Access and refresh token :returns: Access and refresh token
:rtype: dict :rtype: dict
@ -260,17 +279,18 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._token return self._token
@token.setter @token.setter
def token(self, value):
def token(self, value: dict) -> None:
self._token = value self._token = value
self._expires_at = datetime.now() + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
self._expires_at = datetime.now(tz=timezone.utc) + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0),
) )
if value is not None: if value is not None:
self.add_param_headers("Authorization", "Bearer " + value.get("access_token")) self.add_param_headers("Authorization", "Bearer " + value.get("access_token"))
@property @property
def expires_at(self):
"""Get token expiry time.
def expires_at(self) -> datetime:
"""
Get token expiry time.
:returns: Datetime at which the current token will expire :returns: Datetime at which the current token will expire
:rtype: datetime :rtype: datetime
@ -278,8 +298,9 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._expires_at return self._expires_at
@property @property
def user_realm_name(self):
"""Get user realm name.
def user_realm_name(self) -> str:
"""
Get user realm name.
:returns: User realm name :returns: User realm name
:rtype: str :rtype: str
@ -287,12 +308,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._user_realm_name return self._user_realm_name
@user_realm_name.setter @user_realm_name.setter
def user_realm_name(self, value):
def user_realm_name(self, value: str) -> None:
self._user_realm_name = value self._user_realm_name = value
@property @property
def custom_headers(self):
"""Get custom headers.
def custom_headers(self) -> dict:
"""
Get custom headers.
:returns: Custom headers :returns: Custom headers
:rtype: dict :rtype: dict
@ -300,7 +322,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._custom_headers return self._custom_headers
@custom_headers.setter @custom_headers.setter
def custom_headers(self, value):
def custom_headers(self, value: dict) -> None:
self._custom_headers = value self._custom_headers = value
if self.custom_headers is not None: if self.custom_headers is not None:
# merge custom headers to main headers # merge custom headers to main headers
@ -308,7 +330,8 @@ class KeycloakOpenIDConnection(ConnectionManager):
@property @property
def keycloak_openid(self) -> KeycloakOpenID: def keycloak_openid(self) -> KeycloakOpenID:
"""Get the KeycloakOpenID object.
"""
Get the KeycloakOpenID object.
The KeycloakOpenID is used to refresh tokens The KeycloakOpenID is used to refresh tokens
@ -321,7 +344,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
elif self.realm_name: elif self.realm_name:
token_realm_name = self.realm_name token_realm_name = self.realm_name
else: else:
token_realm_name = "master"
token_realm_name = "master" # noqa: S105
self._keycloak_openid = KeycloakOpenID( self._keycloak_openid = KeycloakOpenID(
server_url=self.server_url, server_url=self.server_url,
@ -336,20 +359,25 @@ class KeycloakOpenIDConnection(ConnectionManager):
return self._keycloak_openid return self._keycloak_openid
def get_token(self):
"""Get admin token.
def get_token(self) -> None:
"""
Get admin token.
The admin token is then set in the `token` attribute. The admin token is then set in the `token` attribute.
""" """
if self.grant_type: if self.grant_type:
self.token = self.keycloak_openid.token( self.token = self.keycloak_openid.token(
self.username, self.password, grant_type=self.grant_type, totp=self.totp
self.username,
self.password,
grant_type=self.grant_type,
totp=self.totp,
) )
else: else:
self.token = None self.token = None
def refresh_token(self):
"""Refresh the token.
def refresh_token(self) -> None:
"""
Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed. :raises KeycloakPostError: In case the refresh token request failed.
""" """
@ -365,17 +393,20 @@ class KeycloakOpenIDConnection(ConnectionManager):
b"Token is not active", b"Token is not active",
b"Session not active", b"Session not active",
] ]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
if e.response_code == HTTP_BAD_REQUEST and any(
err in e.response_body for err in list_errors
):
self.get_token() self.get_token()
else: else:
raise raise
def _refresh_if_required(self):
if datetime.now() >= self.expires_at:
def _refresh_if_required(self) -> None:
if datetime.now(tz=timezone.utc) >= self.expires_at:
self.refresh_token() self.refresh_token()
def raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
def raw_get(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more. and try *get* once more.
@ -389,14 +420,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
self._refresh_if_required() self._refresh_if_required()
r = super().raw_get(*args, **kwargs) r = super().raw_get(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token() self.refresh_token()
r = super().raw_get(*args, **kwargs) r = super().raw_get(*args, **kwargs)
return r return r
def raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
def raw_post(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more. and try *post* once more.
@ -410,14 +442,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
self._refresh_if_required() self._refresh_if_required()
r = super().raw_post(*args, **kwargs) r = super().raw_post(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token() self.refresh_token()
r = super().raw_post(*args, **kwargs) r = super().raw_post(*args, **kwargs)
return r return r
def raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
def raw_put(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more. and try *put* once more.
@ -431,14 +464,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
self._refresh_if_required() self._refresh_if_required()
r = super().raw_put(*args, **kwargs) r = super().raw_put(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token() self.refresh_token()
r = super().raw_put(*args, **kwargs) r = super().raw_put(*args, **kwargs)
return r return r
def raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
def raw_delete(self, *args: list, **kwargs: dict) -> Response:
"""
Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired, If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more. it will refresh the token and try *delete* once more.
@ -452,26 +486,31 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
self._refresh_if_required() self._refresh_if_required()
r = super().raw_delete(*args, **kwargs) r = super().raw_delete(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
self.refresh_token() self.refresh_token()
r = super().raw_delete(*args, **kwargs) r = super().raw_delete(*args, **kwargs)
return r return r
async def a_get_token(self):
"""Get admin token.
async def a_get_token(self) -> None:
"""
Get admin token.
The admin token is then set in the `token` attribute. The admin token is then set in the `token` attribute.
""" """
if self.grant_type: if self.grant_type:
self.token = await self.keycloak_openid.a_token( self.token = await self.keycloak_openid.a_token(
self.username, self.password, grant_type=self.grant_type, totp=self.totp
self.username,
self.password,
grant_type=self.grant_type,
totp=self.totp,
) )
else: else:
self.token = None self.token = None
async def a_refresh_token(self):
"""Refresh the token.
async def a_refresh_token(self) -> None:
"""
Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed. :raises KeycloakPostError: In case the refresh token request failed.
""" """
@ -487,18 +526,21 @@ class KeycloakOpenIDConnection(ConnectionManager):
b"Token is not active", b"Token is not active",
b"Session not active", b"Session not active",
] ]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
if e.response_code == HTTP_BAD_REQUEST and any(
err in e.response_body for err in list_errors
):
await self.a_get_token() await self.a_get_token()
else: else:
raise raise
async def a__refresh_if_required(self):
async def a__refresh_if_required(self) -> None:
"""Refresh the token if it is expired.""" """Refresh the token if it is expired."""
if datetime.now() >= self.expires_at:
if datetime.now(tz=timezone.utc) >= self.expires_at:
await self.a_refresh_token() await self.a_refresh_token()
async def a_raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
async def a_raw_get(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more. and try *get* once more.
@ -512,14 +554,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
await self.a__refresh_if_required() await self.a__refresh_if_required()
r = await super().a_raw_get(*args, **kwargs) r = await super().a_raw_get(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token() await self.a_refresh_token()
r = await super().a_raw_get(*args, **kwargs) r = await super().a_raw_get(*args, **kwargs)
return r return r
async def a_raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
async def a_raw_post(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more. and try *post* once more.
@ -533,14 +576,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
await self.a__refresh_if_required() await self.a__refresh_if_required()
r = await super().a_raw_post(*args, **kwargs) r = await super().a_raw_post(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token() await self.a_refresh_token()
r = await super().a_raw_post(*args, **kwargs) r = await super().a_raw_post(*args, **kwargs)
return r return r
async def a_raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
async def a_raw_put(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more. and try *put* once more.
@ -554,14 +598,15 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
await self.a__refresh_if_required() await self.a__refresh_if_required()
r = await super().a_raw_put(*args, **kwargs) r = await super().a_raw_put(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token() await self.a_refresh_token()
r = await super().a_raw_put(*args, **kwargs) r = await super().a_raw_put(*args, **kwargs)
return r return r
async def a_raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
async def a_raw_delete(self, *args: list, **kwargs: dict) -> AsyncResponse:
"""
Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired, If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more. it will refresh the token and try *delete* once more.
@ -575,7 +620,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
""" """
await self.a__refresh_if_required() await self.a__refresh_if_required()
r = await super().a_raw_delete(*args, **kwargs) r = await super().a_raw_delete(*args, **kwargs)
if r.status_code == 401:
if r.status_code == HTTP_UNAUTHORIZED:
await self.a_refresh_token() await self.a_refresh_token()
r = await super().a_raw_delete(*args, **kwargs) r = await super().a_raw_delete(*args, **kwargs)

121
src/keycloak/uma_permissions.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -23,11 +22,14 @@
"""User-managed access permissions module.""" """User-managed access permissions module."""
from __future__ import annotations
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
class UMAPermission: class UMAPermission:
"""A class to conveniently assemble permissions.
"""
A class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission. The class itself is callable, and will return the assembled permission.
@ -47,8 +49,14 @@ class UMAPermission:
:type scope: str :type scope: str
""" """
def __init__(self, permission=None, resource="", scope=""):
"""Init method.
def __init__(
self,
permission: UMAPermission | None = None,
resource: str = "",
scope: str = "",
) -> None:
"""
Init method.
:param permission: Permission :param permission: Permission
:type permission: UMAPermission :type permission: UMAPermission
@ -63,16 +71,16 @@ class UMAPermission:
if permission: if permission:
if not isinstance(permission, UMAPermission): if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
msg = f"can't determine if '{permission}' is a resource or scope"
raise PermissionDefinitionError(msg)
if permission.resource: if permission.resource:
self.resource = str(permission.resource) self.resource = str(permission.resource)
if permission.scope: if permission.scope:
self.scope = str(permission.scope) self.scope = str(permission.scope)
def __str__(self):
"""Str method.
def __str__(self) -> str:
"""
Str method.
:returns: String representation :returns: String representation
:rtype: str :rtype: str
@ -80,20 +88,22 @@ class UMAPermission:
scope = self.scope scope = self.scope
if scope: if scope:
scope = "#" + scope scope = "#" + scope
return "{}{}".format(self.resource, scope)
return f"{self.resource}{scope}"
def __eq__(self, __o: object) -> bool:
"""Eq method.
def __eq__(self, other: object) -> bool:
"""
Eq method.
:param __o: The other object :param __o: The other object
:type __o: object :type __o: object
:returns: Equality boolean :returns: Equality boolean
:rtype: bool :rtype: bool
""" """
return str(self) == str(__o)
return str(self) == str(other)
def __repr__(self) -> str: def __repr__(self) -> str:
"""Repr method.
"""
Repr method.
:returns: The object representation :returns: The object representation
:rtype: str :rtype: str
@ -101,15 +111,22 @@ class UMAPermission:
return self.__str__() return self.__str__()
def __hash__(self) -> int: def __hash__(self) -> int:
"""Hash method.
"""
Hash method.
:returns: Hash of the object :returns: Hash of the object
:rtype: int :rtype: int
""" """
return hash(str(self)) return hash(str(self))
def __call__(self, permission=None, resource="", scope="") -> "UMAPermission":
"""Call method.
def __call__(
self,
permission: UMAPermission | None = None,
resource: str = "",
scope: str = "",
) -> UMAPermission:
"""
Call method.
:param permission: Permission :param permission: Permission
:type permission: UMAPermission :type permission: UMAPermission
@ -131,9 +148,8 @@ class UMAPermission:
if permission: if permission:
if not isinstance(permission, UMAPermission): if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
msg = f"can't determine if '{permission}' is a resource or scope"
raise PermissionDefinitionError(msg)
if permission.resource: if permission.resource:
result_resource = str(permission.resource) result_resource = str(permission.resource)
if permission.scope: if permission.scope:
@ -143,7 +159,8 @@ class UMAPermission:
class Resource(UMAPermission): class Resource(UMAPermission):
"""A UMAPermission Resource class to conveniently assemble permissions.
"""
A UMAPermission Resource class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission. The class itself is callable, and will return the assembled permission.
@ -151,8 +168,9 @@ class Resource(UMAPermission):
:type resource: str :type resource: str
""" """
def __init__(self, resource):
"""Init method.
def __init__(self, resource: Resource) -> None:
"""
Init method.
:param resource: Resource :param resource: Resource
:type resource: str :type resource: str
@ -161,7 +179,8 @@ class Resource(UMAPermission):
class Scope(UMAPermission): class Scope(UMAPermission):
"""A UMAPermission Scope class to conveniently assemble permissions.
"""
A UMAPermission Scope class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission. The class itself is callable, and will return the assembled permission.
@ -169,8 +188,9 @@ class Scope(UMAPermission):
:type scope: str :type scope: str
""" """
def __init__(self, scope):
"""Init method.
def __init__(self, scope: Scope) -> None:
"""
Init method.
:param scope: Scope :param scope: Scope
:type scope: str :type scope: str
@ -179,7 +199,8 @@ class Scope(UMAPermission):
class AuthStatus: class AuthStatus:
"""A class that represents the authorization/login status of a user associated with a token.
"""
A class that represents the authorization/login status of a user associated with a token.
This has to evaluate to True if and only if the user is properly authorized This has to evaluate to True if and only if the user is properly authorized
for the requested resource. for the requested resource.
@ -192,8 +213,9 @@ class AuthStatus:
:type missing_permissions: set :type missing_permissions: set
""" """
def __init__(self, is_logged_in, is_authorized, missing_permissions):
"""Init method.
def __init__(self, is_logged_in: bool, is_authorized: bool, missing_permissions: set) -> None:
"""
Init method.
:param is_logged_in: Is logged in indicator :param is_logged_in: Is logged in indicator
:type is_logged_in: bool :type is_logged_in: bool
@ -206,16 +228,18 @@ class AuthStatus:
self.is_authorized = is_authorized self.is_authorized = is_authorized
self.missing_permissions = missing_permissions self.missing_permissions = missing_permissions
def __bool__(self):
"""Bool method.
def __bool__(self) -> bool:
"""
Bool method.
:returns: Boolean representation :returns: Boolean representation
:rtype: bool :rtype: bool
""" """
return self.is_authorized return self.is_authorized
def __repr__(self):
"""Repr method.
def __repr__(self) -> str:
"""
Repr method.
:returns: The object representation :returns: The object representation
:rtype: str :rtype: str
@ -228,8 +252,9 @@ class AuthStatus:
) )
def build_permission_param(permissions):
"""Transform permissions to a set, so they are usable for requests.
def build_permission_param(permissions: str | list | dict) -> set:
"""
Transform permissions to a set, so they are usable for requests.
:param permissions: Permissions :param permissions: Permissions
:type permissions: str | Iterable[str] | dict[str, str] | dict[str, Iterabble[str]] :type permissions: str | Iterable[str] | dict[str, str] | dict[str, Iterabble[str]]
@ -240,9 +265,9 @@ def build_permission_param(permissions):
if permissions is None or permissions == "": if permissions is None or permissions == "":
return set() return set()
if isinstance(permissions, str): if isinstance(permissions, str):
return set((permissions,))
return set(permissions)
if isinstance(permissions, UMAPermission): if isinstance(permissions, UMAPermission):
return set((str(permissions),))
return set(str(permissions))
try: # treat as dictionary of permissions try: # treat as dictionary of permissions
result = set() result = set()
@ -250,26 +275,26 @@ def build_permission_param(permissions):
if scopes is None: if scopes is None:
result.add(resource) result.add(resource)
elif isinstance(scopes, str): elif isinstance(scopes, str):
result.add("{}#{}".format(resource, scopes))
result.add(f"{resource}#{scopes}")
else: else:
try: try:
for scope in scopes: for scope in scopes:
if not isinstance(scope, str): if not isinstance(scope, str):
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
result.add("{}#{}".format(resource, scope))
except TypeError:
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
return result
msg = f"misbuilt permission {permissions}"
raise KeycloakPermissionFormatError(msg)
result.add(f"{resource}#{scope}")
except TypeError as e:
msg = f"misbuilt permission {permissions}"
raise KeycloakPermissionFormatError(msg) from e
except AttributeError: except AttributeError:
pass pass
else:
return result
result = set() result = set()
for permission in permissions: for permission in permissions:
if not isinstance(permission, (str, UMAPermission)): if not isinstance(permission, (str, UMAPermission)):
raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions))
msg = f"misbuilt permission {permissions}"
raise KeycloakPermissionFormatError(msg)
result.add(str(permission)) result.add(str(permission))
return result return result

5
src/keycloak/urls_patterns.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# The MIT License (MIT) # The MIT License (MIT)
# #
@ -27,7 +26,7 @@
URL_REALM = "realms/{realm-name}" URL_REALM = "realms/{realm-name}"
URL_WELL_KNOWN_BASE = "realms/{realm-name}/.well-known" URL_WELL_KNOWN_BASE = "realms/{realm-name}/.well-known"
URL_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/openid-configuration" URL_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/openid-configuration"
URL_TOKEN = "realms/{realm-name}/protocol/openid-connect/token"
URL_TOKEN = "realms/{realm-name}/protocol/openid-connect/token" # noqa: S105
URL_USERINFO = "realms/{realm-name}/protocol/openid-connect/userinfo" URL_USERINFO = "realms/{realm-name}/protocol/openid-connect/userinfo"
URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout" URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout"
URL_CERTS = "realms/{realm-name}/protocol/openid-connect/certs" URL_CERTS = "realms/{realm-name}/protocol/openid-connect/certs"
@ -49,7 +48,7 @@ URL_ADMIN_USER = "admin/realms/{realm-name}/users/{id}"
URL_ADMIN_USER_CONSENTS = "admin/realms/{realm-name}/users/{id}/consents" URL_ADMIN_USER_CONSENTS = "admin/realms/{realm-name}/users/{id}/consents"
URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-actions-email" URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-actions-email"
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email" URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password" # noqa: S105
URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions" URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions"
URL_ADMIN_USER_ALL_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings" URL_ADMIN_USER_ALL_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings"
URL_ADMIN_USER_CLIENT_ROLES = ( URL_ADMIN_USER_CLIENT_ROLES = (

32
tests/conftest.py

@ -3,8 +3,9 @@
import ipaddress import ipaddress
import os import os
import uuid import uuid
from collections.abc import Generator
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Generator, Tuple
from typing import Tuple
import freezegun import freezegun
import pytest import pytest
@ -17,7 +18,7 @@ from cryptography.x509.oid import NameOID
from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection, KeycloakUMA from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection, KeycloakUMA
class KeycloakTestEnv(object):
class KeycloakTestEnv:
"""Wrapper for test Keycloak connection configuration. """Wrapper for test Keycloak connection configuration.
:param host: Hostname :param host: Hostname
@ -193,7 +194,7 @@ def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"enabled": True, "enabled": True,
"publicClient": True, "publicClient": True,
"protocol": "openid-connect", "protocol": "openid-connect",
}
},
) )
# Return OID # Return OID
yield KeycloakOpenID( yield KeycloakOpenID(
@ -232,7 +233,7 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
"protocol": "openid-connect", "protocol": "openid-connect",
"secret": secret, "secret": secret,
"clientAuthenticatorType": "client-secret", "clientAuthenticatorType": "client-secret",
}
},
) )
# Create user # Create user
username = str(uuid.uuid4()) username = str(uuid.uuid4())
@ -247,7 +248,7 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
"emailVerified": True, "emailVerified": True,
"requiredActions": [], "requiredActions": [],
"credentials": [{"type": "password", "value": password, "temporary": False}], "credentials": [{"type": "password", "value": password, "temporary": False}],
}
},
) )
yield ( yield (
@ -295,7 +296,7 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
"clientAuthenticatorType": "client-secret", "clientAuthenticatorType": "client-secret",
"authorizationServicesEnabled": True, "authorizationServicesEnabled": True,
"serviceAccountsEnabled": True, "serviceAccountsEnabled": True,
}
},
) )
admin.create_client_authz_role_based_policy( admin.create_client_authz_role_based_policy(
client_id=client_id, client_id=client_id,
@ -317,7 +318,7 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
"lastName": "last", "lastName": "last",
"requiredActions": [], "requiredActions": [],
"credentials": [{"type": "password", "value": password, "temporary": False}], "credentials": [{"type": "password", "value": password, "temporary": False}],
}
},
) )
yield ( yield (
@ -364,7 +365,7 @@ def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: Keycloa
"secret": secret, "secret": secret,
"clientAuthenticatorType": "client-secret", "clientAuthenticatorType": "client-secret",
"attributes": {"oauth2.device.authorization.grant.enabled": True}, "attributes": {"oauth2.device.authorization.grant.enabled": True},
}
},
) )
# Create user # Create user
username = str(uuid.uuid4()) username = str(uuid.uuid4())
@ -379,7 +380,7 @@ def oid_with_credentials_device(env: KeycloakTestEnv, realm: str, admin: Keycloa
"emailVerified": True, "emailVerified": True,
"requiredActions": [], "requiredActions": [],
"credentials": [{"type": "password", "value": password, "temporary": False}], "credentials": [{"type": "password", "value": password, "temporary": False}],
}
},
) )
yield ( yield (
@ -489,7 +490,10 @@ def client_role(admin: KeycloakAdmin, realm: str, client: str) -> Generator[str,
@pytest.fixture @pytest.fixture
def composite_client_role( def composite_client_role(
admin: KeycloakAdmin, realm: str, client: str, client_role: str
admin: KeycloakAdmin,
realm: str,
client: str,
client_role: str,
) -> Generator[str, None, None]: ) -> Generator[str, None, None]:
"""Fixture for a new random composite client role. """Fixture for a new random composite client role.
@ -526,7 +530,9 @@ def selfsigned_cert():
# Generate our key # Generate our key
if key is None: if key is None:
key = rsa.generate_private_key( key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
public_exponent=65537,
key_size=2048,
backend=default_backend(),
) )
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)]) name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hostname)])
@ -585,7 +591,7 @@ def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID,
client_secret_key=oid.client_secret_key, client_secret_key=oid.client_secret_key,
timeout=60, timeout=60,
) )
yield connection
return connection
@pytest.fixture @pytest.fixture
@ -599,4 +605,4 @@ def uma(oid_connection_with_authz: KeycloakOpenIDConnection):
""" """
connection = oid_connection_with_authz connection = oid_connection_with_authz
# Return UMA # Return UMA
yield KeycloakUMA(connection=connection)
return KeycloakUMA(connection=connection)

2
tests/test_connection.py

@ -11,7 +11,7 @@ from keycloak.exceptions import KeycloakConnectionError
def test_connection_proxy(): def test_connection_proxy():
"""Test proxies of connection manager.""" """Test proxies of connection manager."""
cm = ConnectionManager( cm = ConnectionManager(
base_url="http://test.test", proxies={"http://test.test": "http://localhost:8080"}
base_url="http://test.test", proxies={"http://test.test": "http://localhost:8080"},
) )
assert cm._s.proxies == {"http://test.test": "http://localhost:8080"} assert cm._s.proxies == {"http://test.test": "http://localhost:8080"}

2
tests/test_exceptions.py

@ -16,5 +16,5 @@ def test_raise_error_from_response_from_dict():
with pytest.raises(KeycloakOperationError): with pytest.raises(KeycloakOperationError):
raise_error_from_response( raise_error_from_response(
response=response, error=dict(), expected_codes=[200], skip_exists=False
response=response, error=dict(), expected_codes=[200], skip_exists=False,
) )

524
tests/test_keycloak_admin.py
File diff suppressed because it is too large
View File

70
tests/test_keycloak_openid.py

@ -177,7 +177,7 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
def test_exchange_token( def test_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
): ):
"""Test the exchange token method. """Test the exchange token method.
@ -198,7 +198,7 @@ def test_exchange_token(
admin.get_client_role( admin.get_client_role(
client_id=admin.get_client_id(client_id="realm-management"), client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation", role_name="impersonation",
)
),
], ],
) )
@ -215,7 +215,7 @@ def test_exchange_token(
# Exchange token with the new user # Exchange token with the new user
new_token = oid.exchange_token( new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username
token=token["access_token"], audience=oid.client_id, subject=username,
) )
assert oid.userinfo(token=new_token["access_token"]) == { assert oid.userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test", "email": f"{username}@test.test",
@ -264,7 +264,7 @@ def test_public_key(oid: KeycloakOpenID):
def test_entitlement( def test_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
): ):
"""Test entitlement. """Test entitlement.
@ -277,7 +277,7 @@ def test_entitlement(
oid, username, password = oid_with_credentials_authz oid, username, password = oid_with_credentials_authz
token = oid.token(username=username, password=password) token = oid.token(username=username, password=password)
resource_server_id = admin.get_client_authz_resources( resource_server_id = admin.get_client_authz_resources(
client_id=admin.get_client_id(oid.client_id)
client_id=admin.get_client_id(oid.client_id),
)[0]["_id"] )[0]["_id"]
with pytest.raises(KeycloakDeprecationError): with pytest.raises(KeycloakDeprecationError):
@ -295,7 +295,7 @@ def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
assert oid.introspect(token=token["access_token"])["active"] assert oid.introspect(token=token["access_token"])["active"]
assert oid.introspect( assert oid.introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token",
) == {"active": False} ) == {"active": False}
with pytest.raises(KeycloakRPTNotFound): with pytest.raises(KeycloakRPTNotFound):
@ -340,14 +340,14 @@ def test_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID,
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = oid.decode_token( decoded_invalid_access_token = oid.decode_token(
token=invalid_access_token, validate=True, key=key
token=invalid_access_token, validate=True, key=key,
) )
decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False) decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False)
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
decoded_invalid_access_token = oid.decode_token( decoded_invalid_access_token = oid.decode_token(
token=invalid_access_token, validate=False, key=key
token=invalid_access_token, validate=False, key=key,
) )
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
@ -368,7 +368,7 @@ def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpe
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role) assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2 assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance( assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission,
) )
@ -430,8 +430,8 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
policy.add_permission( policy.add_permission(
permission=Permission( permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
)
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS",
),
) )
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
@ -464,7 +464,7 @@ def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
def test_has_uma_access( def test_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
): ):
"""Test has UMA access. """Test has UMA access.
@ -497,8 +497,8 @@ def test_has_uma_access(
assert ( assert (
str( str(
oid.has_uma_access( oid.has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource"
)
token=admin.connection.token["access_token"], permissions="Default Resource",
),
) )
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})" + "{'Default Resource'})"
@ -666,7 +666,7 @@ async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_exchange_token( async def test_a_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
): ):
"""Test the exchange token method. """Test the exchange token method.
@ -687,7 +687,7 @@ async def test_a_exchange_token(
await admin.a_get_client_role( await admin.a_get_client_role(
client_id=admin.get_client_id(client_id="realm-management"), client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation", role_name="impersonation",
)
),
], ],
) )
@ -704,7 +704,7 @@ async def test_a_exchange_token(
# Exchange token with the new user # Exchange token with the new user
new_token = oid.exchange_token( new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username
token=token["access_token"], audience=oid.client_id, subject=username,
) )
assert await oid.a_userinfo(token=new_token["access_token"]) == { assert await oid.a_userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test", "email": f"{username}@test.test",
@ -757,7 +757,7 @@ async def test_a_public_key(oid: KeycloakOpenID):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_entitlement( async def test_a_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
): ):
"""Test entitlement. """Test entitlement.
@ -770,7 +770,7 @@ async def test_a_entitlement(
oid, username, password = oid_with_credentials_authz oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password) token = await oid.a_token(username=username, password=password)
resource_server_id = admin.get_client_authz_resources( resource_server_id = admin.get_client_authz_resources(
client_id=admin.get_client_id(oid.client_id)
client_id=admin.get_client_id(oid.client_id),
)[0]["_id"] )[0]["_id"]
with pytest.raises(KeycloakDeprecationError): with pytest.raises(KeycloakDeprecationError):
@ -789,12 +789,12 @@ async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str
assert (await oid.a_introspect(token=token["access_token"]))["active"] assert (await oid.a_introspect(token=token["access_token"]))["active"]
assert await oid.a_introspect( assert await oid.a_introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token",
) == {"active": False} ) == {"active": False}
with pytest.raises(KeycloakRPTNotFound): with pytest.raises(KeycloakRPTNotFound):
await oid.a_introspect( await oid.a_introspect(
token=token["access_token"], token_type_hint="requesting_party_token"
token=token["access_token"], token_type_hint="requesting_party_token",
) )
@ -835,28 +835,28 @@ async def test_a_decode_token_invalid_token(oid_with_credentials: Tuple[Keycloak
invalid_access_token = access_token + "a" invalid_access_token = access_token + "a"
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=True
token=invalid_access_token, validate=True,
) )
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): with pytest.raises(jwcrypto.jws.InvalidJWSSignature):
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=True, key=key
token=invalid_access_token, validate=True, key=key,
) )
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=False
token=invalid_access_token, validate=False,
) )
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
decoded_invalid_access_token = await oid.a_decode_token( decoded_invalid_access_token = await oid.a_decode_token(
token=invalid_access_token, validate=False, key=key
token=invalid_access_token, validate=False, key=key,
) )
assert decoded_access_token == decoded_invalid_access_token assert decoded_access_token == decoded_invalid_access_token
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_load_authorization_config( async def test_a_load_authorization_config(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str],
): ):
"""Test load authorization config. """Test load authorization config.
@ -873,13 +873,13 @@ async def test_a_load_authorization_config(
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role) assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2 assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance( assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission,
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_a_has_uma_access( async def test_a_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin,
): ):
"""Test has UMA access. """Test has UMA access.
@ -898,7 +898,7 @@ async def test_a_has_uma_access(
) )
assert ( assert (
str( str(
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource"),
) )
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
) )
@ -914,8 +914,8 @@ async def test_a_has_uma_access(
assert ( assert (
str( str(
await oid.a_has_uma_access( await oid.a_has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource"
)
token=admin.connection.token["access_token"], permissions="Default Resource",
),
) )
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})" + "{'Default Resource'})"
@ -986,20 +986,20 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
policy.add_permission( policy.add_permission(
permission=Permission( permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
)
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS",
),
) )
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) str(x)
for x in await oid.a_get_permissions( for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
token=token["access_token"], method_token_info="decode",
) )
] == ["Permission: test-perm (resource)"] ] == ["Permission: test-perm (resource)"]
assert [ assert [
repr(x) repr(x)
for x in await oid.a_get_permissions( for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
token=token["access_token"], method_token_info="decode",
) )
] == ["<Permission: test-perm (resource)>"] ] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id

24
tests/test_keycloak_uma.py

@ -109,7 +109,7 @@ def test_uma_resource_sets(uma: KeycloakUMA):
assert len(resource_set_list_ids) == 0 assert len(resource_set_list_ids) == 0
# With matchingUri query option # With matchingUri query option
resource_set_list_ids = uma.resource_set_list_ids( resource_set_list_ids = uma.resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True
uri="/some_resources/resource", matchingUri=True,
) )
assert len(resource_set_list_ids) == 1 assert len(resource_set_list_ids) == 1
@ -119,8 +119,8 @@ def test_uma_resource_sets(uma: KeycloakUMA):
assert err.match( assert err.match(
re.escape( re.escape(
'409: b\'{"error":"invalid_request","error_description":' '409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
'"Resource with name [mytest] already exists."}\'',
),
) )
# Test get resource set # Test get resource set
@ -137,7 +137,7 @@ def test_uma_resource_sets(uma: KeycloakUMA):
# Test update resource set fail # Test update resource set fail
with pytest.raises(KeycloakPutError) as err: with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"}) uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
assert err.match("Unrecognized field")
# Test delete resource set # Test delete resource set
res = uma.resource_set_delete(resource_id=created_resource["_id"]) res = uma.resource_set_delete(resource_id=created_resource["_id"])
@ -215,7 +215,7 @@ def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
with pytest.raises(KeycloakDeleteError) as err: with pytest.raises(KeycloakDeleteError) as err:
uma.policy_delete(policy_id) uma.policy_delete(policy_id)
assert err.match( assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'',
) )
policies = uma.policy_query() policies = uma.policy_query()
@ -312,7 +312,7 @@ def test_uma_permission_ticket(uma: KeycloakUMA):
response = uma.permission_ticket_create(permissions) response = uma.permission_ticket_create(permissions)
rpt = uma.connection.keycloak_openid.token( rpt = uma.connection.keycloak_openid.token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"],
) )
assert rpt assert rpt
assert "access_token" in rpt assert "access_token" in rpt
@ -406,7 +406,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
assert len(resource_set_list_ids) == 0 assert len(resource_set_list_ids) == 0
# With matchingUri query option # With matchingUri query option
resource_set_list_ids = await uma.a_resource_set_list_ids( resource_set_list_ids = await uma.a_resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True
uri="/some_resources/resource", matchingUri=True,
) )
assert len(resource_set_list_ids) == 1 assert len(resource_set_list_ids) == 1
@ -416,8 +416,8 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
assert err.match( assert err.match(
re.escape( re.escape(
'409: b\'{"error":"invalid_request","error_description":' '409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
'"Resource with name [mytest] already exists."}\'',
),
) )
# Test get resource set # Test get resource set
@ -434,7 +434,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA):
# Test update resource set fail # Test update resource set fail
with pytest.raises(KeycloakPutError) as err: with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"}) uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
assert err.match("Unrecognized field")
# Test delete resource set # Test delete resource set
res = await uma.a_resource_set_delete(resource_id=created_resource["_id"]) res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
@ -513,7 +513,7 @@ async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
with pytest.raises(KeycloakDeleteError) as err: with pytest.raises(KeycloakDeleteError) as err:
await uma.a_policy_delete(policy_id) await uma.a_policy_delete(policy_id)
assert err.match( assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'',
) )
policies = await uma.a_policy_query() policies = await uma.a_policy_query()
@ -612,7 +612,7 @@ async def test_a_uma_permission_ticket(uma: KeycloakUMA):
response = await uma.a_permission_ticket_create(permissions) response = await uma.a_permission_ticket_create(permissions)
rpt = await uma.connection.keycloak_openid.a_token( rpt = await uma.connection.keycloak_openid.a_token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"],
) )
assert rpt assert rpt
assert "access_token" in rpt assert "access_token" in rpt

4
tests/test_license.py

@ -8,8 +8,8 @@ def test_license_present():
for path, _, files in os.walk("src/keycloak"): for path, _, files in os.walk("src/keycloak"):
for _file in files: for _file in files:
if _file.endswith(".py"): if _file.endswith(".py"):
with open(os.path.join(path, _file), "r") as fp:
with open(os.path.join(path, _file)) as fp:
content = fp.read() content = fp.read()
assert content.startswith( assert content.startswith(
"# -*- coding: utf-8 -*-\n#\n# The MIT License (MIT)\n#\n#"
"# -*- coding: utf-8 -*-\n#\n# The MIT License (MIT)\n#\n#",
) )

5
tests/test_uma_permissions.py

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# #
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com> # Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
# #
@ -150,7 +149,7 @@ def test_build_permission_tuple_dict_str_list_str():
def test_build_permission_tuple_dict_str_list_str2(): def test_build_permission_tuple_dict_str_list_str2():
"""Test build permission param with mutliple-keyed dictionary.""" """Test build permission param with mutliple-keyed dictionary."""
assert build_permission_param( assert build_permission_param(
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]}
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]},
) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"} ) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}
@ -162,7 +161,7 @@ def test_build_permission_uma():
def test_build_permission_uma_list(): def test_build_permission_uma_list():
"""Test build permission param with list of UMAs.""" """Test build permission param with list of UMAs."""
assert build_permission_param( assert build_permission_param(
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))]
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))],
) == {"res1#scope1", "res1#scope2"} ) == {"res1#scope1", "res1#scope2"}

6
tests/test_urls_patterns.py

@ -31,7 +31,7 @@ def test_correctness_of_patterns():
for url in urls: for url in urls:
url_value = urls_patterns.__dict__[url] url_value = urls_patterns.__dict__[url]
assert url_value not in seen_url_values, f"The url {url} has a duplicate value {url_value}" assert url_value not in seen_url_values, f"The url {url} has a duplicate value {url_value}"
assert (
url_value == url_value.strip()
), f"The url {url} with value '{url_value}' has whitespace values"
assert url_value == url_value.strip(), (
f"The url {url} with value '{url_value}' has whitespace values"
)
seen_url_values.append(url_value) seen_url_values.append(url_value)
Loading…
Cancel
Save