|
|
@ -33,9 +33,9 @@ from .connection import ConnectionManager |
|
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
|
|
from .keycloak_openid import KeycloakOpenID |
|
|
|
from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURCES, URL_ADMIN_CLIENT_ROLES, \ |
|
|
|
URL_ADMIN_GET_SESSIONS, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_GROUPS_REALM_ROLES,\ |
|
|
|
URL_ADMIN_GET_SESSIONS, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_GROUPS_REALM_ROLES, \ |
|
|
|
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE, URL_ADMIN_CLIENT_INSTALLATION_PROVIDER, \ |
|
|
|
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME, URL_ADMIN_GET_GROUPS_REALM_ROLES, URL_ADMIN_GROUPS_CLIENT_ROLES, \ |
|
|
|
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME, URL_ADMIN_GROUPS_CLIENT_ROLES, \ |
|
|
|
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE, URL_ADMIN_USER_GROUP, URL_ADMIN_REALM_ROLES, URL_ADMIN_GROUP_CHILD, \ |
|
|
|
URL_ADMIN_USER_CONSENTS, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_CLIENT, URL_ADMIN_USER, URL_ADMIN_CLIENT_ROLE, \ |
|
|
|
URL_ADMIN_USER_GROUPS, URL_ADMIN_CLIENTS, URL_ADMIN_FLOWS_EXECUTIONS, URL_ADMIN_GROUPS, URL_ADMIN_USER_CLIENT_ROLES, \ |
|
|
@ -47,13 +47,15 @@ from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURC |
|
|
|
URL_ADMIN_USER_FEDERATED_IDENTITY, URL_ADMIN_USER_FEDERATED_IDENTITIES, URL_ADMIN_CLIENT_ROLE_MEMBERS, \ |
|
|
|
URL_ADMIN_REALM_ROLES_MEMBERS, URL_ADMIN_CLIENT_PROTOCOL_MAPPER, URL_ADMIN_CLIENT_SCOPES_MAPPERS, \ |
|
|
|
URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION, URL_ADMIN_FLOWS_EXECUTIONS_FLOW, URL_ADMIN_FLOWS_COPY, \ |
|
|
|
URL_ADMIN_FLOWS_ALIAS, URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER |
|
|
|
URL_ADMIN_FLOWS_ALIAS, URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER, URL_ADMIN_AUTHENTICATOR_CONFIG, \ |
|
|
|
URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE, URL_ADMIN_CLIENT_ALL_SESSIONS, URL_ADMIN_EVENTS, \ |
|
|
|
URL_ADMIN_REALM_EXPORT, URL_ADMIN_DELETE_USER_ROLE, URL_ADMIN_USER_LOGOUT |
|
|
|
|
|
|
|
|
|
|
|
class KeycloakAdmin: |
|
|
|
|
|
|
|
PAGE_SIZE = 100 |
|
|
|
|
|
|
|
|
|
|
|
_server_url = None |
|
|
|
_username = None |
|
|
|
_password = None |
|
|
@ -77,7 +79,7 @@ class KeycloakAdmin: |
|
|
|
:param realm_name: realm name |
|
|
|
:param client_id: client id |
|
|
|
:param verify: True if want check connection SSL |
|
|
|
:param client_secret_key: client secret key |
|
|
|
:param client_secret_key: client secret key (optional, required only for access type confidential) |
|
|
|
:param custom_headers: dict of custom header to pass to each HTML request |
|
|
|
:param user_realm_name: The realm name of the user, if different from realm_name |
|
|
|
:param auto_refresh_token: list of methods that allows automatic token refresh. ex: ['get', 'put', 'post', 'delete'] |
|
|
@ -223,9 +225,18 @@ class KeycloakAdmin: |
|
|
|
if not partial_results: |
|
|
|
break |
|
|
|
results.extend(partial_results) |
|
|
|
if len(partial_results) < query['max']: |
|
|
|
break |
|
|
|
page += 1 |
|
|
|
return results |
|
|
|
|
|
|
|
def __fetch_paginated(self, url, query=None): |
|
|
|
query = query or {} |
|
|
|
|
|
|
|
return raise_error_from_response( |
|
|
|
self.raw_get(url, **query), |
|
|
|
KeycloakGetError) |
|
|
|
|
|
|
|
def import_realm(self, payload): |
|
|
|
""" |
|
|
|
Import a new realm from a RealmRepresentation. Realm name must be unique. |
|
|
@ -242,6 +253,22 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
def export_realm(self, export_clients=False, export_groups_and_role=False): |
|
|
|
""" |
|
|
|
Export the realm configurations in the json format |
|
|
|
|
|
|
|
RealmRepresentation |
|
|
|
https://www.keycloak.org/docs-api/5.0/rest-api/index.html#_partialexport |
|
|
|
|
|
|
|
:param export-clients: Skip if not want to export realm clients |
|
|
|
:param export-groups-and-roles: Skip if not want to export realm groups and roles |
|
|
|
|
|
|
|
:return: realm configurations JSON |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "export-clients": export_clients, "export-groups-and-roles": export_groups_and_role } |
|
|
|
data_raw = self.raw_post(URL_ADMIN_REALM_EXPORT.format(**params_path), data="") |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_realms(self): |
|
|
|
""" |
|
|
|
Lists all realms in Keycloak deployment |
|
|
@ -307,8 +334,14 @@ class KeycloakAdmin: |
|
|
|
:param query: Query parameters (optional) |
|
|
|
:return: users list |
|
|
|
""" |
|
|
|
query = query or {} |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
return self.__fetch_all(URL_ADMIN_USERS.format(**params_path), query) |
|
|
|
url = URL_ADMIN_USERS.format(**params_path) |
|
|
|
|
|
|
|
if "first" in query or "max" in query: |
|
|
|
return self.__fetch_paginated(url, query) |
|
|
|
|
|
|
|
return self.__fetch_all(url, query) |
|
|
|
|
|
|
|
def create_idp(self, payload): |
|
|
|
""" |
|
|
@ -362,7 +395,7 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_IDP.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def create_user(self, payload): |
|
|
|
def create_user(self, payload, exist_ok=True): |
|
|
|
""" |
|
|
|
Create a new user. Username must be unique |
|
|
|
|
|
|
@ -370,15 +403,17 @@ class KeycloakAdmin: |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_userrepresentation |
|
|
|
|
|
|
|
:param payload: UserRepresentation |
|
|
|
:param exist_ok: If False, raise KeycloakGetError if username already exists. Otherwise, return existing user ID. |
|
|
|
|
|
|
|
:return: UserRepresentation |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
|
|
|
|
exists = self.get_user_id(username=payload['username']) |
|
|
|
if exist_ok: |
|
|
|
exists = self.get_user_id(username=payload['username']) |
|
|
|
|
|
|
|
if exists is not None: |
|
|
|
return str(exists) |
|
|
|
if exists is not None: |
|
|
|
return str(exists) |
|
|
|
|
|
|
|
data_raw = self.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
@ -408,9 +443,9 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
:return: user_id |
|
|
|
""" |
|
|
|
|
|
|
|
users = self.get_users(query={"search": username}) |
|
|
|
return next((user["id"] for user in users if user["username"] == username), None) |
|
|
|
lower_user_name = username.lower() |
|
|
|
users = self.get_users(query={"search": lower_user_name}) |
|
|
|
return next((user["id"] for user in users if user["username"] == lower_user_name), None) |
|
|
|
|
|
|
|
def get_user(self, user_id): |
|
|
|
""" |
|
|
@ -485,6 +520,19 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def logout(self, user_id): |
|
|
|
""" |
|
|
|
Logs out user. |
|
|
|
|
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_logout |
|
|
|
|
|
|
|
:param user_id: User id |
|
|
|
:return: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_USER_LOGOUT.format(**params_path), data="") |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def consents_user(self, user_id): |
|
|
|
""" |
|
|
|
Get consents granted by the user |
|
|
@ -520,7 +568,7 @@ class KeycloakAdmin: |
|
|
|
payload = {"identityProvider": provider_id, "userId": provider_userid, "userName": provider_username} |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path), data=json.dumps(payload)) |
|
|
|
|
|
|
|
|
|
|
|
def send_update_account(self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None): |
|
|
|
""" |
|
|
|
Send an update account email to the user. An email contains a |
|
|
@ -537,7 +585,7 @@ class KeycloakAdmin: |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} |
|
|
|
data_raw = self.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), |
|
|
|
data=payload, **params_query) |
|
|
|
data=json.dumps(payload), **params_query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def send_verify_email(self, user_id, client_id=None, redirect_uri=None): |
|
|
@ -584,7 +632,7 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.raw_get(URL_ADMIN_SERVER_INFO) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_groups(self): |
|
|
|
def get_groups(self, query=None): |
|
|
|
""" |
|
|
|
Returns a list of groups belonging to the realm |
|
|
|
|
|
|
@ -593,8 +641,14 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
:return: array GroupRepresentation |
|
|
|
""" |
|
|
|
query = query or {} |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
return self.__fetch_all(URL_ADMIN_GROUPS.format(**params_path)) |
|
|
|
url = URL_ADMIN_USERS.format(**params_path) |
|
|
|
|
|
|
|
if "first" in query or "max" in query: |
|
|
|
return self.__fetch_paginated(url, query) |
|
|
|
|
|
|
|
return self.__fetch_all(url, query) |
|
|
|
|
|
|
|
def get_group(self, group_id): |
|
|
|
""" |
|
|
@ -646,7 +700,12 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response (UserRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
return self.__fetch_all(URL_ADMIN_GROUP_MEMBERS.format(**params_path), query) |
|
|
|
url = URL_ADMIN_USERS.format(**params_path) |
|
|
|
|
|
|
|
if "first" in query or "max" in query: |
|
|
|
return self.__fetch_paginated(url, query) |
|
|
|
|
|
|
|
return self.__fetch_all(url, query) |
|
|
|
|
|
|
|
def get_group_by_path(self, path, search_in_subgroups=False): |
|
|
|
""" |
|
|
@ -1014,6 +1073,22 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
def add_composite_client_roles_to_role(self, client_role_id, role_name, roles): |
|
|
|
""" |
|
|
|
Add composite roles to client role |
|
|
|
|
|
|
|
:param client_role_id: id of client (not client-id) |
|
|
|
:param role_name: The name of the role |
|
|
|
:param roles: roles list or role (use RoleRepresentation) to be updated |
|
|
|
:return Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
payload = roles if isinstance(roles, list) else [roles] |
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def delete_client_role(self, client_role_id, role_name): |
|
|
|
""" |
|
|
|
Delete a client role |
|
|
@ -1220,7 +1295,7 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response (array RoleRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_GET_GROUPS_REALM_ROLES.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_GROUPS_REALM_ROLES.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def assign_group_client_roles(self, group_id, client_id, roles): |
|
|
@ -1330,14 +1405,14 @@ class KeycloakAdmin: |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_FLOWS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def get_authentication_flow_for_id(self, flow_id): |
|
|
|
""" |
|
|
|
Get one authentication flow by it's id/alias. Returns all flow details |
|
|
|
|
|
|
|
AuthenticationFlowRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_authenticationflowrepresentation |
|
|
|
|
|
|
|
|
|
|
|
:param flow_id: the id of a flow NOT it's alias |
|
|
|
:return: Keycloak server response (AuthenticationFlowRepresentation) |
|
|
|
""" |
|
|
@ -1359,7 +1434,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS.format(**params_path), |
|
|
|
data=payload) |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
def copy_authentication_flow(self, payload, flow_alias): |
|
|
@ -1373,7 +1448,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_COPY.format(**params_path), |
|
|
|
data=payload) |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
def get_authentication_flow_executions(self, flow_alias): |
|
|
@ -1401,9 +1476,9 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
|
data_raw = self.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), |
|
|
|
data=payload) |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
|
|
|
def create_authentication_flow_execution(self, payload, flow_alias): |
|
|
|
""" |
|
|
|
Create an authentication flow execution |
|
|
@ -1418,7 +1493,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION.format(**params_path), |
|
|
|
data=payload) |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
def create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): |
|
|
@ -1436,9 +1511,50 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), |
|
|
|
data=payload) |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
def get_authenticator_config(self, config_id): |
|
|
|
""" |
|
|
|
Get authenticator configuration. Returns all configuration details. |
|
|
|
|
|
|
|
:param config_id: Authenticator config id |
|
|
|
:return: Response(json) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": config_id} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def update_authenticator_config(self, payload, config_id): |
|
|
|
""" |
|
|
|
Update an authenticator configuration. |
|
|
|
|
|
|
|
AuthenticatorConfigRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_authenticatorconfigrepresentation |
|
|
|
|
|
|
|
:param payload: AuthenticatorConfigRepresentation |
|
|
|
:param config_id: Authenticator config id |
|
|
|
:return: Response(json) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": config_id} |
|
|
|
data_raw = self.raw_put(URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def delete_authenticator_config(self, config_id): |
|
|
|
""" |
|
|
|
Delete a authenticator configuration. |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_authentication_management_resource |
|
|
|
|
|
|
|
:param config_id: Authenticator config id |
|
|
|
:return: Keycloak server Response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": config_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path)) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def sync_users(self, storage_id, action): |
|
|
|
""" |
|
|
|
Function to trigger user sync from provider |
|
|
@ -1496,6 +1612,22 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
def update_client_scope(self, client_scope_id, payload): |
|
|
|
""" |
|
|
|
Update a client scope |
|
|
|
|
|
|
|
ClientScopeRepresentation: https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_client_scopes_resource |
|
|
|
|
|
|
|
:param client_scope_id: The id of the client scope |
|
|
|
:param payload: ClientScopeRepresentation |
|
|
|
:return: Keycloak server response (ClientScopeRepresentation) |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} |
|
|
|
data_raw = self.raw_put(URL_ADMIN_CLIENT_SCOPE.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def add_mapper_to_client_scope(self, client_scope_id, payload): |
|
|
|
""" |
|
|
|
Add a mapper to a client scope |
|
|
@ -1531,6 +1663,26 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def update_mapper_in_client_scope(self, client_scope_id, protocol_mapper_id, payload): |
|
|
|
""" |
|
|
|
Update an existing protocol mapper in a client scope |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_protocol_mappers_resource |
|
|
|
|
|
|
|
:param client_scope_id: The id of the client scope |
|
|
|
:param protocol_mapper_id: The id of the protocol mapper which exists in the client scope |
|
|
|
and should to be updated |
|
|
|
:param payload: ProtocolMapperRepresentation |
|
|
|
:return: Keycloak server Response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id, |
|
|
|
"protocol-mapper-id": protocol_mapper_id} |
|
|
|
|
|
|
|
data_raw = self.raw_put( |
|
|
|
URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path), data=json.dumps(payload)) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def add_mapper_to_client(self, client_id, payload): |
|
|
|
""" |
|
|
|
Add a mapper to a client |
|
|
@ -1664,6 +1816,34 @@ class KeycloakAdmin: |
|
|
|
data=None) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_events(self, query=None): |
|
|
|
""" |
|
|
|
Return a list of events, filtered according to query parameters |
|
|
|
|
|
|
|
EventRepresentation array |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_eventrepresentation |
|
|
|
|
|
|
|
:return: events list |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_EVENTS.format(**params_path), |
|
|
|
data=None, **query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def set_events(self, payload): |
|
|
|
""" |
|
|
|
Set realm events configuration |
|
|
|
|
|
|
|
RealmEventsConfigRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_realmeventsconfigrepresentation |
|
|
|
|
|
|
|
:return: Http response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.raw_put(URL_ADMIN_EVENTS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def raw_get(self, *args, **kwargs): |
|
|
|
""" |
|
|
|
Calls connection.raw_get. |
|
|
@ -1717,26 +1897,29 @@ class KeycloakAdmin: |
|
|
|
return r |
|
|
|
|
|
|
|
def get_token(self): |
|
|
|
token_realm_name = 'master' if self.client_secret_key else self.user_realm_name or self.realm_name |
|
|
|
self.keycloak_openid = KeycloakOpenID(server_url=self.server_url, client_id=self.client_id, |
|
|
|
realm_name=self.user_realm_name or self.realm_name, verify=self.verify, |
|
|
|
realm_name=token_realm_name, verify=self.verify, |
|
|
|
client_secret_key=self.client_secret_key, |
|
|
|
custom_headers=self.custom_headers) |
|
|
|
|
|
|
|
grant_type = ["password"] |
|
|
|
if self.client_secret_key: |
|
|
|
grant_type = ["client_credentials"] |
|
|
|
|
|
|
|
if self.user_realm_name: |
|
|
|
self.realm_name = self.user_realm_name |
|
|
|
|
|
|
|
self._token = self.keycloak_openid.token(self.username, self.password, grant_type=grant_type) |
|
|
|
|
|
|
|
headers = { |
|
|
|
'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
|
'Content-Type': 'application/json' |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if self.custom_headers is not None: |
|
|
|
# merge custom headers to main headers |
|
|
|
headers.update(self.custom_headers) |
|
|
|
|
|
|
|
|
|
|
|
self._connection = deepcopy(self.keycloak_openid.connection()) |
|
|
|
self._connection._headers.update(headers) |
|
|
|
|
|
|
@ -1751,3 +1934,30 @@ class KeycloakAdmin: |
|
|
|
else: |
|
|
|
raise |
|
|
|
self.connection.add_param_headers('Authorization', 'Bearer ' + self.token.get('access_token')) |
|
|
|
|
|
|
|
def get_client_all_sessions(self, client_id): |
|
|
|
""" |
|
|
|
Get sessions associated with the client |
|
|
|
|
|
|
|
:param client_id: id of client |
|
|
|
|
|
|
|
UserSessionRepresentation |
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_usersessionrepresentation |
|
|
|
|
|
|
|
:return: UserSessionRepresentation |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def delete_user_realm_role(self, user_id, payload): |
|
|
|
""" |
|
|
|
Delete realm-level role mappings |
|
|
|
DELETE admin/realms/{realm-name}/users/{id}/role-mappings/realm |
|
|
|
|
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": str(user_id) } |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_DELETE_USER_ROLE.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |