From 43c9ec2f8e5bc44185e93aaa57f675f6769f263f Mon Sep 17 00:00:00 2001 From: David Date: Fri, 10 May 2024 18:03:01 -0500 Subject: [PATCH] remove threading and create async function --- src/keycloak/connection.py | 102 + src/keycloak/keycloak_admin.py | 3262 ++++++++++++++++++++++++----- src/keycloak/openid_connection.py | 68 + 3 files changed, 2941 insertions(+), 491 deletions(-) diff --git a/src/keycloak/connection.py b/src/keycloak/connection.py index 0dfeaff..9a116c3 100644 --- a/src/keycloak/connection.py +++ b/src/keycloak/connection.py @@ -29,6 +29,7 @@ except ImportError: # pragma: no cover from urlparse import urljoin import requests +import httpx from requests.adapters import HTTPAdapter from .exceptions import KeycloakConnectionError @@ -86,6 +87,14 @@ class ConnectionManager(object): if proxies: self._s.proxies.update(proxies) + self.async_s = httpx.AsyncClient(verify=verify, proxies=proxies) + self.async_s.auth = None # don't let requests add auth headers + self.async_s.transport = httpx.AsyncHTTPTransport(retries=1) + + async def aclose(self): + if hasattr(self, "_s"): + await self.async_s.aclose() + def __del__(self): """Del method.""" if hasattr(self, "_s"): @@ -281,3 +290,96 @@ class ConnectionManager(object): ) except Exception as e: raise KeycloakConnectionError("Can't connect to server (%s)" % e) + + async def a_raw_get(self, path, **kwargs): + """Submit get request to the path. + + :param path: Path for request. + :type path: str + :param kwargs: Additional arguments + :type kwargs: dict + :returns: Response the request. + :rtype: Response + :raises KeycloakConnectionError: HttpError Can't connect to server. + """ + try: + return await self.async_s.get( + urljoin(self.base_url, path), + params=kwargs, + headers=self.headers, + timeout=self.timeout + ) + except Exception as e: + raise KeycloakConnectionError("Can't connect to server (%s)" % e) + + async def a_raw_post(self, path, data, **kwargs): + """Submit post request to the path. + + :param path: Path for request. + :type path: str + :param data: Payload for request. + :type data: dict + :param kwargs: Additional arguments + :type kwargs: dict + :returns: Response the request. + :rtype: Response + :raises KeycloakConnectionError: HttpError Can't connect to server. + """ + try: + return await self.async_s.post( + urljoin(self.base_url, path), + params=kwargs, + data=data, + headers=self.headers, + timeout=self.timeout + ) + except Exception as e: + raise KeycloakConnectionError("Can't connect to server (%s)" % e) + + async def a_raw_put(self, path, data, **kwargs): + """Submit put request to the path. + + :param path: Path for request. + :type path: str + :param data: Payload for request. + :type data: dict + :param kwargs: Additional arguments + :type kwargs: dict + :returns: Response the request. + :rtype: Response + :raises KeycloakConnectionError: HttpError Can't connect to server. + """ + try: + return await self.async_s.put( + urljoin(self.base_url, path), + params=kwargs, + data=data, + headers=self.headers, + timeout=self.timeout, + ) + except Exception as e: + raise KeycloakConnectionError("Can't connect to server (%s)" % e) + + async def a_raw_delete(self, path, data=None, **kwargs): + """Submit delete request to the path. + + :param path: Path for request. + :type path: str + :param data: Payload for request. + :type data: dict | None + :param kwargs: Additional arguments + :type kwargs: dict + :returns: Response the request. + :rtype: Response + :raises KeycloakConnectionError: HttpError Can't connect to server. + """ + try: + return await self.async_s.delete( + urljoin(self.base_url, path), + params=kwargs, + data=data or dict(), + headers=self.headers, + timeout=self.timeout, + ) + except Exception as e: + raise KeycloakConnectionError("Can't connect to server (%s)" % e) diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index dc54b9e..d8dc62f 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -204,18 +204,6 @@ class KeycloakAdmin: query = query or {} return raise_error_from_response(self.connection.raw_get(url, **query), KeycloakGetError) - def async_call(self, func_name, args): - func = getattr(self, func_name) - - if not callable(func): - return None - - #create a thread using threading and run func in that thread - thread = threading.Thread(target=func, args=args, daemon=True) - thread.start() - return thread - - def get_current_realm(self) -> str: """Return the currently configured realm. @@ -375,7 +363,7 @@ class KeycloakAdmin: data_raw = self.connection.raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path)) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def get_users(self, query=None, results=None): + def get_users(self, query=None): """Get all users. Return a list of users, filtered according to query parameters @@ -392,17 +380,10 @@ class KeycloakAdmin: params_path = {"realm-name": self.connection.realm_name} url = urls_patterns.URL_ADMIN_USERS.format(**params_path) - res = None - if "first" in query or "max" in query: - res = self.__fetch_paginated(url, query) - else: - res = self.__fetch_all(url, query) - - if results is not None: - results.append(res) + return self.__fetch_paginated(url, query) - return res + return self.__fetch_all(url, query) def create_idp(self, payload): """Create an ID Provider. @@ -4269,665 +4250,2964 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) - def a_get_users(self, query=None, results=None): - return self.async_call('get_users',(query,results)) - - def a_create_idp(self, payload): - self.async_call('create_idp',(payload,)) + #async functions start + async def a___fetch_all(self, url, query=None): + """Paginate over get requests. - def a_update_idp(self, idp_alias, payload): - self.async_call('update_idp',(idp_alias, payload,)) + Wrapper function to paginate GET requests. - def a_add_mapper_to_idp(self, idp_alias, payload): - self.async_call('add_mapper_to_idp',(idp_alias, payload,)) + :param url: The url on which the query is executed + :type url: str + :param query: Existing query parameters (optional) + :type query: dict - def a_update_mapper_in_idp(self, idp_alias, mapper_id, payload): - self.async_call('update_mapper_in_idp',(idp_alias,mapper_id, payload,)) + :return: Combined results of paginated queries + :rtype: list + """ + results = [] - def a_get_idp_mappers(self, idp_alias): - self.async_call('get_idp_mappers',(idp_alias,)) + # initialize query if it was called with None + if not query: + query = {} + page = 0 + query["max"] = self.PAGE_SIZE - def a_get_idps(self): - self.async_call('get_idps',()) + # fetch until we can + while True: + query["first"] = page * self.PAGE_SIZE + partial_results = raise_error_from_response( + await self.connection.a_raw_get(url, **query), KeycloakGetError + ) + if not partial_results: + break + results.extend(partial_results) + if len(partial_results) < query["max"]: + break + page += 1 + return results - def a_get_idp(self, idp_alias): - self.async_call('get_idp',(idp_alias,)) + async def a___fetch_paginated(self, url, query=None): + """Make a specific paginated request. - def a_delete_idp(self, idp_alias): - self.async_call('delete_idp',(idp_alias,)) + :param url: The url on which the query is executed + :type url: str + :param query: Pagination settings + :type query: dict + :returns: Response + :rtype: dict + """ + query = query or {} + return raise_error_from_response(await self.connection.a_raw_get(url, **query), KeycloakGetError) - def a_create_user(self, payload, exist_ok=False): - self.async_call('create_user',(payload, exist_ok,)) + def a_get_current_realm(self) -> str: + """Return the currently configured realm. - def a_users_count(self, query=None): - self.async_call('users_count',(query,)) + :returns: Currently configured realm name + :rtype: str + """ + return self.connection.realm_name - def a_get_user_id(self, username): - self.async_call('get_user_id',(username,)) + def a_change_current_realm(self, realm_name: str) -> None: + """Change the current realm. - def a_get_user(self, user_id): - self.async_call('get_user',(user_id,)) + :param realm_name: The name of the realm to be configured as current + :type realm_name: str + """ + self.connection.realm_name = realm_name - def a_get_user_groups(self, user_id, query=None, brief_representation=True): - self.async_call('get_user_groups',(user_id, query, brief_representation)) + async def a_import_realm(self, payload): + """Import a new realm from a RealmRepresentation. - def a_update_user(self, user_id, payload): - self.async_call('update_user',(user_id, payload,)) + Realm name must be unique. - def a_disable_user(self, user_id): - self.async_call('disable_user',(user_id,)) + RealmRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_realmrepresentation - def a_enable_user(self, user_id): - self.async_call('enable_user',(user_id,)) + :param payload: RealmRepresentation + :type payload: dict + :return: RealmRepresentation + :rtype: dict + """ + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) - def a_disable_all_users(self): - self.async_call('disable_all_users',()) + async def a_partial_import_realm(self, realm_name, payload): + """Partial import realm configuration from PartialImportRepresentation. - def a_enable_all_users(self): - self.async_call('enable_all_users',()) + Realm partialImport is used for modifying configuration of existing realm. - def a_delete_user(self, user_id): - self.async_call('delete_user',(user_id,)) + PartialImportRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_partialimportrepresentation - def a_set_user_password(self, user_id, password, temporary=True): - self.async_call('set_user_password',(user_id,password,temporary, )) + :param realm_name: Realm name (not the realm id) + :type realm_name: str + :param payload: PartialImportRepresentation + :type payload: dict - def a_get_credentials(self, user_id): - self.async_call('get_credentials',(user_id,)) + :return: PartialImportResponse + :rtype: dict + """ + params_path = {"realm-name": realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_REALM_PARTIAL_IMPORT.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200]) - def a_delete_credential(self, user_id, credential_id): - self.async_call('get_credentials',(user_id, credential_id,)) + async def a_export_realm(self, export_clients=False, export_groups_and_role=False): + """Export the realm configurations in the json format. - def a_user_logout(self, user_id): - self.async_call('user_logout',(user_id,)) + RealmRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_partialexport - def a_user_consents(self, user_id): - self.async_call('user_consents',(user_id,)) + :param export_clients: Skip if not want to export realm clients + :type export_clients: bool + :param export_groups_and_role: Skip if not want to export realm groups and roles + :type export_groups_and_role: bool - def a_get_user_social_logins(self, user_id): - self.async_call('get_user_social_logins',(user_id,)) + :return: realm configurations JSON + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "export-clients": export_clients, + "export-groups-and-roles": export_groups_and_role, + } + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_REALM_EXPORT.format(**params_path), data="" + ) + return raise_error_from_response(data_raw, KeycloakPostError) - def a_add_user_social_login(self, user_id, provider_id, provider_userid, provider_username): - self.async_call('add_user_social_login',(user_id, provider_id, provider_userid, provider_username)) + async def a_get_realms(self): + """List all realms in Keycloak deployment. - def a_delete_user_social_login(self, user_id, provider_id): - self.async_call('delete_user_social_login',(user_id,provider_id,)) + :return: realms list + :rtype: list + """ + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_REALMS) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_send_update_account(self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None): - self.async_call('send_update_account',(user_id, payload, client_id, lifespan, redirect_uri,)) + async def a_get_realm(self, realm_name): + """Get a specific realm. - def a_send_verify_email(self, user_id, client_id=None, redirect_uri=None): - self.async_call('send_verify_email',(user_id, client_id, redirect_uri,)) + RealmRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_realmrepresentation - def a_get_sessions(self, user_id): - self.async_call('get_sessions',(user_id,)) + :param realm_name: Realm name (not the realm id) + :type realm_name: str + :return: RealmRepresentation + :rtype: dict + """ + params_path = {"realm-name": realm_name} + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) - def a_get_server_info(self): - self.async_call('get_server_info',()) + async def a_create_realm(self, payload, skip_exists=False): + """Create a realm. - def a_get_groups(self, query=None, full_hierarchy=False): - self.async_call('get_groups',(query, full_hierarchy,)) + RealmRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_realmrepresentation - def a_get_group(self, group_id, full_hierarchy=False): - self.async_call('get_group',(group_id, full_hierarchy,)) + :param payload: RealmRepresentation + :type payload: dict + :param skip_exists: Skip if Realm already exist. + :type skip_exists: bool + :return: Keycloak server response (RealmRepresentation) + :rtype: dict + """ + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_REALMS, data=json.dumps(payload) + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) - def a_get_subgroups(self, group, path): - self.async_call('get_subgroups',(group, path,)) + async def a_update_realm(self, realm_name, payload): + """Update a realm. - def a_get_group_children(self, group_id, query=None, full_hierarchy=False): - self.async_call('get_group_children',( group_id, query, full_hierarchy,)) + This will only update top level attributes and will ignore any user, + role, or client information in the payload. - def a_get_group_members(self, group_id, query=None): - self.async_call('get_group_members',(group_id, query,)) + RealmRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_realmrepresentation - def a_get_group_by_path(self, path): - self.async_call('get_group_by_path',(path,)) + :param realm_name: Realm name (not the realm id) + :type realm_name: str + :param payload: RealmRepresentation + :type payload: dict + :return: Http response + :rtype: dict + """ + params_path = {"realm-name": realm_name} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_REALM.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) - def a_create_group(self, payload, parent=None, skip_exists=False): - self.async_call('get_group_by_path',(path,)) + async def a_delete_realm(self, realm_name): + """Delete a realm. - def a_update_group(self, group_id, payload): - self.async_call('update_group',(group_id, payload,)) + :param realm_name: Realm name (not the realm id) + :type realm_name: str + :return: Http response + :rtype: dict + """ + params_path = {"realm-name": realm_name} + data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def a_groups_count(self, query=None): - self.async_call('groups_count',(query,)) + async def a_get_users(self, query=None): + """Get all users. - def a_group_set_permissions(self, group_id, enabled=True): - self.async_call('group_set_permissions',( group_id, enabled ,)) + Return a list of users, filtered according to query parameters - def a_group_user_add(self, user_id, group_id): - self.async_call('group_user_add',( user_id, group_id,)) + UserRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userrepresentation - def a_group_user_remove(self, user_id, group_id): - self.async_call('group_user_remove',(user_id, group_id,)) + :param query: Query parameters (optional) + :type query: dict + :return: users list + :rtype: list + """ + query = query or {} + params_path = {"realm-name": self.connection.realm_name} + url = urls_patterns.URL_ADMIN_USERS.format(**params_path) - def a_delete_group(self, group_id): - self.async_call('delete_group',(group_id,)) + if "first" in query or "max" in query: + return await self.a___fetch_paginated(url, query) + + return await self.a___fetch_all(url, query) - def a_get_clients(self): - self.async_call('get_clients',()) + async def a_create_idp(self, payload): + """Create an ID Provider. - def a_get_client(self, client_id): - self.async_call('get_client',(client_id,)) + IdentityProviderRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_identityproviderrepresentation - def a_get_client_id(self, client_id): - self.async_call('get_client_id',(client_id,)) + :param: payload: IdentityProviderRepresentation + :type payload: dict + :returns: Keycloak server response + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_IDPS.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) - def a_get_client_authz_settings(self, client_id): - self.async_call('get_client_authz_settings',(client_id,)) + async def a_update_idp(self, idp_alias, payload): + """Update an ID Provider. - def a_create_client_authz_resource(self, client_id, payload, skip_exists=False): - self.async_call('create_client_authz_resource',(client_id, payload, skip_exists,)) + IdentityProviderRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_identity_providers_resource - def a_update_client_authz_resource(self, client_id, resource_id, payload): - self.async_call('update_client_authz_resource',(client_id, resource_id, payload,)) + :param: idp_alias: alias for IdP to update + :type idp_alias: str + :param: payload: The IdentityProviderRepresentation + :type payload: dict + :returns: Keycloak server response + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_IDP.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) - def a_delete_client_authz_resource(self, client_id: str, resource_id: str): - self.async_call('delete_client_authz_resource',(client_id, resource_id,)) + async def a_add_mapper_to_idp(self, idp_alias, payload): + """Create an ID Provider. - def a_get_client_authz_resource(self, client_id: str, resource_id: str): - self.async_call('get_client_authz_resource',(client_id, resource_id,)) + IdentityProviderRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_identityprovidermapperrepresentation - def a_create_client_authz_role_based_policy(self, client_id, payload, skip_exists=False): - self.async_call('create_client_authz_role_based_policy',( client_id, payload, skip_exists,)) + :param: idp_alias: alias for Idp to add mapper in + :type idp_alias: str + :param: payload: IdentityProviderMapperRepresentation + :type payload: dict + :returns: Keycloak server response + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "idp-alias": idp_alias} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) - def a_create_client_authz_policy(self, client_id, payload, skip_exists=False): - self.async_call('create_client_authz_policy',( client_id, payload, skip_exists,)) + async def a_update_mapper_in_idp(self, idp_alias, mapper_id, payload): + """Update an IdP mapper. - def a_create_client_authz_resource_based_permission(self, client_id, payload, skip_exists=False): - self.async_call('create_client_authz_resource_based_permission',( client_id, payload, skip_exists,)) + IdentityProviderMapperRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_update - def a_get_client_authz_scopes(self, client_id): - self.async_call('get_client_authz_scopes',( client_id,)) + :param: idp_alias: alias for Idp to fetch mappers + :type idp_alias: str + :param: mapper_id: Mapper Id to update + :type mapper_id: str + :param: payload: IdentityProviderMapperRepresentation + :type payload: dict + :return: Http response + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "idp-alias": idp_alias, + "mapper-id": mapper_id, + } - def a_create_client_authz_scopes(self, client_id, payload): - self.async_call('create_client_authz_scopes',( client_id,payload)) + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_IDP_MAPPER_UPDATE.format(**params_path), + data=json.dumps(payload), + ) - def a_get_client_authz_permissions(self, client_id): - self.async_call('get_client_authz_permissions',( client_id,)) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) - def a_get_client_authz_policies(self, client_id): - self.async_call('get_client_authz_policies',( client_id,)) + async def a_get_idp_mappers(self, idp_alias): + """Get IDP mappers. - def a_delete_client_authz_policy(self, client_id, policy_id): - self.async_call('delete_client_authz_policy',( client_id,policy_id,)) + Returns a list of ID Providers mappers - def a_get_client_authz_policy(self, client_id, policy_id): - self.async_call('get_client_authz_policy',( client_id,policy_id,)) + IdentityProviderMapperRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getmappers - def a_get_client_service_account_user(self, client_id): - self.async_call('get_client_service_account_user',( client_id,)) + :param: idp_alias: alias for Idp to fetch mappers + :type idp_alias: str + :return: array IdentityProviderMapperRepresentation + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "idp-alias": idp_alias} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_IDP_MAPPERS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + async def a_get_idps(self): + """Get IDPs. - def a_get_client_default_client_scopes(self, client_id): - self.async_call('get_client_default_client_scopes',( client_id,)) + Returns a list of ID Providers, - def a_add_client_default_client_scope(self, client_id, client_scope_id, payload): - self.async_call('add_client_default_client_scope',( client_id,client_scope_id, payload)) + IdentityProviderRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_identityproviderrepresentation - def a_delete_client_default_client_scope(self, client_id, client_scope_id): - self.async_call('delete_client_default_client_scope',( client_id,client_scope_id,)) + :return: array IdentityProviderRepresentation + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_get_client_optional_client_scopes(self, client_id): - self.async_call('get_client_optional_client_scopes',( client_id,)) + async def a_get_idp(self, idp_alias): + """Get IDP provider. - def a_add_client_optional_client_scope(self, client_id, client_scope_id, payload): - self.async_call('add_client_optional_client_scope',( client_id,client_scope_id, payload)) + Get the representation of a specific IDP Provider. - def delete_client_optional_client_scope(self, client_id, client_scope_id,): - self.async_call('delete_client_optional_client_scope',( client_id,client_scope_id,)) + IdentityProviderRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_identityproviderrepresentation - def a_create_initial_access_token(self, count: int = 1, expiration: int = 1): - self.async_call('create_initial_access_token',( count,expiration,)) + :param: idp_alias: alias for IdP to get + :type idp_alias: str + :return: IdentityProviderRepresentation + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias} + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_IDP.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_create_client(self, payload, skip_exists=False): - self.async_call('create_client',( payload,skip_exists,)) + async def a_delete_idp(self, idp_alias): + """Delete an ID Provider. - def a_update_client(self, client_id, payload): - self.async_call('update_client',( client_id,payload,)) + :param: idp_alias: idp alias name + :type idp_alias: str + :returns: Keycloak server response + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias} + data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_IDP.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def a_delete_client(self, client_id): - self.async_call('delete_client',( client_id,)) + async def a_create_user(self, payload, exist_ok=False): + """Create a new user. - def a_get_client_installation_provider(self, client_id, provider_id): - self.async_call('get_client_installation_provider',( client_id,provider_id,)) + Username must be unique - def a_get_realm_roles(self, brief_representation=True, search_text=""): - self.async_call('get_realm_roles',(brief_representation, search_text,)) + UserRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userrepresentation - def a_get_realm_role_groups(self, role_name, query=None, brief_representation=True): - self.async_call('get_realm_role_groups',(role_name, query, brief_representation,)) + :param payload: UserRepresentation + :type payload: dict + :param exist_ok: If False, raise KeycloakGetError if username already exists. + Otherwise, return existing user ID. + :type exist_ok: bool - def a_get_realm_role_members(self, role_name, query=None): - self.async_call('get_realm_role_members',(role_name, query,)) + :return: user_id + :rtype: str + """ + params_path = {"realm-name": self.connection.realm_name} - def a_get_default_realm_role_id(self): - self.async_call('get_default_realm_role_id',()) + if exist_ok: + exists = self.get_user_id(username=payload["username"]) - def a_get_realm_default_roles(self): - self.async_call('get_realm_default_roles',()) + if exists is not None: + return str(exists) - def a_remove_realm_default_roles(self, payload): - self.async_call('remove_realm_default_roles',(payload,)) + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_USERS.format(**params_path), data=json.dumps(payload) + ) + raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 - def a_add_realm_default_roles(self, payload): - self.async_call('add_realm_default_roles',(payload,)) + async def a_users_count(self, query=None): + """Count users. - def a_get_client_roles(self, client_id, brief_representation=True): - self.async_call('get_client_roles',(client_id, brief_representation,)) + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_users_resource - def a_get_client_role(self, client_id, role_name): - self.async_call('get_client_role',( client_id, role_name,)) + :param query: (dict) Query parameters for users count + :type query: dict - def a_get_client_role_id(self, client_id, role_name): - self.async_call('get_client_role_id',( client_id, role_name,)) + :return: counter + :rtype: int + """ + query = query or dict() + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USERS_COUNT.format(**params_path), **query + ) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_create_client_role(self, client_role_id, payload, skip_exists=False): - self.async_call('create_client_role',(client_role_id, payload, skip_exists,)) + async def a_get_user_id(self, username): + """Get internal keycloak user id from username. - def a_add_composite_client_roles_to_role(self, client_role_id, role_name, roles): - self.async_call('add_composite_client_roles_to_role',(client_role_id, role_name, roles,)) + This is required for further actions against this user. - def a_update_client_role(self, client_id, role_name, payload): - self.async_call('update_client_role',(client_id, role_name, payload,)) + UserRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userrepresentation - def a_delete_client_role(self, client_role_id, role_name): - self.async_call('delete_client_role',( client_role_id, role_name,)) + :param username: id in UserRepresentation + :type username: str - def a_assign_client_role(self, user_id, client_id, roles): - self.async_call('assign_client_role',(user_id, client_id, roles,)) + :return: user_id + :rtype: str + """ + lower_user_name = username.lower() + users = await self.a_get_users(query={"username": lower_user_name, "max": 1, "exact": True}) + return users[0]["id"] if len(users) == 1 else None - def a_get_client_role_members(self, client_id, role_name, **query): - self.async_call('get_client_role_members',(client_id, role_name, query,)) + async def a_get_user(self, user_id): + """Get representation of the user. - def a_get_client_role_groups(self, client_id, role_name, **query): - self.async_call('get_client_role_groups',(client_id, role_name, query,)) + UserRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userrepresentation - def a_get_role_by_id(self, role_id): - self.async_call('get_role_by_id',(role_id,)) + :param user_id: User id + :type user_id: str + :return: UserRepresentation + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_USER.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_update_role_by_id(self, role_id, payload): - self.async_call('update_role_by_id',(role_id, payload,)) + async def a_get_user_groups(self, user_id, query=None, brief_representation=True): + """Get user groups. - def a_delete_role_by_id(self, role_id): - self.async_call('delete_role_by_id',(role_id,)) + Returns a list of groups of which the user is a member - def a_create_realm_role(self, payload, skip_exists=False): - self.async_call('create_realm_role',(payload, skip_exists,)) + :param user_id: User id + :type user_id: str + :param query: Additional query options + :type query: dict + :param brief_representation: whether to omit attributes in the response + :type brief_representation: bool + :return: user groups list + :rtype: list + """ + query = query or {} - def a_get_realm_role(self, role_name): - self.async_call('get_realm_role',(role_name,)) + params = {"briefRepresentation": brief_representation} - def a_get_realm_role_by_id(self, role_id: str): - self.async_call('get_realm_role_by_id',(role_id,)) + query.update(params) - def update_realm_role(self, role_name, payload): - self.async_call('update_realm_role',(role_name, payload,)) + params_path = {"realm-name": self.connection.realm_name, "id": user_id} - def a_delete_realm_role(self, role_name): - self.async_call('delete_realm_role',(role_name,)) + url = urls_patterns.URL_ADMIN_USER_GROUPS.format(**params_path) - def a_add_composite_realm_roles_to_role(self, role_name, roles): - self.async_call('add_composite_realm_roles_to_role',(role_name,roles)) + if "first" in query or "max" in query: + return await self.a___fetch_paginated(url, query) - def a_remove_composite_realm_roles_to_role(self, role_name, roles): - self.async_call('remove_composite_realm_roles_to_role',(role_name,roles)) + return await self.a___fetch_all(url, query) - def a_get_composite_realm_roles_of_role(self, role_name): - self.async_call('get_composite_realm_roles_of_role',(role_name,)) + async def a_update_user(self, user_id, payload): + """Update the user. - def a_assign_realm_roles_to_client_scope(self, client_id, roles): - self.async_call('assign_realm_roles_to_client_scope',( client_id, roles,)) + :param user_id: User id + :type user_id: str + :param payload: UserRepresentation + :type payload: dict - def a_delete_realm_roles_of_client_scope(self, client_id, roles): - self.async_call('delete_realm_roles_of_client_scope',( client_id, roles,)) + :return: Http response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_USER.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) - def a_get_realm_roles_of_client_scope(self, client_id): - self.async_call('get_realm_roles_of_client_scope',( client_id,)) + async def a_disable_user(self, user_id): + """Disable the user from the realm. Disabled users can not log in. - def a_assign_client_roles_to_client_scope(self, client_id, client_roles_owner_id, roles): - self.async_call('assign_client_roles_to_client_scope',(client_id, client_roles_owner_id, roles,)) + :param user_id: User id + :type user_id: str - def a_delete_client_roles_of_client_scope(self, client_id, client_roles_owner_id, roles): - self.async_call('delete_client_roles_of_client_scope',(client_id, client_roles_owner_id, roles,)) + :return: Http response + :rtype: bytes + """ + return await self.a_update_user(user_id=user_id, payload={"enabled": False}) - def a_get_client_roles_of_client_scope(self, client_id, client_roles_owner_id): - self.async_call('get_client_roles_of_client_scope',(client_id, client_roles_owner_id,)) + async def a_enable_user(self, user_id): + """Enable the user from the realm. - def a_assign_realm_roles(self, user_id, roles): - self.async_call('assign_realm_roles',(client_id,roles,)) + :param user_id: User id + :type user_id: str - def a_delete_realm_roles_of_user(self, user_id, roles): - self.async_call('delete_realm_roles_of_user',(user_id, roles,)) + :return: Http response + :rtype: bytes + """ + return await self.a_update_user(user_id=user_id, payload={"enabled": True}) - def a_get_realm_roles_of_user(self, user_id): - self.async_call('get_realm_roles_of_user',(user_id,)) + async def a_disable_all_users(self): + """Disable all existing users.""" + users = await self.a_get_users() + for user in users: + user_id = user["id"] + await self.a_disable_user(user_id=user_id) - def a_get_available_realm_roles_of_user(self, user_id): - self.async_call('get_available_realm_roles_of_user',(user_id,)) + async def a_enable_all_users(self): + """Disable all existing users.""" + users = await self.a_get_users() + for user in users: + user_id = user["id"] + await self.a_enable_user(user_id=user_id) - def a_get_composite_realm_roles_of_user(self, user_id, brief_representation=True): - self.async_call('get_composite_realm_roles_of_user',(user_id, brief_representation,)) + async def a_delete_user(self, user_id): + """Delete the user. - def a_assign_group_realm_roles(self, group_id, roles): - self.async_call('assign_group_realm_roles',(group_id, roles,)) + :param user_id: User id + :type user_id: str + :return: Http response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_USER.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def a_delete_group_realm_roles(self, group_id, roles): - self.async_call('delete_group_realm_roles',(group_id, roles,)) + async def a_set_user_password(self, user_id, password, temporary=True): + """Set up a password for the user. - def a_get_group_realm_roles(self, group_id, brief_representation=True): - self.async_call('get_group_realm_roles',(group_id, brief_representation,)) + If temporary is True, the user will have to reset + the temporary password next time they log in. - def a_assign_group_client_roles(self, group_id, client_id, roles): - - pass + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_users_resource + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_credentialrepresentation - def a_get_group_client_roles(self, group_id, client_id): - - pass + :param user_id: User id + :type user_id: str + :param password: New password + :type password: str + :param temporary: True if password is temporary + :type temporary: bool + :returns: Response + :rtype: dict + """ + payload = {"type": "password", "temporary": temporary, "value": password} + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_RESET_PASSWORD.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) - def a_delete_group_client_roles(self, group_id, client_id, roles): - - pass + async def a_get_credentials(self, user_id): + """Get user credentials. - def a_get_all_roles_of_user(self, user_id): - - pass + Returns a list of credential belonging to the user. - def a_get_client_roles_of_user(self, user_id, client_id): - - pass + CredentialRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_credentialrepresentation - def a_get_available_client_roles_of_user(self, user_id, client_id): - - pass + :param: user_id: user id + :type user_id: str + :returns: Keycloak server response (CredentialRepresentation) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_CREDENTIALS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_get_composite_client_roles_of_user(self, user_id, client_id, brief_representation=False): - - pass + async def a_delete_credential(self, user_id, credential_id): + """Delete credential of the user. - def a__get_client_roles_of_user( - self, client_level_role_mapping_url, user_id, client_id, **params - ): - pass + CredentialRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_credentialrepresentation - def a_delete_client_roles_of_user(self, user_id, client_id, roles): - - pass + :param: user_id: user id + :type user_id: str + :param: credential_id: credential id + :type credential_id: str + :return: Keycloak server response (ClientRepresentation) + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "credential_id": credential_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_USER_CREDENTIAL.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) - def a_get_authentication_flows(self): + async def a_user_logout(self, user_id): + """Log out the user. - pass + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_logout - def a_get_authentication_flow_for_id(self, flow_id): - - pass + :param user_id: User id + :type user_id: str + :returns: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_USER_LOGOUT.format(**params_path), data="" + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) - def a_create_authentication_flow(self, payload, skip_exists=False): + async def a_user_consents(self, user_id): + """Get consents granted by the user. - pass + UserConsentRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userconsentrepresentation - def a_copy_authentication_flow(self, payload, flow_alias): - pass + :param user_id: User id + :type user_id: str + :returns: List of UserConsentRepresentations + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_CONSENTS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_delete_authentication_flow(self, flow_id): - - pass + async def a_get_user_social_logins(self, user_id): + """Get user social logins. - def a_get_authentication_flow_executions(self, flow_alias): - - pass + Returns a list of federated identities/social logins of which the user has been associated + with + :param user_id: User id + :type user_id: str + :returns: Federated identities list + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITIES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_update_authentication_flow_executions(self, payload, flow_alias): - - pass + async def a_add_user_social_login(self, user_id, provider_id, provider_userid, provider_username): + """Add a federated identity / social login provider to the user. - def a_get_authentication_flow_execution(self, execution_id): - - pass + :param user_id: User id + :type user_id: str + :param provider_id: Social login provider id + :type provider_id: str + :param provider_userid: userid specified by the provider + :type provider_userid: str + :param provider_username: username specified by the provider + :type provider_username: str + :returns: Keycloak server response + :rtype: bytes + """ + payload = { + "identityProvider": provider_id, + "userId": provider_userid, + "userName": provider_username, + } + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "provider": provider_id, + } + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201, 204]) - def a_create_authentication_flow_execution(self, payload, flow_alias): - - pass + async def a_delete_user_social_login(self, user_id, provider_id): + """Delete a federated identity / social login provider from the user. - def a_delete_authentication_flow_execution(self, execution_id): - - pass - - def a_create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): - - pass - - def a_get_authenticator_providers(self): - - pass + :param user_id: User id + :type user_id: str + :param provider_id: Social login provider id + :type provider_id: str + :returns: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "provider": provider_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def a_get_authenticator_provider_config_description(self, provider_id): - - pass + async def a_send_update_account( + self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None + ): + """Send an update account email to the user. - def a_get_authenticator_config(self, config_id): - - pass + An email contains a link the user can click to perform a set of required actions. - def a_update_authenticator_config(self, payload, config_id): - - pass + :param user_id: User id + :type user_id: str + :param payload: A list of actions for the user to complete + :type payload: list + :param client_id: Client id (optional) + :type client_id: str + :param lifespan: Number of seconds after which the generated token expires (optional) + :type lifespan: int + :param redirect_uri: The redirect uri (optional) + :type redirect_uri: str - def a_delete_authenticator_config(self, config_id): - - pass + :returns: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), + data=json.dumps(payload), + **params_query, + ) + return raise_error_from_response(data_raw, KeycloakPutError) - def a_sync_users(self, storage_id, action): - - pass + async def a_send_verify_email(self, user_id, client_id=None, redirect_uri=None): + """Send a update account email to the user. - def a_get_client_scopes(self): - - pass + An email contains a link the user can click to perform a set of required actions. - def a_get_client_scope(self, client_scope_id): - - pass + :param user_id: User id + :type user_id: str + :param client_id: Client id (optional) + :type client_id: str + :param redirect_uri: Redirect uri (optional) + :type redirect_uri: str - def a_get_client_scope_by_name(self, client_scope_name): - - pass + :returns: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + params_query = {"client_id": client_id, "redirect_uri": redirect_uri} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path), + data={}, + **params_query, + ) + return raise_error_from_response(data_raw, KeycloakPutError) - def a_create_client_scope(self, payload, skip_exists=False): - - pass + async def a_get_sessions(self, user_id): + """Get sessions associated with the user. - def a_update_client_scope(self, client_scope_id, payload): - - pass + UserSessionRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_usersessionrepresentation - def a_delete_client_scope(self, client_scope_id): - - pass + :param user_id: Id of user + :type user_id: str + :return: UserSessionRepresentation + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_GET_SESSIONS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_get_mappers_from_client_scope(self, client_scope_id): - - pass + async def a_get_server_info(self): + """Get themes, social providers, etc. on this server. - def a_add_mapper_to_client_scope(self, client_scope_id, payload): - - pass + ServerInfoRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_serverinforepresentation - def a_delete_mapper_from_client_scope(self, client_scope_id, protocol_mapper_id): - - pass + :return: ServerInfoRepresentation + :rtype: dict + """ + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_SERVER_INFO) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_update_mapper_in_client_scope(self, client_scope_id, protocol_mapper_id, payload): - - pass + async def a_get_groups(self, query=None, full_hierarchy=False): + """Get groups. - def a_get_default_default_client_scopes(self): - - pass + Returns a list of groups belonging to the realm - def a_delete_default_default_client_scope(self, scope_id): - - pass + GroupRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_grouprepresentation - def a_add_default_default_client_scope(self, scope_id): - - pass + Notice that when using full_hierarchy=True, the response will be a nested structure + containing all the children groups. If used with query parameters, the full_hierarchy + will be applied to the received groups only. - def a_delete_default_optional_client_scope(self, scope_id): - - pass + :param query: Additional query options + :type query: dict + :param full_hierarchy: If True, return all of the nested children groups, otherwise only + the first level children are returned + :type full_hierarchy: bool + :return: array GroupRepresentation + :rtype: list + """ + query = query or {} + params_path = {"realm-name": self.connection.realm_name} + url = urls_patterns.URL_ADMIN_GROUPS.format(**params_path) - def a_add_default_optional_client_scope(self, scope_id): - - pass + if "first" in query or "max" in query: + groups = await self.a___fetch_paginated(url, query) + else: + groups = await self.a___fetch_all(url, query) - def a_get_mappers_from_client(self, client_id): - - pass + # For version +23.0.0 + for group in groups: + if group.get("subGroupCount"): + group["subGroups"] = await self.a_get_group_children( + group_id=group.get("id"), full_hierarchy=full_hierarchy + ) - def a_add_mapper_to_client(self, client_id, payload): - - pass + return groups - def a_update_client_mapper(self, client_id, mapper_id, payload): - - pass + async def a_get_group(self, group_id, full_hierarchy=False): + """Get group by id. - def a_remove_client_mapper(self, client_id, client_mapper_id): - - pass + Returns full group details - def a_generate_client_secrets(self, client_id): - - pass + GroupRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_grouprepresentation - def a_get_client_secrets(self, client_id): - - pass + :param group_id: The group id + :type group_id: str + :param full_hierarchy: If True, return all of the nested children groups, otherwise only + the first level children are returned + :type full_hierarchy: bool + :return: Keycloak server response (GroupRepresentation) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + response = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) - def a_get_components(self, query=None): - - pass + if response.status_code >= 400: + return raise_error_from_response(response, KeycloakGetError) - def a_create_component(self, payload): - pass + # For version +23.0.0 + group = response.json() + if group.get("subGroupCount"): + group["subGroups"] = await self.a_get_group_children( + group.get("id"), full_hierarchy=full_hierarchy + ) - def a_get_component(self, component_id): - - pass + return group - def a_update_component(self, component_id, payload): - - pass + async def a_get_subgroups(self, group, path): + """Get subgroups. - def a_delete_component(self, component_id): - - pass + Utility function to iterate through nested group structures - def a_get_keys(self): - - pass + GroupRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_grouprepresentation - def a_get_admin_events(self, query=None): - - pass - def a_get_events(self, query=None): - - pass + :param group: group (GroupRepresentation) + :type group: dict + :param path: group path (string) + :type path: str + :return: Keycloak server response (GroupRepresentation) + :rtype: dict + """ + for subgroup in group["subGroups"]: + if subgroup["path"] == path: + return subgroup + elif subgroup["subGroups"]: + for subgroup in group["subGroups"]: + result = await self.a_get_subgroups(subgroup, path) + if result: + return result + # went through the tree without hits + return None - def a_set_events(self, payload): - - pass + async def a_get_group_children(self, group_id, query=None, full_hierarchy=False): + """Get group children by parent id. - def a_get_client_all_sessions(self, client_id): - - pass + Returns full group children details - def a_get_client_sessions_stats(self): - - pass + :param group_id: The parent group id + :type group_id: str + :param query: Additional query options + :type query: dict + :param full_hierarchy: If True, return all of the nested children groups + :type full_hierarchy: bool + :return: Keycloak server response (GroupRepresentation) + :rtype: dict + :raises ValueError: If both query and full_hierarchy parameters are used + """ + query = query or {} + if query and full_hierarchy: + raise ValueError("Cannot use both query and full_hierarchy parameters") - def a_get_client_management_permissions(self, client_id): - - pass + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + url = urls_patterns.URL_ADMIN_GROUP_CHILD.format(**params_path) + if "first" in query or "max" in query: + return await self.a___fetch_paginated(url, query) + res = await self.a___fetch_all(url, query) - def a_update_client_management_permissions(self, payload, client_id): - - pass + if not full_hierarchy: + return res - def a_get_client_authz_policy_scopes(self, client_id, policy_id): - - pass + for group in res: + if group.get("subGroupCount"): + group["subGroups"] = await self.a_get_group_children( + group_id=group.get("id"), full_hierarchy=full_hierarchy + ) - def a_get_client_authz_policy_resources(self, client_id, policy_id): - - pass + return res - def a_get_client_authz_scope_permission(self, client_id, scope_id): - - pass + async def a_get_group_members(self, group_id, query=None): + """Get members by group id. - def a_create_client_authz_scope_permission(self, payload, client_id): - - pass + Returns group members - def a_update_client_authz_scope_permission(self, payload, client_id, scope_id): - - pass + GroupRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_userrepresentation - def a_get_client_authz_client_policies(self, client_id): - - pass + :param group_id: The group id + :type group_id: str + :param query: Additional query parameters + (see https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getmembers) + :type query: dict + :return: Keycloak server response (UserRepresentation) + :rtype: list + """ + query = query or {} + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + url = urls_patterns.URL_ADMIN_GROUP_MEMBERS.format(**params_path) - def a_create_client_authz_client_policy(self, payload, client_id): - - pass + if "first" in query or "max" in query: + return await self.a___fetch_paginated(url, query) - def a_get_composite_client_roles_of_group(self, client_id, group_id, brief_representation=True): - - pass + return await self.a___fetch_all(url, query) - def a_get_role_client_level_children(self, client_id, role_id): - - pass + async def a_get_group_by_path(self, path): + """Get group id based on name or path. - def a_upload_certificate(self, client_id, certcont): - - pass + Returns full group details for a group defined by path - def a_get_required_action_by_alias(self, action_alias): - - pass + GroupRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_grouprepresentation - def a_get_required_actions(self): - - pass + :param path: group path + :type path: str + :return: Keycloak server response (GroupRepresentation) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "path": path} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_update_required_action(self, action_alias, payload): - - pass + async def a_create_group(self, payload, parent=None, skip_exists=False): + """Create a group in the Realm. - def a_get_bruteforce_detection_status(self, user_id): - - pass + GroupRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_grouprepresentation - def a_clear_bruteforce_attempts_for_user(self, user_id): - - pass + :param payload: GroupRepresentation + :type payload: dict + :param parent: parent group's id. Required to create a sub-group. + :type parent: str + :param skip_exists: If true then do not raise an error if it already exists + :type skip_exists: bool + + :return: Group id for newly created group or None for an existing group + :rtype: str + """ + if parent is None: + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_GROUPS.format(**params_path), data=json.dumps(payload) + ) + else: + params_path = {"realm-name": self.connection.realm_name, "id": parent} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_GROUP_CHILD.format(**params_path), data=json.dumps(payload) + ) + + raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + try: + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + except KeyError: + return + + async def a_update_group(self, group_id, payload): + """Update group, ignores subgroups. + + GroupRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/#_grouprepresentation + + :param group_id: id of group + :type group_id: str + :param payload: GroupRepresentation with updated information. + :type payload: dict + + :return: Http response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_GROUP.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_groups_count(self, query=None): + """Count groups. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_groups + + :param query: (dict) Query parameters for groups count + :type query: dict + + :return: Keycloak Server Response + :rtype: dict + """ + query = query or dict() + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_GROUPS_COUNT.format(**params_path), **query + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_group_set_permissions(self, group_id, enabled=True): + """Enable/Disable permissions for a group. + + Cannot delete group if disabled + + :param group_id: id of group + :type group_id: str + :param enabled: Enabled flag + :type enabled: bool + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_GROUP_PERMISSIONS.format(**params_path), + data=json.dumps({"enabled": enabled}), + ) + return raise_error_from_response(data_raw, KeycloakPutError) + + async def a_group_user_add(self, user_id, group_id): + """Add user to group (user_id and group_id). + + :param user_id: id of user + :type user_id: str + :param group_id: id of group to add to + :type group_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "group-id": group_id, + } + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path), data=None + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_group_user_remove(self, user_id, group_id): + """Remove user from group (user_id and group_id). + + :param user_id: id of user + :type user_id: str + :param group_id: id of group to remove from + :type group_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "group-id": group_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_USER_GROUP.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_delete_group(self, group_id): + """Delete a group in the Realm. + + :param group_id: id of group to delete + :type group_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_clients(self): + """Get clients. + + Returns a list of clients belonging to the realm + + ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :return: Keycloak server response (ClientRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw =await self.connection.a_raw_get(urls_patterns.URL_ADMIN_CLIENTS.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client(self, client_id): + """Get representation of the client. + + ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :param client_id: id of client (not client-id) + :type client_id: str + :return: Keycloak server response (ClientRepresentation) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw =await self.connection.a_raw_get(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_id(self, client_id): + """Get internal keycloak client id from client-id. + + This is required for further actions against this client. + + :param client_id: clientId in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: client_id (uuid as string) + :rtype: str + """ + params_path = {"realm-name": self.connection.realm_name, "client-id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENTS_CLIENT_ID.format(**params_path) + ) + data_response = raise_error_from_response(data_raw, KeycloakGetError) + + for client in data_response: + if client_id == client.get("clientId"): + return client["id"] + + return None + + async def a_get_client_authz_settings(self, client_id): + """Get authorization json from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_create_client_authz_resource(self, client_id, payload, skip_exists=False): + """Create resources of client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param payload: ResourceRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation + :type payload: dict + :param skip_exists: Skip the creation in case the resource exists + :type skip_exists: bool + + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + + async def a_update_client_authz_resource(self, client_id, resource_id, payload): + """Update resource of client. + + Any parameter missing from the ResourceRepresentation in the payload WILL be set + to default by the Keycloak server. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param payload: ResourceRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation + :type payload: dict + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param resource_id: id in ResourceRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation + :type resource_id: str + + :return: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "resource-id": resource_id, + } + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_client_authz_resource(self, client_id: str, resource_id: str): + """Delete a client resource. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param resource_id: id in ResourceRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation + :type resource_id: str + + :return: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "resource-id": resource_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def get_client_authz_resources(self, client_id): + """Get resources from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response (ResourceRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_authz_resource(self, client_id: str, resource_id: str): + """Get a client resource. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param resource_id: id in ResourceRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation + :type resource_id: str + + :return: Keycloak server response (ResourceRepresentation) + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "resource-id": resource_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) + + async def a_create_client_authz_role_based_policy(self, client_id, payload, skip_exists=False): + """Create role-based policy of client. + + Payload example:: + + payload={ + "type": "role", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "name": "Policy-1", + "roles": [ + { + "id": id + } + ] + } + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param payload: No Document + :type payload: dict + :param skip_exists: Skip creation in case the object exists + :type skip_exists: bool + :return: Keycloak server response + :rtype: bytes + + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + + async def a_create_client_authz_policy(self, client_id, payload, skip_exists=False): + """Create an authz policy of client. + + Payload example:: + + payload={ + "name": "Policy-time-based", + "type": "time", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "hourEnd": "18", + "hour": "9" + } + } + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param payload: No Document + :type payload: dict + :param skip_exists: Skip creation in case the object exists + :type skip_exists: bool + :return: Keycloak server response + :rtype: bytes + + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICIES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + + async def a_create_client_authz_resource_based_permission(self, client_id, payload, skip_exists=False): + """Create resource-based permission of client. + + Payload example:: + + payload={ + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "name": "Permission-Name", + "resources": [ + resource_id + ], + "policies": [ + policy_id + ] + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param payload: PolicyRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation + :type payload: dict + :param skip_exists: Skip creation in case the object already exists + :type skip_exists: bool + :return: Keycloak server response + :rtype: bytes + + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + + async def a_get_client_authz_scopes(self, client_id): + """Get scopes from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_create_client_authz_scopes(self, client_id, payload): + """Create scopes for client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :param payload: ScopeRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_ScopeRepresentation + :type payload: dict + :type client_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + async def a_get_client_authz_permissions(self, client_id): + """Get permissions from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_authz_policies(self, client_id): + """Get policies from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICIES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_delete_client_authz_policy(self, client_id, policy_id): + """Delete a policy from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param policy_id: id in PolicyRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation + :type policy_id: str + :return: Keycloak server response + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "policy-id": policy_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_client_authz_policy(self, client_id, policy_id): + """Get a policy from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param policy_id: id in PolicyRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation + :type policy_id: str + :return: Keycloak server response + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "policy-id": policy_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_service_account_user(self, client_id): + """Get service account user from client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: UserRepresentation + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_default_client_scopes(self, client_id): + """Get all default client scopes from client. + + :param client_id: id of the client in which the new default client scope should be added + :type client_id: str + + :return: list of client scopes with id and name + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_add_client_default_client_scope(self, client_id, client_scope_id, payload): + """Add a client scope to the default client scopes from client. + + Payload example:: + + payload={ + "realm":"testrealm", + "client":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "clientScopeId":"bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" + } + + :param client_id: id of the client in which the new default client scope should be added + :type client_id: str + :param client_scope_id: id of the new client scope that should be added + :type client_scope_id: str + :param payload: dictionary with realm, client and clientScopeId + :type payload: dict + + :return: Http response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "client_scope_id": client_scope_id, + } + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError) + + async def a_delete_client_default_client_scope(self, client_id, client_scope_id): + """Delete a client scope from the default client scopes of the client. + + :param client_id: id of the client in which the default client scope should be deleted + :type client_id: str + :param client_scope_id: id of the client scope that should be deleted + :type client_scope_id: str + + :return: list of client scopes with id and name + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "client_scope_id": client_scope_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) + + async def a_get_client_optional_client_scopes(self, client_id): + """Get all optional client scopes from client. + + :param client_id: id of the client in which the new optional client scope should be added + :type client_id: str + + :return: list of client scopes with id and name + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_add_client_optional_client_scope(self, client_id, client_scope_id, payload): + """Add a client scope to the optional client scopes from client. + + Payload example:: + + payload={ + "realm":"testrealm", + "client":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "clientScopeId":"bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" + } + + :param client_id: id of the client in which the new optional client scope should be added + :type client_id: str + :param client_scope_id: id of the new client scope that should be added + :type client_scope_id: str + :param payload: dictionary with realm, client and clientScopeId + :type payload: dict + + :return: Http response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "client_scope_id": client_scope_id, + } + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError) + + async def a_delete_client_optional_client_scope(self, client_id, client_scope_id): + """Delete a client scope from the optional client scopes of the client. + + :param client_id: id of the client in which the optional client scope should be deleted + :type client_id: str + :param client_scope_id: id of the client scope that should be deleted + :type client_scope_id: str + + :return: list of client scopes with id and name + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "client_scope_id": client_scope_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) + + async def a_create_initial_access_token(self, count: int = 1, expiration: int = 1): + """Create an initial access token. + + :param count: Number of clients that can be registered + :type count: int + :param expiration: Days until expireation + :type expiration: int + :return: initial access token + :rtype: str + """ + payload = {"count": count, "expiration": expiration} + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_INITIAL_ACCESS.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200]) + + async def a_create_client(self, payload, skip_exists=False): + """Create a client. + + ClientRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :param skip_exists: If true then do not raise an error if client already exists + :type skip_exists: bool + :param payload: ClientRepresentation + :type payload: dict + :return: Client ID + :rtype: str + """ + if skip_exists: + client_id = self.get_client_id(client_id=payload["clientId"]) + + if client_id is not None: + return client_id + + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENTS.format(**params_path), data=json.dumps(payload) + ) + raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + + async def a_update_client(self, client_id, payload): + """Update a client. + + :param client_id: Client id + :type client_id: str + :param payload: ClientRepresentation + :type payload: dict + + :return: Http response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_client(self, client_id): + """Get representation of the client. + + ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :param client_id: keycloak client id (not oauth client-id) + :type client_id: str + :return: Keycloak server response (ClientRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_client_installation_provider(self, client_id, provider_id): + """Get content for given installation provider. + + Related documentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clients_resource + + Possible provider_id list available in the ServerInfoRepresentation#clientInstallations + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_serverinforepresentation + + :param client_id: Client id + :type client_id: str + :param provider_id: provider id to specify response format + :type provider_id: str + :returns: Installation providers + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "provider-id": provider_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_INSTALLATION_PROVIDER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) + + async def a_get_realm_roles(self, brief_representation=True, search_text=""): + """Get all roles for the realm or client. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param brief_representation: whether to omit role attributes in the response + :type brief_representation: bool + :param search_text: optional search text to limit the returned result. + :type search_text: str + :return: Keycloak server response (RoleRepresentation) + :rtype: list + """ + url = urls_patterns.URL_ADMIN_REALM_ROLES + params_path = {"realm-name": self.connection.realm_name} + params = {"briefRepresentation": brief_representation} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), **params + ) + + # set the search_text path param, if it is a valid string + if search_text is not None and search_text.strip() != "": + params_path["search-text"] = search_text + url = urls_patterns.URL_ADMIN_REALM_ROLES_SEARCH + + data_raw = await self.connection.a_raw_get(url.format(**params_path), **params) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_realm_role_groups(self, role_name, query=None, brief_representation=True): + """Get role groups of realm by role name. + + :param role_name: Name of the role. + :type role_name: str + :param query: Additional Query parameters + (see https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_parameters_226) + :type query: dict + :param brief_representation: whether to omit role attributes in the response + :type brief_representation: bool + :return: Keycloak Server Response (GroupRepresentation) + :rtype: list + """ + query = query or {} + + params = {"briefRepresentation": brief_representation} + + query.update(params) + + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + + url = urls_patterns.URL_ADMIN_REALM_ROLES_GROUPS.format(**params_path) + + if "first" in query or "max" in query: + return await self.a___fetch_paginated(url, query) + + return await self.a___fetch_all(url, query) + + async def a_get_realm_role_members(self, role_name, query=None): + """Get role members of realm by role name. + + :param role_name: Name of the role. + :type role_name: str + :param query: Additional Query parameters + (see https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_roles_resource) + :type query: dict + :return: Keycloak Server Response (UserRepresentation) + :rtype: list + """ + query = query or dict() + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + return await self.a___fetch_all( + urls_patterns.URL_ADMIN_REALM_ROLES_MEMBERS.format(**params_path), query + ) + + async def a_get_default_realm_role_id(self): + """Get the ID of the default realm role. + + :return: Realm role ID + :rtype: str + """ + all_realm_roles = await self.a_get_realm_roles() + default_realm_roles = [ + realm_role + for realm_role in all_realm_roles + if realm_role["name"] == f"default-roles-{self.connection.realm_name}".lower() + ] + return default_realm_roles[0]["id"] + + async def a_get_realm_default_roles(self): + """Get all the default realm roles. + + :return: Keycloak Server Response (UserRepresentation) + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "role-id": self.get_default_realm_role_id(), + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES_REALM.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_remove_realm_default_roles(self, payload): + """Remove a set of default realm roles. + + :param payload: List of RoleRepresentations + :type payload: list + :return: Keycloak Server Response + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "role-id": self.get_default_realm_role_id(), + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) + + async def a_add_realm_default_roles(self, payload): + """Add a set of default realm roles. + + :param payload: List of RoleRepresentations + :type payload: list + :return: Keycloak Server Response + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "role-id": self.get_default_realm_role_id(), + } + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_REALM_ROLE_COMPOSITES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_get_client_roles(self, client_id, brief_representation=True): + """Get all roles for the client. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param client_id: id of client (not client-id) + :type client_id: str + :param brief_representation: whether to omit role attributes in the response + :type brief_representation: bool + :return: Keycloak server response (RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + params = {"briefRepresentation": brief_representation} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_ROLES.format(**params_path), **params + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_role(self, client_id, role_name): + """Get client role id by name. + + This is required for further actions with this role. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param client_id: id of client (not client-id) + :type client_id: str + :param role_name: role's name (not id!) + :type role_name: str + :return: role_id + :rtype: str + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "role-name": role_name, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_role_id(self, client_id, role_name): + """Get client role id by name. + + This is required for further actions with this role. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param client_id: id of client (not client-id) + :type client_id: str + :param role_name: role's name (not id!) + :type role_name: str + :return: role_id + :rtype: str + """ + role = await self.a_get_client_role(client_id, role_name) + return role.get("id") + + async def a_create_client_role(self, client_role_id, payload, skip_exists=False): + """Create a client role. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param client_role_id: id of client (not client-id) + :type client_role_id: str + :param payload: RoleRepresentation + :type payload: dict + :param skip_exists: If true then do not raise an error if client role already exists + :type skip_exists: bool + :return: Client role name + :rtype: str + """ + if skip_exists: + try: + res = await self.a_get_client_role(client_id=client_role_id, role_name=payload["name"]) + return res["name"] + except KeycloakGetError: + pass + + params_path = {"realm-name": self.connection.realm_name, "id": client_role_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) + ) + raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + + async def a_add_composite_client_roles_to_role(self, client_role_id, role_name, roles): + """Add composite roles to client role. + + :param client_role_id: id of client (not client-id) + :type client_role_id: str + :param role_name: The name of the role + :type role_name: str + :param roles: roles list or role (use RoleRepresentation) to be updated + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = { + "realm-name": self.connection.realm_name, + "id": client_role_id, + "role-name": role_name, + } + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_update_client_role(self, client_id, role_name, payload): + """Update a client role. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param client_id: id of client (not client-id) + :type client_id: str + :param role_name: role's name (not id!) + :type role_name: str + :param payload: RoleRepresentation + :type payload: dict + :returns: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "role-name": role_name, + } + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_client_role(self, client_role_id, role_name): + """Delete a client role. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param client_role_id: id of client (not client-id) + :type client_role_id: str + :param role_name: role's name (not id!) + :type role_name: str + :returns: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_role_id, + "role-name": role_name, + } + data_raw =await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_assign_client_role(self, user_id, client_id, roles): + """Assign a client role to a user. + + :param user_id: id of user + :type user_id: str + :param client_id: id of client (not client-id) + :type client_id: str + :param roles: roles list or role (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_get_client_role_members(self, client_id, role_name, **query): + """Get members by client role. + + :param client_id: The client id + :type client_id: str + :param role_name: the name of role to be queried. + :type role_name: str + :param query: Additional query parameters + (see https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clients_resource) + :type query: dict + :return: Keycloak server response (UserRepresentation) + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "role-name": role_name, + } + return await self.a___fetch_all( + urls_patterns.URL_ADMIN_CLIENT_ROLE_MEMBERS.format(**params_path), query + ) + + async def a_get_client_role_groups(self, client_id, role_name, **query): + """Get group members by client role. + + :param client_id: The client id + :type client_id: str + :param role_name: the name of role to be queried. + :type role_name: str + :param query: Additional query parameters + (see https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clients_resource) + :type query: dict + :return: Keycloak server response + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "role-name": role_name, + } + return await self.a___fetch_all( + urls_patterns.URL_ADMIN_CLIENT_ROLE_GROUPS.format(**params_path), query + ) + + async def a_get_role_by_id(self, role_id): + """Get a specific role’s representation. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param role_id: id of role + :type role_id: str + :return: Keycloak server response (RoleRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "role-id": role_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_ID.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) + + async def a_update_role_by_id(self, role_id, payload): + """Update the role. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param payload: RoleRepresentation + :type payload: dict + :param role_id: id of role + :type role_id: str + :returns: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "role-id": role_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_ID.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_role_by_id(self, role_id): + """Delete a role by its id. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param role_id: id of role + :type role_id: str + :returns: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "role-id": role_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_ID.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_create_realm_role(self, payload, skip_exists=False): + """Create a new role for the realm or client. + + :param payload: The role (use RoleRepresentation) + :type payload: dict + :param skip_exists: If true then do not raise an error if realm role already exists + :type skip_exists: bool + :return: Realm role name + :rtype: str + """ + if skip_exists: + try: + role = await self.a_get_realm_role(role_name=payload["name"]) + return role["name"] + except KeycloakGetError: + pass + + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), data=json.dumps(payload) + ) + raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + + async def a_get_realm_role(self, role_name): + """Get realm role by role name. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param role_name: role's name, not id! + :type role_name: str + :return: role + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_realm_role_by_id(self, role_id: str): + """Get realm role by role id. + + RoleRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation + + :param role_id: role's id, not name! + :type role_id: str + :return: role + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "role-id": role_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_ID.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_update_realm_role(self, role_name, payload): + """Update a role for the realm by name. + + :param role_name: The name of the role to be updated + :type role_name: str + :param payload: The role (use RoleRepresentation) + :type payload: dict + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_realm_role(self, role_name): + """Delete a role for the realm by name. + + :param role_name: The role name + :type role_name: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_NAME.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_add_composite_realm_roles_to_role(self, role_name, roles): + """Add composite roles to the role. + + :param role_name: The name of the role + :type role_name: str + :param roles: roles list or role (use RoleRepresentation) to be updated + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_remove_composite_realm_roles_to_role(self, role_name, roles): + """Remove composite roles from the role. + + :param role_name: The name of the role + :type role_name: str + :param roles: roles list or role (use RoleRepresentation) to be removed + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_composite_realm_roles_of_role(self, role_name): + """Get composite roles of the role. + + :param role_name: The name of the role + :type role_name: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_assign_realm_roles_to_client_scope(self, client_id, roles): + """Assign realm roles to a client's scope. + + :param client_id: id of client (not client-id) + :type client_id: str + :param roles: roles list or role (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: dict + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_delete_realm_roles_of_client_scope(self, client_id, roles): + """Delete realm roles of a client's scope. + + :param client_id: id of client (not client-id) + :type client_id: str + :param roles: roles list or role (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: dict + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_realm_roles_of_client_scope(self, client_id): + """Get all realm roles for a client's scope. + + :param client_id: id of client (not client-id) + :type client_id: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_assign_client_roles_to_client_scope(self, client_id, client_roles_owner_id, roles): + """Assign client roles to a client's scope. + + :param client_id: id of client (not client-id) who is assigned the roles + :type client_id: str + :param client_roles_owner_id: id of client (not client-id) who has the roles + :type client_roles_owner_id: str + :param roles: roles list or role (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: dict + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "client": client_roles_owner_id, + } + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_delete_client_roles_of_client_scope(self, client_id, client_roles_owner_id, roles): + """Delete client roles of a client's scope. + + :param client_id: id of client (not client-id) who is assigned the roles + :type client_id: str + :param client_roles_owner_id: id of client (not client-id) who has the roles + :type client_roles_owner_id: str + :param roles: roles list or role (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: dict + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "client": client_roles_owner_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_client_roles_of_client_scope(self, client_id, client_roles_owner_id): + """Get all client roles for a client's scope. + + :param client_id: id of client (not client-id) + :type client_id: str + :param client_roles_owner_id: id of client (not client-id) who has the roles + :type client_roles_owner_id: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "client": client_roles_owner_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_assign_realm_roles(self, user_id, roles): + """Assign realm roles to a user. + + :param user_id: id of user + :type user_id: str + :param roles: roles list or role (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_delete_realm_roles_of_user(self, user_id, roles): + """Delete realm roles of a user. + + :param user_id: id of user + :type user_id: str + :param roles: roles list or role (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_realm_roles_of_user(self, user_id): + """Get all realm roles for a user. + + :param user_id: id of user + :type user_id: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_REALM_ROLES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_available_realm_roles_of_user(self, user_id): + """Get all available (i.e. unassigned) realm roles for a user. + + :param user_id: id of user + :type user_id: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_REALM_ROLES_AVAILABLE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_composite_realm_roles_of_user(self, user_id, brief_representation=True): + """Get all composite (i.e. implicit) realm roles for a user. + + :param user_id: id of user + :type user_id: str + :param brief_representation: whether to omit role attributes in the response + :type brief_representation: bool + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + params = {"briefRepresentation": brief_representation} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_REALM_ROLES_COMPOSITE.format(**params_path), **params + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_assign_group_realm_roles(self, group_id, roles): + """Assign realm roles to a group. + + :param group_id: id of group + :type group_id: str + :param roles: roles list or role (use GroupRoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_delete_group_realm_roles(self, group_id, roles): + """Delete realm roles of a group. + + :param group_id: id of group + :type group_id: str + :param roles: roles list or role (use GroupRoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def get_group_realm_roles(self, group_id, brief_representation=True): + """Get all realm roles for a group. + + :param group_id: id of the group + :type group_id: str + :param brief_representation: whether to omit role attributes in the response + :type brief_representation: bool + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": group_id} + params = {"briefRepresentation": brief_representation} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path), **params + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_assign_group_client_roles(self, group_id, client_id, roles): + """Assign client roles to a group. + + :param group_id: id of group + :type group_id: str + :param client_id: id of client (not client-id) + :type client_id: str + :param roles: roles list or role (use GroupRoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = { + "realm-name": self.connection.realm_name, + "id": group_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_get_group_client_roles(self, group_id, client_id): + """Get client roles of a group. + + :param group_id: id of group + :type group_id: str + :param client_id: id of client (not client-id) + :type client_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": group_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_delete_group_client_roles(self, group_id, client_id, roles): + """Delete client roles of a group. + + :param group_id: id of group + :type group_id: str + :param client_id: id of client (not client-id) + :type client_id: str + :param roles: roles list or role (use GroupRoleRepresentation) + :type roles: list + :return: Keycloak server response (array RoleRepresentation) + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = { + "realm-name": self.connection.realm_name, + "id": group_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def a_clear_all_bruteforce_attempts(self): - - pass + async def a_get_all_roles_of_user(self, user_id): + """Get all level roles for a user. - def a_clear_keys_cache(self): - - pass + :param user_id: id of user + :type user_id: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_ALL_ROLES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) - def a_clear_realm_cache(self): - - pass + async def a_get_client_roles_of_user(self, user_id, client_id): + """Get all client roles for a user. - def a_clear_user_cache(self): - - pass + :param user_id: id of user + :type user_id: str + :param client_id: id of client (not client-id) + :type client_id: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + return await self.a__get_client_roles_of_user( + urls_patterns.URL_ADMIN_USER_CLIENT_ROLES, user_id, client_id + ) + + async def a_get_available_client_roles_of_user(self, user_id, client_id): + """Get available client role-mappings for a user. + + :param user_id: id of user + :type user_id: str + :param client_id: id of client (not client-id) + :type client_id: str + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + return await self.a__get_client_roles_of_user( + urls_patterns.URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, user_id, client_id + ) + + async def a_get_composite_client_roles_of_user(self, user_id, client_id, brief_representation=False): + """Get composite client role-mappings for a user. + + :param user_id: id of user + :type user_id: str + :param client_id: id of client (not client-id) + :type client_id: str + :param brief_representation: whether to omit attributes in the response + :type brief_representation: bool + :return: Keycloak server response (array RoleRepresentation) + :rtype: list + """ + params = {"briefRepresentation": brief_representation} + return await self.a__get_client_roles_of_user( + urls_patterns.URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE, user_id, client_id, **params + ) + + async def a__get_client_roles_of_user( + self, client_level_role_mapping_url, user_id, client_id, **params + ): + """Get client roles of a single user helper. + + :param client_level_role_mapping_url: Url for the client role mapping + :type client_level_role_mapping_url: str + :param user_id: User id + :type user_id: str + :param client_id: Client id + :type client_id: str + :param params: Additional parameters + :type params: dict + :returns: Client roles of a user + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_get( + client_level_role_mapping_url.format(**params_path), **params + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_delete_client_roles_of_user(self, user_id, client_id, roles): + """Delete client roles from a user. + + :param user_id: id of user + :type user_id: str + :param client_id: id of client containing role (not client-id) + :type client_id: str + :param roles: roles list or role to delete (use RoleRepresentation) + :type roles: list + :return: Keycloak server response + :rtype: bytes + """ + payload = roles if isinstance(roles, list) else [roles] + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_authentication_flows(self): + """Get authentication flows. + + Returns all flow details + + AuthenticationFlowRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationflowrepresentation + + :return: Keycloak server response (AuthenticationFlowRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_FLOWS.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_authentication_flow_for_id(self, flow_id): + """Get one authentication flow by it's id. + + Returns all flow details + + AuthenticationFlowRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationflowrepresentation + + :param flow_id: the id of a flow NOT it's alias + :type flow_id: str + :return: Keycloak server response (AuthenticationFlowRepresentation) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "flow-id": flow_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_FLOWS_ALIAS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_create_authentication_flow(self, payload, skip_exists=False): + """Create a new authentication flow. + + AuthenticationFlowRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationflowrepresentation + + :param payload: AuthenticationFlowRepresentation + :type payload: dict + :param skip_exists: Do not raise an error if authentication flow already exists + :type skip_exists: bool + :return: Keycloak server response (RoleRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_FLOWS.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + + async def a_copy_authentication_flow(self, payload, flow_alias): + """Copy existing authentication flow under a new name. + + The new name is given as 'newName' attribute of the passed payload. + + :param payload: JSON containing 'newName' attribute + :type payload: dict + :param flow_alias: the flow alias + :type flow_alias: str + :return: Keycloak server response (RoleRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "flow-alias": flow_alias} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_FLOWS_COPY.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + async def a_delete_authentication_flow(self, flow_id): + """Delete authentication flow. + + AuthenticationInfoRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationinforepresentation + + :param flow_id: authentication flow id + :type flow_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": flow_id} + data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_FLOW.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_authentication_flow_executions(self, flow_alias): + """Get authentication flow executions. + + Returns all execution steps + + :param flow_alias: the flow alias + :type flow_alias: str + :return: Response(json) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "flow-alias": flow_alias} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_update_authentication_flow_executions(self, payload, flow_alias): + """Update an authentication flow execution. + + AuthenticationExecutionInfoRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationexecutioninforepresentation + + :param payload: AuthenticationExecutionInfoRepresentation + :type payload: dict + :param flow_alias: The flow alias + :type flow_alias: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "flow-alias": flow_alias} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[202, 204]) + + async def a_get_authentication_flow_execution(self, execution_id): + """Get authentication flow execution. + + AuthenticationExecutionInfoRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationexecutioninforepresentation + + :param execution_id: the execution ID + :type execution_id: str + :return: Response(json) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": execution_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_create_authentication_flow_execution(self, payload, flow_alias): + """Create an authentication flow execution. + + AuthenticationExecutionInfoRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationexecutioninforepresentation + + :param payload: AuthenticationExecutionInfoRepresentation + :type payload: dict + :param flow_alias: The flow alias + :type flow_alias: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "flow-alias": flow_alias} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + async def a_delete_authentication_flow_execution(self, execution_id): + """Delete authentication flow execution. + + AuthenticationExecutionInfoRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationexecutioninforepresentation + + :param execution_id: keycloak client id (not oauth client-id) + :type execution_id: str + :return: Keycloak server response (json) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": execution_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) diff --git a/src/keycloak/openid_connection.py b/src/keycloak/openid_connection.py index 99b31af..a54d9d3 100644 --- a/src/keycloak/openid_connection.py +++ b/src/keycloak/openid_connection.py @@ -419,3 +419,71 @@ class KeycloakOpenIDConnection(ConnectionManager): self._refresh_if_required() r = super().raw_delete(*args, **kwargs) return r + + async def a_raw_get(self, *args, **kwargs): + """Call connection.raw_get. + + If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token + and try *get* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_get(*args, **kwargs) + return r + + async def a_raw_post(self, *args, **kwargs): + """Call connection.raw_post. + + If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token + and try *post* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_post(*args, **kwargs) + return r + + async def a_raw_put(self, *args, **kwargs): + """Call connection.raw_put. + + If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token + and try *put* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_put(*args, **kwargs) + return r + + async def raw_delete(self, *args, **kwargs): + """Call connection.raw_delete. + + If auto_refresh is set for *delete* and *access_token* is expired, + it will refresh the token and try *delete* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_delete(*args, **kwargs) + return r