From f865fcc6de97f17b28d94fd0ecc7b210dd338b16 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Sun, 26 Feb 2023 23:33:39 +0530 Subject: [PATCH] refactor: Factor our OpenIdConnectionManager class and deprecate old methods --- poetry.lock | 17 +- pyproject.toml | 1 + src/keycloak/keycloak_admin.py | 800 +++++++++++++++++++------------- src/keycloak/keycloak_openid.py | 406 ++++++++++++++++ tests/test_keycloak_admin.py | 50 +- 5 files changed, 914 insertions(+), 360 deletions(-) diff --git a/poetry.lock b/poetry.lock index 83555c9..da7e72b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -475,6 +475,21 @@ files = [ {file = "decli-0.5.2.tar.gz", hash = "sha256:f2cde55034a75c819c630c7655a844c612f2598c42c21299160465df6ad463ad"}, ] +[[package]] +name = "deprecation" +version = "2.1.0" +description = "A library to handle automated deprecations" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a"}, + {file = "deprecation-2.1.0.tar.gz", hash = "sha256:72b3bde64e5d778694b0cf68178aed03d15e15477116add3fb773e581f9518ff"}, +] + +[package.dependencies] +packaging = "*" + [[package]] name = "distlib" version = "0.3.6" @@ -2094,4 +2109,4 @@ docs = ["Sphinx", "alabaster", "commonmark", "m2r2", "mock", "readthedocs-sphinx [metadata] lock-version = "2.0" python-versions = "^3.7" -content-hash = "8d76b155adddd2eacd0304397b33465d5c67f09165d8de641c71f5ce7b979be2" +content-hash = "45f461f05bdc8da0d12a858c57782cc3a24fb4fb91175e6e0a290cd0304c10e9" diff --git a/pyproject.toml b/pyproject.toml index d855cbb..d4bf17b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ readthedocs-sphinx-ext = {version = "^2.1.9", optional = true} m2r2 = {version = "^0.3.2", optional = true} sphinx-autoapi = {version = "^2.0.0", optional = true} requests-toolbelt = "^0.10.1" +deprecation = "^2.1.0" [tool.poetry.extras] docs = [ diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 7f43879..a6ca09c 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -29,12 +29,11 @@ import copy import json from builtins import isinstance -from typing import Iterable +import deprecation from requests_toolbelt import MultipartEncoder -from . import urls_patterns -from .connection import ConnectionManager +from . import __version__, urls_patterns from .exceptions import ( KeycloakDeleteError, KeycloakGetError, @@ -42,7 +41,7 @@ from .exceptions import ( KeycloakPutError, raise_error_from_response, ) -from .keycloak_openid import KeycloakOpenID +from .keycloak_openid import KeycloakOpenIDConnectionManager class KeycloakAdmin: @@ -80,18 +79,7 @@ class KeycloakAdmin: PAGE_SIZE = 100 - _server_url = None - _username = None - _password = None - _totp = None - _realm_name = None - _client_id = None - _verify = None - _client_secret_key = None - _auto_refresh_token = None _connection = None - _custom_headers = None - _user_realm_name = None def __init__( self, @@ -140,72 +128,78 @@ class KeycloakAdmin: :param timeout: connection timeout in seconds :type timeout: int """ - self.server_url = server_url - self.username = username - self.password = password - self.token = token - self.totp = totp - self.realm_name = realm_name - self.client_id = client_id - self.verify = verify - self.client_secret_key = client_secret_key - self.auto_refresh_token = auto_refresh_token or [] - self.user_realm_name = user_realm_name - self.custom_headers = custom_headers - self.timeout = timeout - - if self.token is None: - self.get_token() - - headers = ( - { - "Authorization": "Bearer " + self.token.get("access_token"), - "Content-Type": "application/json", - } - if self.token is not None - else {} - ) - - if self.custom_headers is not None: - # merge custom headers to main headers - headers.update(self.custom_headers) - - self.connection = ConnectionManager( - base_url=self.server_url, headers=headers, timeout=60, verify=self.verify + self.connection = KeycloakOpenIDConnectionManager( + server_url=server_url, + username=username, + password=password, + token=token, + totp=totp, + realm_name=realm_name, + client_id=client_id, + verify=verify, + client_secret_key=client_secret_key, + user_realm_name=user_realm_name, + custom_headers=custom_headers, + timeout=timeout, + auto_refresh_token=auto_refresh_token or [], ) @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.server_url property instead", + ) def server_url(self): """Get server url. :returns: Keycloak server url :rtype: str """ - return self._server_url + return self.connection.server_url @server_url.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.server_url property instead", + ) def server_url(self, value): - self._server_url = value + self.connection.server_url = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.realm_name property instead", + ) def realm_name(self): """Get realm name. :returns: Realm name :rtype: str """ - return self._realm_name + return self.connection.realm_name @realm_name.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.realm_name property instead", + ) def realm_name(self, value): - self._realm_name = value + self.connection.realm_name = value @property def connection(self): """Get connection. :returns: Connection manager - :rtype: ConnectionManager + :rtype: KeycloakOpenIDConnectionManager """ return self._connection @@ -214,146 +208,254 @@ class KeycloakAdmin: self._connection = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.client_id property instead", + ) def client_id(self): """Get client id. :returns: Client id :rtype: str """ - return self._client_id + return self.connection.client_id @client_id.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.client_id property instead", + ) def client_id(self, value): - self._client_id = value + self.connection.client_id = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.client_secret_key property instead", + ) def client_secret_key(self): """Get client secret key. :returns: Client secret key :rtype: str """ - return self._client_secret_key + return self.connection.client_secret_key @client_secret_key.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.client_secret_key property instead", + ) def client_secret_key(self, value): - self._client_secret_key = value + self.connection.client_secret_key = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.verify property instead", + ) def verify(self): """Get verify. :returns: Verify indicator :rtype: bool """ - return self._verify + return self.connection.verify @verify.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.verify property instead", + ) def verify(self, value): - self._verify = value + self.connection.verify = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.username property instead", + ) def username(self): """Get username. :returns: Admin username :rtype: str """ - return self._username + return self.connection.username @username.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.username property instead", + ) def username(self, value): - self._username = value + self.connection.username = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.password property instead", + ) def password(self): """Get password. :returns: Admin password :rtype: str """ - return self._password + return self.connection.password @password.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.password property instead", + ) def password(self, value): - self._password = value + self.connection.password = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.totp property instead", + ) def totp(self): """Get totp. :returns: TOTP :rtype: str """ - return self._totp + return self.connection.totp @totp.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.totp property instead", + ) def totp(self, value): - self._totp = value + self.connection.totp = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.token property instead", + ) def token(self): """Get token. :returns: Access and refresh token :rtype: dict """ - return self._token + return self.connection.token @token.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.token property instead", + ) def token(self, value): - self._token = value - - @property - def auto_refresh_token(self): - """Get auto refresh token. - - :returns: List of methods for automatic token refresh - :rtype: list - """ - return self._auto_refresh_token + self.connection.token = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.user_realm_name property instead", + ) def user_realm_name(self): """Get user realm name. :returns: User realm name :rtype: str """ - return self._user_realm_name + return self.connection.user_realm_name @user_realm_name.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.user_realm_name property instead", + ) def user_realm_name(self, value): - self._user_realm_name = value + self.connection.user_realm_name = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.custom_headers property instead", + ) def custom_headers(self): """Get custom headers. :returns: Custom headers :rtype: dict """ - return self._custom_headers + return self.connection.custom_headers @custom_headers.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.custom_headers property instead", + ) def custom_headers(self, value): - self._custom_headers = value + self.connection.custom_headers = value + + @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.auto_refresh_token property instead", + ) + def auto_refresh_token(self): + """Get auto refresh token. + + :returns: List of methods for automatic token refresh + :rtype: list + """ + return self.connection.auto_refresh_token @auto_refresh_token.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.custom_headers property instead", + ) def auto_refresh_token(self, value): - allowed_methods = {"get", "post", "put", "delete"} - if not isinstance(value, Iterable): - raise TypeError( - "Expected a list of strings among {allowed}".format(allowed=allowed_methods) - ) - if not all(method in allowed_methods for method in value): - raise TypeError( - "Unexpected method in auto_refresh_token, accepted methods are {allowed}".format( - allowed=allowed_methods - ) - ) - - self._auto_refresh_token = value + self.connection.auto_refresh_token = value def __fetch_all(self, url, query=None): """Paginate over get requests. @@ -380,7 +482,7 @@ class KeycloakAdmin: while True: query["first"] = page * self.PAGE_SIZE partial_results = raise_error_from_response( - self.raw_get(url, **query), KeycloakGetError + self.connection.raw_get(url, **query), KeycloakGetError ) if not partial_results: break @@ -401,7 +503,7 @@ class KeycloakAdmin: :rtype: dict """ query = query or {} - return raise_error_from_response(self.raw_get(url, **query), KeycloakGetError) + return raise_error_from_response(self.connection.raw_get(url, **query), KeycloakGetError) def import_realm(self, payload): """Import a new realm from a RealmRepresentation. @@ -416,7 +518,9 @@ class KeycloakAdmin: :return: RealmRepresentation :rtype: dict """ - data_raw = self.raw_post(urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload)) + data_raw = self.connection.raw_post( + urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) def export_realm(self, export_clients=False, export_groups_and_role=False): @@ -438,7 +542,7 @@ class KeycloakAdmin: "export-clients": export_clients, "export-groups-and-roles": export_groups_and_role, } - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_EXPORT.format(**params_path), data="" ) return raise_error_from_response(data_raw, KeycloakPostError) @@ -449,7 +553,7 @@ class KeycloakAdmin: :return: realms list :rtype: list """ - data_raw = self.raw_get(urls_patterns.URL_ADMIN_REALMS) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_REALMS) return raise_error_from_response(data_raw, KeycloakGetError) def get_realm(self, realm_name): @@ -463,8 +567,11 @@ class KeycloakAdmin: :return: RealmRepresentation :rtype: dict """ + import debugpy + + debugpy.breakpoint() params_path = {"realm-name": realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) def create_realm(self, payload, skip_exists=False): @@ -480,7 +587,9 @@ class KeycloakAdmin: :return: Keycloak server response (RealmRepresentation) :rtype: dict """ - data_raw = self.raw_post(urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload)) + data_raw = self.connection.raw_post( + urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload) + ) return raise_error_from_response( data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) @@ -502,7 +611,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": realm_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_REALM.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -516,7 +625,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": realm_name} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_users(self, query=None): @@ -553,7 +662,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_IDPS.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -572,7 +681,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "alias": idp_alias} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_IDP.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -591,7 +700,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "idp-alias": idp_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -617,7 +726,7 @@ class KeycloakAdmin: "mapper-id": mapper_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_IDP_MAPPER_UPDATE.format(**params_path), data=json.dumps(payload), ) @@ -638,7 +747,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "idp-alias": idp_alias} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_idps(self): @@ -653,7 +764,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def delete_idp(self, idp_alias): @@ -665,7 +776,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "alias": idp_alias} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_IDP.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_IDP.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def create_user(self, payload, exist_ok=False): @@ -693,7 +804,7 @@ class KeycloakAdmin: if exists is not None: return str(exists) - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USERS.format(**params_path), data=json.dumps(payload) ) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -713,7 +824,9 @@ class KeycloakAdmin: """ query = query or dict() params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USERS_COUNT.format(**params_path), **query) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USERS_COUNT.format(**params_path), **query + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_user_id(self, username): @@ -745,7 +858,7 @@ class KeycloakAdmin: :return: UserRepresentation """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_USER.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_user_groups(self, user_id, brief_representation=True): @@ -762,7 +875,7 @@ class KeycloakAdmin: """ params = {"briefRepresentation": brief_representation} params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_GROUPS.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -779,7 +892,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_USER.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -829,7 +942,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_USER.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_USER.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def set_user_password(self, user_id, password, temporary=True): @@ -852,7 +965,7 @@ class KeycloakAdmin: """ payload = {"type": "password", "temporary": temporary, "value": password} params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_RESET_PASSWORD.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -871,7 +984,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER_CREDENTIALS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USER_CREDENTIALS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def delete_credential(self, user_id, credential_id): @@ -892,7 +1007,9 @@ class KeycloakAdmin: "id": user_id, "credential_id": credential_id, } - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_USER_CREDENTIAL.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_USER_CREDENTIAL.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError) def user_logout(self, user_id): @@ -906,7 +1023,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_LOGOUT.format(**params_path), data="" ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) @@ -923,7 +1040,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER_CONSENTS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USER_CONSENTS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_user_social_logins(self, user_id): @@ -937,7 +1056,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITIES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -962,7 +1081,7 @@ class KeycloakAdmin: "userName": provider_username, } params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path), data=json.dumps(payload), ) @@ -979,7 +1098,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -1007,7 +1126,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), data=json.dumps(payload), **params_query, @@ -1031,7 +1150,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params_query = {"client_id": client_id, "redirect_uri": redirect_uri} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path), data={}, **params_query, @@ -1050,7 +1169,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_GET_SESSIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_GET_SESSIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_server_info(self): @@ -1062,7 +1183,7 @@ class KeycloakAdmin: :return: ServerInfoRepresentation :rtype: dict """ - data_raw = self.raw_get(urls_patterns.URL_ADMIN_SERVER_INFO) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_SERVER_INFO) return raise_error_from_response(data_raw, KeycloakGetError) def get_groups(self, query=None): @@ -1101,7 +1222,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_subgroups(self, group, path): @@ -1190,12 +1311,12 @@ class KeycloakAdmin: """ if parent is None: params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUPS.format(**params_path), data=json.dumps(payload) ) else: params_path = {"realm-name": self.realm_name, "id": parent} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUP_CHILD.format(**params_path), data=json.dumps(payload) ) @@ -1223,7 +1344,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_GROUP.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -1241,7 +1362,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_GROUP_PERMISSIONS.format(**params_path), data=json.dumps({"enabled": enabled}), ) @@ -1258,7 +1379,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path), data=None ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -1274,7 +1395,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def delete_group(self, group_id): @@ -1286,7 +1409,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_clients(self): @@ -1301,7 +1424,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENTS.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_CLIENTS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_client(self, client_id): @@ -1316,7 +1439,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_id(self, client_id): @@ -1348,7 +1471,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1370,7 +1493,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path), data=json.dumps(payload), ) @@ -1388,7 +1511,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1423,7 +1546,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY.format(**params_path), data=json.dumps(payload), ) @@ -1462,7 +1585,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION.format(**params_path), data=json.dumps(payload), ) @@ -1480,7 +1603,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def create_client_authz_scopes(self, client_id, payload): @@ -1496,7 +1621,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path), data=json.dumps(payload), ) @@ -1512,7 +1637,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1527,7 +1652,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICIES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1542,7 +1667,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1557,7 +1682,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1588,7 +1713,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -1610,7 +1735,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError) @@ -1625,7 +1750,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1656,7 +1781,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -1678,7 +1803,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError) @@ -1703,7 +1828,7 @@ class KeycloakAdmin: return client_id params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENTS.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -1724,7 +1849,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -1741,7 +1866,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_client_installation_provider(self, client_id, provider_id): @@ -1761,7 +1886,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "provider-id": provider_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_INSTALLATION_PROVIDER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) @@ -1779,7 +1904,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1822,7 +1947,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "role-id": self.get_default_realm_role_id()} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES_REALM.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1836,7 +1961,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "role-id": self.get_default_realm_role_id()} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), data=json.dumps(payload), ) @@ -1851,7 +1976,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "role-id": self.get_default_realm_role_id()} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), data=json.dumps(payload), ) @@ -1872,7 +1997,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_ROLES.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1893,7 +2018,9 @@ class KeycloakAdmin: :rtype: str """ params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_role_id(self, client_id, role_name): @@ -1937,7 +2064,7 @@ class KeycloakAdmin: pass params_path = {"realm-name": self.realm_name, "id": client_role_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -1960,7 +2087,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path), data=json.dumps(payload), ) @@ -1982,7 +2109,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -2001,7 +2128,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def assign_client_role(self, user_id, client_id, roles): @@ -2018,7 +2147,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2078,7 +2207,7 @@ class KeycloakAdmin: pass params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -2099,7 +2228,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2115,7 +2244,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path), data=json.dumps(payload), ) @@ -2130,7 +2259,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -2147,7 +2276,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), data=json.dumps(payload), ) @@ -2165,7 +2294,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), data=json.dumps(payload), ) @@ -2180,7 +2309,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2197,7 +2326,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2215,7 +2344,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2230,7 +2359,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2253,7 +2382,7 @@ class KeycloakAdmin: "id": client_id, "client": client_roles_owner_id, } - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2277,7 +2406,7 @@ class KeycloakAdmin: "id": client_id, "client": client_roles_owner_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2298,7 +2427,7 @@ class KeycloakAdmin: "id": client_id, "client": client_roles_owner_id, } - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2315,7 +2444,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2333,7 +2462,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2348,7 +2477,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_available_realm_roles_of_user(self, user_id): @@ -2360,7 +2491,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_REALM_ROLES_AVAILABLE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2377,7 +2508,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_REALM_ROLES_COMPOSITE.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2394,7 +2525,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2412,7 +2543,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2430,7 +2561,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": group_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2449,7 +2580,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2466,7 +2597,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def delete_group_client_roles(self, group_id, client_id, roles): @@ -2483,7 +2616,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2551,7 +2684,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_get(client_level_role_mapping_url.format(**params_path), **params) + data_raw = self.connection.raw_get( + client_level_role_mapping_url.format(**params_path), **params + ) return raise_error_from_response(data_raw, KeycloakGetError) def delete_client_roles_of_user(self, user_id, client_id, roles): @@ -2568,7 +2703,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2586,7 +2721,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_FLOWS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_authentication_flow_for_id(self, flow_id): @@ -2603,7 +2738,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "flow-id": flow_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS_ALIAS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_FLOWS_ALIAS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def create_authentication_flow(self, payload, skip_exists=False): @@ -2620,7 +2757,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response( @@ -2640,7 +2777,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS_COPY.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -2657,7 +2794,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": flow_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_FLOW.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_FLOW.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_authentication_flow_executions(self, flow_alias): @@ -2671,7 +2808,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def update_authentication_flow_executions(self, payload, flow_alias): @@ -2688,7 +2827,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), data=json.dumps(payload), ) @@ -2706,7 +2845,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": execution_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def create_authentication_flow_execution(self, payload, flow_alias): @@ -2723,7 +2864,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION.format(**params_path), data=json.dumps(payload), ) @@ -2741,7 +2882,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": execution_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): @@ -2760,7 +2903,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), data=json.dumps(payload), ) @@ -2775,7 +2918,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_AUTHENTICATOR_PROVIDERS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2792,7 +2935,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "provider-id": provider_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG_DESCRIPTION.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2808,7 +2951,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": config_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def update_authenticator_config(self, payload, config_id): @@ -2825,7 +2970,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": config_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path), data=json.dumps(payload), ) @@ -2842,7 +2987,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": config_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -2861,7 +3006,7 @@ class KeycloakAdmin: params_query = {"action": action} params_path = {"realm-name": self.realm_name, "id": storage_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_STORAGE.format(**params_path), data=json.dumps(data), **params_query, @@ -2878,7 +3023,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_scope(self, client_scope_id): @@ -2893,7 +3040,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_scope_by_name(self, client_scope_name): @@ -2934,7 +3083,7 @@ class KeycloakAdmin: return exists["id"] params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -2957,7 +3106,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -2974,7 +3123,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_mappers_from_client_scope(self, client_scope_id): @@ -2987,7 +3138,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) @@ -3006,7 +3157,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload), ) @@ -3031,7 +3182,7 @@ class KeycloakAdmin: "protocol-mapper-id": protocol_mapper_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3057,7 +3208,7 @@ class KeycloakAdmin: "protocol-mapper-id": protocol_mapper_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path), data=json.dumps(payload), ) @@ -3073,7 +3224,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3087,7 +3238,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": scope_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3102,7 +3253,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": scope_id} payload = {"realm": self.realm_name, "clientScopeId": scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -3117,7 +3268,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3131,7 +3282,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": scope_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3146,7 +3297,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": scope_id} payload = {"realm": self.realm_name, "clientScopeId": scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -3164,7 +3315,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path) ) @@ -3184,7 +3335,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path), data=json.dumps(payload), ) @@ -3209,7 +3360,7 @@ class KeycloakAdmin: "protocol-mapper-id": mapper_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path), data=json.dumps(payload), ) @@ -3234,7 +3385,7 @@ class KeycloakAdmin: "protocol-mapper-id": client_mapper_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3250,7 +3401,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path), data=None ) return raise_error_from_response(data_raw, KeycloakPostError) @@ -3266,7 +3417,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_components(self, query=None): @@ -3284,7 +3437,7 @@ class KeycloakAdmin: """ query = query or dict() params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=None, **query ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3301,7 +3454,7 @@ class KeycloakAdmin: :rtype: str """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=json.dumps(payload) ) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -3322,7 +3475,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "component-id": component_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def update_component(self, component_id, payload): @@ -3337,7 +3490,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "component-id": component_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_COMPONENT.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -3351,7 +3504,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "component-id": component_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_COMPONENT.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_keys(self): @@ -3366,7 +3521,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_KEYS.format(**params_path), data=None) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_KEYS.format(**params_path), data=None + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_events(self, query=None): @@ -3384,7 +3541,7 @@ class KeycloakAdmin: """ query = query or dict() params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_EVENTS.format(**params_path), data=None, **query ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3401,11 +3558,17 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_EVENTS_CONFIG.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.raw_get function instead", + ) def raw_get(self, *args, **kwargs): """Call connection.raw_get. @@ -3419,12 +3582,14 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_get(*args, **kwargs) - if "get" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_get(*args, **kwargs) - return r + return self.connection.raw_get(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.raw_post function instead", + ) def raw_post(self, *args, **kwargs): """Call connection.raw_post. @@ -3438,12 +3603,14 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_post(*args, **kwargs) - if "post" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_post(*args, **kwargs) - return r + return self.connection.raw_post(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.raw_put function instead", + ) def raw_put(self, *args, **kwargs): """Call connection.raw_put. @@ -3457,12 +3624,14 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_put(*args, **kwargs) - if "put" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_put(*args, **kwargs) - return r + return self.connection.raw_put(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.raw_delete function instead", + ) def raw_delete(self, *args, **kwargs): """Call connection.raw_delete. @@ -3476,74 +3645,37 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_delete(*args, **kwargs) - if "delete" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_delete(*args, **kwargs) - return r + return self.connection.raw_delete(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.get_token function instead", + ) def get_token(self): """Get admin token. The admin token is then set in the `token` attribute. + + :returns: token + :rtype: dict """ - if self.user_realm_name: - token_realm_name = self.user_realm_name - elif self.realm_name: - token_realm_name = self.realm_name - else: - token_realm_name = "master" - - self.keycloak_openid = KeycloakOpenID( - server_url=self.server_url, - client_id=self.client_id, - realm_name=token_realm_name, - verify=self.verify, - client_secret_key=self.client_secret_key, - custom_headers=self.custom_headers, - timeout=self.timeout, - ) - - grant_type = [] - if self.client_secret_key: - if self.user_realm_name: - self.realm_name = self.user_realm_name - grant_type.append("client_credentials") - elif self.username and self.password: - grant_type.append("password") - - if grant_type: - self.token = self.keycloak_openid.token( - self.username, self.password, grant_type=grant_type, totp=self.totp - ) - else: - self.token = None + return self.connection.get_token() + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the self.connection.refresh_token function instead", + ) def refresh_token(self): """Refresh the token. - :raises KeycloakPostError: In case the refresh token request failed. + :returns: token + :rtype: dict """ - refresh_token = self.token.get("refresh_token", None) - if refresh_token is None: - self.get_token() - else: - try: - self.token = self.keycloak_openid.refresh_token(refresh_token) - except KeycloakPostError as e: - list_errors = [ - b"Refresh token expired", - b"Token is not active", - b"Session not active", - ] - if e.response_code == 400 and any(err in e.response_body for err in list_errors): - self.get_token() - else: - raise - - self.connection.add_param_headers( - "Authorization", "Bearer " + self.token.get("access_token") - ) + return self.connection.refresh_token() def get_client_all_sessions(self, client_id): """Get sessions associated with the client. @@ -3557,7 +3689,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_sessions_stats(self): @@ -3569,7 +3703,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_management_permissions(self, client_id): @@ -3582,7 +3718,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3608,7 +3744,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path), data=json.dumps(payload), ) @@ -3626,7 +3762,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3643,7 +3779,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3660,7 +3796,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "scope-id": scope_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3692,7 +3828,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id, "scope-id": scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path), data=json.dumps(payload), ) @@ -3708,7 +3844,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) @@ -3735,7 +3871,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY.format(**params_path), data=json.dumps(payload), ) @@ -3755,7 +3891,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES_COMPOSITE.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3771,7 +3907,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "role-id": role_id, "client-id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_ROLE_CHILDREN.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_ROLE_CHILDREN.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def upload_certificate(self, client_id, certcont): @@ -3790,7 +3928,7 @@ class KeycloakAdmin: new_headers = copy.deepcopy(self.connection.headers) new_headers["Content-Type"] = m.content_type self.connection.headers = new_headers - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_CERT_UPLOAD.format(**params_path), data=m, headers=new_headers, @@ -3818,7 +3956,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_REQUIRED_ACTIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_REQUIRED_ACTIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def update_required_action(self, action_alias, payload): @@ -3834,7 +3974,7 @@ class KeycloakAdmin: if not isinstance(payload, str): payload = json.dumps(payload) params_path = {"realm-name": self.realm_name, "action-alias": action_alias} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_REQUIRED_ACTIONS_ALIAS.format(**params_path), data=payload ) return raise_error_from_response(data_raw, KeycloakPutError) @@ -3848,7 +3988,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3862,7 +4002,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError) @@ -3874,7 +4014,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError) def clear_keys_cache(self): diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 56e0315..0b4752e 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -28,6 +28,8 @@ class to handle authentication and token manipulation. """ import json +from datetime import datetime, timedelta +from typing import Iterable from jose import jwt @@ -679,3 +681,407 @@ class KeycloakOpenID: return AuthStatus( is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed ) + + +class KeycloakOpenIDConnectionManager(ConnectionManager): + """A class to help with OpenID connections which can auto refresh tokens. + + :param object: _description_ + :type object: _type_ + """ + + _server_url = None + _username = None + _password = None + _totp = None + _realm_name = None + _client_id = None + _verify = None + _client_secret_key = None + _auto_refresh_token = None + _connection = None + _custom_headers = None + _user_realm_name = None + + def __init__( + self, + server_url, + username=None, + password=None, + token=None, + totp=None, + realm_name="master", + client_id="admin-cli", + verify=True, + client_secret_key=None, + custom_headers=None, + user_realm_name=None, + auto_refresh_token=None, + timeout=60, + ): + """Init method. + + :param server_url: Keycloak server url + :type server_url: str + :param username: admin username + :type username: str + :param password: admin password + :type password: str + :param token: access and refresh tokens + :type token: dict + :param totp: Time based OTP + :type totp: str + :param realm_name: realm name + :type realm_name: str + :param client_id: client id + :type client_id: str + :param verify: True if want check connection SSL + :type verify: bool + :param client_secret_key: client secret key + (optional, required only for access type confidential) + :type client_secret_key: str + :param custom_headers: dict of custom header to pass to each HTML request + :type custom_headers: dict + :param user_realm_name: The realm name of the user, if different from realm_name + :type user_realm_name: str + :param auto_refresh_token: list of methods that allows automatic token refresh. + Ex: ['get', 'put', 'post', 'delete'] + :type auto_refresh_token: list + :param timeout: connection timeout in seconds + :type timeout: int + """ + self.server_url = server_url + self.username = username + self.password = password + self.token = token + self.totp = totp + self.realm_name = realm_name + self.client_id = client_id + self.verify = verify + self.client_secret_key = client_secret_key + self.auto_refresh_token = auto_refresh_token or [] + self.user_realm_name = user_realm_name + self.timeout = timeout + + if self.token is None: + self.get_token() + + self.expires_at = datetime.now() + timedelta( + seconds=self.token["expires_in"] if self.token else 0 + ) + + self.headers = ( + { + "Authorization": "Bearer " + self.token.get("access_token"), + "Content-Type": "application/json", + } + if self.token is not None + else {} + ) + self.custom_headers = custom_headers + + super().__init__( + base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify + ) + + @property + def server_url(self): + """Get server url. + + :returns: Keycloak server url + :rtype: str + """ + return self.base_url + + @server_url.setter + def server_url(self, value): + self.base_url = value + + @property + def realm_name(self): + """Get realm name. + + :returns: Realm name + :rtype: str + """ + return self._realm_name + + @realm_name.setter + def realm_name(self, value): + self._realm_name = value + + @property + def client_id(self): + """Get client id. + + :returns: Client id + :rtype: str + """ + return self._client_id + + @client_id.setter + def client_id(self, value): + self._client_id = value + + @property + def client_secret_key(self): + """Get client secret key. + + :returns: Client secret key + :rtype: str + """ + return self._client_secret_key + + @client_secret_key.setter + def client_secret_key(self, value): + self._client_secret_key = value + + @property + def username(self): + """Get username. + + :returns: Admin username + :rtype: str + """ + return self._username + + @username.setter + def username(self, value): + self._username = value + + @property + def password(self): + """Get password. + + :returns: Admin password + :rtype: str + """ + return self._password + + @password.setter + def password(self, value): + self._password = value + + @property + def totp(self): + """Get totp. + + :returns: TOTP + :rtype: str + """ + return self._totp + + @totp.setter + def totp(self, value): + self._totp = value + + @property + def token(self): + """Get token. + + :returns: Access and refresh token + :rtype: dict + """ + return self._token + + @token.setter + def token(self, value): + self._token = value + + @property + def user_realm_name(self): + """Get user realm name. + + :returns: User realm name + :rtype: str + """ + return self._user_realm_name + + @user_realm_name.setter + def user_realm_name(self, value): + self._user_realm_name = value + + @property + def custom_headers(self): + """Get custom headers. + + :returns: Custom headers + :rtype: dict + """ + return self._custom_headers + + @custom_headers.setter + def custom_headers(self, value): + self._custom_headers = value + if self.custom_headers is not None: + # merge custom headers to main headers + self.headers.update(self.custom_headers) + + @property + def auto_refresh_token(self): + """Get auto refresh token. + + :returns: List of methods for automatic token refresh + :rtype: list + """ + return self._auto_refresh_token + + @auto_refresh_token.setter + def auto_refresh_token(self, value): + allowed_methods = {"get", "post", "put", "delete"} + if not isinstance(value, Iterable): + raise TypeError( + "Expected a list of strings among {allowed}".format(allowed=allowed_methods) + ) + if not all(method in allowed_methods for method in value): + raise TypeError( + "Unexpected method in auto_refresh_token, accepted methods are {allowed}".format( + allowed=allowed_methods + ) + ) + + self._auto_refresh_token = value + + def get_token(self): + """Get admin token. + + The admin token is then set in the `token` attribute. + """ + if self.user_realm_name: + token_realm_name = self.user_realm_name + elif self.realm_name: + token_realm_name = self.realm_name + else: + token_realm_name = "master" + + self.keycloak_openid = KeycloakOpenID( + server_url=self.server_url, + client_id=self.client_id, + realm_name=token_realm_name, + verify=self.verify, + client_secret_key=self.client_secret_key, + timeout=self.timeout, + ) + + grant_type = [] + if self.client_secret_key: + if self.user_realm_name: + self.realm_name = self.user_realm_name + grant_type.append("client_credentials") + elif self.username and self.password: + grant_type.append("password") + + if grant_type: + self.token = self.keycloak_openid.token( + self.username, self.password, grant_type=grant_type, totp=self.totp + ) + else: + self.token = None + + def refresh_token(self): + """Refresh the token. + + :raises KeycloakPostError: In case the refresh token request failed. + """ + refresh_token = self.token.get("refresh_token", None) + if refresh_token is None: + self.get_token() + else: + try: + self.token = self.keycloak_openid.refresh_token(refresh_token) + except KeycloakPostError as e: + list_errors = [ + b"Refresh token expired", + b"Token is not active", + b"Session not active", + ] + if e.response_code == 400 and any(err in e.response_body for err in list_errors): + self.get_token() + else: + raise + + self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token")) + + def _refresh_if_required(self): + if datetime.now() >= self.expires_at: + self.refresh_token() + + def raw_get(self, *args, **kwargs): + """Call connection.raw_get. + + If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token + and try *get* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_get(*args, **kwargs) + if "get" in self.auto_refresh_token and r.status_code == 401: + self.refresh_token() + return super().raw_get(*args, **kwargs) + return r + + def raw_post(self, *args, **kwargs): + """Call connection.raw_post. + + If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token + and try *post* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_post(*args, **kwargs) + if "post" in self.auto_refresh_token and r.status_code == 401: + self.refresh_token() + return super().raw_post(*args, **kwargs) + return r + + def raw_put(self, *args, **kwargs): + """Call connection.raw_put. + + If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token + and try *put* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_put(*args, **kwargs) + if "put" in self.auto_refresh_token and r.status_code == 401: + self.refresh_token() + return super().raw_put(*args, **kwargs) + return r + + def raw_delete(self, *args, **kwargs): + """Call connection.raw_delete. + + If auto_refresh is set for *delete* and *access_token* is expired, + it will refresh the token and try *delete* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_delete(*args, **kwargs) + if "delete" in self.auto_refresh_token and r.status_code == 401: + self.refresh_token() + return super().raw_delete(*args, **kwargs) + return r diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 0bb7dda..5b97140 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -2197,12 +2197,10 @@ def test_auto_refresh(admin: KeycloakAdmin, realm: str): """ # Test get refresh admin.auto_refresh_token = list() - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) + admin.connection.custom_headers = { + "Authorization": "Bearer bad", + "Content-Type": "application/json", + } with pytest.raises(KeycloakAuthenticationError) as err: admin.get_realm(realm_name=realm) @@ -2213,12 +2211,10 @@ def test_auto_refresh(admin: KeycloakAdmin, realm: str): assert admin.get_realm(realm_name=realm) # Test bad refresh token - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) + admin.connection.custom_headers = { + "Authorization": "Bearer bad", + "Content-Type": "application/json", + } admin.token["refresh_token"] = "bad" with pytest.raises(KeycloakPostError) as err: admin.get_realm(realm_name="test-refresh") @@ -2230,12 +2226,10 @@ def test_auto_refresh(admin: KeycloakAdmin, realm: str): admin.realm_name = realm # Test post refresh - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) + admin.connection.custom_headers = { + "Authorization": "Bearer bad", + "Content-Type": "application/json", + } with pytest.raises(KeycloakAuthenticationError) as err: admin.create_realm(payload={"realm": "test-refresh"}) assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') @@ -2247,12 +2241,10 @@ def test_auto_refresh(admin: KeycloakAdmin, realm: str): admin.realm_name = realm # Test update refresh - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) + admin.connection.custom_headers = { + "Authorization": "Bearer bad", + "Content-Type": "application/json", + } with pytest.raises(KeycloakAuthenticationError) as err: admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') @@ -2263,12 +2255,10 @@ def test_auto_refresh(admin: KeycloakAdmin, realm: str): ) # Test delete refresh - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) + admin.connection.custom_headers = { + "Authorization": "Bearer bad", + "Content-Type": "application/json", + } with pytest.raises(KeycloakAuthenticationError) as err: admin.delete_realm(realm_name="test-refresh") assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')