diff --git a/.gitignore b/.gitignore index 7ea9902..4c8d46d 100644 --- a/.gitignore +++ b/.gitignore @@ -103,4 +103,5 @@ ENV/ .idea/ main.py main2.py -s3air-authz-config.json \ No newline at end of file +s3air-authz-config.json +.vscode \ No newline at end of file diff --git a/keycloak/__init__.py b/keycloak/__init__.py index 987ce1c..9ccb527 100644 --- a/keycloak/__init__.py +++ b/keycloak/__init__.py @@ -21,5 +21,7 @@ # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -from .keycloak_admin import * -from .keycloak_openid import * +from .keycloak_admin import KeycloakAdmin +from .keycloak_openid import KeycloakOpenID + +__all__ = ["KeycloakAdmin", "KeycloakOpenID"] diff --git a/keycloak/authorization/__init__.py b/keycloak/authorization/__init__.py index 219687b..dad1078 100644 --- a/keycloak/authorization/__init__.py +++ b/keycloak/authorization/__init__.py @@ -55,39 +55,44 @@ class Authorization: :param data: keycloak authorization data (dict) :return: """ - for pol in data['policies']: - if pol['type'] == 'role': - policy = Policy(name=pol['name'], - type=pol['type'], - logic=pol['logic'], - decision_strategy=pol['decisionStrategy']) - - config_roles = json.loads(pol['config']['roles']) + for pol in data["policies"]: + if pol["type"] == "role": + policy = Policy( + name=pol["name"], + type=pol["type"], + logic=pol["logic"], + decision_strategy=pol["decisionStrategy"], + ) + + config_roles = json.loads(pol["config"]["roles"]) for role in config_roles: - policy.add_role(Role(name=role['id'], - required=role['required'])) + policy.add_role(Role(name=role["id"], required=role["required"])) self.policies[policy.name] = policy - if pol['type'] == 'scope': - permission = Permission(name=pol['name'], - type=pol['type'], - logic=pol['logic'], - decision_strategy=pol['decisionStrategy']) + if pol["type"] == "scope": + permission = Permission( + name=pol["name"], + type=pol["type"], + logic=pol["logic"], + decision_strategy=pol["decisionStrategy"], + ) - permission.scopes = ast.literal_eval(pol['config']['scopes']) + permission.scopes = ast.literal_eval(pol["config"]["scopes"]) - for policy_name in ast.literal_eval(pol['config']['applyPolicies']): + for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]): self.policies[policy_name].add_permission(permission) - if pol['type'] == 'resource': - permission = Permission(name=pol['name'], - type=pol['type'], - logic=pol['logic'], - decision_strategy=pol['decisionStrategy']) + if pol["type"] == "resource": + permission = Permission( + name=pol["name"], + type=pol["type"], + logic=pol["logic"], + decision_strategy=pol["decisionStrategy"], + ) - permission.resources = ast.literal_eval(pol['config'].get('resources', "[]")) + permission.resources = ast.literal_eval(pol["config"].get("resources", "[]")) - for policy_name in ast.literal_eval(pol['config']['applyPolicies']): + for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]): if self.policies.get(policy_name) is not None: self.policies[policy_name].add_permission(permission) diff --git a/keycloak/authorization/permission.py b/keycloak/authorization/permission.py index 9988730..edbfe01 100644 --- a/keycloak/authorization/permission.py +++ b/keycloak/authorization/permission.py @@ -26,7 +26,8 @@ class Permission: """ Consider this simple and very common permission: - A permission associates the object being protected with the policies that must be evaluated to determine whether access is granted. + A permission associates the object being protected with the policies that must be evaluated to + determine whether access is granted. X CAN DO Y ON RESOURCE Z diff --git a/keycloak/authorization/policy.py b/keycloak/authorization/policy.py index 9f688f7..4014b7a 100644 --- a/keycloak/authorization/policy.py +++ b/keycloak/authorization/policy.py @@ -29,9 +29,10 @@ class Policy: A policy defines the conditions that must be satisfied to grant access to an object. Unlike permissions, you do not specify the object being protected but rather the conditions that must be satisfied for access to a given object (for example, resource, scope, or both). - Policies are strongly related to the different access control mechanisms (ACMs) that you can use to - protect your resources. With policies, you can implement strategies for attribute-based access control - (ABAC), role-based access control (RBAC), context-based access control, or any combination of these. + Policies are strongly related to the different access control mechanisms (ACMs) that you can + use to protect your resources. With policies, you can implement strategies for attribute-based + access control (ABAC), role-based access control (RBAC), context-based access control, or any + combination of these. https://keycloak.gitbooks.io/documentation/authorization_services/topics/policy/overview.html @@ -98,9 +99,10 @@ class Policy: :param role: keycloak role. :return: """ - if self.type != 'role': + if self.type != "role": raise KeycloakAuthorizationConfigError( - "Can't add role. Policy type is different of role") + "Can't add role. Policy type is different of role" + ) self._roles.append(role) def add_permission(self, permission): diff --git a/keycloak/connection.py b/keycloak/connection.py index bdecfce..8ef45b1 100644 --- a/keycloak/connection.py +++ b/keycloak/connection.py @@ -29,11 +29,11 @@ except ImportError: import requests from requests.adapters import HTTPAdapter -from .exceptions import (KeycloakConnectionError) +from .exceptions import KeycloakConnectionError class ConnectionManager(object): - """ Represents a simple server connection. + """Represents a simple server connection. Args: base_url (str): The server URL. headers (dict): The header parameters of the requests to the server. @@ -52,15 +52,15 @@ class ConnectionManager(object): # retry once to reset connection with Keycloak after tomcat's ConnectionTimeout # see https://github.com/marcospereirampj/python-keycloak/issues/36 - for protocol in ('https://', 'http://'): + for protocol in ("https://", "http://"): adapter = HTTPAdapter(max_retries=1) # adds POST to retry whitelist allowed_methods = set(adapter.max_retries.allowed_methods) - allowed_methods.add('POST') + allowed_methods.add("POST") adapter.max_retries.allowed_methods = frozenset(allowed_methods) self._s.mount(protocol, adapter) - + if proxies: self._s.proxies.update(proxies) @@ -69,7 +69,7 @@ class ConnectionManager(object): @property def base_url(self): - """ Return base url in use for requests to the server. """ + """Return base url in use for requests to the server.""" return self._base_url @base_url.setter @@ -79,7 +79,7 @@ class ConnectionManager(object): @property def timeout(self): - """ Return timeout in use for request to the server. """ + """Return timeout in use for request to the server.""" return self._timeout @timeout.setter @@ -89,7 +89,7 @@ class ConnectionManager(object): @property def verify(self): - """ Return verify in use for request to the server. """ + """Return verify in use for request to the server.""" return self._verify @verify.setter @@ -99,7 +99,7 @@ class ConnectionManager(object): @property def headers(self): - """ Return header request to the server. """ + """Return header request to the server.""" return self._headers @headers.setter @@ -108,7 +108,7 @@ class ConnectionManager(object): self._headers = value def param_headers(self, key): - """ Return a specific header parameter. + """Return a specific header parameter. :arg key (str): Header parameters key. :return: @@ -117,11 +117,11 @@ class ConnectionManager(object): return self.headers.get(key) def clean_headers(self): - """ Clear header parameters. """ + """Clear header parameters.""" self.headers = {} def exist_param_headers(self, key): - """ Check if the parameter exists in the header. + """Check if the parameter exists in the header. :arg key (str): Header parameters key. :return: @@ -130,7 +130,7 @@ class ConnectionManager(object): return self.param_headers(key) is not None def add_param_headers(self, key, value): - """ Add a single parameter inside the header. + """Add a single parameter inside the header. :arg key (str): Header parameters key. value (str): Value to be added. @@ -138,14 +138,14 @@ class ConnectionManager(object): self.headers[key] = value def del_param_headers(self, key): - """ Remove a specific parameter. + """Remove a specific parameter. :arg key (str): Key of the header parameters. """ self.headers.pop(key, None) def raw_get(self, path, **kwargs): - """ Submit get request to the path. + """Submit get request to the path. :arg path (str): Path for request. :return @@ -155,17 +155,18 @@ class ConnectionManager(object): """ try: - return self._s.get(urljoin(self.base_url, path), - params=kwargs, - headers=self.headers, - timeout=self.timeout, - verify=self.verify) + return self._s.get( + urljoin(self.base_url, path), + params=kwargs, + headers=self.headers, + timeout=self.timeout, + verify=self.verify, + ) except Exception as e: - raise KeycloakConnectionError( - "Can't connect to server (%s)" % e) + raise KeycloakConnectionError("Can't connect to server (%s)" % e) def raw_post(self, path, data, **kwargs): - """ Submit post request to the path. + """Submit post request to the path. :arg path (str): Path for request. data (dict): Payload for request. @@ -175,18 +176,19 @@ class ConnectionManager(object): HttpError: Can't connect to server. """ try: - return self._s.post(urljoin(self.base_url, path), - params=kwargs, - data=data, - headers=self.headers, - timeout=self.timeout, - verify=self.verify) + return self._s.post( + urljoin(self.base_url, path), + params=kwargs, + data=data, + headers=self.headers, + timeout=self.timeout, + verify=self.verify, + ) except Exception as e: - raise KeycloakConnectionError( - "Can't connect to server (%s)" % e) + raise KeycloakConnectionError("Can't connect to server (%s)" % e) def raw_put(self, path, data, **kwargs): - """ Submit put request to the path. + """Submit put request to the path. :arg path (str): Path for request. data (dict): Payload for request. @@ -196,18 +198,19 @@ class ConnectionManager(object): HttpError: Can't connect to server. """ try: - return self._s.put(urljoin(self.base_url, path), - params=kwargs, - data=data, - headers=self.headers, - timeout=self.timeout, - verify=self.verify) + return self._s.put( + urljoin(self.base_url, path), + params=kwargs, + data=data, + headers=self.headers, + timeout=self.timeout, + verify=self.verify, + ) except Exception as e: - raise KeycloakConnectionError( - "Can't connect to server (%s)" % e) + raise KeycloakConnectionError("Can't connect to server (%s)" % e) def raw_delete(self, path, data={}, **kwargs): - """ Submit delete request to the path. + """Submit delete request to the path. :arg path (str): Path for request. @@ -218,12 +221,13 @@ class ConnectionManager(object): HttpError: Can't connect to server. """ try: - return self._s.delete(urljoin(self.base_url, path), - params=kwargs, - data=data, - headers=self.headers, - timeout=self.timeout, - verify=self.verify) + return self._s.delete( + urljoin(self.base_url, path), + params=kwargs, + data=data, + headers=self.headers, + timeout=self.timeout, + verify=self.verify, + ) except Exception as e: - raise KeycloakConnectionError( - "Can't connect to server (%s)" % e) + raise KeycloakConnectionError("Can't connect to server (%s)" % e) diff --git a/keycloak/exceptions.py b/keycloak/exceptions.py index 67da62a..d310fff 100644 --- a/keycloak/exceptions.py +++ b/keycloak/exceptions.py @@ -25,8 +25,7 @@ import requests class KeycloakError(Exception): - def __init__(self, error_message="", response_code=None, - response_body=None): + def __init__(self, error_message="", response_code=None, response_body=None): Exception.__init__(self, error_message) @@ -56,6 +55,7 @@ class KeycloakOperationError(KeycloakError): class KeycloakDeprecationError(KeycloakError): pass + class KeycloakGetError(KeycloakOperationError): pass @@ -93,7 +93,7 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists= return {"Already exists"} try: - message = response.json()['message'] + message = response.json()["message"] except (KeyError, ValueError): message = response.content @@ -103,6 +103,6 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists= if response.status_code == 401: error = KeycloakAuthenticationError - raise error(error_message=message, - response_code=response.status_code, - response_body=response.content) + raise error( + error_message=message, response_code=response.status_code, response_body=response.content + ) diff --git a/keycloak/keycloak_admin.py b/keycloak/keycloak_admin.py index 4890c3c..7707990 100644 --- a/keycloak/keycloak_admin.py +++ b/keycloak/keycloak_admin.py @@ -29,33 +29,92 @@ from builtins import isinstance from typing import Iterable from .connection import ConnectionManager -from .exceptions import raise_error_from_response, KeycloakGetError +from .exceptions import KeycloakGetError, raise_error_from_response from .keycloak_openid import KeycloakOpenID - -from .urls_patterns import URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS, URL_ADMIN_CLIENT_AUTHZ_POLICIES, \ - URL_ADMIN_CLIENT_AUTHZ_SCOPES, URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURCES, URL_ADMIN_CLIENT_ROLES, \ - URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY, URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION, \ - URL_ADMIN_GET_SESSIONS, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_GROUPS_REALM_ROLES, \ - URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE, URL_ADMIN_CLIENT_INSTALLATION_PROVIDER, \ - URL_ADMIN_REALM_ROLES_ROLE_BY_NAME, URL_ADMIN_GROUPS_CLIENT_ROLES, \ - URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE, URL_ADMIN_USER_GROUP, URL_ADMIN_REALM_ROLES, URL_ADMIN_GROUP_CHILD, \ - URL_ADMIN_USER_CONSENTS, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_CLIENT, URL_ADMIN_USER, URL_ADMIN_CLIENT_ROLE, \ - URL_ADMIN_USER_GROUPS, URL_ADMIN_CLIENTS, URL_ADMIN_FLOWS_EXECUTIONS, URL_ADMIN_GROUPS, URL_ADMIN_USER_CLIENT_ROLES, \ - URL_ADMIN_REALMS, URL_ADMIN_USERS_COUNT, URL_ADMIN_FLOWS, URL_ADMIN_GROUP, URL_ADMIN_CLIENT_AUTHZ_SETTINGS, \ - URL_ADMIN_GROUP_MEMBERS, URL_ADMIN_USER_STORAGE, URL_ADMIN_GROUP_PERMISSIONS, URL_ADMIN_IDPS, URL_ADMIN_IDP, \ - URL_ADMIN_IDP_MAPPERS, URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, URL_ADMIN_USERS, URL_ADMIN_CLIENT_SCOPES, \ - URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER, URL_ADMIN_CLIENT_SCOPE, URL_ADMIN_CLIENT_SECRETS, \ - URL_ADMIN_USER_REALM_ROLES, URL_ADMIN_USER_REALM_ROLES_AVAILABLE, URL_ADMIN_USER_REALM_ROLES_COMPOSITE, \ - URL_ADMIN_REALM, URL_ADMIN_COMPONENTS, URL_ADMIN_COMPONENT, URL_ADMIN_KEYS, \ - URL_ADMIN_USER_FEDERATED_IDENTITY, URL_ADMIN_USER_FEDERATED_IDENTITIES, URL_ADMIN_CLIENT_ROLE_MEMBERS, \ - URL_ADMIN_REALM_ROLES_MEMBERS, URL_ADMIN_CLIENT_PROTOCOL_MAPPER, URL_ADMIN_CLIENT_SCOPES_MAPPERS, \ - URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION, URL_ADMIN_FLOWS_EXECUTIONS_FLOW, URL_ADMIN_FLOWS_COPY, \ - URL_ADMIN_FLOWS_ALIAS, URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER, URL_ADMIN_AUTHENTICATOR_CONFIG, \ - URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE, URL_ADMIN_CLIENT_ALL_SESSIONS, URL_ADMIN_EVENTS, \ - URL_ADMIN_REALM_EXPORT, URL_ADMIN_DELETE_USER_ROLE, URL_ADMIN_USER_LOGOUT, URL_ADMIN_FLOWS_EXECUTION, \ - URL_ADMIN_FLOW, URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES, URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE, \ - URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES, URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE, \ - URL_ADMIN_USER_CREDENTIALS, URL_ADMIN_USER_CREDENTIAL, URL_ADMIN_CLIENT_PROTOCOL_MAPPERS +from .urls_patterns import ( + URL_ADMIN_AUTHENTICATOR_CONFIG, + URL_ADMIN_CLIENT, + URL_ADMIN_CLIENT_ALL_SESSIONS, + URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS, + URL_ADMIN_CLIENT_AUTHZ_POLICIES, + URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION, + URL_ADMIN_CLIENT_AUTHZ_RESOURCES, + URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY, + URL_ADMIN_CLIENT_AUTHZ_SCOPES, + URL_ADMIN_CLIENT_AUTHZ_SETTINGS, + URL_ADMIN_CLIENT_INSTALLATION_PROVIDER, + URL_ADMIN_CLIENT_PROTOCOL_MAPPER, + URL_ADMIN_CLIENT_PROTOCOL_MAPPERS, + URL_ADMIN_CLIENT_ROLE, + URL_ADMIN_CLIENT_ROLE_GROUPS, + URL_ADMIN_CLIENT_ROLE_MEMBERS, + URL_ADMIN_CLIENT_ROLES, + URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE, + URL_ADMIN_CLIENT_SCOPE, + URL_ADMIN_CLIENT_SCOPES, + URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER, + URL_ADMIN_CLIENT_SCOPES_MAPPERS, + URL_ADMIN_CLIENT_SECRETS, + URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER, + URL_ADMIN_CLIENTS, + URL_ADMIN_COMPONENT, + URL_ADMIN_COMPONENTS, + URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE, + URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES, + URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE, + URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES, + URL_ADMIN_DELETE_USER_ROLE, + URL_ADMIN_EVENTS, + URL_ADMIN_FLOW, + URL_ADMIN_FLOWS, + URL_ADMIN_FLOWS_ALIAS, + URL_ADMIN_FLOWS_COPY, + URL_ADMIN_FLOWS_EXECUTION, + URL_ADMIN_FLOWS_EXECUTIONS, + URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION, + URL_ADMIN_FLOWS_EXECUTIONS_FLOW, + URL_ADMIN_GET_SESSIONS, + URL_ADMIN_GROUP, + URL_ADMIN_GROUP_CHILD, + URL_ADMIN_GROUP_MEMBERS, + URL_ADMIN_GROUP_PERMISSIONS, + URL_ADMIN_GROUPS, + URL_ADMIN_GROUPS_CLIENT_ROLES, + URL_ADMIN_GROUPS_REALM_ROLES, + URL_ADMIN_IDP, + URL_ADMIN_IDP_MAPPERS, + URL_ADMIN_IDPS, + URL_ADMIN_KEYS, + URL_ADMIN_REALM, + URL_ADMIN_REALM_EXPORT, + URL_ADMIN_REALM_ROLES, + URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE, + URL_ADMIN_REALM_ROLES_MEMBERS, + URL_ADMIN_REALM_ROLES_ROLE_BY_NAME, + URL_ADMIN_REALMS, + URL_ADMIN_RESET_PASSWORD, + URL_ADMIN_SEND_UPDATE_ACCOUNT, + URL_ADMIN_SEND_VERIFY_EMAIL, + URL_ADMIN_SERVER_INFO, + URL_ADMIN_USER, + URL_ADMIN_USER_CLIENT_ROLES, + URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, + URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE, + URL_ADMIN_USER_CONSENTS, + URL_ADMIN_USER_CREDENTIAL, + URL_ADMIN_USER_CREDENTIALS, + URL_ADMIN_USER_FEDERATED_IDENTITIES, + URL_ADMIN_USER_FEDERATED_IDENTITY, + URL_ADMIN_USER_GROUP, + URL_ADMIN_USER_GROUPS, + URL_ADMIN_USER_LOGOUT, + URL_ADMIN_USER_REALM_ROLES, + URL_ADMIN_USER_REALM_ROLES_AVAILABLE, + URL_ADMIN_USER_REALM_ROLES_COMPOSITE, + URL_ADMIN_USER_STORAGE, + URL_ADMIN_USERS, + URL_ADMIN_USERS_COUNT, +) class KeycloakAdmin: @@ -75,8 +134,19 @@ class KeycloakAdmin: _custom_headers = None _user_realm_name = None - def __init__(self, server_url, username=None, password=None, realm_name='master', client_id='admin-cli', verify=True, - client_secret_key=None, custom_headers=None, user_realm_name=None, auto_refresh_token=None): + def __init__( + self, + server_url, + username=None, + password=None, + realm_name="master", + client_id="admin-cli", + verify=True, + client_secret_key=None, + custom_headers=None, + user_realm_name=None, + auto_refresh_token=None, + ): """ :param server_url: Keycloak server url @@ -85,10 +155,12 @@ class KeycloakAdmin: :param realm_name: realm name :param client_id: client id :param verify: True if want check connection SSL - :param client_secret_key: client secret key (optional, required only for access type confidential) + :param client_secret_key: client secret key + (optional, required only for access type confidential) :param custom_headers: dict of custom header to pass to each HTML request :param user_realm_name: The realm name of the user, if different from realm_name - :param auto_refresh_token: list of methods that allows automatic token refresh. ex: ['get', 'put', 'post', 'delete'] + :param auto_refresh_token: list of methods that allows automatic token refresh. + ex: ['get', 'put', 'post', 'delete'] """ self.server_url = server_url self.username = username @@ -198,40 +270,46 @@ class KeycloakAdmin: @auto_refresh_token.setter def auto_refresh_token(self, value): - allowed_methods = {'get', 'post', 'put', 'delete'} + allowed_methods = {"get", "post", "put", "delete"} if not isinstance(value, Iterable): - raise TypeError('Expected a list of strings among {allowed}'.format(allowed=allowed_methods)) + raise TypeError( + "Expected a list of strings among {allowed}".format(allowed=allowed_methods) + ) if not all(method in allowed_methods for method in value): - raise TypeError('Unexpected method in auto_refresh_token, accepted methods are {allowed}'.format(allowed=allowed_methods)) + raise TypeError( + "Unexpected method in auto_refresh_token, accepted methods are {allowed}".format( + allowed=allowed_methods + ) + ) self._auto_refresh_token = value def __fetch_all(self, url, query=None): - '''Wrapper function to paginate GET requests + """Wrapper function to paginate GET requests :param url: The url on which the query is executed :param query: Existing query parameters (optional) :return: Combined results of paginated queries - ''' + """ results = [] # initalize query if it was called with None if not query: query = {} page = 0 - query['max'] = self.PAGE_SIZE + query["max"] = self.PAGE_SIZE # fetch until we can while True: - query['first'] = page*self.PAGE_SIZE + query["first"] = page * self.PAGE_SIZE partial_results = raise_error_from_response( - self.raw_get(url, **query), - KeycloakGetError) + self.raw_get(url, **query), KeycloakGetError + ) if not partial_results: break results.extend(partial_results) - if len(partial_results) < query['max']: + if len(partial_results) < query["max"]: break page += 1 return results @@ -239,9 +317,7 @@ class KeycloakAdmin: def __fetch_paginated(self, url, query=None): query = query or {} - return raise_error_from_response( - self.raw_get(url, **query), - KeycloakGetError) + return raise_error_from_response(self.raw_get(url, **query), KeycloakGetError) def import_realm(self, payload): """ @@ -255,8 +331,7 @@ class KeycloakAdmin: :return: RealmRepresentation """ - data_raw = self.raw_post(URL_ADMIN_REALMS, - data=json.dumps(payload)) + data_raw = self.raw_post(URL_ADMIN_REALMS, data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) def export_realm(self, export_clients=False, export_groups_and_role=False): @@ -271,7 +346,11 @@ class KeycloakAdmin: :return: realm configurations JSON """ - params_path = {"realm-name": self.realm_name, "export-clients": export_clients, "export-groups-and-roles": export_groups_and_role } + params_path = { + "realm-name": self.realm_name, + "export-clients": export_clients, + "export-groups-and-roles": export_groups_and_role, + } data_raw = self.raw_post(URL_ADMIN_REALM_EXPORT.format(**params_path), data="") return raise_error_from_response(data_raw, KeycloakGetError) @@ -296,9 +375,10 @@ class KeycloakAdmin: :return: Keycloak server response (RealmRepresentation) """ - data_raw = self.raw_post(URL_ADMIN_REALMS, - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post(URL_ADMIN_REALMS, data=json.dumps(payload)) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def update_realm(self, realm_name, payload): """ @@ -314,8 +394,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": realm_name} - data_raw = self.raw_put(URL_ADMIN_REALM.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put(URL_ADMIN_REALM.format(**params_path), data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_realm(self, realm_name): @@ -359,8 +438,7 @@ class KeycloakAdmin: :param: payload: IdentityProviderRepresentation """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post(URL_ADMIN_IDPS.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post(URL_ADMIN_IDPS.format(**params_path), data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) def add_mapper_to_idp(self, idp_alias, payload): @@ -374,8 +452,9 @@ class KeycloakAdmin: :param: payload: IdentityProviderMapperRepresentation """ params_path = {"realm-name": self.realm_name, "idp-alias": idp_alias} - data_raw = self.raw_post(URL_ADMIN_IDP_MAPPERS.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_IDP_MAPPERS.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) def get_idps(self): @@ -409,23 +488,23 @@ class KeycloakAdmin: https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_userrepresentation :param payload: UserRepresentation - :param exist_ok: If False, raise KeycloakGetError if username already exists. Otherwise, return existing user ID. + :param exist_ok: If False, raise KeycloakGetError if username already exists. + Otherwise, return existing user ID. :return: UserRepresentation """ params_path = {"realm-name": self.realm_name} if exist_ok: - exists = self.get_user_id(username=payload['username']) + exists = self.get_user_id(username=payload["username"]) if exists is not None: return str(exists) - data_raw = self.raw_post(URL_ADMIN_USERS.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post(URL_ADMIN_USERS.format(**params_path), data=json.dumps(payload)) raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) - _last_slash_idx = data_raw.headers['Location'].rindex('/') - return data_raw.headers['Location'][_last_slash_idx + 1:] + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 def users_count(self): """ @@ -490,8 +569,7 @@ class KeycloakAdmin: :return: Http response """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_put(URL_ADMIN_USER.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put(URL_ADMIN_USER.format(**params_path), data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_user(self, user_id): @@ -522,8 +600,9 @@ class KeycloakAdmin: """ payload = {"type": "password", "temporary": temporary, "value": password} params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_put(URL_ADMIN_RESET_PASSWORD.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put( + URL_ADMIN_RESET_PASSWORD.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_credentials(self, user_id): @@ -551,7 +630,11 @@ class KeycloakAdmin: :param: credential_id: credential id :return: Keycloak server response (ClientRepresentation) """ - params_path = {"realm-name": self.realm_name, "id": user_id, "credential_id": credential_id} + params_path = { + "realm-name": self.realm_name, + "id": user_id, + "credential_id": credential_id, + } data_raw = self.raw_get(URL_ADMIN_USER_CREDENTIAL.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) @@ -566,7 +649,11 @@ class KeycloakAdmin: :param: credential_id: credential id :return: Keycloak server response (ClientRepresentation) """ - params_path = {"realm-name": self.realm_name, "id": user_id, "credential_id": credential_id} + params_path = { + "realm-name": self.realm_name, + "id": user_id, + "credential_id": credential_id, + } data_raw = self.raw_delete(URL_ADMIN_USER_CREDENTIAL.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) @@ -597,7 +684,8 @@ class KeycloakAdmin: def get_user_social_logins(self, user_id): """ - Returns a list of federated identities/social logins of which the user has been associated with + Returns a list of federated identities/social logins of which the user has been associated + with :param user_id: User id :return: federated identities list """ @@ -615,9 +703,15 @@ class KeycloakAdmin: :param provider_username: username specified by the provider :return: """ - payload = {"identityProvider": provider_id, "userId": provider_userid, "userName": provider_username} + payload = { + "identityProvider": provider_id, + "userId": provider_userid, + "userName": provider_username, + } params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} - data_raw = self.raw_post(URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path), data=json.dumps(payload)) + self.raw_post( + URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path), data=json.dumps(payload) + ) def delete_user_social_login(self, user_id, provider_id): @@ -631,7 +725,9 @@ class KeycloakAdmin: data_raw = self.raw_delete(URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) - def send_update_account(self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None): + def send_update_account( + self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None + ): """ Send an update account email to the user. An email contains a link the user can click to perform a set of required actions. @@ -646,8 +742,11 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} - data_raw = self.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), - data=json.dumps(payload), **params_query) + data_raw = self.raw_put( + URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), + data=json.dumps(payload), + **params_query + ) return raise_error_from_response(data_raw, KeycloakGetError) def send_verify_email(self, user_id, client_id=None, redirect_uri=None): @@ -663,8 +762,9 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params_query = {"client_id": client_id, "redirect_uri": redirect_uri} - data_raw = self.raw_put(URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path), - data={}, **params_query) + data_raw = self.raw_put( + URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path), data={}, **params_query + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_sessions(self, user_id): @@ -705,7 +805,7 @@ class KeycloakAdmin: """ query = query or {} params_path = {"realm-name": self.realm_name} - url = URL_ADMIN_USERS.format(**params_path) + url = URL_ADMIN_GROUPS.format(**params_path) if "first" in query or "max" in query: return self.__fetch_paginated(url, query) @@ -740,7 +840,7 @@ class KeycloakAdmin: """ for subgroup in group["subGroups"]: - if subgroup['path'] == path: + if subgroup["path"] == path: return subgroup elif subgroup["subGroups"]: for subgroup in group["subGroups"]: @@ -758,11 +858,12 @@ class KeycloakAdmin: https://www.keycloak.org/docs-api/8.0/rest-api/#_userrepresentation :param group_id: The group id - :param query: Additional query parameters (see https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_getmembers) + :param query: Additional query parameters + (see https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_getmembers) :return: Keycloak server response (UserRepresentation) """ params_path = {"realm-name": self.realm_name, "id": group_id} - url = URL_ADMIN_USERS.format(**params_path) + url = URL_ADMIN_GROUP_MEMBERS.format(**params_path) if "first" in query or "max" in query: return self.__fetch_paginated(url, query) @@ -787,14 +888,14 @@ class KeycloakAdmin: # TODO: Review this code is necessary for group in groups: - if group['path'] == path: + if group["path"] == path: return group elif search_in_subgroups and group["subGroups"]: for group in group["subGroups"]: - if group['path'] == path: + if group["path"] == path: return group res = self.get_subgroups(group, path) - if res != None: + if res is not None: return res return None @@ -814,14 +915,21 @@ class KeycloakAdmin: if parent is None: params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post(URL_ADMIN_GROUPS.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_GROUPS.format(**params_path), data=json.dumps(payload) + ) else: - params_path = {"realm-name": self.realm_name, "id": parent, } - data_raw = self.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path), - data=json.dumps(payload)) + params_path = { + "realm-name": self.realm_name, + "id": parent, + } + data_raw = self.raw_post( + URL_ADMIN_GROUP_CHILD.format(**params_path), data=json.dumps(payload) + ) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def update_group(self, group_id, payload): """ @@ -837,8 +945,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_put(URL_ADMIN_GROUP.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put(URL_ADMIN_GROUP.format(**params_path), data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def group_set_permissions(self, group_id, enabled=True): @@ -851,8 +958,10 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_put(URL_ADMIN_GROUP_PERMISSIONS.format(**params_path), - data=json.dumps({"enabled": enabled})) + data_raw = self.raw_put( + URL_ADMIN_GROUP_PERMISSIONS.format(**params_path), + data=json.dumps({"enabled": enabled}), + ) return raise_error_from_response(data_raw, KeycloakGetError) def group_user_add(self, user_id, group_id): @@ -935,7 +1044,7 @@ class KeycloakAdmin: clients = self.get_clients() for client in clients: - if client_name == client.get('name') or client_name == client.get('clientId'): + if client_name == client.get("name") or client_name == client.get("clientId"): return client["id"] return None @@ -965,12 +1074,14 @@ class KeycloakAdmin: :return: Keycloak server response """ - params_path = {"realm-name": self.realm_name, - "id": client_id} + params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post(URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post( + URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def get_client_authz_resources(self, client_id): """ @@ -1008,12 +1119,15 @@ class KeycloakAdmin: :return: Keycloak server response """ - params_path = {"realm-name": self.realm_name, - "id": client_id} + params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post(URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post( + URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def create_client_authz_resource_based_permission(self, client_id, payload, skip_exists=False): """ @@ -1039,12 +1153,15 @@ class KeycloakAdmin: :return: Keycloak server response """ - params_path = {"realm-name": self.realm_name, - "id": client_id} + params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post(URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post( + URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def get_client_authz_scopes(self, client_id): """ @@ -1102,7 +1219,8 @@ class KeycloakAdmin: """ Create a client - ClientRepresentation: https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_clientrepresentation + ClientRepresentation: + https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_clientrepresentation :param skip_exists: If true then do not raise an error if client already exists :param payload: ClientRepresentation @@ -1110,9 +1228,10 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post(URL_ADMIN_CLIENTS.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post(URL_ADMIN_CLIENTS.format(**params_path), data=json.dumps(payload)) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def update_client(self, client_id, payload): """ @@ -1124,8 +1243,7 @@ class KeycloakAdmin: :return: Http response """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_put(URL_ADMIN_CLIENT.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put(URL_ADMIN_CLIENT.format(**params_path), data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_client(self, client_id): @@ -1179,10 +1297,11 @@ class KeycloakAdmin: """ Get role members of realm by role name. :param role_name: Name of the role. - :param query: Additional Query parameters (see https://www.keycloak.org/docs-api/11.0/rest-api/index.html#_roles_resource) + :param query: Additional Query parameters + (see https://www.keycloak.org/docs-api/11.0/rest-api/index.html#_roles_resource) :return: Keycloak Server Response (UserRepresentation) """ - params_path = {"realm-name": self.realm_name, "role-name":role_name} + params_path = {"realm-name": self.realm_name, "role-name": role_name} return self.__fetch_all(URL_ADMIN_REALM_ROLES_MEMBERS.format(**params_path), query) def get_client_roles(self, client_id): @@ -1250,9 +1369,12 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_role_id} - data_raw = self.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post( + URL_ADMIN_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def add_composite_client_roles_to_role(self, client_role_id, role_name, roles): """ @@ -1266,8 +1388,10 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} - data_raw = self.raw_post(URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path), + data=json.dumps(payload), + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_client_role(self, client_role_id, role_name): @@ -1296,8 +1420,9 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_client_role_members(self, client_id, role_name, **query): @@ -1305,12 +1430,24 @@ class KeycloakAdmin: Get members by client role . :param client_id: The client id :param role_name: the name of role to be queried. - :param query: Additional query parameters ( see https://www.keycloak.org/docs-api/11.0/rest-api/index.html#_clients_resource) + :param query: Additional query parameters + (see https://www.keycloak.org/docs-api/11.0/rest-api/index.html#_clients_resource) :return: Keycloak server response (UserRepresentation) """ - params_path = {"realm-name": self.realm_name, "id":client_id, "role-name":role_name} - return self.__fetch_all(URL_ADMIN_CLIENT_ROLE_MEMBERS.format(**params_path) , query) + params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name} + return self.__fetch_all(URL_ADMIN_CLIENT_ROLE_MEMBERS.format(**params_path), query) + def get_client_role_groups(self, client_id, role_name, **query): + """ + Get group members by client role . + :param client_id: The client id + :param role_name: the name of role to be queried. + :param query: Additional query parameters + (see https://www.keycloak.org/docs-api/11.0/rest-api/index.html#_clients_resource) + :return: Keycloak server response + """ + params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name} + return self.__fetch_all(URL_ADMIN_CLIENT_ROLE_GROUPS.format(**params_path), query) def create_realm_role(self, payload, skip_exists=False): """ @@ -1322,9 +1459,12 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post(URL_ADMIN_REALM_ROLES.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post( + URL_ADMIN_REALM_ROLES.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def get_realm_role(self, role_name): """ @@ -1348,8 +1488,9 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.connection.raw_put(URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path), - data=json.dumps(payload)) + data_raw = self.connection.raw_put( + URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_realm_role(self, role_name): @@ -1361,7 +1502,8 @@ class KeycloakAdmin: params_path = {"realm-name": self.realm_name, "role-name": role_name} data_raw = self.connection.raw_delete( - URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path)) + URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def add_composite_realm_roles_to_role(self, role_name, roles): @@ -1377,9 +1519,9 @@ class KeycloakAdmin: params_path = {"realm-name": self.realm_name, "role-name": role_name} data_raw = self.raw_post( URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, - expected_codes=[204]) + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def remove_composite_realm_roles_to_role(self, role_name, roles): """ @@ -1394,9 +1536,9 @@ class KeycloakAdmin: params_path = {"realm-name": self.realm_name, "role-name": role_name} data_raw = self.raw_delete( URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, - expected_codes=[204]) + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_composite_realm_roles_of_role(self, role_name): """ @@ -1407,8 +1549,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_get( - URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path)) + data_raw = self.raw_get(URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def assign_realm_roles(self, user_id, roles): @@ -1422,8 +1563,9 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_post(URL_ADMIN_USER_REALM_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_USER_REALM_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_realm_roles_of_user(self, user_id, roles): @@ -1437,8 +1579,9 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_delete(URL_ADMIN_USER_REALM_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_delete( + URL_ADMIN_USER_REALM_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_realm_roles_of_user(self, user_id): @@ -1484,8 +1627,9 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_post(URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_group_realm_roles(self, group_id, roles): @@ -1499,8 +1643,9 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_delete(URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_delete( + URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_group_realm_roles(self, group_id): @@ -1526,8 +1671,9 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_post(URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_group_client_roles(self, group_id, client_id): @@ -1555,8 +1701,9 @@ class KeycloakAdmin: payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_delete(URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_delete( + URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_client_roles_of_user(self, user_id, client_id): @@ -1577,7 +1724,9 @@ class KeycloakAdmin: :param client_id: id of client (not client-id) :return: Keycloak server response (array RoleRepresentation) """ - return self._get_client_roles_of_user(URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, user_id, client_id) + return self._get_client_roles_of_user( + URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, user_id, client_id + ) def get_composite_client_roles_of_user(self, user_id, client_id): """ @@ -1587,7 +1736,9 @@ class KeycloakAdmin: :param client_id: id of client (not client-id) :return: Keycloak server response (array RoleRepresentation) """ - return self._get_client_roles_of_user(URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE, user_id, client_id) + return self._get_client_roles_of_user( + URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE, user_id, client_id + ) def _get_client_roles_of_user(self, client_level_role_mapping_url, user_id, client_id): params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} @@ -1605,8 +1756,9 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_delete(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_delete( + URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_authentication_flows(self): @@ -1644,18 +1796,20 @@ class KeycloakAdmin: https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_authenticationflowrepresentation :param payload: AuthenticationFlowRepresentation - :param skip_exists: If true then do not raise an error if authentication flow already exists + :param skip_exists: Do not raise an error if authentication flow already exists :return: Keycloak server response (RoleRepresentation) """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post(URL_ADMIN_FLOWS.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post(URL_ADMIN_FLOWS.format(**params_path), data=json.dumps(payload)) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def copy_authentication_flow(self, payload, flow_alias): """ - Copy existing authentication flow under a new name. The new name is given as 'newName' attribute of the passed payload. + Copy existing authentication flow under a new name. The new name is given as 'newName' + attribute of the passed payload. :param payload: JSON containing 'newName' attribute :param flow_alias: the flow alias @@ -1663,8 +1817,9 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post(URL_ADMIN_FLOWS_COPY.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_FLOWS_COPY.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) def delete_authentication_flow(self, flow_id): @@ -1705,9 +1860,10 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) + data_raw = self.raw_put( + URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[202, 204]) def get_authentication_flow_execution(self, execution_id): """ @@ -1736,8 +1892,9 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) def delete_authentication_flow_execution(self, execution_id): @@ -1763,14 +1920,17 @@ class KeycloakAdmin: :param payload: AuthenticationFlowRepresentation :param flow_alias: The flow alias - :param skip_exists: If true then do not raise an error if authentication flow already exists + :param skip_exists: Do not raise an error if authentication flow already exists :return: Keycloak server response (RoleRepresentation) """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post( + URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def get_authenticator_config(self, config_id): """ @@ -1795,8 +1955,9 @@ class KeycloakAdmin: :return: Response(json) """ params_path = {"realm-name": self.realm_name, "id": config_id} - data_raw = self.raw_put(URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put( + URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_authenticator_config(self, config_id): @@ -1821,12 +1982,13 @@ class KeycloakAdmin: :param action: Action can be "triggerFullSync" or "triggerChangedUsersSync" :return: """ - data = {'action': action} + data = {"action": action} params_query = {"action": action} params_path = {"realm-name": self.realm_name, "id": storage_id} - data_raw = self.raw_post(URL_ADMIN_USER_STORAGE.format(**params_path), - data=json.dumps(data), **params_query) + data_raw = self.raw_post( + URL_ADMIN_USER_STORAGE.format(**params_path), data=json.dumps(data), **params_query + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_scopes(self): @@ -1858,7 +2020,8 @@ class KeycloakAdmin: """ Create a client scope - ClientScopeRepresentation: https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_getclientscopes + ClientScopeRepresentation: + https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_getclientscopes :param payload: ClientScopeRepresentation :param skip_exists: If true then do not raise an error if client scope already exists @@ -1866,15 +2029,19 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post(URL_ADMIN_CLIENT_SCOPES.format(**params_path), - data=json.dumps(payload)) - return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) + data_raw = self.raw_post( + URL_ADMIN_CLIENT_SCOPES.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response( + data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists + ) def update_client_scope(self, client_scope_id, payload): """ Update a client scope - ClientScopeRepresentation: https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_client_scopes_resource + ClientScopeRepresentation: + https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_client_scopes_resource :param client_scope_id: The id of the client scope :param payload: ClientScopeRepresentation @@ -1882,8 +2049,9 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_put(URL_ADMIN_CLIENT_SCOPE.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put( + URL_ADMIN_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def add_mapper_to_client_scope(self, client_scope_id, payload): @@ -1899,7 +2067,8 @@ class KeycloakAdmin: params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} data_raw = self.raw_post( - URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload)) + URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) @@ -1913,11 +2082,13 @@ class KeycloakAdmin: :return: Keycloak server Response """ - params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id, - "protocol-mapper-id": protocol_mppaer_id} + params_path = { + "realm-name": self.realm_name, + "scope-id": client_scope_id, + "protocol-mapper-id": protocol_mppaer_id, + } - data_raw = self.raw_delete( - URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path)) + data_raw = self.raw_delete(URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) @@ -1933,11 +2104,15 @@ class KeycloakAdmin: :return: Keycloak server Response """ - params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id, - "protocol-mapper-id": protocol_mapper_id} + params_path = { + "realm-name": self.realm_name, + "scope-id": client_scope_id, + "protocol-mapper-id": protocol_mapper_id, + } data_raw = self.raw_put( - URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path), data=json.dumps(payload)) + URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) @@ -1951,7 +2126,6 @@ class KeycloakAdmin: data_raw = self.raw_get(URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) - def delete_default_default_client_scope(self, scope_id): """ Delete default default client scope @@ -1963,7 +2137,6 @@ class KeycloakAdmin: data_raw = self.raw_delete(URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) - def add_default_default_client_scope(self, scope_id): """ Add default default client scope @@ -1973,10 +2146,11 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": scope_id} payload = {"realm": self.realm_name, "clientScopeId": scope_id} - data_raw = self.raw_put(URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload)) + data_raw = self.raw_put( + URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) - def get_default_optional_client_scopes(self): """ Return list of default optional client scopes @@ -1987,7 +2161,6 @@ class KeycloakAdmin: data_raw = self.raw_get(URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) - def delete_default_optional_client_scope(self, scope_id): """ Delete default optional client scope @@ -1999,7 +2172,6 @@ class KeycloakAdmin: data_raw = self.raw_delete(URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) - def add_default_optional_client_scope(self, scope_id): """ Add default optional client scope @@ -2009,10 +2181,11 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": scope_id} payload = {"realm": self.realm_name, "clientScopeId": scope_id} - data_raw = self.raw_put(URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload)) + data_raw = self.raw_put( + URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) - def add_mapper_to_client(self, client_id, payload): """ Add a mapper to a client @@ -2026,28 +2199,30 @@ class KeycloakAdmin: params_path = {"realm-name": self.realm_name, "id": client_id} data_raw = self.raw_post( - URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path), data=json.dumps(payload)) + URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) - + def update_client_mapper(self, client_id, mapper_id, payload): """ Update client mapper :param client_id: The id of the client :param client_mapper_id: The id of the mapper to be deleted :param payload: ProtocolMapperRepresentation - :return: Keycloak server response + :return: Keycloak server response """ params_path = { "realm-name": self.realm_name, - "id": self.client_id, + "id": self.client_id, "protocol-mapper-id": mapper_id, } data_raw = self.raw_put( - URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path), data=json.dumps(payload)) - + URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def remove_client_mapper(self, client_id, client_mapper_id): @@ -2062,14 +2237,13 @@ class KeycloakAdmin: params_path = { "realm-name": self.realm_name, "id": client_id, - "protocol-mapper-id": client_mapper_id + "protocol-mapper-id": client_mapper_id, } - data_raw = self.raw_delete( - URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path)) - + data_raw = self.raw_delete(URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) - + def generate_client_secrets(self, client_id): """ @@ -2109,8 +2283,7 @@ class KeycloakAdmin: :return: components list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(URL_ADMIN_COMPONENTS.format(**params_path), - data=None, **query) + data_raw = self.raw_get(URL_ADMIN_COMPONENTS.format(**params_path), data=None, **query) return raise_error_from_response(data_raw, KeycloakGetError) def create_component(self, payload): @@ -2126,8 +2299,9 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post(URL_ADMIN_COMPONENTS.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_post( + URL_ADMIN_COMPONENTS.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) def get_component(self, component_id): @@ -2156,8 +2330,9 @@ class KeycloakAdmin: :return: Http response """ params_path = {"realm-name": self.realm_name, "component-id": component_id} - data_raw = self.raw_put(URL_ADMIN_COMPONENT.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put( + URL_ADMIN_COMPONENT.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def delete_component(self, component_id): @@ -2182,8 +2357,7 @@ class KeycloakAdmin: :return: keys list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(URL_ADMIN_KEYS.format(**params_path), - data=None) + data_raw = self.raw_get(URL_ADMIN_KEYS.format(**params_path), data=None) return raise_error_from_response(data_raw, KeycloakGetError) def get_events(self, query=None): @@ -2196,8 +2370,7 @@ class KeycloakAdmin: :return: events list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(URL_ADMIN_EVENTS.format(**params_path), - data=None, **query) + data_raw = self.raw_get(URL_ADMIN_EVENTS.format(**params_path), data=None, **query) return raise_error_from_response(data_raw, KeycloakGetError) def set_events(self, payload): @@ -2210,8 +2383,7 @@ class KeycloakAdmin: :return: Http response """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_put(URL_ADMIN_EVENTS.format(**params_path), - data=json.dumps(payload)) + data_raw = self.raw_put(URL_ADMIN_EVENTS.format(**params_path), data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def raw_get(self, *args, **kwargs): @@ -2222,7 +2394,7 @@ class KeycloakAdmin: and try *get* once more. """ r = self.connection.raw_get(*args, **kwargs) - if 'get' in self.auto_refresh_token and r.status_code == 401: + if "get" in self.auto_refresh_token and r.status_code == 401: self.refresh_token() return self.connection.raw_get(*args, **kwargs) return r @@ -2235,7 +2407,7 @@ class KeycloakAdmin: and try *post* once more. """ r = self.connection.raw_post(*args, **kwargs) - if 'post' in self.auto_refresh_token and r.status_code == 401: + if "post" in self.auto_refresh_token and r.status_code == 401: self.refresh_token() return self.connection.raw_post(*args, **kwargs) return r @@ -2248,7 +2420,7 @@ class KeycloakAdmin: and try *put* once more. """ r = self.connection.raw_put(*args, **kwargs) - if 'put' in self.auto_refresh_token and r.status_code == 401: + if "put" in self.auto_refresh_token and r.status_code == 401: self.refresh_token() return self.connection.raw_put(*args, **kwargs) return r @@ -2257,11 +2429,11 @@ class KeycloakAdmin: """ Calls connection.raw_delete. - If auto_refresh is set for *delete* and *access_token* is expired, it will refresh the token - and try *delete* once more. + If auto_refresh is set for *delete* and *access_token* is expired, + it will refresh the token and try *delete* once more. """ r = self.connection.raw_delete(*args, **kwargs) - if 'delete' in self.auto_refresh_token and r.status_code == 401: + if "delete" in self.auto_refresh_token and r.status_code == 401: self.refresh_token() return self.connection.raw_delete(*args, **kwargs) return r @@ -2273,11 +2445,15 @@ class KeycloakAdmin: token_realm_name = self.realm_name else: token_realm_name = "master" - - self.keycloak_openid = KeycloakOpenID(server_url=self.server_url, client_id=self.client_id, - realm_name=token_realm_name, verify=self.verify, - client_secret_key=self.client_secret_key, - custom_headers=self.custom_headers) + + self.keycloak_openid = KeycloakOpenID( + server_url=self.server_url, + client_id=self.client_id, + realm_name=token_realm_name, + verify=self.verify, + client_secret_key=self.client_secret_key, + custom_headers=self.custom_headers, + ) grant_type = ["password"] if self.client_secret_key: @@ -2286,11 +2462,13 @@ class KeycloakAdmin: self.realm_name = self.user_realm_name if self.username and self.password: - self._token = self.keycloak_openid.token(self.username, self.password, grant_type=grant_type) + self._token = self.keycloak_openid.token( + self.username, self.password, grant_type=grant_type + ) headers = { - 'Authorization': 'Bearer ' + self.token.get('access_token'), - 'Content-Type': 'application/json' + "Authorization": "Bearer " + self.token.get("access_token"), + "Content-Type": "application/json", } else: self._token = None @@ -2300,13 +2478,12 @@ class KeycloakAdmin: # merge custom headers to main headers headers.update(self.custom_headers) - self._connection = ConnectionManager(base_url=self.server_url, - headers=headers, - timeout=60, - verify=self.verify) + self._connection = ConnectionManager( + base_url=self.server_url, headers=headers, timeout=60, verify=self.verify + ) def refresh_token(self): - refresh_token = self.token.get('refresh_token', None) + refresh_token = self.token.get("refresh_token", None) if refresh_token is None: self.get_token() else: @@ -2314,16 +2491,18 @@ class KeycloakAdmin: self.token = self.keycloak_openid.refresh_token(refresh_token) except KeycloakGetError as e: list_errors = [ - b'Refresh token expired', - b'Token is not active', - b'Session not active' + b"Refresh token expired", + b"Token is not active", + b"Session not active", ] if e.response_code == 400 and any(err in e.response_body for err in list_errors): - self.get_token() + self.get_token() else: raise - - self.connection.add_param_headers('Authorization', 'Bearer ' + self.token.get('access_token')) + + self.connection.add_param_headers( + "Authorization", "Bearer " + self.token.get("access_token") + ) def get_client_all_sessions(self, client_id): """ @@ -2346,9 +2525,10 @@ class KeycloakAdmin: DELETE admin/realms/{realm-name}/users/{id}/role-mappings/realm """ - params_path = {"realm-name": self.realm_name, "id": str(user_id) } - data_raw = self.connection.raw_delete(URL_ADMIN_DELETE_USER_ROLE.format(**params_path), - data=json.dumps(payload)) + params_path = {"realm-name": self.realm_name, "id": str(user_id)} + data_raw = self.connection.raw_delete( + URL_ADMIN_DELETE_USER_ROLE.format(**params_path), data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) def get_client_sessions_stats(self): @@ -2360,8 +2540,5 @@ class KeycloakAdmin: :return: Dict of clients and session count """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( - self.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path) - ) + data_raw = self.raw_get(self.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) - diff --git a/keycloak/keycloak_openid.py b/keycloak/keycloak_openid.py index 1d6ed28..292d9aa 100644 --- a/keycloak/keycloak_openid.py +++ b/keycloak/keycloak_openid.py @@ -27,24 +27,38 @@ from jose import jwt from .authorization import Authorization from .connection import ConnectionManager -from .exceptions import raise_error_from_response, KeycloakGetError, \ - KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError +from .exceptions import ( + KeycloakAuthorizationConfigError, + KeycloakDeprecationError, + KeycloakGetError, + KeycloakInvalidTokenError, + KeycloakRPTNotFound, + raise_error_from_response, +) from .urls_patterns import ( - URL_REALM, URL_AUTH, + URL_CERTS, + URL_ENTITLEMENT, + URL_INTROSPECT, + URL_LOGOUT, + URL_REALM, URL_TOKEN, URL_USERINFO, URL_WELL_KNOWN, - URL_LOGOUT, - URL_CERTS, - URL_ENTITLEMENT, - URL_INTROSPECT ) class KeycloakOpenID: - - def __init__(self, server_url, realm_name, client_id, client_secret_key=None, verify=True, custom_headers=None, proxies=None): + def __init__( + self, + server_url, + realm_name, + client_id, + client_secret_key=None, + verify=True, + custom_headers=None, + proxies=None, + ): """ :param server_url: Keycloak server url @@ -62,11 +76,9 @@ class KeycloakOpenID: if custom_headers is not None: # merge custom headers to main headers headers.update(custom_headers) - self._connection = ConnectionManager(base_url=server_url, - headers=headers, - timeout=60, - verify=verify, - proxies=proxies) + self._connection = ConnectionManager( + base_url=server_url, headers=headers, timeout=60, verify=verify, proxies=proxies + ) self._authorization = Authorization() @@ -138,7 +150,7 @@ class KeycloakOpenID: :param kwargs: :return: """ - if method_token_info == 'introspect': + if method_token_info == "introspect": token_info = self.introspect(token) else: token_info = self.decode_token(token, **kwargs) @@ -146,11 +158,11 @@ class KeycloakOpenID: return token_info def well_know(self): - """ The most important endpoint to understand is the well-known configuration - endpoint. It lists endpoints and other configuration options relevant to - the OpenID Connect implementation in Keycloak. + """The most important endpoint to understand is the well-known configuration + endpoint. It lists endpoints and other configuration options relevant to + the OpenID Connect implementation in Keycloak. - :return It lists endpoints and other configuration options relevant. + :return It lists endpoints and other configuration options relevant. """ params_path = {"realm-name": self.realm_name} @@ -165,12 +177,23 @@ class KeycloakOpenID: :return: """ - params_path = {"authorization-endpoint": self.well_know()['authorization_endpoint'], - "client-id": self.client_id, - "redirect-uri": redirect_uri} + params_path = { + "authorization-endpoint": self.well_know()["authorization_endpoint"], + "client-id": self.client_id, + "redirect-uri": redirect_uri, + } return URL_AUTH.format(**params_path) - def token(self, username="", password="", grant_type=["password"], code="", redirect_uri="", totp=None, **extra): + def token( + self, + username="", + password="", + grant_type=["password"], + code="", + redirect_uri="", + totp=None, + **extra + ): """ The token endpoint is used to obtain tokens. Tokens can either be obtained by exchanging an authorization code or by supplying credentials directly depending on @@ -188,9 +211,14 @@ class KeycloakOpenID: :return: """ params_path = {"realm-name": self.realm_name} - payload = {"username": username, "password": password, - "client_id": self.client_id, "grant_type": grant_type, - "code": code, "redirect_uri": redirect_uri} + payload = { + "username": username, + "password": password, + "client_id": self.client_id, + "grant_type": grant_type, + "code": code, + "redirect_uri": redirect_uri, + } if extra: payload.update(extra) @@ -198,8 +226,7 @@ class KeycloakOpenID: payload["totp"] = totp payload = self._add_secret_key(payload) - data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), - data=payload) + data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakGetError) def refresh_token(self, refresh_token, grant_type=["refresh_token"]): @@ -216,10 +243,13 @@ class KeycloakOpenID: :return: """ params_path = {"realm-name": self.realm_name} - payload = {"client_id": self.client_id, "grant_type": grant_type, "refresh_token": refresh_token} + payload = { + "client_id": self.client_id, + "grant_type": grant_type, + "refresh_token": refresh_token, + } payload = self._add_secret_key(payload) - data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), - data=payload) + data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakGetError) def userinfo(self, token): @@ -250,8 +280,7 @@ class KeycloakOpenID: payload = {"client_id": self.client_id, "refresh_token": refresh_token} payload = self._add_secret_key(payload) - data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), - data=payload) + data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) @@ -268,7 +297,7 @@ class KeycloakOpenID: params_path = {"realm-name": self.realm_name} data_raw = self.connection.raw_get(URL_CERTS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) - + def public_key(self): """ The public key is exposed by the realm page directly. @@ -277,32 +306,31 @@ class KeycloakOpenID: """ params_path = {"realm-name": self.realm_name} data_raw = self.connection.raw_get(URL_REALM.format(**params_path)) - return raise_error_from_response(data_raw, KeycloakGetError)['public_key'] - + return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] def entitlement(self, token, resource_server_id): """ Client applications can use a specific endpoint to obtain a special security token called a requesting party token (RPT). This token consists of all the entitlements - (or permissions) for a user as a result of the evaluation of the permissions and authorization - policies associated with the resources being requested. With an RPT, client applications can - gain access to protected resources at the resource server. + (or permissions) for a user as a result of the evaluation of the permissions and + authorization policies associated with the resources being requested. With an RPT, + client applications can gain access to protected resources at the resource server. :return: """ self.connection.add_param_headers("Authorization", "Bearer " + token) params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id} data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path)) - - if data_raw.status_code == 404: + + if data_raw.status_code == 404: return raise_error_from_response(data_raw, KeycloakDeprecationError) return raise_error_from_response(data_raw, KeycloakGetError) def introspect(self, token, rpt=None, token_type_hint=None): """ - The introspection endpoint is used to retrieve the active state of a token. It is can only be - invoked by confidential clients. + The introspection endpoint is used to retrieve the active state of a token. + It is can only be invoked by confidential clients. https://tools.ietf.org/html/rfc7662 @@ -316,7 +344,7 @@ class KeycloakOpenID: payload = {"client_id": self.client_id, "token": token} - if token_type_hint == 'requesting_party_token': + if token_type_hint == "requesting_party_token": if rpt: payload.update({"token": rpt, "token_type_hint": token_type_hint}) self.connection.add_param_headers("Authorization", "Bearer " + token) @@ -325,12 +353,11 @@ class KeycloakOpenID: payload = self._add_secret_key(payload) - data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), - data=payload) + data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakGetError) - def decode_token(self, token, key, algorithms=['RS256'], **kwargs): + def decode_token(self, token, key, algorithms=["RS256"], **kwargs): """ A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data structure that represents a cryptographic key. This specification @@ -347,8 +374,7 @@ class KeycloakOpenID: :return: """ - return jwt.decode(token, key, algorithms=algorithms, - audience=self.client_id, **kwargs) + return jwt.decode(token, key, algorithms=algorithms, audience=self.client_id, **kwargs) def load_authorization_config(self, path): """ @@ -357,12 +383,12 @@ class KeycloakOpenID: :param path: settings file (json) :return: """ - authorization_file = open(path, 'r') + authorization_file = open(path, "r") authorization_json = json.loads(authorization_file.read()) self.authorization.load_config(authorization_json) authorization_file.close() - def get_policies(self, token, method_token_info='introspect', **kwargs): + def get_policies(self, token, method_token_info="introspect", **kwargs): """ Get policies by user token @@ -377,12 +403,10 @@ class KeycloakOpenID: token_info = self._token_info(token, method_token_info, **kwargs) - if method_token_info == 'introspect' and not token_info['active']: - raise KeycloakInvalidTokenError( - "Token expired or invalid." - ) + if method_token_info == "introspect" and not token_info["active"]: + raise KeycloakInvalidTokenError("Token expired or invalid.") - user_resources = token_info['resource_access'].get(self.client_id) + user_resources = token_info["resource_access"].get(self.client_id) if not user_resources: return None @@ -390,13 +414,13 @@ class KeycloakOpenID: policies = [] for policy_name, policy in self.authorization.policies.items(): - for role in user_resources['roles']: + for role in user_resources["roles"]: if self._build_name_role(role) in policy.roles: policies.append(policy) return list(set(policies)) - def get_permissions(self, token, method_token_info='introspect', **kwargs): + def get_permissions(self, token, method_token_info="introspect", **kwargs): """ Get permission by user token @@ -413,12 +437,10 @@ class KeycloakOpenID: token_info = self._token_info(token, method_token_info, **kwargs) - if method_token_info == 'introspect' and not token_info['active']: - raise KeycloakInvalidTokenError( - "Token expired or invalid." - ) + if method_token_info == "introspect" and not token_info["active"]: + raise KeycloakInvalidTokenError("Token expired or invalid.") - user_resources = token_info['resource_access'].get(self.client_id) + user_resources = token_info["resource_access"].get(self.client_id) if not user_resources: return None @@ -426,7 +448,7 @@ class KeycloakOpenID: permissions = [] for policy_name, policy in self.authorization.policies.items(): - for role in user_resources['roles']: + for role in user_resources["roles"]: if self._build_name_role(role) in policy.roles: permissions += policy.permissions diff --git a/keycloak/urls_patterns.py b/keycloak/urls_patterns.py index d7dd16a..cf89da9 100644 --- a/keycloak/urls_patterns.py +++ b/keycloak/urls_patterns.py @@ -30,7 +30,9 @@ URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout" URL_CERTS = "realms/{realm-name}/protocol/openid-connect/certs" URL_INTROSPECT = "realms/{realm-name}/protocol/openid-connect/token/introspect" URL_ENTITLEMENT = "realms/{realm-name}/authz/entitlement/{resource-server-id}" -URL_AUTH = "{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}" +URL_AUTH = ( + "{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}" +) # ADMIN URLS URL_ADMIN_USERS = "admin/realms/{realm-name}/users" @@ -41,14 +43,26 @@ URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-ac URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email" URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password" URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions" -URL_ADMIN_USER_CLIENT_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}" +URL_ADMIN_USER_CLIENT_ROLES = ( + "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}" +) URL_ADMIN_USER_REALM_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/realm" -URL_ADMIN_USER_REALM_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/available" -URL_ADMIN_USER_REALM_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite" +URL_ADMIN_USER_REALM_ROLES_AVAILABLE = ( + "admin/realms/{realm-name}/users/{id}/role-mappings/realm/available" +) +URL_ADMIN_USER_REALM_ROLES_COMPOSITE = ( + "admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite" +) URL_ADMIN_GROUPS_REALM_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/realm" -URL_ADMIN_GROUPS_CLIENT_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}" -URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available" -URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite" +URL_ADMIN_GROUPS_CLIENT_ROLES = ( + "admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}" +) +URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = ( + "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available" +) +URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = ( + "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite" +) URL_ADMIN_USER_GROUP = "admin/realms/{realm-name}/users/{id}/groups/{group-id}" URL_ADMIN_USER_GROUPS = "admin/realms/{realm-name}/users/{id}/groups" URL_ADMIN_USER_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password" @@ -73,14 +87,19 @@ URL_ADMIN_CLIENT_ROLES = URL_ADMIN_CLIENT + "/roles" URL_ADMIN_CLIENT_ROLE = URL_ADMIN_CLIENT + "/roles/{role-name}" URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE = URL_ADMIN_CLIENT_ROLE + "/composites" URL_ADMIN_CLIENT_ROLE_MEMBERS = URL_ADMIN_CLIENT + "/roles/{role-name}/users" +URL_ADMIN_CLIENT_ROLE_GROUPS = URL_ADMIN_CLIENT + "/roles/{role-name}/groups" URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings" URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1" URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1" URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1" URL_ADMIN_CLIENT_AUTHZ_POLICIES = URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1" -URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1" -URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1" +URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = ( + URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1" +) +URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = ( + URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1" +) URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user" URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}" @@ -101,8 +120,13 @@ URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances" URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers" URL_ADMIN_IDP = "admin/realms//{realm-name}/identity-provider/instances/{alias}" URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}" -URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = "admin/realms/{realm-name}/roles/{role-name}/composites" -URL_ADMIN_REALM_EXPORT = "admin/realms/{realm-name}/partial-export?exportClients={export-clients}&exportGroupsAndRoles={export-groups-and-roles}" +URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = ( + "admin/realms/{realm-name}/roles/{role-name}/composites" +) +URL_ADMIN_REALM_EXPORT = ( + "admin/realms/{realm-name}/partial-export?exportClients={export-clients" + + "}&exportGroupsAndRoles={export-groups-and-roles}" +) URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes" URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE = URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES + "/{id}" @@ -113,10 +137,16 @@ URL_ADMIN_FLOWS = "admin/realms/{realm-name}/authentication/flows" URL_ADMIN_FLOW = URL_ADMIN_FLOWS + "/{id}" URL_ADMIN_FLOWS_ALIAS = "admin/realms/{realm-name}/authentication/flows/{flow-id}" URL_ADMIN_FLOWS_COPY = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/copy" -URL_ADMIN_FLOWS_EXECUTIONS = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions" +URL_ADMIN_FLOWS_EXECUTIONS = ( + "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions" +) URL_ADMIN_FLOWS_EXECUTION = "admin/realms/{realm-name}/authentication/executions/{id}" -URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution" -URL_ADMIN_FLOWS_EXECUTIONS_FLOW = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow" +URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = ( + "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution" +) +URL_ADMIN_FLOWS_EXECUTIONS_FLOW = ( + "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow" +) URL_ADMIN_AUTHENTICATOR_CONFIG = "admin/realms/{realm-name}/authentication/config/{id}" URL_ADMIN_COMPONENTS = "admin/realms/{realm-name}/components" @@ -124,10 +154,11 @@ URL_ADMIN_COMPONENT = "admin/realms/{realm-name}/components/{component-id}" URL_ADMIN_KEYS = "admin/realms/{realm-name}/keys" URL_ADMIN_USER_FEDERATED_IDENTITIES = "admin/realms/{realm-name}/users/{id}/federated-identity" -URL_ADMIN_USER_FEDERATED_IDENTITY = "admin/realms/{realm-name}/users/{id}/federated-identity/{provider}" +URL_ADMIN_USER_FEDERATED_IDENTITY = ( + "admin/realms/{realm-name}/users/{id}/federated-identity/{provider}" +) -URL_ADMIN_EVENTS = 'admin/realms/{realm-name}/events' +URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events" URL_ADMIN_DELETE_USER_ROLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm" URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats" - diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3134574 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,9 @@ +[tool.black] +line-length = 99 + +[tool.isort] +line_length = 99 +profile = "black" + +[tool.flake8] +max-line-length = 99 diff --git a/setup.cfg b/setup.cfg index 224a779..d10244e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,12 @@ [metadata] -description-file = README.md \ No newline at end of file +description-file = README.md + +[tool.black] +line-length = 99 + +[tool.isort] +line_length = 99 +profile = "black" + +[tool.flake8] +max-line-length = 99 \ No newline at end of file diff --git a/setup.py b/setup.py index 556882f..c228051 100644 --- a/setup.py +++ b/setup.py @@ -6,26 +6,26 @@ with open("README.md", "r") as fh: long_description = fh.read() setup( - name='python-keycloak', - version='0.27.0', - url='https://github.com/marcospereirampj/python-keycloak', - license='The MIT License', - author='Marcos Pereira', - author_email='marcospereira.mpj@gmail.com', - keywords='keycloak openid', - description='python-keycloak is a Python package providing access to the Keycloak API.', + name="python-keycloak", + version="0.27.0", + url="https://github.com/marcospereirampj/python-keycloak", + license="The MIT License", + author="Marcos Pereira", + author_email="marcospereira.mpj@gmail.com", + keywords="keycloak openid", + description="python-keycloak is a Python package providing access to the Keycloak API.", long_description=long_description, long_description_content_type="text/markdown", - packages=['keycloak', 'keycloak.authorization', 'keycloak.tests'], - install_requires=['requests>=2.20.0', 'python-jose>=1.4.0'], - tests_require=['httmock>=1.2.5'], + packages=["keycloak", "keycloak.authorization", "keycloak.tests"], + install_requires=["requests>=2.20.0", "python-jose>=1.4.0", "urllib>=1.26.0"], + tests_require=["httmock>=1.2.5"], classifiers=[ - 'Programming Language :: Python :: 3', - 'License :: OSI Approved :: MIT License', - 'Development Status :: 3 - Alpha', - 'Operating System :: MacOS', - 'Operating System :: Unix', - 'Operating System :: Microsoft :: Windows', - 'Topic :: Utilities' - ] + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Operating System :: MacOS", + "Operating System :: Unix", + "Operating System :: Microsoft :: Windows", + "Topic :: Utilities", + ], ) diff --git a/keycloak/tests/__init__.py b/tests/__init__.py similarity index 100% rename from keycloak/tests/__init__.py rename to tests/__init__.py diff --git a/keycloak/tests/test_connection.py b/tests/test_connection.py similarity index 59% rename from keycloak/tests/test_connection.py rename to tests/test_connection.py index cb98feb..eec86a3 100644 --- a/keycloak/tests/test_connection.py +++ b/tests/test_connection.py @@ -19,7 +19,7 @@ from unittest import mock from httmock import urlmatch, response, HTTMock, all_requests from keycloak import KeycloakAdmin, KeycloakOpenID -from ..connection import ConnectionManager +from keycloak.connection import ConnectionManager try: import unittest @@ -28,55 +28,49 @@ except ImportError: class TestConnection(unittest.TestCase): - def setUp(self): - self._conn = ConnectionManager( - base_url="http://localhost/", - headers={}, - timeout=60) + self._conn = ConnectionManager(base_url="http://localhost/", headers={}, timeout=60) @all_requests def response_content_success(self, url, request): - headers = {'content-type': 'application/json'} - content = b'response_ok' + headers = {"content-type": "application/json"} + content = b"response_ok" return response(200, content, headers, None, 5, request) def test_raw_get(self): with HTTMock(self.response_content_success): resp = self._conn.raw_get("/known_path") - self.assertEqual(resp.content, b'response_ok') + self.assertEqual(resp.content, b"response_ok") self.assertEqual(resp.status_code, 200) def test_raw_post(self): @urlmatch(path="/known_path", method="post") def response_post_success(url, request): - headers = {'content-type': 'application/json'} - content = 'response'.encode("utf-8") + headers = {"content-type": "application/json"} + content = "response".encode("utf-8") return response(201, content, headers, None, 5, request) with HTTMock(response_post_success): - resp = self._conn.raw_post("/known_path", - {'field': 'value'}) - self.assertEqual(resp.content, b'response') + resp = self._conn.raw_post("/known_path", {"field": "value"}) + self.assertEqual(resp.content, b"response") self.assertEqual(resp.status_code, 201) def test_raw_put(self): @urlmatch(netloc="localhost", path="/known_path", method="put") def response_put_success(url, request): - headers = {'content-type': 'application/json'} - content = 'response'.encode("utf-8") + headers = {"content-type": "application/json"} + content = "response".encode("utf-8") return response(200, content, headers, None, 5, request) with HTTMock(response_put_success): - resp = self._conn.raw_put("/known_path", - {'field': 'value'}) - self.assertEqual(resp.content, b'response') + resp = self._conn.raw_put("/known_path", {"field": "value"}) + self.assertEqual(resp.content, b"response") self.assertEqual(resp.status_code, 200) def test_raw_get_fail(self): @urlmatch(netloc="localhost", path="/known_path", method="get") def response_get_fail(url, request): - headers = {'content-type': 'application/json'} + headers = {"content-type": "application/json"} content = "404 page not found".encode("utf-8") return response(404, content, headers, None, 5, request) @@ -89,33 +83,30 @@ class TestConnection(unittest.TestCase): def test_raw_post_fail(self): @urlmatch(netloc="localhost", path="/known_path", method="post") def response_post_fail(url, request): - headers = {'content-type': 'application/json'} + headers = {"content-type": "application/json"} content = str(["Start can't be blank"]).encode("utf-8") return response(404, content, headers, None, 5, request) with HTTMock(response_post_fail): - resp = self._conn.raw_post("/known_path", - {'field': 'value'}) + resp = self._conn.raw_post("/known_path", {"field": "value"}) self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8")) self.assertEqual(resp.status_code, 404) def test_raw_put_fail(self): @urlmatch(netloc="localhost", path="/known_path", method="put") def response_put_fail(url, request): - headers = {'content-type': 'application/json'} + headers = {"content-type": "application/json"} content = str(["Start can't be blank"]).encode("utf-8") return response(404, content, headers, None, 5, request) with HTTMock(response_put_fail): - resp = self._conn.raw_put("/known_path", - {'field': 'value'}) + resp = self._conn.raw_put("/known_path", {"field": "value"}) self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8")) self.assertEqual(resp.status_code, 404) def test_add_param_headers(self): self._conn.add_param_headers("test", "value") - self.assertEqual(self._conn.headers, - {"test": "value"}) + self.assertEqual(self._conn.headers, {"test": "value"}) def test_del_param_headers(self): self._conn.add_param_headers("test", "value") @@ -124,8 +115,7 @@ class TestConnection(unittest.TestCase): def test_clean_param_headers(self): self._conn.add_param_headers("test", "value") - self.assertEqual(self._conn.headers, - {"test": "value"}) + self.assertEqual(self._conn.headers, {"test": "value"}) self._conn.clean_headers() self.assertEqual(self._conn.headers, {}) @@ -141,11 +131,9 @@ class TestConnection(unittest.TestCase): def test_get_headers(self): self._conn.add_param_headers("test", "value") - self.assertEqual(self._conn.headers, - {"test": "value"}) + self.assertEqual(self._conn.headers, {"test": "value"}) def test_KeycloakAdmin_custom_header(self): - class FakeToken: @staticmethod def get(string_val): @@ -153,39 +141,49 @@ class TestConnection(unittest.TestCase): fake_token = FakeToken() - with mock.patch.object(KeycloakOpenID, "__init__", return_value=None) as mock_keycloak_open_id: - with mock.patch("keycloak.keycloak_openid.KeycloakOpenID.token", return_value=fake_token): - with mock.patch("keycloak.connection.ConnectionManager.__init__", return_value=None) as mock_connection_manager: - with mock.patch("keycloak.connection.ConnectionManager.__del__", return_value=None) as mock_connection_manager_delete: + with mock.patch.object( + KeycloakOpenID, "__init__", return_value=None + ) as mock_keycloak_open_id: + with mock.patch( + "keycloak.keycloak_openid.KeycloakOpenID.token", return_value=fake_token + ): + with mock.patch( + "keycloak.connection.ConnectionManager.__init__", return_value=None + ) as mock_connection_manager: + with mock.patch( + "keycloak.connection.ConnectionManager.__del__", return_value=None + ) as mock_connection_manager_delete: server_url = "https://localhost/auth/" username = "admin" password = "secret" realm_name = "master" - headers = { - 'Custom': 'test-custom-header' + headers = {"Custom": "test-custom-header"} + KeycloakAdmin( + server_url=server_url, + username=username, + password=password, + realm_name=realm_name, + verify=False, + custom_headers=headers, + ) + + mock_keycloak_open_id.assert_called_with( + server_url=server_url, + realm_name=realm_name, + client_id="admin-cli", + client_secret_key=None, + verify=False, + custom_headers=headers, + ) + + expected_header = { + "Authorization": "Bearer faketoken", + "Content-Type": "application/json", + "Custom": "test-custom-header", } - KeycloakAdmin(server_url=server_url, - username=username, - password=password, - realm_name=realm_name, - verify=False, - custom_headers=headers) - - mock_keycloak_open_id.assert_called_with(server_url=server_url, - realm_name=realm_name, - client_id='admin-cli', - client_secret_key=None, - verify=False, - custom_headers=headers) - - expected_header = {'Authorization': 'Bearer faketoken', - 'Content-Type': 'application/json', - 'Custom': 'test-custom-header' - } - - mock_connection_manager.assert_called_with(base_url=server_url, - headers=expected_header, - timeout=60, - verify=False) + + mock_connection_manager.assert_called_with( + base_url=server_url, headers=expected_header, timeout=60, verify=False + ) mock_connection_manager_delete.assert_called_once_with()