Browse Source

create async wrapper function

pull/565/head
David 12 months ago
parent
commit
edc8b27ca0
  1. 2
      src/keycloak/asynchronous/__init__.py
  2. 318
      src/keycloak/asynchronous/connection.py
  3. 195
      src/keycloak/asynchronous/exceptions.py
  4. 4257
      src/keycloak/asynchronous/keycloak_admin.py
  5. 786
      src/keycloak/asynchronous/keycloak_openid.py
  6. 417
      src/keycloak/asynchronous/keycloak_uma.py
  7. 420
      src/keycloak/asynchronous/openid_connection.py
  8. 690
      src/keycloak/keycloak_admin.py

2
src/keycloak/asynchronous/__init__.py

@ -1,2 +0,0 @@
from .openid_connection import KeycloakOpenIDConnection
from .keycloak_admin import KeycloakAdmin

318
src/keycloak/asynchronous/connection.py

@ -1,318 +0,0 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Connection manager module."""
try:
from urllib.parse import urljoin
except ImportError: # pragma: no cover
from urlparse import urljoin
import httpx
from requests.adapters import HTTPAdapter
import requests
from .exceptions import KeycloakConnectionError
class ConnectionManager(object):
"""Represents a simple server connection.
:param base_url: The server URL.
:type base_url: str
:param headers: The header parameters of the requests to the server.
:type headers: dict
:param timeout: Timeout to use for requests to the server.
:type timeout: int
:param verify: Boolean value to enable or disable certificate validation or a string
containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param proxies: The proxies servers requests is sent by.
:type proxies: dict
"""
def __init__(self, base_url, headers={}, timeout=60, verify=True, proxies=None):
"""Init method.
:param base_url: The server URL.
:type base_url: str
:param headers: The header parameters of the requests to the server.
:type headers: dict
:param timeout: Timeout to use for requests to the server.
:type timeout: int
:param verify: Boolean value to enable or disable certificate validation or a string
containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param proxies: The proxies servers requests is sent by.
:type proxies: dict
"""
print(base_url)
self.base_url = base_url
print(self.base_url)
self.headers = headers
self.timeout = timeout
self.verify = verify
self._s = httpx.AsyncClient(verify=verify, proxies=proxies)
self.sync_s = requests.Session()
self.sync_s.auth = lambda x: x # don't let requests add auth headers
# retry once to reset connection with Keycloak after tomcat's ConnectionTimeout
# see https://github.com/marcospereirampj/python-keycloak/issues/36
self._s.transport = httpx.AsyncHTTPTransport(retries=2)
for protocol in ("https://", "http://"):
adapter = HTTPAdapter(max_retries=1)
# adds POST to retry whitelist
allowed_methods = set(adapter.max_retries.allowed_methods)
allowed_methods.add("POST")
adapter.max_retries.allowed_methods = frozenset(allowed_methods)
self.sync_s.mount(protocol, adapter)
if proxies:
self.sync_s.proxies.update(proxies)
async def aclose(self):
if hasattr(self, "_s"):
await self._s.aclose()
def __del__(self):
"""Del method."""
return
if hasattr(self, "_s"):
self._s.close()
@property
def base_url(self):
"""Return base url in use for requests to the server.
:returns: Base URL
:rtype: str
"""
return self._base_url
@base_url.setter
def base_url(self, value):
self._base_url = value
@property
def timeout(self):
"""Return timeout in use for request to the server.
:returns: Timeout
:rtype: int
"""
return self._timeout
@timeout.setter
def timeout(self, value):
self._timeout = value
@property
def verify(self):
"""Return verify in use for request to the server.
:returns: Verify indicator
:rtype: bool
"""
return self._verify
@verify.setter
def verify(self, value):
self._verify = value
@property
def headers(self):
"""Return header request to the server.
:returns: Request headers
:rtype: dict
"""
return self._headers
@headers.setter
def headers(self, value):
self._headers = value
def param_headers(self, key):
"""Return a specific header parameter.
:param key: Header parameters key.
:type key: str
:returns: If the header parameters exist, return its value.
:rtype: str
"""
return self.headers.get(key)
def clean_headers(self):
"""Clear header parameters."""
self.headers = {}
def exist_param_headers(self, key):
"""Check if the parameter exists in the header.
:param key: Header parameters key.
:type key: str
:returns: If the header parameters exist, return True.
:rtype: bool
"""
return self.param_headers(key) is not None
def add_param_headers(self, key, value):
"""Add a single parameter inside the header.
:param key: Header parameters key.
:type key: str
:param value: Value to be added.
:type value: str
"""
self.headers[key] = value
def del_param_headers(self, key):
"""Remove a specific parameter.
:param key: Key of the header parameters.
:type key: str
"""
self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
"""Submit get request to the path.
:param path: Path for request.
:type path: str
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return self._s.get(
urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
async def raw_post(self, path, data, **kwargs):
"""Submit post request to the path.
:param path: Path for request.
:type path: str
:param data: Payload for request.
:type data: dict
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return await self._s.post(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def sync_raw_post(self, path, data, **kwargs):
"""Submit post request to the path.
:param path: Path for request.
:type path: str
:param data: Payload for request.
:type data: dict
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return self.sync_s.post(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_put(self, path, data, **kwargs):
"""Submit put request to the path.
:param path: Path for request.
:type path: str
:param data: Payload for request.
:type data: dict
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return self._s.put(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_delete(self, path, data=None, **kwargs):
"""Submit delete request to the path.
:param path: Path for request.
:type path: str
:param data: Payload for request.
:type data: dict | None
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return self._s.delete(
urljoin(self.base_url, path),
params=kwargs,
data=data or dict(),
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)

195
src/keycloak/asynchronous/exceptions.py

@ -1,195 +0,0 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak custom exceptions module."""
import requests
class KeycloakError(Exception):
"""Base class for custom Keycloak errors.
:param error_message: The error message
:type error_message: str
:param response_code: The response status code
:type response_code: int
"""
def __init__(self, error_message="", response_code=None, response_body=None):
"""Init method.
:param error_message: The error message
:type error_message: str
:param response_code: The code of the response
:type response_code: int
:param response_body: Body of the response
:type response_body: bytes
"""
Exception.__init__(self, error_message)
self.response_code = response_code
self.response_body = response_body
self.error_message = error_message
def __str__(self):
"""Str method.
:returns: String representation of the object
:rtype: str
"""
if self.response_code is not None:
return "{0}: {1}".format(self.response_code, self.error_message)
else:
return "{0}".format(self.error_message)
class KeycloakAuthenticationError(KeycloakError):
"""Keycloak authentication error exception."""
pass
class KeycloakConnectionError(KeycloakError):
"""Keycloak connection error exception."""
pass
class KeycloakOperationError(KeycloakError):
"""Keycloak operation error exception."""
pass
class KeycloakDeprecationError(KeycloakError):
"""Keycloak deprecation error exception."""
pass
class KeycloakGetError(KeycloakOperationError):
"""Keycloak request get error exception."""
pass
class KeycloakPostError(KeycloakOperationError):
"""Keycloak request post error exception."""
pass
class KeycloakPutError(KeycloakOperationError):
"""Keycloak request put error exception."""
pass
class KeycloakDeleteError(KeycloakOperationError):
"""Keycloak request delete error exception."""
pass
class KeycloakSecretNotFound(KeycloakOperationError):
"""Keycloak secret not found exception."""
pass
class KeycloakRPTNotFound(KeycloakOperationError):
"""Keycloak RPT not found exception."""
pass
class KeycloakAuthorizationConfigError(KeycloakOperationError):
"""Keycloak authorization config exception."""
pass
class KeycloakInvalidTokenError(KeycloakOperationError):
"""Keycloak invalid token exception."""
pass
class KeycloakPermissionFormatError(KeycloakOperationError):
"""Keycloak permission format exception."""
pass
class PermissionDefinitionError(Exception):
"""Keycloak permission definition exception."""
pass
def raise_error_from_response(response, error, expected_codes=None, skip_exists=False):
"""Raise an exception for the response.
:param response: The response object
:type response: Response
:param error: Error object to raise
:type error: dict or Exception
:param expected_codes: Set of expected codes, which should not raise the exception
:type expected_codes: Sequence[int]
:param skip_exists: Indicates whether the response on already existing object should be ignored
:type skip_exists: bool
:returns: Content of the response message
:type: bytes or dict
:raises KeycloakError: In case of unexpected status codes
""" # noqa: DAR401,DAR402
if expected_codes is None:
expected_codes = [200, 201, 204]
if hasattr(response, 'status_code'):
if response.status_code in expected_codes:
if response.status_code == requests.codes.no_content:
return {}
try:
return response.json()
except ValueError:
return response.content
if skip_exists and response.status_code == 409:
return {"msg": "Already exists"}
try:
message = response.json()["message"]
except (KeyError, ValueError):
message = response.content
if isinstance(error, dict):
error = error.get(response.status_code, KeycloakOperationError)
else:
if response.status_code == 401:
error = KeycloakAuthenticationError
raise error(
error_message=message, response_code=response.status_code, response_body=response.content
)

4257
src/keycloak/asynchronous/keycloak_admin.py
File diff suppressed because it is too large
View File

786
src/keycloak/asynchronous/keycloak_openid.py

@ -1,786 +0,0 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID module.
The module contains mainly the implementation of KeycloakOpenID class, the main
class to handle authentication and token manipulation.
"""
import json
from typing import Optional
from jwcrypto import jwk, jwt
from ..authorization import Authorization
from .connection import ConnectionManager
from .exceptions import (
KeycloakAuthenticationError,
KeycloakAuthorizationConfigError,
KeycloakDeprecationError,
KeycloakGetError,
KeycloakInvalidTokenError,
KeycloakPostError,
KeycloakPutError,
KeycloakRPTNotFound,
raise_error_from_response,
)
from ..uma_permissions import AuthStatus, build_permission_param
from ..urls_patterns import (
URL_AUTH,
URL_CERTS,
URL_CLIENT_REGISTRATION,
URL_CLIENT_UPDATE,
URL_DEVICE,
URL_ENTITLEMENT,
URL_INTROSPECT,
URL_LOGOUT,
URL_REALM,
URL_TOKEN,
URL_USERINFO,
URL_WELL_KNOWN,
)
class KeycloakOpenID:
"""Keycloak OpenID client.
:param server_url: Keycloak server url
:param client_id: client id
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: Boolean value to enable or disable certificate validation or a string
containing a path to a CA bundle to use
:param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by.
:param timeout: connection timeout in seconds
"""
def __init__(
self,
server_url,
realm_name,
client_id,
client_secret_key=None,
verify=True,
custom_headers=None,
proxies=None,
timeout=60,
):
"""Init method.
:param server_url: Keycloak server url
:type server_url: str
:param client_id: client id
:type client_id: str
:param realm_name: realm name
:type realm_name: str
:param client_secret_key: client secret key
:type client_secret_key: str
:param verify: Boolean value to enable or disable certificate validation or a string
containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param proxies: dict of proxies to sent the request by.
:type proxies: dict
:param timeout: connection timeout in seconds
:type timeout: int
"""
self.client_id = client_id
self.client_secret_key = client_secret_key
self.realm_name = realm_name
headers = custom_headers if custom_headers is not None else dict()
self.connection = ConnectionManager(
base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies
)
self.authorization = Authorization()
@property
def client_id(self):
"""Get client id.
:returns: Client id
:rtype: str
"""
return self._client_id
@client_id.setter
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
"""Get the client secret key.
:returns: Client secret key
:rtype: str
"""
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def realm_name(self):
"""Get the realm name.
:returns: Realm name
:rtype: str
"""
return self._realm_name
@realm_name.setter
def realm_name(self, value):
self._realm_name = value
@property
def connection(self):
"""Get connection.
:returns: Connection manager object
:rtype: ConnectionManager
"""
return self._connection
@connection.setter
def connection(self, value):
self._connection = value
@property
def authorization(self):
"""Get authorization.
:returns: The authorization manager
:rtype: Authorization
"""
return self._authorization
@authorization.setter
def authorization(self, value):
self._authorization = value
def _add_secret_key(self, payload):
"""Add secret key if exists.
:param payload: Payload
:type payload: dict
:returns: Payload with the secret key
:rtype: dict
"""
if self.client_secret_key:
payload.update({"client_secret": self.client_secret_key})
return payload
def _build_name_role(self, role):
"""Build name of a role.
:param role: Role name
:type role: str
:returns: Role path
:rtype: str
"""
return self.client_id + "/" + role
def _token_info(self, token, method_token_info, **kwargs):
"""Getter for the token data.
:param token: Token
:type token: str
:param method_token_info: Token info method to use
:type method_token_info: str
:param kwargs: Additional keyword arguments passed to the decode_token method
:type kwargs: dict
:returns: Token info
:rtype: dict
"""
if method_token_info == "introspect":
token_info = self.introspect(token)
else:
token_info = self.decode_token(token, **kwargs)
return token_info
def well_known(self):
"""Get the well_known object.
The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def auth_url(self, redirect_uri, scope="email", state=""):
"""Get authorization URL endpoint.
:param redirect_uri: Redirect url to receive oauth code
:type redirect_uri: str
:param scope: Scope of authorization request, split with the blank space
:type scope: str
:param state: State will be returned to the redirect_uri
:type state: str
:returns: Authorization URL Full Build
:rtype: str
"""
params_path = {
"authorization-endpoint": self.well_known()["authorization_endpoint"],
"client-id": self.client_id,
"redirect-uri": redirect_uri,
"scope": scope,
"state": state,
}
return URL_AUTH.format(**params_path)
def token(
self,
username="",
password="",
grant_type=["password"],
code="",
redirect_uri="",
totp=None,
scope="openid",
**extra
):
"""Retrieve user token.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
what flow is used. The token endpoint is also used to obtain new access tokens
when they expire.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param username: Username
:type username: str
:param password: Password
:type password: str
:param grant_type: Grant type
:type grant_type: str
:param code: Code
:type code: str
:param redirect_uri: Redirect URI
:type redirect_uri: str
:param totp: Time-based one-time password
:type totp: int
:param scope: Scope, defaults to openid
:type scope: str
:param extra: Additional extra arguments
:type extra: dict
:returns: Keycloak token
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {
"username": username,
"password": password,
"client_id": self.client_id,
"grant_type": grant_type,
"code": code,
"redirect_uri": redirect_uri,
"scope": scope,
}
if extra:
payload.update(extra)
if totp:
payload["totp"] = totp
payload = self._add_secret_key(payload)
data_raw = self.connection.sync_raw_post(URL_TOKEN.format(**params_path), data=payload)
print(data_raw)
return raise_error_from_response(data_raw, KeycloakPostError)
def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
"""Refresh the user token.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
what flow is used. The token endpoint is also used to obtain new access tokens
when they expire.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param refresh_token: Refresh token from Keycloak
:type refresh_token: str
:param grant_type: Grant type
:type grant_type: str
:returns: New token
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {
"client_id": self.client_id,
"grant_type": grant_type,
"refresh_token": refresh_token,
}
payload = self._add_secret_key(payload)
data_raw = self.connection.sync_raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def exchange_token(
self,
token: str,
audience: Optional[str] = None,
subject: Optional[str] = None,
subject_token_type: Optional[str] = None,
subject_issuer: Optional[str] = None,
requested_issuer: Optional[str] = None,
requested_token_type: str = "urn:ietf:params:oauth:token-type:refresh_token",
scope: str = "openid",
) -> dict:
"""Exchange user token.
Use a token to obtain an entirely different token. See
https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange
:param token: Access token
:type token: str
:param audience: Audience
:type audience: str
:param subject: Subject
:type subject: str
:param subject_token_type: Token Type specification
:type subject_token_type: Optional[str]
:param subject_issuer: Issuer
:type subject_issuer: Optional[str]
:param requested_issuer: Issuer
:type requested_issuer: Optional[str]
:param requested_token_type: Token type specification
:type requested_token_type: str
:param scope: Scope, defaults to openid
:type scope: str
:returns: Exchanged token
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {
"grant_type": ["urn:ietf:params:oauth:grant-type:token-exchange"],
"client_id": self.client_id,
"subject_token": token,
"subject_token_type": subject_token_type,
"subject_issuer": subject_issuer,
"requested_token_type": requested_token_type,
"audience": audience,
"requested_subject": subject,
"requested_issuer": requested_issuer,
"scope": scope,
}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def userinfo(self, token):
"""Get the user info object.
The userinfo endpoint returns standard claims about the authenticated user,
and is protected by a bearer token.
http://openid.net/specs/openid-connect-core-1_0.html#UserInfo
:param token: Access token
:type token: str
:returns: Userinfo object
:rtype: dict
"""
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def logout(self, refresh_token):
"""Log out the authenticated user.
:param refresh_token: Refresh token from Keycloak
:type refresh_token: str
:returns: Keycloak server response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "refresh_token": refresh_token}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
def certs(self):
"""Get certificates.
The certificate endpoint returns the public keys enabled by the realm, encoded as a
JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled
for verifying tokens.
https://tools.ietf.org/html/rfc7517
:returns: Certificates
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_CERTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def public_key(self):
"""Retrieve the public key.
The public key is exposed by the realm page directly.
:returns: The public key
:rtype: str
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
def entitlement(self, token, resource_server_id):
"""Get entitlements from the token.
Client applications can use a specific endpoint to obtain a special security token
called a requesting party token (RPT). This token consists of all the entitlements
(or permissions) for a user as a result of the evaluation of the permissions and
authorization policies associated with the resources being requested. With an RPT,
client applications can gain access to protected resources at the resource server.
:param token: Access token
:type token: str
:param resource_server_id: Resource server ID
:type resource_server_id: str
:returns: Entitlements
:rtype: dict
"""
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
if data_raw.status_code == 404 or data_raw.status_code == 405:
return raise_error_from_response(data_raw, KeycloakDeprecationError)
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover
def introspect(self, token, rpt=None, token_type_hint=None):
"""Introspect the user token.
The introspection endpoint is used to retrieve the active state of a token.
It is can only be invoked by confidential clients.
https://tools.ietf.org/html/rfc7662
:param token: Access token
:type token: str
:param rpt: Requesting party token
:type rpt: str
:param token_type_hint: Token type hint
:type token_type_hint: str
:returns: Token info
:rtype: dict
:raises KeycloakRPTNotFound: In case of RPT not specified
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "token": token}
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
self.connection.add_param_headers("Authorization", "Bearer " + token)
else:
raise KeycloakRPTNotFound("Can't found RPT.")
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def decode_token(self, token, validate: bool = True, **kwargs):
"""Decode user token.
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
also defines a JWK Set JSON data structure that represents a set of
JWKs. Cryptographic algorithms and identifiers for use with this
specification are described in the separate JSON Web Algorithms (JWA)
specification and IANA registries established by that specification.
https://tools.ietf.org/html/rfc7517
:param token: Keycloak token
:type token: str
:param validate: Determines whether the token should be validated with the public key.
Defaults to True.
:type validate: bool
:param kwargs: Additional keyword arguments for jwcrypto's JWT object
:type kwargs: dict
:returns: Decoded token
:rtype: dict
"""
if validate:
if "key" not in kwargs:
key = (
"-----BEGIN PUBLIC KEY-----\n"
+ self.public_key()
+ "\n-----END PUBLIC KEY-----"
)
key = jwk.JWK.from_pem(key.encode("utf-8"))
kwargs["key"] = key
full_jwt = jwt.JWT(jwt=token, **kwargs)
return jwt.json_decode(full_jwt.claims)
else:
full_jwt = jwt.JWT(jwt=token, **kwargs)
full_jwt.token.objects["valid"] = True
return json.loads(full_jwt.token.payload.decode("utf-8"))
def load_authorization_config(self, path):
"""Load Keycloak settings (authorization).
:param path: settings file (json)
:type path: str
"""
with open(path, "r") as fp:
authorization_json = json.load(fp)
self.authorization.load_config(authorization_json)
def get_policies(self, token, method_token_info="introspect", **kwargs):
"""Get policies by user token.
:param token: User token
:type token: str
:param method_token_info: Method for token info decoding
:type method_token_info: str
:param kwargs: Additional keyword arguments
:type kwargs: dict
:return: Policies
:rtype: dict
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration
:raises KeycloakInvalidTokenError: In case of bad token
"""
if not self.authorization.policies:
raise KeycloakAuthorizationConfigError(
"Keycloak settings not found. Load Authorization Keycloak settings."
)
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
policies = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
policies.append(policy)
return list(set(policies))
def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""Get permission by user token.
:param token: user token
:type token: str
:param method_token_info: Decode token method
:type method_token_info: str
:param kwargs: parameters for decode
:type kwargs: dict
:returns: permissions list
:rtype: list
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration
:raises KeycloakInvalidTokenError: In case of bad token
"""
if not self.authorization.policies:
raise KeycloakAuthorizationConfigError(
"Keycloak settings not found. Load Authorization Keycloak settings."
)
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
permissions = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
permissions += policy.permissions
return list(set(permissions))
def uma_permissions(self, token, permissions=""):
"""Get UMA permissions by user token with requested permissions.
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param token: user token
:type token: str
:param permissions: list of uma permissions list(resource:scope) requested by the user
:type permissions: str
:returns: Keycloak server response
:rtype: dict
"""
permission = build_permission_param(permissions)
params_path = {"realm-name": self.realm_name}
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": permission,
"response_mode": "permissions",
"audience": self.client_id,
}
self.connection.add_param_headers("Authorization", "Bearer " + token)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def has_uma_access(self, token, permissions):
"""Determine whether user has uma permissions with specified user token.
:param token: user token
:type token: str
:param permissions: list of uma permissions (resource:scope)
:type permissions: str
:return: Authentication status
:rtype: AuthStatus
:raises KeycloakAuthenticationError: In case of failed authentication
:raises KeycloakPostError: In case of failed request to Keycloak
"""
needed = build_permission_param(permissions)
try:
granted = self.uma_permissions(token, permissions)
except (KeycloakPostError, KeycloakAuthenticationError) as e:
if e.response_code == 403: # pragma: no cover
return AuthStatus(
is_logged_in=True, is_authorized=False, missing_permissions=needed
)
elif e.response_code == 401:
return AuthStatus(
is_logged_in=False, is_authorized=False, missing_permissions=needed
)
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes: # pragma: no cover
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
)
def register_client(self, token: str, payload: dict):
"""Create a client.
ClientRepresentation:
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation
:param token: Initial access token
:type token: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
data_raw = self.connection.raw_post(
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)
def device(self):
"""Get device authorization grant.
The device endpoint is used to obtain a user code verification and user authentication.
The response contains a device_code, user_code, verification_uri,
verification_uri_complete, expires_in (lifetime in seconds for device_code
and user_code), and polling interval.
Users can either follow the verification_uri and enter the user_code or
follow the verification_uri_complete.
After authenticating with valid credentials, users can obtain tokens using the
"urn:ietf:params:oauth:grant-type:device_code" grant_type and the device_code.
https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow
https://github.com/keycloak/keycloak-community/blob/main/design/oauth2-device-authorization-grant.md#how-to-try-it
:returns: Device Authorization Response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_DEVICE.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def update_client(self, token: str, client_id: str, payload: dict):
"""Update a client.
ClientRepresentation:
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation
:param token: registration access token
:type token: str
:param client_id: Keycloak client id
:type client_id: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
if "clientId" not in payload:
payload["clientId"] = client_id
data_raw = self.connection.raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPutError)

417
src/keycloak/asynchronous/keycloak_uma.py

@ -1,417 +0,0 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak UMA module.
The module contains a UMA compatible client for keycloak:
https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html
"""
import json
from typing import Iterable
from urllib.parse import quote_plus
from .connection import ConnectionManager
from .exceptions import (
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
raise_error_from_response,
)
from .openid_connection import KeycloakOpenIDConnection
from .uma_permissions import UMAPermission
from .urls_patterns import URL_UMA_WELL_KNOWN
class KeycloakUMA:
"""Keycloak UMA client.
:param connection: OpenID connection manager
"""
def __init__(self, connection: KeycloakOpenIDConnection):
"""Init method.
:param connection: OpenID connection manager
:type connection: KeycloakOpenIDConnection
"""
self.connection = connection
custom_headers = self.connection.custom_headers or {}
custom_headers.update({"Content-Type": "application/json"})
self.connection.custom_headers = custom_headers
self._well_known = None
def _fetch_well_known(self):
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@staticmethod
def format_url(url, **kwargs):
"""Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example,
`format_url("https://myserver/{my_resource}/{id}", my_resource="hello world", id="myid")`
would produce `https://myserver/hello+world/myid`.
:param url: url string to format
:type url: str
:param kwargs: dict containing kwargs to substitute
:type kwargs: dict
:return: formatted string
:rtype: str
"""
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@property
def uma_well_known(self):
"""Get the well_known UMA2 config.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
# per instance cache
if not self._well_known:
self._well_known = self._fetch_well_known()
return self._well_known
def resource_set_create(self, payload):
"""Create a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:param payload: ResourceRepresentation
:type payload: dict
:return: ResourceRepresentation with the _id property assigned
:rtype: dict
"""
data_raw = self.connection.raw_post(
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def resource_set_update(self, resource_id, payload):
"""Update a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:param resource_id: id of the resource
:type resource_id: str
:param payload: ResourceRepresentation
:type payload: dict
:return: Response dict (empty)
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
def resource_set_read(self, resource_id):
"""Read a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:param resource_id: id of the resource
:type resource_id: str
:return: ResourceRepresentation
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_delete(self, resource_id):
"""Delete a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
:param resource_id: id of the resource
:type resource_id: str
:return: Response dict (empty)
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def resource_set_list_ids(
self,
name: str = "",
exact_name: bool = False,
uri: str = "",
owner: str = "",
resource_type: str = "",
scope: str = "",
first: int = 0,
maximum: int = -1,
):
"""Query for list of resource set ids.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
:param name: query resource name
:type name: str
:param exact_name: query exact match for resource name
:type exact_name: bool
:param uri: query resource uri
:type uri: str
:param owner: query resource owner
:type owner: str
:param resource_type: query resource type
:type resource_type: str
:param scope: query resource scope
:type scope: str
:param first: index of first matching resource to return
:type first: int
:param maximum: maximum number of resources to return (-1 for all)
:type maximum: int
:return: List of ids
:rtype: List[str]
"""
query = dict()
if name:
query["name"] = name
if exact_name:
query["exactName"] = "true"
if uri:
query["uri"] = uri
if owner:
query["owner"] = owner
if resource_type:
query["type"] = resource_type
if scope:
query["scope"] = scope
if first > 0:
query["first"] = first
if maximum >= 0:
query["max"] = maximum
data_raw = self.connection.raw_get(
self.uma_well_known["resource_registration_endpoint"], **query
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_list(self):
"""List all resource sets.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:yields: Iterator over a list of ResourceRepresentations
:rtype: Iterator[dict]
"""
for resource_id in self.resource_set_list_ids():
resource = self.resource_set_read(resource_id)
yield resource
def permission_ticket_create(self, permissions: Iterable[UMAPermission]):
"""Create a permission ticket.
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
:returns: Keycloak decision
:rtype: boolean
:raises KeycloakPostError: In case permission resource not found
"""
resources = dict()
for permission in permissions:
resource_id = getattr(permission, "resource_id", None)
if resource_id is None:
resource_ids = self.resource_set_list_ids(
exact_name=True, name=permission.resource, first=0, maximum=1
)
if not resource_ids:
raise KeycloakPostError("Invalid resource specified")
setattr(permission, "resource_id", resource_ids[0])
resources.setdefault(resource_id, set())
if permission.scope:
resources[resource_id].add(permission.scope)
payload = [
{"resource_id": resource_id, "resource_scopes": list(scopes)}
for resource_id, scopes in resources.items()
]
data_raw = self.connection.raw_post(
self.uma_well_known["permission_endpoint"], data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)
def permissions_check(self, token, permissions: Iterable[UMAPermission]):
"""Check UMA permissions by user token with requested permissions.
The token endpoint is used to check UMA permissions from Keycloak. It can only be
invoked by confidential clients.
https://www.keycloak.org/docs/latest/authorization_services/#_service_authorization_api
:param token: user token
:type token: str
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
:returns: Keycloak decision
:rtype: boolean
"""
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": ",".join(str(permission) for permission in permissions),
"response_mode": "decision",
"audience": self.connection.client_id,
}
# Everyone always has the null set of permissions
# However keycloak cannot evaluate the null set
if len(payload["permission"]) == 0:
return True
connection = ConnectionManager(self.connection.base_url)
connection.add_param_headers("Authorization", "Bearer " + token)
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = connection.raw_post(self.uma_well_known["token_endpoint"], data=payload)
try:
data = raise_error_from_response(data_raw, KeycloakPostError)
except KeycloakPostError:
return False
return data.get("result", False)
def policy_resource_create(self, resource_id, payload):
"""Create permission policy for resource.
Supports name, description, scopes, roles, groups, clients
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
:param resource_id: _id of resource
:type resource_id: str
:param payload: permission configuration
:type payload: dict
:return: PermissionRepresentation
:rtype: dict
"""
data_raw = self.connection.raw_post(
self.uma_well_known["policy_endpoint"] + f"/{resource_id}", data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)
def policy_update(self, policy_id, payload):
"""Update permission policy.
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
:param policy_id: id of policy permission
:type policy_id: str
:param payload: policy permission configuration
:type payload: dict
:return: PermissionRepresentation
:rtype: dict
"""
data_raw = self.connection.raw_put(
self.uma_well_known["policy_endpoint"] + f"/{policy_id}", data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPutError)
def policy_delete(self, policy_id):
"""Delete permission policy.
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
:param policy_id: id of permission policy
:type policy_id: str
:return: PermissionRepresentation
:rtype: dict
"""
data_raw = self.connection.raw_delete(
self.uma_well_known["policy_endpoint"] + f"/{policy_id}"
)
return raise_error_from_response(data_raw, KeycloakDeleteError)
def policy_query(
self,
resource: str = "",
name: str = "",
scope: str = "",
first: int = 0,
maximum: int = -1,
):
"""Query permission policies.
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission
:param resource: query resource id
:type resource: str
:param name: query resource name
:type name: str
:param scope: query resource scope
:type scope: str
:param first: index of first matching resource to return
:type first: int
:param maximum: maximum number of resources to return (-1 for all)
:type maximum: int
:return: List of ids
:return: List of ids
:rtype: List[str]
"""
query = dict()
if name:
query["name"] = name
if resource:
query["resource"] = resource
if scope:
query["scope"] = scope
if first > 0:
query["first"] = first
if maximum >= 0:
query["max"] = maximum
data_raw = self.connection.raw_get(self.uma_well_known["policy_endpoint"], **query)
return raise_error_from_response(data_raw, KeycloakGetError)

420
src/keycloak/asynchronous/openid_connection.py

@ -1,420 +0,0 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID Connection Manager module.
The module contains mainly the implementation of KeycloakOpenIDConnection class.
This is an extension of the ConnectionManager class, and handles the automatic refresh
of openid tokens when required.
"""
from datetime import datetime, timedelta
from .connection import ConnectionManager
from keycloak import KeycloakPostError
from .keycloak_openid import KeycloakOpenID
class KeycloakOpenIDConnection(ConnectionManager):
"""A class to help with OpenID connections which can auto refresh tokens.
:param object: _description_
:type object: _type_
"""
_server_url = None
_username = None
_password = None
_totp = None
_realm_name = None
_client_id = None
_verify = None
_client_secret_key = None
_connection = None
_custom_headers = None
_user_realm_name = None
_expires_at = None
_keycloak_openid = None
def __init__(
self,
server_url,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
timeout=60,
):
"""Init method.
:param server_url: Keycloak server url
:type server_url: str
:param username: admin username
:type username: str
:param password: admin password
:type password: str
:param token: access and refresh tokens
:type token: dict
:param totp: Time based OTP
:type totp: str
:param realm_name: realm name
:type realm_name: str
:param client_id: client id
:type client_id: str
:param verify: Boolean value to enable or disable certificate validation or a string
containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:type client_secret_key: str
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param user_realm_name: The realm name of the user, if different from realm_name
:type user_realm_name: str
:param timeout: connection timeout in seconds
:type timeout: int
"""
# token is renewed when it hits 90% of its lifetime. This is to account for any possible
# clock skew.
self.token_lifetime_fraction = 0.9
self.server_url = server_url
self.username = username
self.password = password
self.token = token
self.totp = totp
self.realm_name = realm_name
self.client_id = client_id
self.verify = verify
self.client_secret_key = client_secret_key
self.user_realm_name = user_realm_name
self.timeout = timeout
self.headers = {}
self.custom_headers = custom_headers
super().__init__(
base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
)
if self.token is None:
self.get_token()
if self.token is not None:
self.headers = {
**self.headers,
"Authorization": "Bearer " + self.token.get("access_token"),
"Content-Type": "application/json",
}
@property
def server_url(self):
"""Get server url.
:returns: Keycloak server url
:rtype: str
"""
return self.base_url
@server_url.setter
def server_url(self, value):
self.base_url = value
@property
def realm_name(self):
"""Get realm name.
:returns: Realm name
:rtype: str
"""
return self._realm_name
@realm_name.setter
def realm_name(self, value):
self._realm_name = value
@property
def client_id(self):
"""Get client id.
:returns: Client id
:rtype: str
"""
return self._client_id
@client_id.setter
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
"""Get client secret key.
:returns: Client secret key
:rtype: str
"""
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def username(self):
"""Get username.
:returns: Admin username
:rtype: str
"""
return self._username
@username.setter
def username(self, value):
self._username = value
@property
def password(self):
"""Get password.
:returns: Admin password
:rtype: str
"""
return self._password
@password.setter
def password(self, value):
self._password = value
@property
def totp(self):
"""Get totp.
:returns: TOTP
:rtype: str
"""
return self._totp
@totp.setter
def totp(self, value):
self._totp = value
@property
def token(self):
"""Get token.
:returns: Access and refresh token
:rtype: dict
"""
return self._token
@token.setter
def token(self, value):
self._token = value
self._expires_at = datetime.now() + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
)
@property
def expires_at(self):
"""Get token expiry time.
:returns: Datetime at which the current token will expire
:rtype: datetime
"""
return self._expires_at
@property
def user_realm_name(self):
"""Get user realm name.
:returns: User realm name
:rtype: str
"""
return self._user_realm_name
@user_realm_name.setter
def user_realm_name(self, value):
self._user_realm_name = value
@property
def custom_headers(self):
"""Get custom headers.
:returns: Custom headers
:rtype: dict
"""
return self._custom_headers
@custom_headers.setter
def custom_headers(self, value):
self._custom_headers = value
if self.custom_headers is not None:
# merge custom headers to main headers
self.headers.update(self.custom_headers)
@property
def keycloak_openid(self) -> KeycloakOpenID:
"""Get the KeycloakOpenID object.
The KeycloakOpenID is used to refresh tokens
:returns: KeycloakOpenID
:rtype: KeycloakOpenID
"""
if self._keycloak_openid is None:
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self._keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
custom_headers=self.custom_headers,
)
return self._keycloak_openid
def get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
grant_type = []
if self.username and self.password:
grant_type.append("password")
elif self.client_secret_key:
grant_type.append("client_credentials")
if grant_type:
self.token = self.keycloak_openid.token(
self.username, self.password, grant_type=grant_type, totp=self.totp
)
else:
self.token = {}
def refresh_token(self):
"""Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed.
"""
refresh_token = self.token.get("refresh_token", None) if self.token else None
if refresh_token is None:
self.get_token()
else:
try:
self.token = self.keycloak_openid.refresh_token(refresh_token)
except KeycloakPostError as e:
list_errors = [
b"Refresh token expired",
b"Token is not active",
b"Session not active",
]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
self.get_token()
else:
raise
self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
def _refresh_if_required(self):
if datetime.now() >= self.expires_at:
self.refresh_token()
def raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_get(*args, **kwargs)
return r
def raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_post(*args, **kwargs)
return r
def raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_put(*args, **kwargs)
return r
def raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_delete(*args, **kwargs)
return r

690
src/keycloak/keycloak_admin.py

@ -31,6 +31,8 @@ import json
from builtins import isinstance
from typing import Optional
import threading
from requests_toolbelt import MultipartEncoder
from . import urls_patterns
@ -202,6 +204,18 @@ class KeycloakAdmin:
query = query or {}
return raise_error_from_response(self.connection.raw_get(url, **query), KeycloakGetError)
def async_call(self, func_name, args):
func = getattr(self, func_name)
if not callable(func):
return None
#create a thread using threading and run func in that thread
thread = threading.Thread(target=func, args=args, daemon=True)
thread.start()
return thread
def get_current_realm(self) -> str:
"""Return the currently configured realm.
@ -361,7 +375,7 @@ class KeycloakAdmin:
data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def get_users(self, query=None):
def get_users(self, query=None, results=None):
"""Get all users.
Return a list of users, filtered according to query parameters
@ -378,10 +392,17 @@ class KeycloakAdmin:
params_path = {"realm-name": self.connection.realm_name}
url = urls_patterns.URL_ADMIN_USERS.format(**params_path)
res = None
if "first" in query or "max" in query:
return self.__fetch_paginated(url, query)
res = self.__fetch_paginated(url, query)
else:
res = self.__fetch_all(url, query)
return self.__fetch_all(url, query)
if results is not None:
results.append(res)
return res
def create_idp(self, payload):
"""Create an ID Provider.
@ -4247,3 +4268,666 @@ class KeycloakAdmin:
urls_patterns.URL_ADMIN_CLEAR_USER_CACHE.format(**params_path), data=""
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
def a_get_users(self, query=None, results=None):
return self.async_call('get_users',(query,results))
def a_create_idp(self, payload):
self.async_call('create_idp',(payload,))
def a_update_idp(self, idp_alias, payload):
self.async_call('update_idp',(idp_alias, payload,))
def a_add_mapper_to_idp(self, idp_alias, payload):
self.async_call('add_mapper_to_idp',(idp_alias, payload,))
def a_update_mapper_in_idp(self, idp_alias, mapper_id, payload):
self.async_call('update_mapper_in_idp',(idp_alias,mapper_id, payload,))
def a_get_idp_mappers(self, idp_alias):
self.async_call('get_idp_mappers',(idp_alias,))
def a_get_idps(self):
self.async_call('get_idps',())
def a_get_idp(self, idp_alias):
self.async_call('get_idp',(idp_alias,))
def a_delete_idp(self, idp_alias):
self.async_call('delete_idp',(idp_alias,))
def a_create_user(self, payload, exist_ok=False):
self.async_call('create_user',(payload, exist_ok,))
def a_users_count(self, query=None):
self.async_call('users_count',(query,))
def a_get_user_id(self, username):
self.async_call('get_user_id',(username,))
def a_get_user(self, user_id):
self.async_call('get_user',(user_id,))
def a_get_user_groups(self, user_id, query=None, brief_representation=True):
self.async_call('get_user_groups',(user_id, query, brief_representation))
def a_update_user(self, user_id, payload):
self.async_call('update_user',(user_id, payload,))
def a_disable_user(self, user_id):
self.async_call('disable_user',(user_id,))
def a_enable_user(self, user_id):
self.async_call('enable_user',(user_id,))
def a_disable_all_users(self):
self.async_call('disable_all_users',())
def a_enable_all_users(self):
self.async_call('enable_all_users',())
def a_delete_user(self, user_id):
self.async_call('delete_user',(user_id,))
def a_set_user_password(self, user_id, password, temporary=True):
self.async_call('set_user_password',(user_id,password,temporary, ))
def a_get_credentials(self, user_id):
self.async_call('get_credentials',(user_id,))
def a_delete_credential(self, user_id, credential_id):
self.async_call('get_credentials',(user_id, credential_id,))
def a_user_logout(self, user_id):
self.async_call('user_logout',(user_id,))
def a_user_consents(self, user_id):
self.async_call('user_consents',(user_id,))
def a_get_user_social_logins(self, user_id):
self.async_call('get_user_social_logins',(user_id,))
def a_add_user_social_login(self, user_id, provider_id, provider_userid, provider_username):
self.async_call('add_user_social_login',(user_id, provider_id, provider_userid, provider_username))
def a_delete_user_social_login(self, user_id, provider_id):
self.async_call('delete_user_social_login',(user_id,provider_id,))
def a_send_update_account(self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None):
self.async_call('send_update_account',(user_id, payload, client_id, lifespan, redirect_uri,))
def a_send_verify_email(self, user_id, client_id=None, redirect_uri=None):
self.async_call('send_verify_email',(user_id, client_id, redirect_uri,))
def a_get_sessions(self, user_id):
self.async_call('get_sessions',(user_id,))
def a_get_server_info(self):
self.async_call('get_server_info',())
def a_get_groups(self, query=None, full_hierarchy=False):
self.async_call('get_groups',(query, full_hierarchy,))
def a_get_group(self, group_id, full_hierarchy=False):
self.async_call('get_group',(group_id, full_hierarchy,))
def a_get_subgroups(self, group, path):
self.async_call('get_subgroups',(group, path,))
def a_get_group_children(self, group_id, query=None, full_hierarchy=False):
self.async_call('get_group_children',( group_id, query, full_hierarchy,))
def a_get_group_members(self, group_id, query=None):
self.async_call('get_group_members',(group_id, query,))
def a_get_group_by_path(self, path):
self.async_call('get_group_by_path',(path,))
def a_create_group(self, payload, parent=None, skip_exists=False):
self.async_call('get_group_by_path',(path,))
def a_update_group(self, group_id, payload):
self.async_call('update_group',(group_id, payload,))
def a_groups_count(self, query=None):
self.async_call('groups_count',(query,))
def a_group_set_permissions(self, group_id, enabled=True):
self.async_call('group_set_permissions',( group_id, enabled ,))
def a_group_user_add(self, user_id, group_id):
self.async_call('group_user_add',( user_id, group_id,))
def a_group_user_remove(self, user_id, group_id):
self.async_call('group_user_remove',(user_id, group_id,))
def a_delete_group(self, group_id):
self.async_call('delete_group',(group_id,))
def a_get_clients(self):
self.async_call('get_clients',())
def a_get_client(self, client_id):
self.async_call('get_client',(client_id,))
def a_get_client_id(self, client_id):
self.async_call('get_client_id',(client_id,))
def a_get_client_authz_settings(self, client_id):
self.async_call('get_client_authz_settings',(client_id,))
def a_create_client_authz_resource(self, client_id, payload, skip_exists=False):
self.async_call('create_client_authz_resource',(client_id, payload, skip_exists,))
def a_update_client_authz_resource(self, client_id, resource_id, payload):
self.async_call('update_client_authz_resource',(client_id, resource_id, payload,))
def a_delete_client_authz_resource(self, client_id: str, resource_id: str):
self.async_call('delete_client_authz_resource',(client_id, resource_id,))
def a_get_client_authz_resource(self, client_id: str, resource_id: str):
self.async_call('get_client_authz_resource',(client_id, resource_id,))
def a_create_client_authz_role_based_policy(self, client_id, payload, skip_exists=False):
self.async_call('create_client_authz_role_based_policy',( client_id, payload, skip_exists,))
def a_create_client_authz_policy(self, client_id, payload, skip_exists=False):
self.async_call('create_client_authz_policy',( client_id, payload, skip_exists,))
def a_create_client_authz_resource_based_permission(self, client_id, payload, skip_exists=False):
self.async_call('create_client_authz_resource_based_permission',( client_id, payload, skip_exists,))
def a_get_client_authz_scopes(self, client_id):
self.async_call('get_client_authz_scopes',( client_id,))
def a_create_client_authz_scopes(self, client_id, payload):
self.async_call('create_client_authz_scopes',( client_id,payload))
def a_get_client_authz_permissions(self, client_id):
self.async_call('get_client_authz_permissions',( client_id,))
def a_get_client_authz_policies(self, client_id):
self.async_call('get_client_authz_policies',( client_id,))
def a_delete_client_authz_policy(self, client_id, policy_id):
self.async_call('delete_client_authz_policy',( client_id,policy_id,))
def a_get_client_authz_policy(self, client_id, policy_id):
self.async_call('get_client_authz_policy',( client_id,policy_id,))
def a_get_client_service_account_user(self, client_id):
self.async_call('get_client_service_account_user',( client_id,))
def a_get_client_default_client_scopes(self, client_id):
self.async_call('get_client_default_client_scopes',( client_id,))
def a_add_client_default_client_scope(self, client_id, client_scope_id, payload):
self.async_call('add_client_default_client_scope',( client_id,client_scope_id, payload))
def a_delete_client_default_client_scope(self, client_id, client_scope_id):
self.async_call('delete_client_default_client_scope',( client_id,client_scope_id,))
def a_get_client_optional_client_scopes(self, client_id):
self.async_call('get_client_optional_client_scopes',( client_id,))
def a_add_client_optional_client_scope(self, client_id, client_scope_id, payload):
self.async_call('add_client_optional_client_scope',( client_id,client_scope_id, payload))
def delete_client_optional_client_scope(self, client_id, client_scope_id,):
self.async_call('delete_client_optional_client_scope',( client_id,client_scope_id,))
def a_create_initial_access_token(self, count: int = 1, expiration: int = 1):
self.async_call('create_initial_access_token',( count,expiration,))
def a_create_client(self, payload, skip_exists=False):
self.async_call('create_client',( payload,skip_exists,))
def a_update_client(self, client_id, payload):
self.async_call('update_client',( client_id,payload,))
def a_delete_client(self, client_id):
self.async_call('delete_client',( client_id,))
def a_get_client_installation_provider(self, client_id, provider_id):
self.async_call('get_client_installation_provider',( client_id,provider_id,))
def a_get_realm_roles(self, brief_representation=True, search_text=""):
self.async_call('get_realm_roles',(brief_representation, search_text,))
def a_get_realm_role_groups(self, role_name, query=None, brief_representation=True):
self.async_call('get_realm_role_groups',(role_name, query, brief_representation,))
def a_get_realm_role_members(self, role_name, query=None):
self.async_call('get_realm_role_members',(role_name, query,))
def a_get_default_realm_role_id(self):
self.async_call('get_default_realm_role_id',())
def a_get_realm_default_roles(self):
self.async_call('get_realm_default_roles',())
def a_remove_realm_default_roles(self, payload):
self.async_call('remove_realm_default_roles',(payload,))
def a_add_realm_default_roles(self, payload):
self.async_call('add_realm_default_roles',(payload,))
def a_get_client_roles(self, client_id, brief_representation=True):
self.async_call('get_client_roles',(client_id, brief_representation,))
def a_get_client_role(self, client_id, role_name):
self.async_call('get_client_role',( client_id, role_name,))
def a_get_client_role_id(self, client_id, role_name):
self.async_call('get_client_role_id',( client_id, role_name,))
def a_create_client_role(self, client_role_id, payload, skip_exists=False):
self.async_call('create_client_role',(client_role_id, payload, skip_exists,))
def a_add_composite_client_roles_to_role(self, client_role_id, role_name, roles):
self.async_call('add_composite_client_roles_to_role',(client_role_id, role_name, roles,))
def a_update_client_role(self, client_id, role_name, payload):
self.async_call('update_client_role',(client_id, role_name, payload,))
def a_delete_client_role(self, client_role_id, role_name):
self.async_call('delete_client_role',( client_role_id, role_name,))
def a_assign_client_role(self, user_id, client_id, roles):
self.async_call('assign_client_role',(user_id, client_id, roles,))
def a_get_client_role_members(self, client_id, role_name, **query):
self.async_call('get_client_role_members',(client_id, role_name, query,))
def a_get_client_role_groups(self, client_id, role_name, **query):
self.async_call('get_client_role_groups',(client_id, role_name, query,))
def a_get_role_by_id(self, role_id):
self.async_call('get_role_by_id',(role_id,))
def a_update_role_by_id(self, role_id, payload):
self.async_call('update_role_by_id',(role_id, payload,))
def a_delete_role_by_id(self, role_id):
self.async_call('delete_role_by_id',(role_id,))
def a_create_realm_role(self, payload, skip_exists=False):
self.async_call('create_realm_role',(payload, skip_exists,))
def a_get_realm_role(self, role_name):
self.async_call('get_realm_role',(role_name,))
def a_get_realm_role_by_id(self, role_id: str):
self.async_call('get_realm_role_by_id',(role_id,))
def update_realm_role(self, role_name, payload):
self.async_call('update_realm_role',(role_name, payload,))
def a_delete_realm_role(self, role_name):
self.async_call('delete_realm_role',(role_name,))
def a_add_composite_realm_roles_to_role(self, role_name, roles):
self.async_call('add_composite_realm_roles_to_role',(role_name,roles))
def a_remove_composite_realm_roles_to_role(self, role_name, roles):
self.async_call('remove_composite_realm_roles_to_role',(role_name,roles))
def a_get_composite_realm_roles_of_role(self, role_name):
self.async_call('get_composite_realm_roles_of_role',(role_name,))
def a_assign_realm_roles_to_client_scope(self, client_id, roles):
self.async_call('assign_realm_roles_to_client_scope',( client_id, roles,))
def a_delete_realm_roles_of_client_scope(self, client_id, roles):
self.async_call('delete_realm_roles_of_client_scope',( client_id, roles,))
def a_get_realm_roles_of_client_scope(self, client_id):
self.async_call('get_realm_roles_of_client_scope',( client_id,))
def a_assign_client_roles_to_client_scope(self, client_id, client_roles_owner_id, roles):
self.async_call('assign_client_roles_to_client_scope',(client_id, client_roles_owner_id, roles,))
def a_delete_client_roles_of_client_scope(self, client_id, client_roles_owner_id, roles):
self.async_call('delete_client_roles_of_client_scope',(client_id, client_roles_owner_id, roles,))
def a_get_client_roles_of_client_scope(self, client_id, client_roles_owner_id):
self.async_call('get_client_roles_of_client_scope',(client_id, client_roles_owner_id,))
def a_assign_realm_roles(self, user_id, roles):
self.async_call('assign_realm_roles',(client_id,roles,))
def a_delete_realm_roles_of_user(self, user_id, roles):
self.async_call('delete_realm_roles_of_user',(user_id, roles,))
def a_get_realm_roles_of_user(self, user_id):
self.async_call('get_realm_roles_of_user',(user_id,))
def a_get_available_realm_roles_of_user(self, user_id):
self.async_call('get_available_realm_roles_of_user',(user_id,))
def a_get_composite_realm_roles_of_user(self, user_id, brief_representation=True):
self.async_call('get_composite_realm_roles_of_user',(user_id, brief_representation,))
def a_assign_group_realm_roles(self, group_id, roles):
self.async_call('assign_group_realm_roles',(group_id, roles,))
def a_delete_group_realm_roles(self, group_id, roles):
self.async_call('delete_group_realm_roles',(group_id, roles,))
def a_get_group_realm_roles(self, group_id, brief_representation=True):
self.async_call('get_group_realm_roles',(group_id, brief_representation,))
def a_assign_group_client_roles(self, group_id, client_id, roles):
pass
def a_get_group_client_roles(self, group_id, client_id):
pass
def a_delete_group_client_roles(self, group_id, client_id, roles):
pass
def a_get_all_roles_of_user(self, user_id):
pass
def a_get_client_roles_of_user(self, user_id, client_id):
pass
def a_get_available_client_roles_of_user(self, user_id, client_id):
pass
def a_get_composite_client_roles_of_user(self, user_id, client_id, brief_representation=False):
pass
def a__get_client_roles_of_user(
self, client_level_role_mapping_url, user_id, client_id, **params
):
pass
def a_delete_client_roles_of_user(self, user_id, client_id, roles):
pass
def a_get_authentication_flows(self):
pass
def a_get_authentication_flow_for_id(self, flow_id):
pass
def a_create_authentication_flow(self, payload, skip_exists=False):
pass
def a_copy_authentication_flow(self, payload, flow_alias):
pass
def a_delete_authentication_flow(self, flow_id):
pass
def a_get_authentication_flow_executions(self, flow_alias):
pass
def a_update_authentication_flow_executions(self, payload, flow_alias):
pass
def a_get_authentication_flow_execution(self, execution_id):
pass
def a_create_authentication_flow_execution(self, payload, flow_alias):
pass
def a_delete_authentication_flow_execution(self, execution_id):
pass
def a_create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False):
pass
def a_get_authenticator_providers(self):
pass
def a_get_authenticator_provider_config_description(self, provider_id):
pass
def a_get_authenticator_config(self, config_id):
pass
def a_update_authenticator_config(self, payload, config_id):
pass
def a_delete_authenticator_config(self, config_id):
pass
def a_sync_users(self, storage_id, action):
pass
def a_get_client_scopes(self):
pass
def a_get_client_scope(self, client_scope_id):
pass
def a_get_client_scope_by_name(self, client_scope_name):
pass
def a_create_client_scope(self, payload, skip_exists=False):
pass
def a_update_client_scope(self, client_scope_id, payload):
pass
def a_delete_client_scope(self, client_scope_id):
pass
def a_get_mappers_from_client_scope(self, client_scope_id):
pass
def a_add_mapper_to_client_scope(self, client_scope_id, payload):
pass
def a_delete_mapper_from_client_scope(self, client_scope_id, protocol_mapper_id):
pass
def a_update_mapper_in_client_scope(self, client_scope_id, protocol_mapper_id, payload):
pass
def a_get_default_default_client_scopes(self):
pass
def a_delete_default_default_client_scope(self, scope_id):
pass
def a_add_default_default_client_scope(self, scope_id):
pass
def a_delete_default_optional_client_scope(self, scope_id):
pass
def a_add_default_optional_client_scope(self, scope_id):
pass
def a_get_mappers_from_client(self, client_id):
pass
def a_add_mapper_to_client(self, client_id, payload):
pass
def a_update_client_mapper(self, client_id, mapper_id, payload):
pass
def a_remove_client_mapper(self, client_id, client_mapper_id):
pass
def a_generate_client_secrets(self, client_id):
pass
def a_get_client_secrets(self, client_id):
pass
def a_get_components(self, query=None):
pass
def a_create_component(self, payload):
pass
def a_get_component(self, component_id):
pass
def a_update_component(self, component_id, payload):
pass
def a_delete_component(self, component_id):
pass
def a_get_keys(self):
pass
def a_get_admin_events(self, query=None):
pass
def a_get_events(self, query=None):
pass
def a_set_events(self, payload):
pass
def a_get_client_all_sessions(self, client_id):
pass
def a_get_client_sessions_stats(self):
pass
def a_get_client_management_permissions(self, client_id):
pass
def a_update_client_management_permissions(self, payload, client_id):
pass
def a_get_client_authz_policy_scopes(self, client_id, policy_id):
pass
def a_get_client_authz_policy_resources(self, client_id, policy_id):
pass
def a_get_client_authz_scope_permission(self, client_id, scope_id):
pass
def a_create_client_authz_scope_permission(self, payload, client_id):
pass
def a_update_client_authz_scope_permission(self, payload, client_id, scope_id):
pass
def a_get_client_authz_client_policies(self, client_id):
pass
def a_create_client_authz_client_policy(self, payload, client_id):
pass
def a_get_composite_client_roles_of_group(self, client_id, group_id, brief_representation=True):
pass
def a_get_role_client_level_children(self, client_id, role_id):
pass
def a_upload_certificate(self, client_id, certcont):
pass
def a_get_required_action_by_alias(self, action_alias):
pass
def a_get_required_actions(self):
pass
def a_update_required_action(self, action_alias, payload):
pass
def a_get_bruteforce_detection_status(self, user_id):
pass
def a_clear_bruteforce_attempts_for_user(self, user_id):
pass
def a_clear_all_bruteforce_attempts(self):
pass
def a_clear_keys_cache(self):
pass
def a_clear_realm_cache(self):
pass
def a_clear_user_cache(self):
pass
Loading…
Cancel
Save