9 changed files with 6399 additions and 0 deletions
-
3.gitignore
-
2src/keycloak/asynchronous/__init__.py
-
318src/keycloak/asynchronous/connection.py
-
195src/keycloak/asynchronous/exceptions.py
-
4257src/keycloak/asynchronous/keycloak_admin.py
-
786src/keycloak/asynchronous/keycloak_openid.py
-
417src/keycloak/asynchronous/keycloak_uma.py
-
420src/keycloak/asynchronous/openid_connection.py
-
1src/keycloak/openid_connection.py
@ -0,0 +1,2 @@ |
|||
from .openid_connection import KeycloakOpenIDConnection |
|||
from .keycloak_admin import KeycloakAdmin |
@ -0,0 +1,318 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# The MIT License (MIT) |
|||
# |
|||
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com> |
|||
# |
|||
# Permission is hereby granted, free of charge, to any person obtaining a copy of |
|||
# this software and associated documentation files (the "Software"), to deal in |
|||
# the Software without restriction, including without limitation the rights to |
|||
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
|||
# the Software, and to permit persons to whom the Software is furnished to do so, |
|||
# subject to the following conditions: |
|||
# |
|||
# The above copyright notice and this permission notice shall be included in all |
|||
# copies or substantial portions of the Software. |
|||
# |
|||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
|||
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
|||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
|||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
|||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|||
|
|||
"""Connection manager module.""" |
|||
|
|||
try: |
|||
from urllib.parse import urljoin |
|||
except ImportError: # pragma: no cover |
|||
from urlparse import urljoin |
|||
|
|||
import httpx |
|||
from requests.adapters import HTTPAdapter |
|||
import requests |
|||
|
|||
from .exceptions import KeycloakConnectionError |
|||
|
|||
|
|||
class ConnectionManager(object): |
|||
"""Represents a simple server connection. |
|||
|
|||
:param base_url: The server URL. |
|||
:type base_url: str |
|||
:param headers: The header parameters of the requests to the server. |
|||
:type headers: dict |
|||
:param timeout: Timeout to use for requests to the server. |
|||
:type timeout: int |
|||
:param verify: Boolean value to enable or disable certificate validation or a string |
|||
containing a path to a CA bundle to use |
|||
:type verify: Union[bool,str] |
|||
:param proxies: The proxies servers requests is sent by. |
|||
:type proxies: dict |
|||
""" |
|||
|
|||
def __init__(self, base_url, headers={}, timeout=60, verify=True, proxies=None): |
|||
"""Init method. |
|||
|
|||
:param base_url: The server URL. |
|||
:type base_url: str |
|||
:param headers: The header parameters of the requests to the server. |
|||
:type headers: dict |
|||
:param timeout: Timeout to use for requests to the server. |
|||
:type timeout: int |
|||
:param verify: Boolean value to enable or disable certificate validation or a string |
|||
containing a path to a CA bundle to use |
|||
:type verify: Union[bool,str] |
|||
:param proxies: The proxies servers requests is sent by. |
|||
:type proxies: dict |
|||
""" |
|||
print(base_url) |
|||
self.base_url = base_url |
|||
print(self.base_url) |
|||
self.headers = headers |
|||
self.timeout = timeout |
|||
self.verify = verify |
|||
self._s = httpx.AsyncClient(verify=verify, proxies=proxies) |
|||
self.sync_s = requests.Session() |
|||
self.sync_s.auth = lambda x: x # don't let requests add auth headers |
|||
|
|||
# retry once to reset connection with Keycloak after tomcat's ConnectionTimeout |
|||
# see https://github.com/marcospereirampj/python-keycloak/issues/36 |
|||
self._s.transport = httpx.AsyncHTTPTransport(retries=2) |
|||
for protocol in ("https://", "http://"): |
|||
adapter = HTTPAdapter(max_retries=1) |
|||
# adds POST to retry whitelist |
|||
allowed_methods = set(adapter.max_retries.allowed_methods) |
|||
allowed_methods.add("POST") |
|||
adapter.max_retries.allowed_methods = frozenset(allowed_methods) |
|||
|
|||
self.sync_s.mount(protocol, adapter) |
|||
|
|||
if proxies: |
|||
self.sync_s.proxies.update(proxies) |
|||
|
|||
async def aclose(self): |
|||
if hasattr(self, "_s"): |
|||
await self._s.aclose() |
|||
|
|||
def __del__(self): |
|||
"""Del method.""" |
|||
return |
|||
if hasattr(self, "_s"): |
|||
self._s.close() |
|||
|
|||
@property |
|||
def base_url(self): |
|||
"""Return base url in use for requests to the server. |
|||
|
|||
:returns: Base URL |
|||
:rtype: str |
|||
""" |
|||
return self._base_url |
|||
|
|||
@base_url.setter |
|||
def base_url(self, value): |
|||
self._base_url = value |
|||
|
|||
@property |
|||
def timeout(self): |
|||
"""Return timeout in use for request to the server. |
|||
|
|||
:returns: Timeout |
|||
:rtype: int |
|||
""" |
|||
return self._timeout |
|||
|
|||
@timeout.setter |
|||
def timeout(self, value): |
|||
self._timeout = value |
|||
|
|||
@property |
|||
def verify(self): |
|||
"""Return verify in use for request to the server. |
|||
|
|||
:returns: Verify indicator |
|||
:rtype: bool |
|||
""" |
|||
return self._verify |
|||
|
|||
@verify.setter |
|||
def verify(self, value): |
|||
self._verify = value |
|||
|
|||
@property |
|||
def headers(self): |
|||
"""Return header request to the server. |
|||
|
|||
:returns: Request headers |
|||
:rtype: dict |
|||
""" |
|||
return self._headers |
|||
|
|||
@headers.setter |
|||
def headers(self, value): |
|||
self._headers = value |
|||
|
|||
def param_headers(self, key): |
|||
"""Return a specific header parameter. |
|||
|
|||
:param key: Header parameters key. |
|||
:type key: str |
|||
:returns: If the header parameters exist, return its value. |
|||
:rtype: str |
|||
""" |
|||
return self.headers.get(key) |
|||
|
|||
def clean_headers(self): |
|||
"""Clear header parameters.""" |
|||
self.headers = {} |
|||
|
|||
def exist_param_headers(self, key): |
|||
"""Check if the parameter exists in the header. |
|||
|
|||
:param key: Header parameters key. |
|||
:type key: str |
|||
:returns: If the header parameters exist, return True. |
|||
:rtype: bool |
|||
""" |
|||
return self.param_headers(key) is not None |
|||
|
|||
def add_param_headers(self, key, value): |
|||
"""Add a single parameter inside the header. |
|||
|
|||
:param key: Header parameters key. |
|||
:type key: str |
|||
:param value: Value to be added. |
|||
:type value: str |
|||
""" |
|||
self.headers[key] = value |
|||
|
|||
def del_param_headers(self, key): |
|||
"""Remove a specific parameter. |
|||
|
|||
:param key: Key of the header parameters. |
|||
:type key: str |
|||
""" |
|||
self.headers.pop(key, None) |
|||
|
|||
def raw_get(self, path, **kwargs): |
|||
"""Submit get request to the path. |
|||
|
|||
:param path: Path for request. |
|||
:type path: str |
|||
:param kwargs: Additional arguments |
|||
:type kwargs: dict |
|||
:returns: Response the request. |
|||
:rtype: Response |
|||
:raises KeycloakConnectionError: HttpError Can't connect to server. |
|||
""" |
|||
try: |
|||
return self._s.get( |
|||
urljoin(self.base_url, path), |
|||
params=kwargs, |
|||
headers=self.headers, |
|||
timeout=self.timeout, |
|||
verify=self.verify, |
|||
) |
|||
except Exception as e: |
|||
raise KeycloakConnectionError("Can't connect to server (%s)" % e) |
|||
|
|||
async def raw_post(self, path, data, **kwargs): |
|||
"""Submit post request to the path. |
|||
|
|||
:param path: Path for request. |
|||
:type path: str |
|||
:param data: Payload for request. |
|||
:type data: dict |
|||
:param kwargs: Additional arguments |
|||
:type kwargs: dict |
|||
:returns: Response the request. |
|||
:rtype: Response |
|||
:raises KeycloakConnectionError: HttpError Can't connect to server. |
|||
""" |
|||
try: |
|||
return await self._s.post( |
|||
urljoin(self.base_url, path), |
|||
params=kwargs, |
|||
data=data, |
|||
headers=self.headers, |
|||
timeout=self.timeout, |
|||
) |
|||
except Exception as e: |
|||
raise KeycloakConnectionError("Can't connect to server (%s)" % e) |
|||
|
|||
|
|||
def sync_raw_post(self, path, data, **kwargs): |
|||
"""Submit post request to the path. |
|||
|
|||
:param path: Path for request. |
|||
:type path: str |
|||
:param data: Payload for request. |
|||
:type data: dict |
|||
:param kwargs: Additional arguments |
|||
:type kwargs: dict |
|||
:returns: Response the request. |
|||
:rtype: Response |
|||
:raises KeycloakConnectionError: HttpError Can't connect to server. |
|||
""" |
|||
try: |
|||
return self.sync_s.post( |
|||
urljoin(self.base_url, path), |
|||
params=kwargs, |
|||
data=data, |
|||
headers=self.headers, |
|||
timeout=self.timeout, |
|||
verify=self.verify, |
|||
) |
|||
except Exception as e: |
|||
raise KeycloakConnectionError("Can't connect to server (%s)" % e) |
|||
|
|||
def raw_put(self, path, data, **kwargs): |
|||
"""Submit put request to the path. |
|||
|
|||
:param path: Path for request. |
|||
:type path: str |
|||
:param data: Payload for request. |
|||
:type data: dict |
|||
:param kwargs: Additional arguments |
|||
:type kwargs: dict |
|||
:returns: Response the request. |
|||
:rtype: Response |
|||
:raises KeycloakConnectionError: HttpError Can't connect to server. |
|||
""" |
|||
try: |
|||
return self._s.put( |
|||
urljoin(self.base_url, path), |
|||
params=kwargs, |
|||
data=data, |
|||
headers=self.headers, |
|||
timeout=self.timeout, |
|||
verify=self.verify, |
|||
) |
|||
except Exception as e: |
|||
raise KeycloakConnectionError("Can't connect to server (%s)" % e) |
|||
|
|||
def raw_delete(self, path, data=None, **kwargs): |
|||
"""Submit delete request to the path. |
|||
|
|||
:param path: Path for request. |
|||
:type path: str |
|||
:param data: Payload for request. |
|||
:type data: dict | None |
|||
:param kwargs: Additional arguments |
|||
:type kwargs: dict |
|||
:returns: Response the request. |
|||
:rtype: Response |
|||
:raises KeycloakConnectionError: HttpError Can't connect to server. |
|||
""" |
|||
try: |
|||
return self._s.delete( |
|||
urljoin(self.base_url, path), |
|||
params=kwargs, |
|||
data=data or dict(), |
|||
headers=self.headers, |
|||
timeout=self.timeout, |
|||
verify=self.verify, |
|||
) |
|||
except Exception as e: |
|||
raise KeycloakConnectionError("Can't connect to server (%s)" % e) |
@ -0,0 +1,195 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# The MIT License (MIT) |
|||
# |
|||
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com> |
|||
# |
|||
# Permission is hereby granted, free of charge, to any person obtaining a copy of |
|||
# this software and associated documentation files (the "Software"), to deal in |
|||
# the Software without restriction, including without limitation the rights to |
|||
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
|||
# the Software, and to permit persons to whom the Software is furnished to do so, |
|||
# subject to the following conditions: |
|||
# |
|||
# The above copyright notice and this permission notice shall be included in all |
|||
# copies or substantial portions of the Software. |
|||
# |
|||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
|||
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
|||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
|||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
|||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|||
|
|||
"""Keycloak custom exceptions module.""" |
|||
|
|||
import requests |
|||
|
|||
|
|||
class KeycloakError(Exception): |
|||
"""Base class for custom Keycloak errors. |
|||
|
|||
:param error_message: The error message |
|||
:type error_message: str |
|||
:param response_code: The response status code |
|||
:type response_code: int |
|||
""" |
|||
|
|||
def __init__(self, error_message="", response_code=None, response_body=None): |
|||
"""Init method. |
|||
|
|||
:param error_message: The error message |
|||
:type error_message: str |
|||
:param response_code: The code of the response |
|||
:type response_code: int |
|||
:param response_body: Body of the response |
|||
:type response_body: bytes |
|||
""" |
|||
Exception.__init__(self, error_message) |
|||
|
|||
self.response_code = response_code |
|||
self.response_body = response_body |
|||
self.error_message = error_message |
|||
|
|||
def __str__(self): |
|||
"""Str method. |
|||
|
|||
:returns: String representation of the object |
|||
:rtype: str |
|||
""" |
|||
if self.response_code is not None: |
|||
return "{0}: {1}".format(self.response_code, self.error_message) |
|||
else: |
|||
return "{0}".format(self.error_message) |
|||
|
|||
|
|||
class KeycloakAuthenticationError(KeycloakError): |
|||
"""Keycloak authentication error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakConnectionError(KeycloakError): |
|||
"""Keycloak connection error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakOperationError(KeycloakError): |
|||
"""Keycloak operation error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakDeprecationError(KeycloakError): |
|||
"""Keycloak deprecation error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakGetError(KeycloakOperationError): |
|||
"""Keycloak request get error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakPostError(KeycloakOperationError): |
|||
"""Keycloak request post error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakPutError(KeycloakOperationError): |
|||
"""Keycloak request put error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakDeleteError(KeycloakOperationError): |
|||
"""Keycloak request delete error exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakSecretNotFound(KeycloakOperationError): |
|||
"""Keycloak secret not found exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakRPTNotFound(KeycloakOperationError): |
|||
"""Keycloak RPT not found exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakAuthorizationConfigError(KeycloakOperationError): |
|||
"""Keycloak authorization config exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakInvalidTokenError(KeycloakOperationError): |
|||
"""Keycloak invalid token exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class KeycloakPermissionFormatError(KeycloakOperationError): |
|||
"""Keycloak permission format exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
class PermissionDefinitionError(Exception): |
|||
"""Keycloak permission definition exception.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
def raise_error_from_response(response, error, expected_codes=None, skip_exists=False): |
|||
"""Raise an exception for the response. |
|||
|
|||
:param response: The response object |
|||
:type response: Response |
|||
:param error: Error object to raise |
|||
:type error: dict or Exception |
|||
:param expected_codes: Set of expected codes, which should not raise the exception |
|||
:type expected_codes: Sequence[int] |
|||
:param skip_exists: Indicates whether the response on already existing object should be ignored |
|||
:type skip_exists: bool |
|||
|
|||
:returns: Content of the response message |
|||
:type: bytes or dict |
|||
:raises KeycloakError: In case of unexpected status codes |
|||
""" # noqa: DAR401,DAR402 |
|||
if expected_codes is None: |
|||
expected_codes = [200, 201, 204] |
|||
|
|||
if hasattr(response, 'status_code'): |
|||
if response.status_code in expected_codes: |
|||
if response.status_code == requests.codes.no_content: |
|||
return {} |
|||
|
|||
try: |
|||
return response.json() |
|||
except ValueError: |
|||
return response.content |
|||
|
|||
if skip_exists and response.status_code == 409: |
|||
return {"msg": "Already exists"} |
|||
|
|||
try: |
|||
message = response.json()["message"] |
|||
except (KeyError, ValueError): |
|||
message = response.content |
|||
|
|||
if isinstance(error, dict): |
|||
error = error.get(response.status_code, KeycloakOperationError) |
|||
else: |
|||
if response.status_code == 401: |
|||
error = KeycloakAuthenticationError |
|||
|
|||
raise error( |
|||
error_message=message, response_code=response.status_code, response_body=response.content |
|||
) |
4257
src/keycloak/asynchronous/keycloak_admin.py
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,786 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# The MIT License (MIT) |
|||
# |
|||
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com> |
|||
# |
|||
# Permission is hereby granted, free of charge, to any person obtaining a copy of |
|||
# this software and associated documentation files (the "Software"), to deal in |
|||
# the Software without restriction, including without limitation the rights to |
|||
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
|||
# the Software, and to permit persons to whom the Software is furnished to do so, |
|||
# subject to the following conditions: |
|||
# |
|||
# The above copyright notice and this permission notice shall be included in all |
|||
# copies or substantial portions of the Software. |
|||
# |
|||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
|||
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
|||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
|||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
|||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|||
|
|||
"""Keycloak OpenID module. |
|||
|
|||
The module contains mainly the implementation of KeycloakOpenID class, the main |
|||
class to handle authentication and token manipulation. |
|||
""" |
|||
|
|||
import json |
|||
from typing import Optional |
|||
|
|||
from jwcrypto import jwk, jwt |
|||
|
|||
from ..authorization import Authorization |
|||
from .connection import ConnectionManager |
|||
from .exceptions import ( |
|||
KeycloakAuthenticationError, |
|||
KeycloakAuthorizationConfigError, |
|||
KeycloakDeprecationError, |
|||
KeycloakGetError, |
|||
KeycloakInvalidTokenError, |
|||
KeycloakPostError, |
|||
KeycloakPutError, |
|||
KeycloakRPTNotFound, |
|||
raise_error_from_response, |
|||
) |
|||
from ..uma_permissions import AuthStatus, build_permission_param |
|||
from ..urls_patterns import ( |
|||
URL_AUTH, |
|||
URL_CERTS, |
|||
URL_CLIENT_REGISTRATION, |
|||
URL_CLIENT_UPDATE, |
|||
URL_DEVICE, |
|||
URL_ENTITLEMENT, |
|||
URL_INTROSPECT, |
|||
URL_LOGOUT, |
|||
URL_REALM, |
|||
URL_TOKEN, |
|||
URL_USERINFO, |
|||
URL_WELL_KNOWN, |
|||
) |
|||
|
|||
|
|||
class KeycloakOpenID: |
|||
"""Keycloak OpenID client. |
|||
|
|||
:param server_url: Keycloak server url |
|||
:param client_id: client id |
|||
:param realm_name: realm name |
|||
:param client_secret_key: client secret key |
|||
:param verify: Boolean value to enable or disable certificate validation or a string |
|||
containing a path to a CA bundle to use |
|||
:param custom_headers: dict of custom header to pass to each HTML request |
|||
:param proxies: dict of proxies to sent the request by. |
|||
:param timeout: connection timeout in seconds |
|||
""" |
|||
|
|||
def __init__( |
|||
self, |
|||
server_url, |
|||
realm_name, |
|||
client_id, |
|||
client_secret_key=None, |
|||
verify=True, |
|||
custom_headers=None, |
|||
proxies=None, |
|||
timeout=60, |
|||
): |
|||
"""Init method. |
|||
|
|||
:param server_url: Keycloak server url |
|||
:type server_url: str |
|||
:param client_id: client id |
|||
:type client_id: str |
|||
:param realm_name: realm name |
|||
:type realm_name: str |
|||
:param client_secret_key: client secret key |
|||
:type client_secret_key: str |
|||
:param verify: Boolean value to enable or disable certificate validation or a string |
|||
containing a path to a CA bundle to use |
|||
:type verify: Union[bool,str] |
|||
:param custom_headers: dict of custom header to pass to each HTML request |
|||
:type custom_headers: dict |
|||
:param proxies: dict of proxies to sent the request by. |
|||
:type proxies: dict |
|||
:param timeout: connection timeout in seconds |
|||
:type timeout: int |
|||
""" |
|||
self.client_id = client_id |
|||
self.client_secret_key = client_secret_key |
|||
self.realm_name = realm_name |
|||
headers = custom_headers if custom_headers is not None else dict() |
|||
self.connection = ConnectionManager( |
|||
base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies |
|||
) |
|||
|
|||
self.authorization = Authorization() |
|||
|
|||
@property |
|||
def client_id(self): |
|||
"""Get client id. |
|||
|
|||
:returns: Client id |
|||
:rtype: str |
|||
""" |
|||
return self._client_id |
|||
|
|||
@client_id.setter |
|||
def client_id(self, value): |
|||
self._client_id = value |
|||
|
|||
@property |
|||
def client_secret_key(self): |
|||
"""Get the client secret key. |
|||
|
|||
:returns: Client secret key |
|||
:rtype: str |
|||
""" |
|||
return self._client_secret_key |
|||
|
|||
@client_secret_key.setter |
|||
def client_secret_key(self, value): |
|||
self._client_secret_key = value |
|||
|
|||
@property |
|||
def realm_name(self): |
|||
"""Get the realm name. |
|||
|
|||
:returns: Realm name |
|||
:rtype: str |
|||
""" |
|||
return self._realm_name |
|||
|
|||
@realm_name.setter |
|||
def realm_name(self, value): |
|||
self._realm_name = value |
|||
|
|||
@property |
|||
def connection(self): |
|||
"""Get connection. |
|||
|
|||
:returns: Connection manager object |
|||
:rtype: ConnectionManager |
|||
""" |
|||
return self._connection |
|||
|
|||
@connection.setter |
|||
def connection(self, value): |
|||
self._connection = value |
|||
|
|||
@property |
|||
def authorization(self): |
|||
"""Get authorization. |
|||
|
|||
:returns: The authorization manager |
|||
:rtype: Authorization |
|||
""" |
|||
return self._authorization |
|||
|
|||
@authorization.setter |
|||
def authorization(self, value): |
|||
self._authorization = value |
|||
|
|||
def _add_secret_key(self, payload): |
|||
"""Add secret key if exists. |
|||
|
|||
:param payload: Payload |
|||
:type payload: dict |
|||
:returns: Payload with the secret key |
|||
:rtype: dict |
|||
""" |
|||
if self.client_secret_key: |
|||
payload.update({"client_secret": self.client_secret_key}) |
|||
|
|||
return payload |
|||
|
|||
def _build_name_role(self, role): |
|||
"""Build name of a role. |
|||
|
|||
:param role: Role name |
|||
:type role: str |
|||
:returns: Role path |
|||
:rtype: str |
|||
""" |
|||
return self.client_id + "/" + role |
|||
|
|||
def _token_info(self, token, method_token_info, **kwargs): |
|||
"""Getter for the token data. |
|||
|
|||
:param token: Token |
|||
:type token: str |
|||
:param method_token_info: Token info method to use |
|||
:type method_token_info: str |
|||
:param kwargs: Additional keyword arguments passed to the decode_token method |
|||
:type kwargs: dict |
|||
:returns: Token info |
|||
:rtype: dict |
|||
""" |
|||
if method_token_info == "introspect": |
|||
token_info = self.introspect(token) |
|||
else: |
|||
token_info = self.decode_token(token, **kwargs) |
|||
|
|||
return token_info |
|||
|
|||
def well_known(self): |
|||
"""Get the well_known object. |
|||
|
|||
The most important endpoint to understand is the well-known configuration |
|||
endpoint. It lists endpoints and other configuration options relevant to |
|||
the OpenID Connect implementation in Keycloak. |
|||
|
|||
:returns: It lists endpoints and other configuration options relevant |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
data_raw = self.connection.raw_get(URL_WELL_KNOWN.format(**params_path)) |
|||
return raise_error_from_response(data_raw, KeycloakGetError) |
|||
|
|||
def auth_url(self, redirect_uri, scope="email", state=""): |
|||
"""Get authorization URL endpoint. |
|||
|
|||
:param redirect_uri: Redirect url to receive oauth code |
|||
:type redirect_uri: str |
|||
:param scope: Scope of authorization request, split with the blank space |
|||
:type scope: str |
|||
:param state: State will be returned to the redirect_uri |
|||
:type state: str |
|||
:returns: Authorization URL Full Build |
|||
:rtype: str |
|||
""" |
|||
params_path = { |
|||
"authorization-endpoint": self.well_known()["authorization_endpoint"], |
|||
"client-id": self.client_id, |
|||
"redirect-uri": redirect_uri, |
|||
"scope": scope, |
|||
"state": state, |
|||
} |
|||
return URL_AUTH.format(**params_path) |
|||
|
|||
def token( |
|||
self, |
|||
username="", |
|||
password="", |
|||
grant_type=["password"], |
|||
code="", |
|||
redirect_uri="", |
|||
totp=None, |
|||
scope="openid", |
|||
**extra |
|||
): |
|||
"""Retrieve user token. |
|||
|
|||
The token endpoint is used to obtain tokens. Tokens can either be obtained by |
|||
exchanging an authorization code or by supplying credentials directly depending on |
|||
what flow is used. The token endpoint is also used to obtain new access tokens |
|||
when they expire. |
|||
|
|||
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint |
|||
|
|||
:param username: Username |
|||
:type username: str |
|||
:param password: Password |
|||
:type password: str |
|||
:param grant_type: Grant type |
|||
:type grant_type: str |
|||
:param code: Code |
|||
:type code: str |
|||
:param redirect_uri: Redirect URI |
|||
:type redirect_uri: str |
|||
:param totp: Time-based one-time password |
|||
:type totp: int |
|||
:param scope: Scope, defaults to openid |
|||
:type scope: str |
|||
:param extra: Additional extra arguments |
|||
:type extra: dict |
|||
:returns: Keycloak token |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
payload = { |
|||
"username": username, |
|||
"password": password, |
|||
"client_id": self.client_id, |
|||
"grant_type": grant_type, |
|||
"code": code, |
|||
"redirect_uri": redirect_uri, |
|||
"scope": scope, |
|||
} |
|||
if extra: |
|||
payload.update(extra) |
|||
|
|||
if totp: |
|||
payload["totp"] = totp |
|||
|
|||
payload = self._add_secret_key(payload) |
|||
data_raw = self.connection.sync_raw_post(URL_TOKEN.format(**params_path), data=payload) |
|||
print(data_raw) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def refresh_token(self, refresh_token, grant_type=["refresh_token"]): |
|||
"""Refresh the user token. |
|||
|
|||
The token endpoint is used to obtain tokens. Tokens can either be obtained by |
|||
exchanging an authorization code or by supplying credentials directly depending on |
|||
what flow is used. The token endpoint is also used to obtain new access tokens |
|||
when they expire. |
|||
|
|||
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint |
|||
|
|||
:param refresh_token: Refresh token from Keycloak |
|||
:type refresh_token: str |
|||
:param grant_type: Grant type |
|||
:type grant_type: str |
|||
:returns: New token |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
payload = { |
|||
"client_id": self.client_id, |
|||
"grant_type": grant_type, |
|||
"refresh_token": refresh_token, |
|||
} |
|||
payload = self._add_secret_key(payload) |
|||
data_raw = self.connection.sync_raw_post(URL_TOKEN.format(**params_path), data=payload) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def exchange_token( |
|||
self, |
|||
token: str, |
|||
audience: Optional[str] = None, |
|||
subject: Optional[str] = None, |
|||
subject_token_type: Optional[str] = None, |
|||
subject_issuer: Optional[str] = None, |
|||
requested_issuer: Optional[str] = None, |
|||
requested_token_type: str = "urn:ietf:params:oauth:token-type:refresh_token", |
|||
scope: str = "openid", |
|||
) -> dict: |
|||
"""Exchange user token. |
|||
|
|||
Use a token to obtain an entirely different token. See |
|||
https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange |
|||
|
|||
:param token: Access token |
|||
:type token: str |
|||
:param audience: Audience |
|||
:type audience: str |
|||
:param subject: Subject |
|||
:type subject: str |
|||
:param subject_token_type: Token Type specification |
|||
:type subject_token_type: Optional[str] |
|||
:param subject_issuer: Issuer |
|||
:type subject_issuer: Optional[str] |
|||
:param requested_issuer: Issuer |
|||
:type requested_issuer: Optional[str] |
|||
:param requested_token_type: Token type specification |
|||
:type requested_token_type: str |
|||
:param scope: Scope, defaults to openid |
|||
:type scope: str |
|||
:returns: Exchanged token |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
payload = { |
|||
"grant_type": ["urn:ietf:params:oauth:grant-type:token-exchange"], |
|||
"client_id": self.client_id, |
|||
"subject_token": token, |
|||
"subject_token_type": subject_token_type, |
|||
"subject_issuer": subject_issuer, |
|||
"requested_token_type": requested_token_type, |
|||
"audience": audience, |
|||
"requested_subject": subject, |
|||
"requested_issuer": requested_issuer, |
|||
"scope": scope, |
|||
} |
|||
payload = self._add_secret_key(payload) |
|||
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def userinfo(self, token): |
|||
"""Get the user info object. |
|||
|
|||
The userinfo endpoint returns standard claims about the authenticated user, |
|||
and is protected by a bearer token. |
|||
|
|||
http://openid.net/specs/openid-connect-core-1_0.html#UserInfo |
|||
|
|||
:param token: Access token |
|||
:type token: str |
|||
:returns: Userinfo object |
|||
:rtype: dict |
|||
""" |
|||
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|||
params_path = {"realm-name": self.realm_name} |
|||
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path)) |
|||
return raise_error_from_response(data_raw, KeycloakGetError) |
|||
|
|||
def logout(self, refresh_token): |
|||
"""Log out the authenticated user. |
|||
|
|||
:param refresh_token: Refresh token from Keycloak |
|||
:type refresh_token: str |
|||
:returns: Keycloak server response |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
payload = {"client_id": self.client_id, "refresh_token": refresh_token} |
|||
payload = self._add_secret_key(payload) |
|||
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload) |
|||
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) |
|||
|
|||
def certs(self): |
|||
"""Get certificates. |
|||
|
|||
The certificate endpoint returns the public keys enabled by the realm, encoded as a |
|||
JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled |
|||
for verifying tokens. |
|||
|
|||
https://tools.ietf.org/html/rfc7517 |
|||
|
|||
:returns: Certificates |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
data_raw = self.connection.raw_get(URL_CERTS.format(**params_path)) |
|||
return raise_error_from_response(data_raw, KeycloakGetError) |
|||
|
|||
def public_key(self): |
|||
"""Retrieve the public key. |
|||
|
|||
The public key is exposed by the realm page directly. |
|||
|
|||
:returns: The public key |
|||
:rtype: str |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
data_raw = self.connection.raw_get(URL_REALM.format(**params_path)) |
|||
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] |
|||
|
|||
def entitlement(self, token, resource_server_id): |
|||
"""Get entitlements from the token. |
|||
|
|||
Client applications can use a specific endpoint to obtain a special security token |
|||
called a requesting party token (RPT). This token consists of all the entitlements |
|||
(or permissions) for a user as a result of the evaluation of the permissions and |
|||
authorization policies associated with the resources being requested. With an RPT, |
|||
client applications can gain access to protected resources at the resource server. |
|||
|
|||
:param token: Access token |
|||
:type token: str |
|||
:param resource_server_id: Resource server ID |
|||
:type resource_server_id: str |
|||
:returns: Entitlements |
|||
:rtype: dict |
|||
""" |
|||
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|||
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id} |
|||
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path)) |
|||
|
|||
if data_raw.status_code == 404 or data_raw.status_code == 405: |
|||
return raise_error_from_response(data_raw, KeycloakDeprecationError) |
|||
|
|||
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover |
|||
|
|||
def introspect(self, token, rpt=None, token_type_hint=None): |
|||
"""Introspect the user token. |
|||
|
|||
The introspection endpoint is used to retrieve the active state of a token. |
|||
It is can only be invoked by confidential clients. |
|||
|
|||
https://tools.ietf.org/html/rfc7662 |
|||
|
|||
:param token: Access token |
|||
:type token: str |
|||
:param rpt: Requesting party token |
|||
:type rpt: str |
|||
:param token_type_hint: Token type hint |
|||
:type token_type_hint: str |
|||
|
|||
:returns: Token info |
|||
:rtype: dict |
|||
:raises KeycloakRPTNotFound: In case of RPT not specified |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
payload = {"client_id": self.client_id, "token": token} |
|||
|
|||
if token_type_hint == "requesting_party_token": |
|||
if rpt: |
|||
payload.update({"token": rpt, "token_type_hint": token_type_hint}) |
|||
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|||
else: |
|||
raise KeycloakRPTNotFound("Can't found RPT.") |
|||
|
|||
payload = self._add_secret_key(payload) |
|||
|
|||
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def decode_token(self, token, validate: bool = True, **kwargs): |
|||
"""Decode user token. |
|||
|
|||
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data |
|||
structure that represents a cryptographic key. This specification |
|||
also defines a JWK Set JSON data structure that represents a set of |
|||
JWKs. Cryptographic algorithms and identifiers for use with this |
|||
specification are described in the separate JSON Web Algorithms (JWA) |
|||
specification and IANA registries established by that specification. |
|||
|
|||
https://tools.ietf.org/html/rfc7517 |
|||
|
|||
:param token: Keycloak token |
|||
:type token: str |
|||
:param validate: Determines whether the token should be validated with the public key. |
|||
Defaults to True. |
|||
:type validate: bool |
|||
:param kwargs: Additional keyword arguments for jwcrypto's JWT object |
|||
:type kwargs: dict |
|||
:returns: Decoded token |
|||
:rtype: dict |
|||
""" |
|||
if validate: |
|||
if "key" not in kwargs: |
|||
key = ( |
|||
"-----BEGIN PUBLIC KEY-----\n" |
|||
+ self.public_key() |
|||
+ "\n-----END PUBLIC KEY-----" |
|||
) |
|||
key = jwk.JWK.from_pem(key.encode("utf-8")) |
|||
kwargs["key"] = key |
|||
|
|||
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|||
return jwt.json_decode(full_jwt.claims) |
|||
else: |
|||
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|||
full_jwt.token.objects["valid"] = True |
|||
return json.loads(full_jwt.token.payload.decode("utf-8")) |
|||
|
|||
def load_authorization_config(self, path): |
|||
"""Load Keycloak settings (authorization). |
|||
|
|||
:param path: settings file (json) |
|||
:type path: str |
|||
""" |
|||
with open(path, "r") as fp: |
|||
authorization_json = json.load(fp) |
|||
|
|||
self.authorization.load_config(authorization_json) |
|||
|
|||
def get_policies(self, token, method_token_info="introspect", **kwargs): |
|||
"""Get policies by user token. |
|||
|
|||
:param token: User token |
|||
:type token: str |
|||
:param method_token_info: Method for token info decoding |
|||
:type method_token_info: str |
|||
:param kwargs: Additional keyword arguments |
|||
:type kwargs: dict |
|||
:return: Policies |
|||
:rtype: dict |
|||
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration |
|||
:raises KeycloakInvalidTokenError: In case of bad token |
|||
""" |
|||
if not self.authorization.policies: |
|||
raise KeycloakAuthorizationConfigError( |
|||
"Keycloak settings not found. Load Authorization Keycloak settings." |
|||
) |
|||
|
|||
token_info = self._token_info(token, method_token_info, **kwargs) |
|||
|
|||
if method_token_info == "introspect" and not token_info["active"]: |
|||
raise KeycloakInvalidTokenError("Token expired or invalid.") |
|||
|
|||
user_resources = token_info["resource_access"].get(self.client_id) |
|||
|
|||
if not user_resources: |
|||
return None |
|||
|
|||
policies = [] |
|||
|
|||
for policy_name, policy in self.authorization.policies.items(): |
|||
for role in user_resources["roles"]: |
|||
if self._build_name_role(role) in policy.roles: |
|||
policies.append(policy) |
|||
|
|||
return list(set(policies)) |
|||
|
|||
def get_permissions(self, token, method_token_info="introspect", **kwargs): |
|||
"""Get permission by user token. |
|||
|
|||
:param token: user token |
|||
:type token: str |
|||
:param method_token_info: Decode token method |
|||
:type method_token_info: str |
|||
:param kwargs: parameters for decode |
|||
:type kwargs: dict |
|||
:returns: permissions list |
|||
:rtype: list |
|||
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration |
|||
:raises KeycloakInvalidTokenError: In case of bad token |
|||
""" |
|||
if not self.authorization.policies: |
|||
raise KeycloakAuthorizationConfigError( |
|||
"Keycloak settings not found. Load Authorization Keycloak settings." |
|||
) |
|||
|
|||
token_info = self._token_info(token, method_token_info, **kwargs) |
|||
|
|||
if method_token_info == "introspect" and not token_info["active"]: |
|||
raise KeycloakInvalidTokenError("Token expired or invalid.") |
|||
|
|||
user_resources = token_info["resource_access"].get(self.client_id) |
|||
|
|||
if not user_resources: |
|||
return None |
|||
|
|||
permissions = [] |
|||
|
|||
for policy_name, policy in self.authorization.policies.items(): |
|||
for role in user_resources["roles"]: |
|||
if self._build_name_role(role) in policy.roles: |
|||
permissions += policy.permissions |
|||
|
|||
return list(set(permissions)) |
|||
|
|||
def uma_permissions(self, token, permissions=""): |
|||
"""Get UMA permissions by user token with requested permissions. |
|||
|
|||
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be |
|||
invoked by confidential clients. |
|||
|
|||
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint |
|||
|
|||
:param token: user token |
|||
:type token: str |
|||
:param permissions: list of uma permissions list(resource:scope) requested by the user |
|||
:type permissions: str |
|||
:returns: Keycloak server response |
|||
:rtype: dict |
|||
""" |
|||
permission = build_permission_param(permissions) |
|||
|
|||
params_path = {"realm-name": self.realm_name} |
|||
payload = { |
|||
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", |
|||
"permission": permission, |
|||
"response_mode": "permissions", |
|||
"audience": self.client_id, |
|||
} |
|||
|
|||
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|||
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def has_uma_access(self, token, permissions): |
|||
"""Determine whether user has uma permissions with specified user token. |
|||
|
|||
:param token: user token |
|||
:type token: str |
|||
:param permissions: list of uma permissions (resource:scope) |
|||
:type permissions: str |
|||
:return: Authentication status |
|||
:rtype: AuthStatus |
|||
:raises KeycloakAuthenticationError: In case of failed authentication |
|||
:raises KeycloakPostError: In case of failed request to Keycloak |
|||
""" |
|||
needed = build_permission_param(permissions) |
|||
try: |
|||
granted = self.uma_permissions(token, permissions) |
|||
except (KeycloakPostError, KeycloakAuthenticationError) as e: |
|||
if e.response_code == 403: # pragma: no cover |
|||
return AuthStatus( |
|||
is_logged_in=True, is_authorized=False, missing_permissions=needed |
|||
) |
|||
elif e.response_code == 401: |
|||
return AuthStatus( |
|||
is_logged_in=False, is_authorized=False, missing_permissions=needed |
|||
) |
|||
raise |
|||
|
|||
for resource_struct in granted: |
|||
resource = resource_struct["rsname"] |
|||
scopes = resource_struct.get("scopes", None) |
|||
if not scopes: |
|||
needed.discard(resource) |
|||
continue |
|||
for scope in scopes: # pragma: no cover |
|||
needed.discard("{}#{}".format(resource, scope)) |
|||
|
|||
return AuthStatus( |
|||
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed |
|||
) |
|||
|
|||
def register_client(self, token: str, payload: dict): |
|||
"""Create a client. |
|||
|
|||
ClientRepresentation: |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation |
|||
|
|||
:param token: Initial access token |
|||
:type token: str |
|||
:param payload: ClientRepresentation |
|||
:type payload: dict |
|||
:return: Client Representation |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|||
self.connection.add_param_headers("Content-Type", "application/json") |
|||
data_raw = self.connection.raw_post( |
|||
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload) |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def device(self): |
|||
"""Get device authorization grant. |
|||
|
|||
The device endpoint is used to obtain a user code verification and user authentication. |
|||
The response contains a device_code, user_code, verification_uri, |
|||
verification_uri_complete, expires_in (lifetime in seconds for device_code |
|||
and user_code), and polling interval. |
|||
Users can either follow the verification_uri and enter the user_code or |
|||
follow the verification_uri_complete. |
|||
After authenticating with valid credentials, users can obtain tokens using the |
|||
"urn:ietf:params:oauth:grant-type:device_code" grant_type and the device_code. |
|||
|
|||
https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow |
|||
https://github.com/keycloak/keycloak-community/blob/main/design/oauth2-device-authorization-grant.md#how-to-try-it |
|||
|
|||
:returns: Device Authorization Response |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name} |
|||
payload = {"client_id": self.client_id} |
|||
|
|||
payload = self._add_secret_key(payload) |
|||
data_raw = self.connection.raw_post(URL_DEVICE.format(**params_path), data=payload) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def update_client(self, token: str, client_id: str, payload: dict): |
|||
"""Update a client. |
|||
|
|||
ClientRepresentation: |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation |
|||
|
|||
:param token: registration access token |
|||
:type token: str |
|||
:param client_id: Keycloak client id |
|||
:type client_id: str |
|||
:param payload: ClientRepresentation |
|||
:type payload: dict |
|||
:return: Client Representation |
|||
:rtype: dict |
|||
""" |
|||
params_path = {"realm-name": self.realm_name, "client-id": client_id} |
|||
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|||
self.connection.add_param_headers("Content-Type", "application/json") |
|||
|
|||
# Keycloak complains if the clientId is not set in the payload |
|||
if "clientId" not in payload: |
|||
payload["clientId"] = client_id |
|||
|
|||
data_raw = self.connection.raw_put( |
|||
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload) |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakPutError) |
@ -0,0 +1,417 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# The MIT License (MIT) |
|||
# |
|||
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com> |
|||
# |
|||
# Permission is hereby granted, free of charge, to any person obtaining a copy of |
|||
# this software and associated documentation files (the "Software"), to deal in |
|||
# the Software without restriction, including without limitation the rights to |
|||
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
|||
# the Software, and to permit persons to whom the Software is furnished to do so, |
|||
# subject to the following conditions: |
|||
# |
|||
# The above copyright notice and this permission notice shall be included in all |
|||
# copies or substantial portions of the Software. |
|||
# |
|||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
|||
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
|||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
|||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
|||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|||
|
|||
"""Keycloak UMA module. |
|||
|
|||
The module contains a UMA compatible client for keycloak: |
|||
https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html |
|||
""" |
|||
import json |
|||
from typing import Iterable |
|||
from urllib.parse import quote_plus |
|||
|
|||
from .connection import ConnectionManager |
|||
from .exceptions import ( |
|||
KeycloakDeleteError, |
|||
KeycloakGetError, |
|||
KeycloakPostError, |
|||
KeycloakPutError, |
|||
raise_error_from_response, |
|||
) |
|||
from .openid_connection import KeycloakOpenIDConnection |
|||
from .uma_permissions import UMAPermission |
|||
from .urls_patterns import URL_UMA_WELL_KNOWN |
|||
|
|||
|
|||
class KeycloakUMA: |
|||
"""Keycloak UMA client. |
|||
|
|||
:param connection: OpenID connection manager |
|||
""" |
|||
|
|||
def __init__(self, connection: KeycloakOpenIDConnection): |
|||
"""Init method. |
|||
|
|||
:param connection: OpenID connection manager |
|||
:type connection: KeycloakOpenIDConnection |
|||
""" |
|||
self.connection = connection |
|||
custom_headers = self.connection.custom_headers or {} |
|||
custom_headers.update({"Content-Type": "application/json"}) |
|||
self.connection.custom_headers = custom_headers |
|||
self._well_known = None |
|||
|
|||
def _fetch_well_known(self): |
|||
params_path = {"realm-name": self.connection.realm_name} |
|||
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path)) |
|||
return raise_error_from_response(data_raw, KeycloakGetError) |
|||
|
|||
@staticmethod |
|||
def format_url(url, **kwargs): |
|||
"""Substitute url path parameters. |
|||
|
|||
Given a parameterized url string, returns the string after url encoding and substituting |
|||
the given params. For example, |
|||
`format_url("https://myserver/{my_resource}/{id}", my_resource="hello world", id="myid")` |
|||
would produce `https://myserver/hello+world/myid`. |
|||
|
|||
:param url: url string to format |
|||
:type url: str |
|||
:param kwargs: dict containing kwargs to substitute |
|||
:type kwargs: dict |
|||
:return: formatted string |
|||
:rtype: str |
|||
""" |
|||
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()}) |
|||
|
|||
@property |
|||
def uma_well_known(self): |
|||
"""Get the well_known UMA2 config. |
|||
|
|||
:returns: It lists endpoints and other configuration options relevant |
|||
:rtype: dict |
|||
""" |
|||
# per instance cache |
|||
if not self._well_known: |
|||
self._well_known = self._fetch_well_known() |
|||
return self._well_known |
|||
|
|||
def resource_set_create(self, payload): |
|||
"""Create a resource set. |
|||
|
|||
Spec |
|||
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1 |
|||
|
|||
ResourceRepresentation |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation |
|||
|
|||
:param payload: ResourceRepresentation |
|||
:type payload: dict |
|||
:return: ResourceRepresentation with the _id property assigned |
|||
:rtype: dict |
|||
""" |
|||
data_raw = self.connection.raw_post( |
|||
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload) |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) |
|||
|
|||
def resource_set_update(self, resource_id, payload): |
|||
"""Update a resource set. |
|||
|
|||
Spec |
|||
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set |
|||
|
|||
ResourceRepresentation |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation |
|||
|
|||
:param resource_id: id of the resource |
|||
:type resource_id: str |
|||
:param payload: ResourceRepresentation |
|||
:type payload: dict |
|||
:return: Response dict (empty) |
|||
:rtype: dict |
|||
""" |
|||
url = self.format_url( |
|||
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id |
|||
) |
|||
data_raw = self.connection.raw_put(url, data=json.dumps(payload)) |
|||
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) |
|||
|
|||
def resource_set_read(self, resource_id): |
|||
"""Read a resource set. |
|||
|
|||
Spec |
|||
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set |
|||
|
|||
ResourceRepresentation |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation |
|||
|
|||
:param resource_id: id of the resource |
|||
:type resource_id: str |
|||
:return: ResourceRepresentation |
|||
:rtype: dict |
|||
""" |
|||
url = self.format_url( |
|||
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id |
|||
) |
|||
data_raw = self.connection.raw_get(url) |
|||
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) |
|||
|
|||
def resource_set_delete(self, resource_id): |
|||
"""Delete a resource set. |
|||
|
|||
Spec |
|||
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set |
|||
|
|||
:param resource_id: id of the resource |
|||
:type resource_id: str |
|||
:return: Response dict (empty) |
|||
:rtype: dict |
|||
""" |
|||
url = self.format_url( |
|||
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id |
|||
) |
|||
data_raw = self.connection.raw_delete(url) |
|||
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) |
|||
|
|||
def resource_set_list_ids( |
|||
self, |
|||
name: str = "", |
|||
exact_name: bool = False, |
|||
uri: str = "", |
|||
owner: str = "", |
|||
resource_type: str = "", |
|||
scope: str = "", |
|||
first: int = 0, |
|||
maximum: int = -1, |
|||
): |
|||
"""Query for list of resource set ids. |
|||
|
|||
Spec |
|||
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets |
|||
|
|||
:param name: query resource name |
|||
:type name: str |
|||
:param exact_name: query exact match for resource name |
|||
:type exact_name: bool |
|||
:param uri: query resource uri |
|||
:type uri: str |
|||
:param owner: query resource owner |
|||
:type owner: str |
|||
:param resource_type: query resource type |
|||
:type resource_type: str |
|||
:param scope: query resource scope |
|||
:type scope: str |
|||
:param first: index of first matching resource to return |
|||
:type first: int |
|||
:param maximum: maximum number of resources to return (-1 for all) |
|||
:type maximum: int |
|||
:return: List of ids |
|||
:rtype: List[str] |
|||
""" |
|||
query = dict() |
|||
if name: |
|||
query["name"] = name |
|||
if exact_name: |
|||
query["exactName"] = "true" |
|||
if uri: |
|||
query["uri"] = uri |
|||
if owner: |
|||
query["owner"] = owner |
|||
if resource_type: |
|||
query["type"] = resource_type |
|||
if scope: |
|||
query["scope"] = scope |
|||
if first > 0: |
|||
query["first"] = first |
|||
if maximum >= 0: |
|||
query["max"] = maximum |
|||
|
|||
data_raw = self.connection.raw_get( |
|||
self.uma_well_known["resource_registration_endpoint"], **query |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) |
|||
|
|||
def resource_set_list(self): |
|||
"""List all resource sets. |
|||
|
|||
Spec |
|||
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets |
|||
|
|||
ResourceRepresentation |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation |
|||
|
|||
:yields: Iterator over a list of ResourceRepresentations |
|||
:rtype: Iterator[dict] |
|||
""" |
|||
for resource_id in self.resource_set_list_ids(): |
|||
resource = self.resource_set_read(resource_id) |
|||
yield resource |
|||
|
|||
def permission_ticket_create(self, permissions: Iterable[UMAPermission]): |
|||
"""Create a permission ticket. |
|||
|
|||
:param permissions: Iterable of uma permissions to validate the token against |
|||
:type permissions: Iterable[UMAPermission] |
|||
:returns: Keycloak decision |
|||
:rtype: boolean |
|||
:raises KeycloakPostError: In case permission resource not found |
|||
""" |
|||
resources = dict() |
|||
for permission in permissions: |
|||
resource_id = getattr(permission, "resource_id", None) |
|||
|
|||
if resource_id is None: |
|||
resource_ids = self.resource_set_list_ids( |
|||
exact_name=True, name=permission.resource, first=0, maximum=1 |
|||
) |
|||
|
|||
if not resource_ids: |
|||
raise KeycloakPostError("Invalid resource specified") |
|||
|
|||
setattr(permission, "resource_id", resource_ids[0]) |
|||
|
|||
resources.setdefault(resource_id, set()) |
|||
if permission.scope: |
|||
resources[resource_id].add(permission.scope) |
|||
|
|||
payload = [ |
|||
{"resource_id": resource_id, "resource_scopes": list(scopes)} |
|||
for resource_id, scopes in resources.items() |
|||
] |
|||
|
|||
data_raw = self.connection.raw_post( |
|||
self.uma_well_known["permission_endpoint"], data=json.dumps(payload) |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def permissions_check(self, token, permissions: Iterable[UMAPermission]): |
|||
"""Check UMA permissions by user token with requested permissions. |
|||
|
|||
The token endpoint is used to check UMA permissions from Keycloak. It can only be |
|||
invoked by confidential clients. |
|||
|
|||
https://www.keycloak.org/docs/latest/authorization_services/#_service_authorization_api |
|||
|
|||
:param token: user token |
|||
:type token: str |
|||
:param permissions: Iterable of uma permissions to validate the token against |
|||
:type permissions: Iterable[UMAPermission] |
|||
:returns: Keycloak decision |
|||
:rtype: boolean |
|||
""" |
|||
payload = { |
|||
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", |
|||
"permission": ",".join(str(permission) for permission in permissions), |
|||
"response_mode": "decision", |
|||
"audience": self.connection.client_id, |
|||
} |
|||
|
|||
# Everyone always has the null set of permissions |
|||
# However keycloak cannot evaluate the null set |
|||
if len(payload["permission"]) == 0: |
|||
return True |
|||
|
|||
connection = ConnectionManager(self.connection.base_url) |
|||
connection.add_param_headers("Authorization", "Bearer " + token) |
|||
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|||
data_raw = connection.raw_post(self.uma_well_known["token_endpoint"], data=payload) |
|||
try: |
|||
data = raise_error_from_response(data_raw, KeycloakPostError) |
|||
except KeycloakPostError: |
|||
return False |
|||
return data.get("result", False) |
|||
|
|||
def policy_resource_create(self, resource_id, payload): |
|||
"""Create permission policy for resource. |
|||
|
|||
Supports name, description, scopes, roles, groups, clients |
|||
|
|||
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource |
|||
|
|||
:param resource_id: _id of resource |
|||
:type resource_id: str |
|||
:param payload: permission configuration |
|||
:type payload: dict |
|||
:return: PermissionRepresentation |
|||
:rtype: dict |
|||
""" |
|||
data_raw = self.connection.raw_post( |
|||
self.uma_well_known["policy_endpoint"] + f"/{resource_id}", data=json.dumps(payload) |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakPostError) |
|||
|
|||
def policy_update(self, policy_id, payload): |
|||
"""Update permission policy. |
|||
|
|||
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation |
|||
|
|||
:param policy_id: id of policy permission |
|||
:type policy_id: str |
|||
:param payload: policy permission configuration |
|||
:type payload: dict |
|||
:return: PermissionRepresentation |
|||
:rtype: dict |
|||
""" |
|||
data_raw = self.connection.raw_put( |
|||
self.uma_well_known["policy_endpoint"] + f"/{policy_id}", data=json.dumps(payload) |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakPutError) |
|||
|
|||
def policy_delete(self, policy_id): |
|||
"""Delete permission policy. |
|||
|
|||
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission |
|||
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation |
|||
|
|||
:param policy_id: id of permission policy |
|||
:type policy_id: str |
|||
:return: PermissionRepresentation |
|||
:rtype: dict |
|||
""" |
|||
data_raw = self.connection.raw_delete( |
|||
self.uma_well_known["policy_endpoint"] + f"/{policy_id}" |
|||
) |
|||
return raise_error_from_response(data_raw, KeycloakDeleteError) |
|||
|
|||
def policy_query( |
|||
self, |
|||
resource: str = "", |
|||
name: str = "", |
|||
scope: str = "", |
|||
first: int = 0, |
|||
maximum: int = -1, |
|||
): |
|||
"""Query permission policies. |
|||
|
|||
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission |
|||
|
|||
:param resource: query resource id |
|||
:type resource: str |
|||
:param name: query resource name |
|||
:type name: str |
|||
:param scope: query resource scope |
|||
:type scope: str |
|||
:param first: index of first matching resource to return |
|||
:type first: int |
|||
:param maximum: maximum number of resources to return (-1 for all) |
|||
:type maximum: int |
|||
:return: List of ids |
|||
:return: List of ids |
|||
:rtype: List[str] |
|||
""" |
|||
query = dict() |
|||
if name: |
|||
query["name"] = name |
|||
if resource: |
|||
query["resource"] = resource |
|||
if scope: |
|||
query["scope"] = scope |
|||
if first > 0: |
|||
query["first"] = first |
|||
if maximum >= 0: |
|||
query["max"] = maximum |
|||
|
|||
data_raw = self.connection.raw_get(self.uma_well_known["policy_endpoint"], **query) |
|||
return raise_error_from_response(data_raw, KeycloakGetError) |
@ -0,0 +1,420 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# The MIT License (MIT) |
|||
# |
|||
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com> |
|||
# |
|||
# Permission is hereby granted, free of charge, to any person obtaining a copy of |
|||
# this software and associated documentation files (the "Software"), to deal in |
|||
# the Software without restriction, including without limitation the rights to |
|||
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
|||
# the Software, and to permit persons to whom the Software is furnished to do so, |
|||
# subject to the following conditions: |
|||
# |
|||
# The above copyright notice and this permission notice shall be included in all |
|||
# copies or substantial portions of the Software. |
|||
# |
|||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
|||
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
|||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
|||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
|||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|||
|
|||
"""Keycloak OpenID Connection Manager module. |
|||
|
|||
The module contains mainly the implementation of KeycloakOpenIDConnection class. |
|||
This is an extension of the ConnectionManager class, and handles the automatic refresh |
|||
of openid tokens when required. |
|||
""" |
|||
|
|||
from datetime import datetime, timedelta |
|||
|
|||
from .connection import ConnectionManager |
|||
from keycloak import KeycloakPostError |
|||
from .keycloak_openid import KeycloakOpenID |
|||
|
|||
|
|||
class KeycloakOpenIDConnection(ConnectionManager): |
|||
"""A class to help with OpenID connections which can auto refresh tokens. |
|||
|
|||
:param object: _description_ |
|||
:type object: _type_ |
|||
""" |
|||
|
|||
_server_url = None |
|||
_username = None |
|||
_password = None |
|||
_totp = None |
|||
_realm_name = None |
|||
_client_id = None |
|||
_verify = None |
|||
_client_secret_key = None |
|||
_connection = None |
|||
_custom_headers = None |
|||
_user_realm_name = None |
|||
_expires_at = None |
|||
_keycloak_openid = None |
|||
|
|||
def __init__( |
|||
self, |
|||
server_url, |
|||
username=None, |
|||
password=None, |
|||
token=None, |
|||
totp=None, |
|||
realm_name="master", |
|||
client_id="admin-cli", |
|||
verify=True, |
|||
client_secret_key=None, |
|||
custom_headers=None, |
|||
user_realm_name=None, |
|||
timeout=60, |
|||
): |
|||
"""Init method. |
|||
|
|||
:param server_url: Keycloak server url |
|||
:type server_url: str |
|||
:param username: admin username |
|||
:type username: str |
|||
:param password: admin password |
|||
:type password: str |
|||
:param token: access and refresh tokens |
|||
:type token: dict |
|||
:param totp: Time based OTP |
|||
:type totp: str |
|||
:param realm_name: realm name |
|||
:type realm_name: str |
|||
:param client_id: client id |
|||
:type client_id: str |
|||
:param verify: Boolean value to enable or disable certificate validation or a string |
|||
containing a path to a CA bundle to use |
|||
:type verify: Union[bool,str] |
|||
:param client_secret_key: client secret key |
|||
(optional, required only for access type confidential) |
|||
:type client_secret_key: str |
|||
:param custom_headers: dict of custom header to pass to each HTML request |
|||
:type custom_headers: dict |
|||
:param user_realm_name: The realm name of the user, if different from realm_name |
|||
:type user_realm_name: str |
|||
:param timeout: connection timeout in seconds |
|||
:type timeout: int |
|||
""" |
|||
# token is renewed when it hits 90% of its lifetime. This is to account for any possible |
|||
# clock skew. |
|||
self.token_lifetime_fraction = 0.9 |
|||
self.server_url = server_url |
|||
self.username = username |
|||
self.password = password |
|||
self.token = token |
|||
self.totp = totp |
|||
self.realm_name = realm_name |
|||
self.client_id = client_id |
|||
self.verify = verify |
|||
self.client_secret_key = client_secret_key |
|||
self.user_realm_name = user_realm_name |
|||
self.timeout = timeout |
|||
self.headers = {} |
|||
self.custom_headers = custom_headers |
|||
|
|||
super().__init__( |
|||
base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify |
|||
) |
|||
if self.token is None: |
|||
self.get_token() |
|||
|
|||
if self.token is not None: |
|||
self.headers = { |
|||
**self.headers, |
|||
"Authorization": "Bearer " + self.token.get("access_token"), |
|||
"Content-Type": "application/json", |
|||
} |
|||
|
|||
|
|||
@property |
|||
def server_url(self): |
|||
"""Get server url. |
|||
|
|||
:returns: Keycloak server url |
|||
:rtype: str |
|||
""" |
|||
return self.base_url |
|||
|
|||
@server_url.setter |
|||
def server_url(self, value): |
|||
self.base_url = value |
|||
|
|||
@property |
|||
def realm_name(self): |
|||
"""Get realm name. |
|||
|
|||
:returns: Realm name |
|||
:rtype: str |
|||
""" |
|||
return self._realm_name |
|||
|
|||
@realm_name.setter |
|||
def realm_name(self, value): |
|||
self._realm_name = value |
|||
|
|||
@property |
|||
def client_id(self): |
|||
"""Get client id. |
|||
|
|||
:returns: Client id |
|||
:rtype: str |
|||
""" |
|||
return self._client_id |
|||
|
|||
@client_id.setter |
|||
def client_id(self, value): |
|||
self._client_id = value |
|||
|
|||
@property |
|||
def client_secret_key(self): |
|||
"""Get client secret key. |
|||
|
|||
:returns: Client secret key |
|||
:rtype: str |
|||
""" |
|||
return self._client_secret_key |
|||
|
|||
@client_secret_key.setter |
|||
def client_secret_key(self, value): |
|||
self._client_secret_key = value |
|||
|
|||
@property |
|||
def username(self): |
|||
"""Get username. |
|||
|
|||
:returns: Admin username |
|||
:rtype: str |
|||
""" |
|||
return self._username |
|||
|
|||
@username.setter |
|||
def username(self, value): |
|||
self._username = value |
|||
|
|||
@property |
|||
def password(self): |
|||
"""Get password. |
|||
|
|||
:returns: Admin password |
|||
:rtype: str |
|||
""" |
|||
return self._password |
|||
|
|||
@password.setter |
|||
def password(self, value): |
|||
self._password = value |
|||
|
|||
@property |
|||
def totp(self): |
|||
"""Get totp. |
|||
|
|||
:returns: TOTP |
|||
:rtype: str |
|||
""" |
|||
return self._totp |
|||
|
|||
@totp.setter |
|||
def totp(self, value): |
|||
self._totp = value |
|||
|
|||
@property |
|||
def token(self): |
|||
"""Get token. |
|||
|
|||
:returns: Access and refresh token |
|||
:rtype: dict |
|||
""" |
|||
return self._token |
|||
|
|||
@token.setter |
|||
def token(self, value): |
|||
self._token = value |
|||
self._expires_at = datetime.now() + timedelta( |
|||
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0) |
|||
) |
|||
|
|||
@property |
|||
def expires_at(self): |
|||
"""Get token expiry time. |
|||
|
|||
:returns: Datetime at which the current token will expire |
|||
:rtype: datetime |
|||
""" |
|||
return self._expires_at |
|||
|
|||
@property |
|||
def user_realm_name(self): |
|||
"""Get user realm name. |
|||
|
|||
:returns: User realm name |
|||
:rtype: str |
|||
""" |
|||
return self._user_realm_name |
|||
|
|||
@user_realm_name.setter |
|||
def user_realm_name(self, value): |
|||
self._user_realm_name = value |
|||
|
|||
@property |
|||
def custom_headers(self): |
|||
"""Get custom headers. |
|||
|
|||
:returns: Custom headers |
|||
:rtype: dict |
|||
""" |
|||
return self._custom_headers |
|||
|
|||
@custom_headers.setter |
|||
def custom_headers(self, value): |
|||
self._custom_headers = value |
|||
if self.custom_headers is not None: |
|||
# merge custom headers to main headers |
|||
self.headers.update(self.custom_headers) |
|||
|
|||
@property |
|||
def keycloak_openid(self) -> KeycloakOpenID: |
|||
"""Get the KeycloakOpenID object. |
|||
|
|||
The KeycloakOpenID is used to refresh tokens |
|||
|
|||
:returns: KeycloakOpenID |
|||
:rtype: KeycloakOpenID |
|||
""" |
|||
if self._keycloak_openid is None: |
|||
if self.user_realm_name: |
|||
token_realm_name = self.user_realm_name |
|||
elif self.realm_name: |
|||
token_realm_name = self.realm_name |
|||
else: |
|||
token_realm_name = "master" |
|||
|
|||
self._keycloak_openid = KeycloakOpenID( |
|||
server_url=self.server_url, |
|||
client_id=self.client_id, |
|||
realm_name=token_realm_name, |
|||
verify=self.verify, |
|||
client_secret_key=self.client_secret_key, |
|||
timeout=self.timeout, |
|||
custom_headers=self.custom_headers, |
|||
) |
|||
|
|||
return self._keycloak_openid |
|||
|
|||
def get_token(self): |
|||
"""Get admin token. |
|||
|
|||
The admin token is then set in the `token` attribute. |
|||
""" |
|||
grant_type = [] |
|||
if self.username and self.password: |
|||
grant_type.append("password") |
|||
elif self.client_secret_key: |
|||
grant_type.append("client_credentials") |
|||
|
|||
if grant_type: |
|||
self.token = self.keycloak_openid.token( |
|||
self.username, self.password, grant_type=grant_type, totp=self.totp |
|||
) |
|||
else: |
|||
self.token = {} |
|||
|
|||
def refresh_token(self): |
|||
"""Refresh the token. |
|||
|
|||
:raises KeycloakPostError: In case the refresh token request failed. |
|||
""" |
|||
refresh_token = self.token.get("refresh_token", None) if self.token else None |
|||
if refresh_token is None: |
|||
self.get_token() |
|||
else: |
|||
try: |
|||
self.token = self.keycloak_openid.refresh_token(refresh_token) |
|||
except KeycloakPostError as e: |
|||
list_errors = [ |
|||
b"Refresh token expired", |
|||
b"Token is not active", |
|||
b"Session not active", |
|||
] |
|||
if e.response_code == 400 and any(err in e.response_body for err in list_errors): |
|||
self.get_token() |
|||
else: |
|||
raise |
|||
|
|||
self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token")) |
|||
|
|||
def _refresh_if_required(self): |
|||
if datetime.now() >= self.expires_at: |
|||
self.refresh_token() |
|||
|
|||
def raw_get(self, *args, **kwargs): |
|||
"""Call connection.raw_get. |
|||
|
|||
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token |
|||
and try *get* once more. |
|||
|
|||
:param args: Additional arguments |
|||
:type args: tuple |
|||
:param kwargs: Additional keyword arguments |
|||
:type kwargs: dict |
|||
:returns: Response |
|||
:rtype: Response |
|||
""" |
|||
self._refresh_if_required() |
|||
r = super().raw_get(*args, **kwargs) |
|||
return r |
|||
|
|||
def raw_post(self, *args, **kwargs): |
|||
"""Call connection.raw_post. |
|||
|
|||
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token |
|||
and try *post* once more. |
|||
|
|||
:param args: Additional arguments |
|||
:type args: tuple |
|||
:param kwargs: Additional keyword arguments |
|||
:type kwargs: dict |
|||
:returns: Response |
|||
:rtype: Response |
|||
""" |
|||
self._refresh_if_required() |
|||
r = super().raw_post(*args, **kwargs) |
|||
return r |
|||
|
|||
def raw_put(self, *args, **kwargs): |
|||
"""Call connection.raw_put. |
|||
|
|||
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token |
|||
and try *put* once more. |
|||
|
|||
:param args: Additional arguments |
|||
:type args: tuple |
|||
:param kwargs: Additional keyword arguments |
|||
:type kwargs: dict |
|||
:returns: Response |
|||
:rtype: Response |
|||
""" |
|||
self._refresh_if_required() |
|||
r = super().raw_put(*args, **kwargs) |
|||
return r |
|||
|
|||
def raw_delete(self, *args, **kwargs): |
|||
"""Call connection.raw_delete. |
|||
|
|||
If auto_refresh is set for *delete* and *access_token* is expired, |
|||
it will refresh the token and try *delete* once more. |
|||
|
|||
:param args: Additional arguments |
|||
:type args: tuple |
|||
:param kwargs: Additional keyword arguments |
|||
:type kwargs: dict |
|||
:returns: Response |
|||
:rtype: Response |
|||
""" |
|||
self._refresh_if_required() |
|||
r = super().raw_delete(*args, **kwargs) |
|||
return r |
Write
Preview
Loading…
Cancel
Save
Reference in new issue