From 7bbf4e15b7ccb32014217b5c5353aecea3d96aa9 Mon Sep 17 00:00:00 2001 From: Nuwan Goonasekera <2070605+nuwang@users.noreply.github.com> Date: Tue, 7 Mar 2023 03:52:24 +0530 Subject: [PATCH] fix: Refactor auto refresh (#415) * refactor: Factor our OpenIdConnectionManager class and deprecate old methods * refactor: Refactor keycloak uma client to use openid connection manager * fix: Perform token renewal at 90% of lifetime * refactor: Add optional openid connection constructor param to keycloak admin * refactor: Remove auto_refresh_token in favour of automatic refresh on expiry * refactor: move KeycloakOpenIDConnectionManager to a separate file * docs: uma additions and fixes * refactor: rename token_renewal_fraction->token_lifetime_fraction * refactor: shorten KeycloakOpenIDConnectionManager->KeycloakOpenIDConnection * docs: incorporate review comments --- README.md | 53 +- poetry.lock | 47 +- pyproject.toml | 2 + src/keycloak/__init__.py | 2 + src/keycloak/keycloak_admin.py | 804 ++++++++++++++++++------------ src/keycloak/keycloak_uma.py | 77 +-- src/keycloak/openid_connection.py | 408 +++++++++++++++ tests/conftest.py | 54 +- tests/test_keycloak_admin.py | 141 ++---- tests/test_keycloak_openid.py | 3 +- tests/test_keycloak_uma.py | 52 +- 11 files changed, 1105 insertions(+), 538 deletions(-) create mode 100644 src/keycloak/openid_connection.py diff --git a/README.md b/README.md index 6af1617..d3e4cd0 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/", realm_name="example_realm", client_secret_key="secret") -# Get WellKnow +# Get WellKnown config_well_known = keycloak_openid.well_known() # Get Code With Oauth Authorization Request @@ -142,14 +142,19 @@ auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Sc # KEYCLOAK ADMIN from keycloak import KeycloakAdmin +from keycloak import KeycloakOpenIDConnection -keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/", - username='example-admin', - password='secret', - realm_name="master", - user_realm_name="only_if_other_realm_than_master", - client_secret_key="client-secret", - verify=True) +keycloak_connection = KeycloakOpenIDConnection( + server_url="http://localhost:8080/", + username='example-admin', + password='secret', + realm_name="master", + user_realm_name="only_if_other_realm_than_master", + client_id="my_client", + client_secret_key="client-secret", + verify=True) + +keycloak_admin = KeycloakAdmin(connection=keycloak_connection) # Add user new_user = keycloak_admin.create_user({"email": "example@example.com", @@ -344,4 +349,36 @@ keycloak_admin.get_users() # Get user in main realm keycloak_admin.realm_name = "demo" # Change realm to 'demo' keycloak_admin.get_users() # Get users in realm 'demo' keycloak_admin.create_user(...) # Creates a new user in 'demo' + +# KEYCLOAK UMA + +from keycloak import KeycloakOpenIDConnection +from keycloak import KeycloakUMA + +keycloak_connection = KeycloakOpenIDConnection( + server_url="http://localhost:8080/", + realm_name="master", + client_id="my_client", + client_secret_key="client-secret") + +keycloak_uma = KeycloakUMA(connection=keycloak_connection) + +# Create a resource set +resource_set = keycloak_uma.resource_set_create({ + "name": "example_resource", + "scopes": ["example:read", "example:write"], + "type": "urn:example"}) + +# List resource sets +resource_sets = uma.resource_set_list() + +# get resource set +latest_resource = uma.resource_set_read(resource_set["_id"]) + +# update resource set +latest_resource["name"] = "New Resource Name" +uma.resource_set_update(resource_set["_id"], latest_resource) + +# delete resource set +uma.resource_set_delete(resource_id=resource_set["_id"]) ``` diff --git a/poetry.lock b/poetry.lock index 83555c9..738b108 100644 --- a/poetry.lock +++ b/poetry.lock @@ -475,6 +475,21 @@ files = [ {file = "decli-0.5.2.tar.gz", hash = "sha256:f2cde55034a75c819c630c7655a844c612f2598c42c21299160465df6ad463ad"}, ] +[[package]] +name = "deprecation" +version = "2.1.0" +description = "A library to handle automated deprecations" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a"}, + {file = "deprecation-2.1.0.tar.gz", hash = "sha256:72b3bde64e5d778694b0cf68178aed03d15e15477116add3fb773e581f9518ff"}, +] + +[package.dependencies] +packaging = "*" + [[package]] name = "distlib" version = "0.3.6" @@ -583,6 +598,21 @@ files = [ flake8 = ">=3" pydocstyle = ">=2.1" +[[package]] +name = "freezegun" +version = "1.2.2" +description = "Let your Python tests travel through time" +category = "dev" +optional = false +python-versions = ">=3.6" +files = [ + {file = "freezegun-1.2.2-py3-none-any.whl", hash = "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"}, + {file = "freezegun-1.2.2.tar.gz", hash = "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446"}, +] + +[package.dependencies] +python-dateutil = ">=2.7" + [[package]] name = "identify" version = "2.5.18" @@ -1260,6 +1290,21 @@ pytest = ">=4.6" [package.extras] testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtualenv"] +[[package]] +name = "python-dateutil" +version = "2.8.2" +description = "Extensions to the standard Python datetime module" +category = "dev" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +files = [ + {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, + {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, +] + +[package.dependencies] +six = ">=1.5" + [[package]] name = "python-jose" version = "3.3.0" @@ -2094,4 +2139,4 @@ docs = ["Sphinx", "alabaster", "commonmark", "m2r2", "mock", "readthedocs-sphinx [metadata] lock-version = "2.0" python-versions = "^3.7" -content-hash = "8d76b155adddd2eacd0304397b33465d5c67f09165d8de641c71f5ce7b979be2" +content-hash = "70bb30bae9ff3d8b6c54553f755b2f31725701f379ae9aeb4a2a5658d2f6d51a" diff --git a/pyproject.toml b/pyproject.toml index d855cbb..b3e10af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ readthedocs-sphinx-ext = {version = "^2.1.9", optional = true} m2r2 = {version = "^0.3.2", optional = true} sphinx-autoapi = {version = "^2.0.0", optional = true} requests-toolbelt = "^0.10.1" +deprecation = "^2.1.0" [tool.poetry.extras] docs = [ @@ -72,6 +73,7 @@ cryptography = "^37.0.4" codespell = "^2.1.0" darglint = "^1.8.1" twine = "^4.0.2" +freezegun = "^1.2.2" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/src/keycloak/__init__.py b/src/keycloak/__init__.py index 7e49c8f..9a6961d 100644 --- a/src/keycloak/__init__.py +++ b/src/keycloak/__init__.py @@ -43,6 +43,7 @@ from .exceptions import ( from .keycloak_admin import KeycloakAdmin from .keycloak_openid import KeycloakOpenID from .keycloak_uma import KeycloakUMA +from .openid_connection import KeycloakOpenIDConnection __all__ = [ "__version__", @@ -62,5 +63,6 @@ __all__ = [ "KeycloakSecretNotFound", "KeycloakAdmin", "KeycloakOpenID", + "KeycloakOpenIDConnection", "KeycloakUMA", ] diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 7f43879..50ed7fd 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -29,12 +29,12 @@ import copy import json from builtins import isinstance -from typing import Iterable +import deprecation from requests_toolbelt import MultipartEncoder from . import urls_patterns -from .connection import ConnectionManager +from ._version import __version__ from .exceptions import ( KeycloakDeleteError, KeycloakGetError, @@ -42,7 +42,7 @@ from .exceptions import ( KeycloakPutError, raise_error_from_response, ) -from .keycloak_openid import KeycloakOpenID +from .openid_connection import KeycloakOpenIDConnection class KeycloakAdmin: @@ -76,22 +76,14 @@ class KeycloakAdmin: :type auto_refresh_token: list :param timeout: connection timeout in seconds :type timeout: int + :param connection: A KeycloakOpenIDConnection as an alternative to individual params. + :type connection: KeycloakOpenIDConnection """ PAGE_SIZE = 100 - _server_url = None - _username = None - _password = None - _totp = None - _realm_name = None - _client_id = None - _verify = None - _client_secret_key = None _auto_refresh_token = None _connection = None - _custom_headers = None - _user_realm_name = None def __init__( self, @@ -108,6 +100,7 @@ class KeycloakAdmin: user_realm_name=None, auto_refresh_token=None, timeout=60, + connection: KeycloakOpenIDConnection = None, ): """Init method. @@ -139,73 +132,81 @@ class KeycloakAdmin: :type auto_refresh_token: list :param timeout: connection timeout in seconds :type timeout: int - """ - self.server_url = server_url - self.username = username - self.password = password - self.token = token - self.totp = totp - self.realm_name = realm_name - self.client_id = client_id - self.verify = verify - self.client_secret_key = client_secret_key - self.auto_refresh_token = auto_refresh_token or [] - self.user_realm_name = user_realm_name - self.custom_headers = custom_headers - self.timeout = timeout - - if self.token is None: - self.get_token() - - headers = ( - { - "Authorization": "Bearer " + self.token.get("access_token"), - "Content-Type": "application/json", - } - if self.token is not None - else {} - ) - - if self.custom_headers is not None: - # merge custom headers to main headers - headers.update(self.custom_headers) - - self.connection = ConnectionManager( - base_url=self.server_url, headers=headers, timeout=60, verify=self.verify - ) + :param connection: An OpenID Connection as an alternative to individual params. + :type connection: KeycloakOpenIDConnection + """ + self.connection = connection or KeycloakOpenIDConnection( + server_url=server_url, + username=username, + password=password, + token=token, + totp=totp, + realm_name=realm_name, + client_id=client_id, + verify=verify, + client_secret_key=client_secret_key, + user_realm_name=user_realm_name, + custom_headers=custom_headers, + timeout=timeout, + ) + self.auto_refresh_token = auto_refresh_token @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.server_url property instead", + ) def server_url(self): """Get server url. :returns: Keycloak server url :rtype: str """ - return self._server_url + return self.connection.server_url @server_url.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.server_url property instead", + ) def server_url(self, value): - self._server_url = value + self.connection.server_url = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.realm_name property instead", + ) def realm_name(self): """Get realm name. :returns: Realm name :rtype: str """ - return self._realm_name + return self.connection.realm_name @realm_name.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.realm_name property instead", + ) def realm_name(self, value): - self._realm_name = value + self.connection.realm_name = value @property def connection(self): """Get connection. :returns: Connection manager - :rtype: ConnectionManager + :rtype: KeycloakOpenIDConnection """ return self._connection @@ -214,146 +215,254 @@ class KeycloakAdmin: self._connection = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.client_id property instead", + ) def client_id(self): """Get client id. :returns: Client id :rtype: str """ - return self._client_id + return self.connection.client_id @client_id.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.client_id property instead", + ) def client_id(self, value): - self._client_id = value + self.connection.client_id = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.client_secret_key property instead", + ) def client_secret_key(self): """Get client secret key. :returns: Client secret key :rtype: str """ - return self._client_secret_key + return self.connection.client_secret_key @client_secret_key.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.client_secret_key property instead", + ) def client_secret_key(self, value): - self._client_secret_key = value + self.connection.client_secret_key = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.verify property instead", + ) def verify(self): """Get verify. :returns: Verify indicator :rtype: bool """ - return self._verify + return self.connection.verify @verify.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.verify property instead", + ) def verify(self, value): - self._verify = value + self.connection.verify = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.username property instead", + ) def username(self): """Get username. :returns: Admin username :rtype: str """ - return self._username + return self.connection.username @username.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.username property instead", + ) def username(self, value): - self._username = value + self.connection.username = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.password property instead", + ) def password(self): """Get password. :returns: Admin password :rtype: str """ - return self._password + return self.connection.password @password.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.password property instead", + ) def password(self, value): - self._password = value + self.connection.password = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.totp property instead", + ) def totp(self): """Get totp. :returns: TOTP :rtype: str """ - return self._totp + return self.connection.totp @totp.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.totp property instead", + ) def totp(self, value): - self._totp = value + self.connection.totp = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.token property instead", + ) def token(self): """Get token. :returns: Access and refresh token :rtype: dict """ - return self._token + return self.connection.token @token.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.token property instead", + ) def token(self, value): - self._token = value - - @property - def auto_refresh_token(self): - """Get auto refresh token. - - :returns: List of methods for automatic token refresh - :rtype: list - """ - return self._auto_refresh_token + self.connection.token = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.user_realm_name property instead", + ) def user_realm_name(self): """Get user realm name. :returns: User realm name :rtype: str """ - return self._user_realm_name + return self.connection.user_realm_name @user_realm_name.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.user_realm_name property instead", + ) def user_realm_name(self, value): - self._user_realm_name = value + self.connection.user_realm_name = value @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.custom_headers property instead", + ) def custom_headers(self): """Get custom headers. :returns: Custom headers :rtype: dict """ - return self._custom_headers + return self.connection.custom_headers @custom_headers.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.custom_headers property instead", + ) def custom_headers(self, value): - self._custom_headers = value + self.connection.custom_headers = value + + @property + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Auto-refresh will be implicitly set for all requests", + ) + def auto_refresh_token(self): + """Get auto refresh token. + + :returns: List of methods for automatic token refresh + :rtype: list + """ + return self._auto_refresh_token @auto_refresh_token.setter + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Auto-refresh will be implicitly set for all requests", + ) def auto_refresh_token(self, value): - allowed_methods = {"get", "post", "put", "delete"} - if not isinstance(value, Iterable): - raise TypeError( - "Expected a list of strings among {allowed}".format(allowed=allowed_methods) - ) - if not all(method in allowed_methods for method in value): - raise TypeError( - "Unexpected method in auto_refresh_token, accepted methods are {allowed}".format( - allowed=allowed_methods - ) - ) - - self._auto_refresh_token = value + self._auto_refresh_token = value or [] def __fetch_all(self, url, query=None): """Paginate over get requests. @@ -380,7 +489,7 @@ class KeycloakAdmin: while True: query["first"] = page * self.PAGE_SIZE partial_results = raise_error_from_response( - self.raw_get(url, **query), KeycloakGetError + self.connection.raw_get(url, **query), KeycloakGetError ) if not partial_results: break @@ -401,7 +510,7 @@ class KeycloakAdmin: :rtype: dict """ query = query or {} - return raise_error_from_response(self.raw_get(url, **query), KeycloakGetError) + return raise_error_from_response(self.connection.raw_get(url, **query), KeycloakGetError) def import_realm(self, payload): """Import a new realm from a RealmRepresentation. @@ -416,7 +525,9 @@ class KeycloakAdmin: :return: RealmRepresentation :rtype: dict """ - data_raw = self.raw_post(urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload)) + data_raw = self.connection.raw_post( + urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload) + ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) def export_realm(self, export_clients=False, export_groups_and_role=False): @@ -438,7 +549,7 @@ class KeycloakAdmin: "export-clients": export_clients, "export-groups-and-roles": export_groups_and_role, } - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_EXPORT.format(**params_path), data="" ) return raise_error_from_response(data_raw, KeycloakPostError) @@ -449,7 +560,7 @@ class KeycloakAdmin: :return: realms list :rtype: list """ - data_raw = self.raw_get(urls_patterns.URL_ADMIN_REALMS) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_REALMS) return raise_error_from_response(data_raw, KeycloakGetError) def get_realm(self, realm_name): @@ -464,7 +575,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) def create_realm(self, payload, skip_exists=False): @@ -480,7 +591,9 @@ class KeycloakAdmin: :return: Keycloak server response (RealmRepresentation) :rtype: dict """ - data_raw = self.raw_post(urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload)) + data_raw = self.connection.raw_post( + urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload) + ) return raise_error_from_response( data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) @@ -502,7 +615,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": realm_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_REALM.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -516,7 +629,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": realm_name} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_users(self, query=None): @@ -553,7 +666,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_IDPS.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -572,7 +685,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "alias": idp_alias} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_IDP.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -591,7 +704,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "idp-alias": idp_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -617,7 +730,7 @@ class KeycloakAdmin: "mapper-id": mapper_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_IDP_MAPPER_UPDATE.format(**params_path), data=json.dumps(payload), ) @@ -638,7 +751,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "idp-alias": idp_alias} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_idps(self): @@ -653,7 +768,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def delete_idp(self, idp_alias): @@ -665,7 +780,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "alias": idp_alias} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_IDP.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_IDP.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def create_user(self, payload, exist_ok=False): @@ -693,7 +808,7 @@ class KeycloakAdmin: if exists is not None: return str(exists) - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USERS.format(**params_path), data=json.dumps(payload) ) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -713,7 +828,9 @@ class KeycloakAdmin: """ query = query or dict() params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USERS_COUNT.format(**params_path), **query) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USERS_COUNT.format(**params_path), **query + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_user_id(self, username): @@ -745,7 +862,7 @@ class KeycloakAdmin: :return: UserRepresentation """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_USER.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_user_groups(self, user_id, brief_representation=True): @@ -762,7 +879,7 @@ class KeycloakAdmin: """ params = {"briefRepresentation": brief_representation} params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_GROUPS.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -779,7 +896,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_USER.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -829,7 +946,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_USER.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_USER.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def set_user_password(self, user_id, password, temporary=True): @@ -852,7 +969,7 @@ class KeycloakAdmin: """ payload = {"type": "password", "temporary": temporary, "value": password} params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_RESET_PASSWORD.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -871,7 +988,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER_CREDENTIALS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USER_CREDENTIALS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def delete_credential(self, user_id, credential_id): @@ -892,7 +1011,9 @@ class KeycloakAdmin: "id": user_id, "credential_id": credential_id, } - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_USER_CREDENTIAL.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_USER_CREDENTIAL.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError) def user_logout(self, user_id): @@ -906,7 +1027,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_LOGOUT.format(**params_path), data="" ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) @@ -923,7 +1044,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER_CONSENTS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USER_CONSENTS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_user_social_logins(self, user_id): @@ -937,7 +1060,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITIES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -962,7 +1085,7 @@ class KeycloakAdmin: "userName": provider_username, } params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path), data=json.dumps(payload), ) @@ -979,7 +1102,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -1007,7 +1130,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), data=json.dumps(payload), **params_query, @@ -1031,7 +1154,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params_query = {"client_id": client_id, "redirect_uri": redirect_uri} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path), data={}, **params_query, @@ -1050,7 +1173,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_GET_SESSIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_GET_SESSIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_server_info(self): @@ -1062,7 +1187,7 @@ class KeycloakAdmin: :return: ServerInfoRepresentation :rtype: dict """ - data_raw = self.raw_get(urls_patterns.URL_ADMIN_SERVER_INFO) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_SERVER_INFO) return raise_error_from_response(data_raw, KeycloakGetError) def get_groups(self, query=None): @@ -1101,7 +1226,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_subgroups(self, group, path): @@ -1190,12 +1315,12 @@ class KeycloakAdmin: """ if parent is None: params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUPS.format(**params_path), data=json.dumps(payload) ) else: params_path = {"realm-name": self.realm_name, "id": parent} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUP_CHILD.format(**params_path), data=json.dumps(payload) ) @@ -1223,7 +1348,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_GROUP.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -1241,7 +1366,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_GROUP_PERMISSIONS.format(**params_path), data=json.dumps({"enabled": enabled}), ) @@ -1258,7 +1383,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path), data=None ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -1274,7 +1399,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def delete_group(self, group_id): @@ -1286,7 +1413,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_clients(self): @@ -1301,7 +1428,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENTS.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_CLIENTS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_client(self, client_id): @@ -1316,7 +1443,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_id(self, client_id): @@ -1348,7 +1475,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1370,7 +1497,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path), data=json.dumps(payload), ) @@ -1388,7 +1515,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1423,7 +1550,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY.format(**params_path), data=json.dumps(payload), ) @@ -1462,7 +1589,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION.format(**params_path), data=json.dumps(payload), ) @@ -1480,7 +1607,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def create_client_authz_scopes(self, client_id, payload): @@ -1496,7 +1625,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path), data=json.dumps(payload), ) @@ -1512,7 +1641,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1527,7 +1656,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICIES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1542,7 +1671,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1557,7 +1686,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1588,7 +1717,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -1610,7 +1739,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError) @@ -1625,7 +1754,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1656,7 +1785,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -1678,7 +1807,7 @@ class KeycloakAdmin: "id": client_id, "client_scope_id": client_scope_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError) @@ -1703,7 +1832,7 @@ class KeycloakAdmin: return client_id params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENTS.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -1724,7 +1853,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -1741,7 +1870,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_client_installation_provider(self, client_id, provider_id): @@ -1761,7 +1890,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "provider-id": provider_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_INSTALLATION_PROVIDER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) @@ -1779,7 +1908,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1822,7 +1951,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "role-id": self.get_default_realm_role_id()} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES_REALM.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1836,7 +1965,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "role-id": self.get_default_realm_role_id()} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), data=json.dumps(payload), ) @@ -1851,7 +1980,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "role-id": self.get_default_realm_role_id()} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), data=json.dumps(payload), ) @@ -1872,7 +2001,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_ROLES.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -1893,7 +2022,9 @@ class KeycloakAdmin: :rtype: str """ params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_role_id(self, client_id, role_name): @@ -1937,7 +2068,7 @@ class KeycloakAdmin: pass params_path = {"realm-name": self.realm_name, "id": client_role_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -1960,7 +2091,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path), data=json.dumps(payload), ) @@ -1982,7 +2113,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -2001,7 +2132,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def assign_client_role(self, user_id, client_id, roles): @@ -2018,7 +2151,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2078,7 +2211,7 @@ class KeycloakAdmin: pass params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -2099,7 +2232,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2115,7 +2248,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path), data=json.dumps(payload), ) @@ -2130,7 +2263,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -2147,7 +2280,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), data=json.dumps(payload), ) @@ -2165,7 +2298,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), data=json.dumps(payload), ) @@ -2180,7 +2313,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "role-name": role_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2197,7 +2330,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2215,7 +2348,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2230,7 +2363,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2253,7 +2386,7 @@ class KeycloakAdmin: "id": client_id, "client": client_roles_owner_id, } - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2277,7 +2410,7 @@ class KeycloakAdmin: "id": client_id, "client": client_roles_owner_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2298,7 +2431,7 @@ class KeycloakAdmin: "id": client_id, "client": client_roles_owner_id, } - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2315,7 +2448,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2333,7 +2466,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2348,7 +2481,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_available_realm_roles_of_user(self, user_id): @@ -2360,7 +2495,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_REALM_ROLES_AVAILABLE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2377,7 +2512,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": user_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_USER_REALM_ROLES_COMPOSITE.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2394,7 +2529,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2412,7 +2547,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2430,7 +2565,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": group_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2449,7 +2584,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2466,7 +2601,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def delete_group_client_roles(self, group_id, client_id, roles): @@ -2483,7 +2620,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2551,7 +2688,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_get(client_level_role_mapping_url.format(**params_path), **params) + data_raw = self.connection.raw_get( + client_level_role_mapping_url.format(**params_path), **params + ) return raise_error_from_response(data_raw, KeycloakGetError) def delete_client_roles_of_user(self, user_id, client_id, roles): @@ -2568,7 +2707,7 @@ class KeycloakAdmin: """ payload = roles if isinstance(roles, list) else [roles] params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), data=json.dumps(payload), ) @@ -2586,7 +2725,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_FLOWS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def get_authentication_flow_for_id(self, flow_id): @@ -2603,7 +2742,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "flow-id": flow_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS_ALIAS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_FLOWS_ALIAS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def create_authentication_flow(self, payload, skip_exists=False): @@ -2620,7 +2761,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response( @@ -2640,7 +2781,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS_COPY.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -2657,7 +2798,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": flow_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_FLOW.format(**params_path)) + data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_FLOW.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_authentication_flow_executions(self, flow_alias): @@ -2671,7 +2812,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def update_authentication_flow_executions(self, payload, flow_alias): @@ -2688,7 +2831,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), data=json.dumps(payload), ) @@ -2706,7 +2849,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": execution_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def create_authentication_flow_execution(self, payload, flow_alias): @@ -2723,7 +2868,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION.format(**params_path), data=json.dumps(payload), ) @@ -2741,7 +2886,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": execution_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): @@ -2760,7 +2907,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), data=json.dumps(payload), ) @@ -2775,7 +2922,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_AUTHENTICATOR_PROVIDERS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2792,7 +2939,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "provider-id": provider_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG_DESCRIPTION.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -2808,7 +2955,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": config_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def update_authenticator_config(self, payload, config_id): @@ -2825,7 +2974,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": config_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path), data=json.dumps(payload), ) @@ -2842,7 +2991,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": config_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -2861,7 +3010,7 @@ class KeycloakAdmin: params_query = {"action": action} params_path = {"realm-name": self.realm_name, "id": storage_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_USER_STORAGE.format(**params_path), data=json.dumps(data), **params_query, @@ -2878,7 +3027,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_scope(self, client_scope_id): @@ -2893,7 +3044,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_scope_by_name(self, client_scope_name): @@ -2934,7 +3087,7 @@ class KeycloakAdmin: return exists["id"] params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path), data=json.dumps(payload) ) raise_error_from_response( @@ -2957,7 +3110,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -2974,7 +3127,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_mappers_from_client_scope(self, client_scope_id): @@ -2987,7 +3142,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) @@ -3006,7 +3161,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload), ) @@ -3031,7 +3186,7 @@ class KeycloakAdmin: "protocol-mapper-id": protocol_mapper_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3057,7 +3212,7 @@ class KeycloakAdmin: "protocol-mapper-id": protocol_mapper_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path), data=json.dumps(payload), ) @@ -3073,7 +3228,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3087,7 +3242,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": scope_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3102,7 +3257,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": scope_id} payload = {"realm": self.realm_name, "clientScopeId": scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -3117,7 +3272,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3131,7 +3286,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": scope_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3146,7 +3301,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": scope_id} payload = {"realm": self.realm_name, "clientScopeId": scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload), ) @@ -3164,7 +3319,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path) ) @@ -3184,7 +3339,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path), data=json.dumps(payload), ) @@ -3209,7 +3364,7 @@ class KeycloakAdmin: "protocol-mapper-id": mapper_id, } - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path), data=json.dumps(payload), ) @@ -3234,7 +3389,7 @@ class KeycloakAdmin: "protocol-mapper-id": client_mapper_id, } - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -3250,7 +3405,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path), data=None ) return raise_error_from_response(data_raw, KeycloakPostError) @@ -3266,7 +3421,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_components(self, query=None): @@ -3284,7 +3441,7 @@ class KeycloakAdmin: """ query = query or dict() params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=None, **query ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3301,7 +3458,7 @@ class KeycloakAdmin: :rtype: str """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=json.dumps(payload) ) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) @@ -3322,7 +3479,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "component-id": component_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) def update_component(self, component_id, payload): @@ -3337,7 +3494,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "component-id": component_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_COMPONENT.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) @@ -3351,7 +3508,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "component-id": component_id} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_COMPONENT.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def get_keys(self): @@ -3366,7 +3525,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_KEYS.format(**params_path), data=None) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_KEYS.format(**params_path), data=None + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_events(self, query=None): @@ -3384,7 +3545,7 @@ class KeycloakAdmin: """ query = query or dict() params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_EVENTS.format(**params_path), data=None, **query ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3401,11 +3562,17 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_EVENTS_CONFIG.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.raw_get function instead", + ) def raw_get(self, *args, **kwargs): """Call connection.raw_get. @@ -3419,12 +3586,14 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_get(*args, **kwargs) - if "get" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_get(*args, **kwargs) - return r + return self.connection.raw_get(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.raw_post function instead", + ) def raw_post(self, *args, **kwargs): """Call connection.raw_post. @@ -3438,12 +3607,14 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_post(*args, **kwargs) - if "post" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_post(*args, **kwargs) - return r + return self.connection.raw_post(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.raw_put function instead", + ) def raw_put(self, *args, **kwargs): """Call connection.raw_put. @@ -3457,12 +3628,14 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_put(*args, **kwargs) - if "put" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_put(*args, **kwargs) - return r + return self.connection.raw_put(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.raw_delete function instead", + ) def raw_delete(self, *args, **kwargs): """Call connection.raw_delete. @@ -3476,74 +3649,37 @@ class KeycloakAdmin: :returns: Response :rtype: Response """ - r = self.connection.raw_delete(*args, **kwargs) - if "delete" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return self.connection.raw_delete(*args, **kwargs) - return r + return self.connection.raw_delete(*args, **kwargs) + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.get_token function instead", + ) def get_token(self): """Get admin token. The admin token is then set in the `token` attribute. + + :returns: token + :rtype: dict """ - if self.user_realm_name: - token_realm_name = self.user_realm_name - elif self.realm_name: - token_realm_name = self.realm_name - else: - token_realm_name = "master" - - self.keycloak_openid = KeycloakOpenID( - server_url=self.server_url, - client_id=self.client_id, - realm_name=token_realm_name, - verify=self.verify, - client_secret_key=self.client_secret_key, - custom_headers=self.custom_headers, - timeout=self.timeout, - ) - - grant_type = [] - if self.client_secret_key: - if self.user_realm_name: - self.realm_name = self.user_realm_name - grant_type.append("client_credentials") - elif self.username and self.password: - grant_type.append("password") - - if grant_type: - self.token = self.keycloak_openid.token( - self.username, self.password, grant_type=grant_type, totp=self.totp - ) - else: - self.token = None + return self.connection.get_token() + @deprecation.deprecated( + deprecated_in="2.13.0", + removed_in="3.0.0", + current_version=__version__, + details="Use the connection.refresh_token function instead", + ) def refresh_token(self): """Refresh the token. - :raises KeycloakPostError: In case the refresh token request failed. + :returns: token + :rtype: dict """ - refresh_token = self.token.get("refresh_token", None) - if refresh_token is None: - self.get_token() - else: - try: - self.token = self.keycloak_openid.refresh_token(refresh_token) - except KeycloakPostError as e: - list_errors = [ - b"Refresh token expired", - b"Token is not active", - b"Session not active", - ] - if e.response_code == 400 and any(err in e.response_body for err in list_errors): - self.get_token() - else: - raise - - self.connection.add_param_headers( - "Authorization", "Bearer " + self.token.get("access_token") - ) + return self.connection.refresh_token() def get_client_all_sessions(self, client_id): """Get sessions associated with the client. @@ -3557,7 +3693,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_sessions_stats(self): @@ -3569,7 +3707,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def get_client_management_permissions(self, client_id): @@ -3582,7 +3722,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3608,7 +3748,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path), data=json.dumps(payload), ) @@ -3626,7 +3766,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3643,7 +3783,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3660,7 +3800,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id, "scope-id": scope_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3692,7 +3832,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id, "scope-id": scope_id} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path), data=json.dumps(payload), ) @@ -3708,7 +3848,7 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) @@ -3735,7 +3875,7 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY.format(**params_path), data=json.dumps(payload), ) @@ -3755,7 +3895,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": group_id, "client-id": client_id} params = {"briefRepresentation": brief_representation} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES_COMPOSITE.format(**params_path), **params ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3771,7 +3911,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name, "role-id": role_id, "client-id": client_id} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_ROLE_CHILDREN.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_CLIENT_ROLE_CHILDREN.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def upload_certificate(self, client_id, certcont): @@ -3790,7 +3932,7 @@ class KeycloakAdmin: new_headers = copy.deepcopy(self.connection.headers) new_headers["Content-Type"] = m.content_type self.connection.headers = new_headers - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_CERT_UPLOAD.format(**params_path), data=m, headers=new_headers, @@ -3818,7 +3960,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_REQUIRED_ACTIONS.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_REQUIRED_ACTIONS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def update_required_action(self, action_alias, payload): @@ -3834,7 +3978,7 @@ class KeycloakAdmin: if not isinstance(payload, str): payload = json.dumps(payload) params_path = {"realm-name": self.realm_name, "action-alias": action_alias} - data_raw = self.raw_put( + data_raw = self.connection.raw_put( urls_patterns.URL_ADMIN_REQUIRED_ACTIONS_ALIAS.format(**params_path), data=payload ) return raise_error_from_response(data_raw, KeycloakPutError) @@ -3848,7 +3992,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_get( + data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakGetError) @@ -3862,7 +4006,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name, "id": user_id} - data_raw = self.raw_delete( + data_raw = self.connection.raw_delete( urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError) @@ -3874,7 +4018,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_delete(urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path)) + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError) def clear_keys_cache(self): diff --git a/src/keycloak/keycloak_uma.py b/src/keycloak/keycloak_uma.py index a066567..d143bfd 100644 --- a/src/keycloak/keycloak_uma.py +++ b/src/keycloak/keycloak_uma.py @@ -29,7 +29,6 @@ https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html import json from urllib.parse import quote_plus -from .connection import ConnectionManager from .exceptions import ( KeycloakDeleteError, KeycloakGetError, @@ -37,50 +36,30 @@ from .exceptions import ( KeycloakPutError, raise_error_from_response, ) +from .openid_connection import KeycloakOpenIDConnection from .urls_patterns import URL_UMA_WELL_KNOWN class KeycloakUMA: """Keycloak UMA client. - :param server_url: Keycloak server url - :param client_id: client id - :param realm_name: realm name - :param client_secret_key: client secret key - :param verify: True if want check connection SSL - :param custom_headers: dict of custom header to pass to each HTML request - :param proxies: dict of proxies to sent the request by. - :param timeout: connection timeout in seconds + :param connection: OpenID connection manager """ - def __init__( - self, server_url, realm_name, verify=True, custom_headers=None, proxies=None, timeout=60 - ): + def __init__(self, connection: KeycloakOpenIDConnection): """Init method. - :param server_url: Keycloak server url - :type server_url: str - :param realm_name: realm name - :type realm_name: str - :param verify: True if want check connection SSL - :type verify: bool - :param custom_headers: dict of custom header to pass to each HTML request - :type custom_headers: dict - :param proxies: dict of proxies to sent the request by. - :type proxies: dict - :param timeout: connection timeout in seconds - :type timeout: int + :param connection: OpenID connection manager + :type connection: KeycloakOpenIDConnection """ - self.realm_name = realm_name - headers = custom_headers if custom_headers is not None else dict() - headers.update({"Content-Type": "application/json"}) - self.connection = ConnectionManager( - base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies - ) + self.connection = connection + custom_headers = self.connection.custom_headers or {} + custom_headers.update({"Content-Type": "application/json"}) + self.connection.custom_headers = custom_headers self._well_known = None def _fetch_well_known(self): - params_path = {"realm-name": self.realm_name} + params_path = {"realm-name": self.connection.realm_name} data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) @@ -102,9 +81,6 @@ class KeycloakUMA: """ return url.format(**{k: quote_plus(v) for k, v in kwargs.items()}) - def _add_bearer_token_header(self, token): - self.connection.add_param_headers("Authorization", "Bearer " + token) - @property def uma_well_known(self): """Get the well_known UMA2 config. @@ -117,7 +93,7 @@ class KeycloakUMA: self._well_known = self._fetch_well_known() return self._well_known - def resource_set_create(self, token, payload): + def resource_set_create(self, payload): """Create a resource set. Spec @@ -126,20 +102,17 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :param payload: ResourceRepresentation :type payload: dict :return: ResourceRepresentation with the _id property assigned :rtype: dict """ - self._add_bearer_token_header(token) data_raw = self.connection.raw_post( self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) - def resource_set_update(self, token, resource_id, payload): + def resource_set_update(self, resource_id, payload): """Update a resource set. Spec @@ -148,8 +121,6 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :param resource_id: id of the resource :type resource_id: str :param payload: ResourceRepresentation @@ -157,14 +128,13 @@ class KeycloakUMA: :return: Response dict (empty) :rtype: dict """ - self._add_bearer_token_header(token) url = self.format_url( self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id ) data_raw = self.connection.raw_put(url, data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) - def resource_set_read(self, token, resource_id): + def resource_set_read(self, resource_id): """Read a resource set. Spec @@ -173,56 +143,47 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :param resource_id: id of the resource :type resource_id: str :return: ResourceRepresentation :rtype: dict """ - self._add_bearer_token_header(token) url = self.format_url( self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id ) data_raw = self.connection.raw_get(url) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) - def resource_set_delete(self, token, resource_id): + def resource_set_delete(self, resource_id): """Delete a resource set. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set - :param token: client access token - :type token: str :param resource_id: id of the resource :type resource_id: str :return: Response dict (empty) :rtype: dict """ - self._add_bearer_token_header(token) url = self.format_url( self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id ) data_raw = self.connection.raw_delete(url) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def resource_set_list_ids(self, token): + def resource_set_list_ids(self): """List all resource set ids. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets - :param token: client access token - :type token: str :return: List of ids :rtype: List[str] """ - self._add_bearer_token_header(token) data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"]) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) - def resource_set_list(self, token): + def resource_set_list(self): """List all resource sets. Spec @@ -231,11 +192,9 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :yields: Iterator over a list of ResourceRepresentations :rtype: Iterator[dict] """ - for resource_id in self.resource_set_list_ids(token): - resource = self.resource_set_read(token, resource_id) + for resource_id in self.resource_set_list_ids(): + resource = self.resource_set_read(resource_id) yield resource diff --git a/src/keycloak/openid_connection.py b/src/keycloak/openid_connection.py new file mode 100644 index 0000000..3a865a1 --- /dev/null +++ b/src/keycloak/openid_connection.py @@ -0,0 +1,408 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +"""Keycloak OpenID Connection Manager module. + +The module contains mainly the implementation of KeycloakOpenIDConnection class. +This is an extension of the ConnectionManager class, and handles the automatic refresh +of openid tokens when required. +""" + +from datetime import datetime, timedelta + +from .connection import ConnectionManager +from .exceptions import KeycloakPostError +from .keycloak_openid import KeycloakOpenID + + +class KeycloakOpenIDConnection(ConnectionManager): + """A class to help with OpenID connections which can auto refresh tokens. + + :param object: _description_ + :type object: _type_ + """ + + _server_url = None + _username = None + _password = None + _totp = None + _realm_name = None + _client_id = None + _verify = None + _client_secret_key = None + _connection = None + _custom_headers = None + _user_realm_name = None + _expires_at = None + + def __init__( + self, + server_url, + username=None, + password=None, + token=None, + totp=None, + realm_name="master", + client_id="admin-cli", + verify=True, + client_secret_key=None, + custom_headers=None, + user_realm_name=None, + timeout=60, + ): + """Init method. + + :param server_url: Keycloak server url + :type server_url: str + :param username: admin username + :type username: str + :param password: admin password + :type password: str + :param token: access and refresh tokens + :type token: dict + :param totp: Time based OTP + :type totp: str + :param realm_name: realm name + :type realm_name: str + :param client_id: client id + :type client_id: str + :param verify: True if want check connection SSL + :type verify: bool + :param client_secret_key: client secret key + (optional, required only for access type confidential) + :type client_secret_key: str + :param custom_headers: dict of custom header to pass to each HTML request + :type custom_headers: dict + :param user_realm_name: The realm name of the user, if different from realm_name + :type user_realm_name: str + :param timeout: connection timeout in seconds + :type timeout: int + """ + # token is renewed when it hits 90% of its lifetime. This is to account for any possible + # clock skew. + self.token_lifetime_fraction = 0.9 + self.server_url = server_url + self.username = username + self.password = password + self.token = token + self.totp = totp + self.realm_name = realm_name + self.client_id = client_id + self.verify = verify + self.client_secret_key = client_secret_key + self.user_realm_name = user_realm_name + self.timeout = timeout + + if self.token is None: + self.get_token() + + self.headers = ( + { + "Authorization": "Bearer " + self.token.get("access_token"), + "Content-Type": "application/json", + } + if self.token is not None + else {} + ) + self.custom_headers = custom_headers + + super().__init__( + base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify + ) + + @property + def server_url(self): + """Get server url. + + :returns: Keycloak server url + :rtype: str + """ + return self.base_url + + @server_url.setter + def server_url(self, value): + self.base_url = value + + @property + def realm_name(self): + """Get realm name. + + :returns: Realm name + :rtype: str + """ + return self._realm_name + + @realm_name.setter + def realm_name(self, value): + self._realm_name = value + + @property + def client_id(self): + """Get client id. + + :returns: Client id + :rtype: str + """ + return self._client_id + + @client_id.setter + def client_id(self, value): + self._client_id = value + + @property + def client_secret_key(self): + """Get client secret key. + + :returns: Client secret key + :rtype: str + """ + return self._client_secret_key + + @client_secret_key.setter + def client_secret_key(self, value): + self._client_secret_key = value + + @property + def username(self): + """Get username. + + :returns: Admin username + :rtype: str + """ + return self._username + + @username.setter + def username(self, value): + self._username = value + + @property + def password(self): + """Get password. + + :returns: Admin password + :rtype: str + """ + return self._password + + @password.setter + def password(self, value): + self._password = value + + @property + def totp(self): + """Get totp. + + :returns: TOTP + :rtype: str + """ + return self._totp + + @totp.setter + def totp(self, value): + self._totp = value + + @property + def token(self): + """Get token. + + :returns: Access and refresh token + :rtype: dict + """ + return self._token + + @token.setter + def token(self, value): + self._token = value + self._expires_at = datetime.now() + timedelta( + seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0) + ) + + @property + def expires_at(self): + """Get token expiry time. + + :returns: Datetime at which the current token will expire + :rtype: datetime + """ + return self._expires_at + + @property + def user_realm_name(self): + """Get user realm name. + + :returns: User realm name + :rtype: str + """ + return self._user_realm_name + + @user_realm_name.setter + def user_realm_name(self, value): + self._user_realm_name = value + + @property + def custom_headers(self): + """Get custom headers. + + :returns: Custom headers + :rtype: dict + """ + return self._custom_headers + + @custom_headers.setter + def custom_headers(self, value): + self._custom_headers = value + if self.custom_headers is not None: + # merge custom headers to main headers + self.headers.update(self.custom_headers) + + def get_token(self): + """Get admin token. + + The admin token is then set in the `token` attribute. + """ + if self.user_realm_name: + token_realm_name = self.user_realm_name + elif self.realm_name: + token_realm_name = self.realm_name + else: + token_realm_name = "master" + + self.keycloak_openid = KeycloakOpenID( + server_url=self.server_url, + client_id=self.client_id, + realm_name=token_realm_name, + verify=self.verify, + client_secret_key=self.client_secret_key, + timeout=self.timeout, + ) + + grant_type = [] + if self.client_secret_key: + if self.user_realm_name: + self.realm_name = self.user_realm_name + grant_type.append("client_credentials") + elif self.username and self.password: + grant_type.append("password") + + if grant_type: + self.token = self.keycloak_openid.token( + self.username, self.password, grant_type=grant_type, totp=self.totp + ) + else: + self.token = None + + def refresh_token(self): + """Refresh the token. + + :raises KeycloakPostError: In case the refresh token request failed. + """ + refresh_token = self.token.get("refresh_token", None) if self.token else None + if refresh_token is None: + self.get_token() + else: + try: + self.token = self.keycloak_openid.refresh_token(refresh_token) + except KeycloakPostError as e: + list_errors = [ + b"Refresh token expired", + b"Token is not active", + b"Session not active", + ] + if e.response_code == 400 and any(err in e.response_body for err in list_errors): + self.get_token() + else: + raise + + self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token")) + + def _refresh_if_required(self): + if datetime.now() >= self.expires_at: + self.refresh_token() + + def raw_get(self, *args, **kwargs): + """Call connection.raw_get. + + If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token + and try *get* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_get(*args, **kwargs) + return r + + def raw_post(self, *args, **kwargs): + """Call connection.raw_post. + + If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token + and try *post* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_post(*args, **kwargs) + return r + + def raw_put(self, *args, **kwargs): + """Call connection.raw_put. + + If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token + and try *put* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_put(*args, **kwargs) + return r + + def raw_delete(self, *args, **kwargs): + """Call connection.raw_delete. + + If auto_refresh is set for *delete* and *access_token* is expired, + it will refresh the token and try *delete* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = super().raw_delete(*args, **kwargs) + return r diff --git a/tests/conftest.py b/tests/conftest.py index 18cb6a3..fcfdb4f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,9 @@ import ipaddress import os import uuid from datetime import datetime, timedelta +from typing import Tuple +import freezegun import pytest from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -12,7 +14,7 @@ from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID -from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakUMA +from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection, KeycloakUMA class KeycloakTestEnv(object): @@ -150,6 +152,23 @@ def admin(env: KeycloakTestEnv): ) +@pytest.fixture +@freezegun.freeze_time("2023-02-25 10:00:00") +def admin_frozen(env: KeycloakTestEnv): + """Fixture for initialized KeycloakAdmin class, with time frozen. + + :param env: Keycloak test environment + :type env: KeycloakTestEnv + :returns: Keycloak admin + :rtype: KeycloakAdmin + """ + return KeycloakAdmin( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + username=env.KEYCLOAK_ADMIN, + password=env.KEYCLOAK_ADMIN_PASSWORD, + ) + + @pytest.fixture def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin): """Fixture for initialized KeycloakOpenID class. @@ -478,17 +497,34 @@ def selfsigned_cert(): @pytest.fixture -def uma(env: KeycloakTestEnv, realm: str): +def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): """Fixture for initialized KeycloakUMA class. - :param env: Keycloak test environment - :type env: KeycloakTestEnv - :param realm: Keycloak realm - :type realm: str + :param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + :yields: Keycloak OpenID connection manager + :rtype: KeycloakOpenIDConnection + """ + oid, _, _ = oid_with_credentials_authz + connection = KeycloakOpenIDConnection( + server_url=oid.connection.base_url, + realm_name=oid.realm_name, + client_id=oid.client_id, + client_secret_key=oid.client_secret_key, + timeout=60, + ) + yield connection + + +@pytest.fixture +def uma(oid_connection_with_authz: KeycloakOpenIDConnection): + """Fixture for initialized KeycloakUMA class. + + :param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client + :type oid_connection_with_authz: KeycloakOpenIDConnection :yields: Keycloak OpenID client :rtype: KeycloakOpenID """ + connection = oid_connection_with_authz # Return UMA - yield KeycloakUMA( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name=realm - ) + yield KeycloakUMA(connection=connection) diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 0bb7dda..4b8e824 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -3,7 +3,9 @@ import copy from typing import Tuple +import freezegun import pytest +from dateutil import parser as datetime_parser import keycloak from keycloak import KeycloakAdmin, KeycloakOpenID @@ -22,31 +24,6 @@ def test_keycloak_version(): assert keycloak.__version__, keycloak.__version__ -def test_keycloak_admin_bad_init(env): - """Test keycloak admin bad init. - - :param env: Environment fixture - :type env: KeycloakTestEnv - """ - with pytest.raises(TypeError) as err: - KeycloakAdmin( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", - username=env.KEYCLOAK_ADMIN, - password=env.KEYCLOAK_ADMIN_PASSWORD, - auto_refresh_token=1, - ) - assert err.match("Expected a list of strings") - - with pytest.raises(TypeError) as err: - KeycloakAdmin( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", - username=env.KEYCLOAK_ADMIN, - password=env.KEYCLOAK_ADMIN_PASSWORD, - auto_refresh_token=["patch"], - ) - assert err.match("Unexpected method in auto_refresh_token") - - def test_keycloak_admin_init(env): """Test keycloak admin init. @@ -2187,94 +2164,66 @@ def test_events(admin: KeycloakAdmin, realm: str): assert events == list() -def test_auto_refresh(admin: KeycloakAdmin, realm: str): +@freezegun.freeze_time("2023-02-25 10:00:00") +def test_auto_refresh(admin_frozen: KeycloakAdmin, realm: str): """Test auto refresh token. - :param admin: Keycloak Admin client - :type admin: KeycloakAdmin + :param admin_frozen: Keycloak Admin client with time frozen in place + :type admin_frozen: KeycloakAdmin :param realm: Keycloak realm :type realm: str """ + admin = admin_frozen # Test get refresh - admin.auto_refresh_token = list() - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) + admin.connection.custom_headers = { + "Authorization": "Bearer bad", + "Content-Type": "application/json", + } with pytest.raises(KeycloakAuthenticationError) as err: admin.get_realm(realm_name=realm) assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - admin.auto_refresh_token = ["get"] - del admin.token["refresh_token"] - assert admin.get_realm(realm_name=realm) - - # Test bad refresh token - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) - admin.token["refresh_token"] = "bad" - with pytest.raises(KeycloakPostError) as err: - admin.get_realm(realm_name="test-refresh") - assert err.match( - '400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\'' - ) - admin.realm_name = "master" - admin.get_token() - admin.realm_name = realm + # Freeze time to simulate the access token expiring + with freezegun.freeze_time("2023-02-25 10:05:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:05:00") + assert admin.get_realm(realm_name=realm) + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:05:00") + + # Test bad refresh token, but first make sure access token has expired again + with freezegun.freeze_time("2023-02-25 10:10:00"): + admin.connection.custom_headers = {"Content-Type": "application/json"} + admin.connection.token["refresh_token"] = "bad" + with pytest.raises(KeycloakPostError) as err: + admin.get_realm(realm_name="test-refresh") + assert err.match( + '400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\'' + ) + admin.connection.get_token() # Test post refresh - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) - with pytest.raises(KeycloakAuthenticationError) as err: - admin.create_realm(payload={"realm": "test-refresh"}) - assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - - admin.auto_refresh_token = ["get", "post"] - admin.realm_name = "master" - admin.user_logout(user_id=admin.get_user_id(username=admin.username)) - assert admin.create_realm(payload={"realm": "test-refresh"}) == b"" - admin.realm_name = realm + with freezegun.freeze_time("2023-02-25 10:15:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:15:00") + admin.connection.token = None + assert admin.create_realm(payload={"realm": "test-refresh"}) == b"" + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:15:00") # Test update refresh - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) - with pytest.raises(KeycloakAuthenticationError) as err: - admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) - assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - - admin.auto_refresh_token = ["get", "post", "put"] - assert ( - admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict() - ) + with freezegun.freeze_time("2023-02-25 10:25:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00") + admin.connection.token = None + assert ( + admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) + == dict() + ) + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00") # Test delete refresh - admin.connection = ConnectionManager( - base_url=admin.server_url, - headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, - timeout=60, - verify=admin.verify, - ) - with pytest.raises(KeycloakAuthenticationError) as err: - admin.delete_realm(realm_name="test-refresh") - assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - - admin.auto_refresh_token = ["get", "post", "put", "delete"] - assert admin.delete_realm(realm_name="test-refresh") == dict() + with freezegun.freeze_time("2023-02-25 10:35:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:35:00") + admin.connection.token = None + assert admin.delete_realm(realm_name="test-refresh") == dict() + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00") def test_get_required_actions(admin: KeycloakAdmin, realm: str): diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index e1a1421..8f3825a 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -4,6 +4,7 @@ from unittest import mock import pytest +from keycloak import KeycloakAdmin, KeycloakOpenID from keycloak.authorization import Authorization from keycloak.authorization.permission import Permission from keycloak.authorization.policy import Policy @@ -17,8 +18,6 @@ from keycloak.exceptions import ( KeycloakPostError, KeycloakRPTNotFound, ) -from keycloak.keycloak_admin import KeycloakAdmin -from keycloak.keycloak_openid import KeycloakOpenID def test_keycloak_openid_init(env): diff --git a/tests/test_keycloak_uma.py b/tests/test_keycloak_uma.py index 2a1dde7..9808804 100644 --- a/tests/test_keycloak_uma.py +++ b/tests/test_keycloak_uma.py @@ -1,32 +1,27 @@ """Test module for KeycloakUMA.""" import re -from typing import Tuple import pytest -from keycloak import KeycloakOpenID -from keycloak.connection import ConnectionManager +from keycloak import KeycloakOpenIDConnection, KeycloakUMA from keycloak.exceptions import ( KeycloakDeleteError, KeycloakGetError, KeycloakPostError, KeycloakPutError, ) -from keycloak.keycloak_uma import KeycloakUMA -def test_keycloak_uma_init(env): +def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection): """Test KeycloakUMA's init method. - :param env: Environment fixture - :type env: KeycloakTestEnv + :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz + :type oid_connection_with_authz: KeycloakOpenIDConnection """ - uma = KeycloakUMA( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name="master" - ) + connection = oid_connection_with_authz + uma = KeycloakUMA(connection=connection) - assert uma.realm_name == "master" - assert isinstance(uma.connection, ConnectionManager) + assert isinstance(uma.connection, KeycloakOpenIDConnection) # should initially be empty assert uma._well_known is None assert uma.uma_well_known @@ -47,23 +42,14 @@ def test_uma_well_known(uma: KeycloakUMA): assert key in res -def test_uma_resource_sets( - uma: KeycloakUMA, oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] -): +def test_uma_resource_sets(uma: KeycloakUMA): """Test resource sets. :param uma: Keycloak UMA client :type uma: KeycloakUMA - :param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials - :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] """ - oid, _, _ = oid_with_credentials_authz - - token = oid.token(grant_type="client_credentials") - access_token = token["access_token"] - # Check that only the default resource is present - resource_sets = uma.resource_set_list(access_token) + resource_sets = uma.resource_set_list() resource_set_list = list(resource_sets) assert len(resource_set_list) == 1, resource_set_list assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] @@ -74,14 +60,14 @@ def test_uma_resource_sets( "scopes": ["test:read", "test:write"], "type": "urn:test", } - created_resource = uma.resource_set_create(access_token, resource_to_create) + created_resource = uma.resource_set_create(resource_to_create) assert created_resource assert created_resource["_id"], created_resource assert set(resource_to_create).issubset(set(created_resource)), created_resource # Test create the same resource set with pytest.raises(KeycloakPostError) as err: - uma.resource_set_create(access_token, resource_to_create) + uma.resource_set_create(resource_to_create) assert err.match( re.escape( '409: b\'{"error":"invalid_request","error_description":' @@ -90,31 +76,29 @@ def test_uma_resource_sets( ) # Test get resource set - latest_resource = uma.resource_set_read(access_token, created_resource["_id"]) + latest_resource = uma.resource_set_read(created_resource["_id"]) assert latest_resource["name"] == created_resource["name"] # Test update resource set latest_resource["name"] = "New Resource Name" - res = uma.resource_set_update(access_token, created_resource["_id"], latest_resource) + res = uma.resource_set_update(created_resource["_id"], latest_resource) assert res == dict(), res - updated_resource = uma.resource_set_read(access_token, created_resource["_id"]) + updated_resource = uma.resource_set_read(created_resource["_id"]) assert updated_resource["name"] == "New Resource Name" # Test update resource set fail with pytest.raises(KeycloakPutError) as err: - uma.resource_set_update( - token=access_token, resource_id=created_resource["_id"], payload={"wrong": "payload"} - ) + uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"}) assert err.match('400: b\'{"error":"Unrecognized field') # Test delete resource set - res = uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"]) + res = uma.resource_set_delete(resource_id=created_resource["_id"]) assert res == dict(), res with pytest.raises(KeycloakGetError) as err: - uma.resource_set_read(access_token, created_resource["_id"]) + uma.resource_set_read(created_resource["_id"]) err.match("404: b''") # Test delete fail with pytest.raises(KeycloakDeleteError) as err: - uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"]) + uma.resource_set_delete(resource_id=created_resource["_id"]) assert err.match("404: b''")