|
|
@ -32,6 +32,7 @@ from .connection import ConnectionManager |
|
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
|
|
from .keycloak_openid import KeycloakOpenID |
|
|
|
from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURCES, URL_ADMIN_CLIENT_ROLES, \ |
|
|
|
URL_ADMIN_CLIENT_AUTHZ_POLICIES, URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY, URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION, \ |
|
|
|
URL_ADMIN_GET_SESSIONS, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_GROUPS_REALM_ROLES, \ |
|
|
|
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE, URL_ADMIN_CLIENT_INSTALLATION_PROVIDER, \ |
|
|
|
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME, URL_ADMIN_GROUPS_CLIENT_ROLES, \ |
|
|
@ -45,10 +46,13 @@ from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURC |
|
|
|
URL_ADMIN_USER_REALM_ROLES, URL_ADMIN_REALM, URL_ADMIN_COMPONENTS, URL_ADMIN_COMPONENT, URL_ADMIN_KEYS, \ |
|
|
|
URL_ADMIN_USER_FEDERATED_IDENTITY, URL_ADMIN_USER_FEDERATED_IDENTITIES, URL_ADMIN_CLIENT_ROLE_MEMBERS, \ |
|
|
|
URL_ADMIN_REALM_ROLES_MEMBERS, URL_ADMIN_CLIENT_PROTOCOL_MAPPER, URL_ADMIN_CLIENT_SCOPES_MAPPERS, \ |
|
|
|
URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION, URL_ADMIN_FLOWS_EXECUTIONS_FLOW, URL_ADMIN_FLOWS_COPY, \ |
|
|
|
URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION, URL_ADMIN_FLOWS_EXECUTIONS_FLOW, URL_ADMIN_FLOWS_COPY, \ |
|
|
|
URL_ADMIN_FLOWS_ALIAS, URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER, URL_ADMIN_AUTHENTICATOR_CONFIG, \ |
|
|
|
URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE, URL_ADMIN_CLIENT_ALL_SESSIONS, URL_ADMIN_EVENTS, \ |
|
|
|
URL_ADMIN_REALM_EXPORT, URL_ADMIN_DELETE_USER_ROLE, URL_ADMIN_USER_LOGOUT |
|
|
|
URL_ADMIN_REALM_EXPORT, URL_ADMIN_DELETE_USER_ROLE, URL_ADMIN_USER_LOGOUT, URL_ADMIN_FLOWS_EXECUTION, \ |
|
|
|
URL_ADMIN_FLOW, URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES, URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE, \ |
|
|
|
URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES, URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE, \ |
|
|
|
URL_ADMIN_USER_CREDENTIALS, URL_ADMIN_USER_CREDENTIAL |
|
|
|
|
|
|
|
|
|
|
|
class KeycloakAdmin: |
|
|
@ -229,6 +233,13 @@ class KeycloakAdmin: |
|
|
|
page += 1 |
|
|
|
return results |
|
|
|
|
|
|
|
def __fetch_paginated(self, url, query=None): |
|
|
|
query = query or {} |
|
|
|
|
|
|
|
return raise_error_from_response( |
|
|
|
self.raw_get(url, **query), |
|
|
|
KeycloakGetError) |
|
|
|
|
|
|
|
def import_realm(self, payload): |
|
|
|
""" |
|
|
|
Import a new realm from a RealmRepresentation. Realm name must be unique. |
|
|
@ -254,7 +265,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
:param export-clients: Skip if not want to export realm clients |
|
|
|
:param export-groups-and-roles: Skip if not want to export realm groups and roles |
|
|
|
|
|
|
|
|
|
|
|
:return: realm configurations JSON |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "export-clients": export_clients, "export-groups-and-roles": export_groups_and_role } |
|
|
@ -326,8 +337,14 @@ class KeycloakAdmin: |
|
|
|
:param query: Query parameters (optional) |
|
|
|
:return: users list |
|
|
|
""" |
|
|
|
query = query or {} |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
return self.__fetch_all(URL_ADMIN_USERS.format(**params_path), query) |
|
|
|
url = URL_ADMIN_USERS.format(**params_path) |
|
|
|
|
|
|
|
if "first" in query or "max" in query: |
|
|
|
return self.__fetch_paginated(url, query) |
|
|
|
|
|
|
|
return self.__fetch_all(url, query) |
|
|
|
|
|
|
|
def create_idp(self, payload): |
|
|
|
""" |
|
|
@ -506,6 +523,50 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def get_credentials(self, user_id): |
|
|
|
""" |
|
|
|
Returns a list of credential belonging to the user. |
|
|
|
|
|
|
|
CredentialRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_credentialrepresentation |
|
|
|
|
|
|
|
:param: user_id: user id |
|
|
|
:return: Keycloak server response (CredentialRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_USER_CREDENTIALS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_credential(self, user_id, credential_id): |
|
|
|
""" |
|
|
|
Get credential of the user. |
|
|
|
|
|
|
|
CredentialRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_credentialrepresentation |
|
|
|
|
|
|
|
:param: user_id: user id |
|
|
|
:param: credential_id: credential id |
|
|
|
:return: Keycloak server response (ClientRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "credential_id": credential_id} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_USER_CREDENTIAL.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def delete_credential(self, user_id, credential_id): |
|
|
|
""" |
|
|
|
Delete credential of the user. |
|
|
|
|
|
|
|
CredentialRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_credentialrepresentation |
|
|
|
|
|
|
|
:param: user_id: user id |
|
|
|
:param: credential_id: credential id |
|
|
|
:return: Keycloak server response (ClientRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "credential_id": credential_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_USER_CREDENTIAL.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def logout(self, user_id): |
|
|
|
""" |
|
|
|
Logs out user. |
|
|
@ -555,6 +616,18 @@ class KeycloakAdmin: |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path), data=json.dumps(payload)) |
|
|
|
|
|
|
|
def delete_user_social_login(self, user_id, provider_id): |
|
|
|
|
|
|
|
""" |
|
|
|
Delete a federated identity / social login provider from the user |
|
|
|
:param user_id: User id |
|
|
|
:param provider_id: Social login provider id |
|
|
|
:return: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "provider": provider_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_USER_FEDERATED_IDENTITY.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def send_update_account(self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None): |
|
|
|
""" |
|
|
|
Send an update account email to the user. An email contains a |
|
|
@ -618,7 +691,7 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.raw_get(URL_ADMIN_SERVER_INFO) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_groups(self): |
|
|
|
def get_groups(self, query=None): |
|
|
|
""" |
|
|
|
Returns a list of groups belonging to the realm |
|
|
|
|
|
|
@ -627,8 +700,14 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
:return: array GroupRepresentation |
|
|
|
""" |
|
|
|
query = query or {} |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
return self.__fetch_all(URL_ADMIN_GROUPS.format(**params_path)) |
|
|
|
url = URL_ADMIN_USERS.format(**params_path) |
|
|
|
|
|
|
|
if "first" in query or "max" in query: |
|
|
|
return self.__fetch_paginated(url, query) |
|
|
|
|
|
|
|
return self.__fetch_all(url, query) |
|
|
|
|
|
|
|
def get_group(self, group_id): |
|
|
|
""" |
|
|
@ -680,7 +759,12 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response (UserRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
return self.__fetch_all(URL_ADMIN_GROUP_MEMBERS.format(**params_path), query) |
|
|
|
url = URL_ADMIN_USERS.format(**params_path) |
|
|
|
|
|
|
|
if "first" in query or "max" in query: |
|
|
|
return self.__fetch_paginated(url, query) |
|
|
|
|
|
|
|
return self.__fetch_all(url, query) |
|
|
|
|
|
|
|
def get_group_by_path(self, path, search_in_subgroups=False): |
|
|
|
""" |
|
|
@ -866,6 +950,25 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path)) |
|
|
|
return data_raw |
|
|
|
|
|
|
|
def create_client_authz_resource(self, client_id, payload, skip_exists=False): |
|
|
|
""" |
|
|
|
Create resources of client. |
|
|
|
|
|
|
|
:param client_id: id in ClientRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_clientrepresentation |
|
|
|
:param payload: ResourceRepresentation |
|
|
|
https://www.keycloak.org/docs-api/12.0/rest-api/index.html#_resourcerepresentation |
|
|
|
|
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, |
|
|
|
"id": client_id} |
|
|
|
|
|
|
|
data_raw = self.raw_post(URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
def get_client_authz_resources(self, client_id): |
|
|
|
""" |
|
|
|
Get resources from client. |
|
|
@ -877,7 +980,85 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path)) |
|
|
|
return data_raw |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def create_client_authz_role_based_policy(self, client_id, payload, skip_exists=False): |
|
|
|
""" |
|
|
|
Create role-based policy of client. |
|
|
|
|
|
|
|
:param client_id: id in ClientRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_clientrepresentation |
|
|
|
:param payload: No Document |
|
|
|
payload example: |
|
|
|
payload={ |
|
|
|
"type": "role", |
|
|
|
"logic": "POSITIVE", |
|
|
|
"decisionStrategy": "UNANIMOUS", |
|
|
|
"name": "Policy-1", |
|
|
|
"roles": [ |
|
|
|
{ |
|
|
|
"id": id |
|
|
|
} |
|
|
|
] |
|
|
|
} |
|
|
|
|
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, |
|
|
|
"id": client_id} |
|
|
|
|
|
|
|
data_raw = self.raw_post(URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
def create_client_authz_resource_based_permission(self, client_id, payload, skip_exists=False): |
|
|
|
""" |
|
|
|
Create resource-based permission of client. |
|
|
|
|
|
|
|
:param client_id: id in ClientRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_clientrepresentation |
|
|
|
:param payload: PolicyRepresentation |
|
|
|
https://www.keycloak.org/docs-api/12.0/rest-api/index.html#_policyrepresentation |
|
|
|
payload example: |
|
|
|
payload={ |
|
|
|
"type": "resource", |
|
|
|
"logic": "POSITIVE", |
|
|
|
"decisionStrategy": "UNANIMOUS", |
|
|
|
"name": "Permission-Name", |
|
|
|
"resources": [ |
|
|
|
resource_id |
|
|
|
], |
|
|
|
"policies": [ |
|
|
|
policy_id |
|
|
|
] |
|
|
|
|
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, |
|
|
|
"id": client_id} |
|
|
|
|
|
|
|
data_raw = self.raw_post(URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
def get_client_authz_policies(self, client_id): |
|
|
|
""" |
|
|
|
Get policies from client. |
|
|
|
|
|
|
|
:param client_id: id in ClientRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_clientrepresentation |
|
|
|
:param payload: PolicyRepresentation |
|
|
|
https://www.keycloak.org/docs-api/12.0/rest-api/index.html#_policyrepresentation |
|
|
|
|
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
params_query = {"first": 0, "max": 20, "permission": False} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_AUTHZ_POLICIES.format(**params_path), **params_query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client_service_account_user(self, client_id): |
|
|
|
""" |
|
|
@ -1210,7 +1391,6 @@ class KeycloakAdmin: |
|
|
|
Assign realm roles to a user |
|
|
|
|
|
|
|
:param user_id: id of user |
|
|
|
:param client_id: id of client containing role (not client-id) |
|
|
|
:param roles: roles list or role (use RoleRepresentation) |
|
|
|
:return Keycloak server response |
|
|
|
""" |
|
|
@ -1221,6 +1401,21 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def delete_realm_roles_of_user(self, user_id, roles): |
|
|
|
""" |
|
|
|
Deletes realm roles of a user |
|
|
|
|
|
|
|
:param user_id: id of user |
|
|
|
:param roles: roles list or role (use RoleRepresentation) |
|
|
|
:return Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
payload = roles if isinstance(roles, list) else [roles] |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_USER_REALM_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def get_realm_roles_of_user(self, user_id): |
|
|
|
""" |
|
|
|
Get all realm roles for a user. |
|
|
@ -1427,6 +1622,20 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
def delete_authentication_flow(self, flow_id): |
|
|
|
""" |
|
|
|
Delete authentication flow |
|
|
|
|
|
|
|
AuthenticationInfoRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_authenticationinforepresentation |
|
|
|
|
|
|
|
:param flow_id: authentication flow id |
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": flow_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_FLOW.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def get_authentication_flow_executions(self, flow_alias): |
|
|
|
""" |
|
|
|
Get authentication flow executions. Returns all execution steps |
|
|
@ -1455,6 +1664,20 @@ class KeycloakAdmin: |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def get_authentication_flow_execution(self, execution_id): |
|
|
|
""" |
|
|
|
Get authentication flow execution. |
|
|
|
|
|
|
|
AuthenticationExecutionInfoRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_authenticationexecutioninforepresentation |
|
|
|
|
|
|
|
:param execution_id: the execution ID |
|
|
|
:return: Response(json) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": execution_id} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_FLOWS_EXECUTION.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def create_authentication_flow_execution(self, payload, flow_alias): |
|
|
|
""" |
|
|
|
Create an authentication flow execution |
|
|
@ -1468,10 +1691,24 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION.format(**params_path), |
|
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
def delete_authentication_flow_execution(self, execution_id): |
|
|
|
""" |
|
|
|
Delete authentication flow execution |
|
|
|
|
|
|
|
AuthenticationExecutionInfoRepresentation |
|
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_authenticationexecutioninforepresentation |
|
|
|
|
|
|
|
:param execution_id: keycloak client id (not oauth client-id) |
|
|
|
:return: Keycloak server response (json) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": execution_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_FLOWS_EXECUTION.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): |
|
|
|
""" |
|
|
|
Create a new sub authentication flow for a given authentication flow |
|
|
@ -1659,6 +1896,78 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def get_default_default_client_scopes(self): |
|
|
|
""" |
|
|
|
Return list of default default client scopes |
|
|
|
|
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def delete_default_default_client_scope(self, scope_id): |
|
|
|
""" |
|
|
|
Delete default default client scope |
|
|
|
|
|
|
|
:param scope_id: default default client scope id |
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": scope_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
|
|
|
def add_default_default_client_scope(self, scope_id): |
|
|
|
""" |
|
|
|
Add default default client scope |
|
|
|
|
|
|
|
:param scope_id: default default client scope id |
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": scope_id} |
|
|
|
payload = {"realm": self.realm_name, "clientScopeId": scope_id} |
|
|
|
data_raw = self.raw_put(URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
|
|
|
def get_default_optional_client_scopes(self): |
|
|
|
""" |
|
|
|
Return list of default optional client scopes |
|
|
|
|
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.raw_get(URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def delete_default_optional_client_scope(self, scope_id): |
|
|
|
""" |
|
|
|
Delete default optional client scope |
|
|
|
|
|
|
|
:param scope_id: default optional client scope id |
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": scope_id} |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
|
|
|
def add_default_optional_client_scope(self, scope_id): |
|
|
|
""" |
|
|
|
Add default optional client scope |
|
|
|
|
|
|
|
:param scope_id: default optional client scope id |
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": scope_id} |
|
|
|
payload = {"realm": self.realm_name, "clientScopeId": scope_id} |
|
|
|
data_raw = self.raw_put(URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
|
|
|
def add_mapper_to_client(self, client_id, payload): |
|
|
|
""" |
|
|
|
Add a mapper to a client |
|
|
@ -1672,10 +1981,50 @@ class KeycloakAdmin: |
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
|
|
|
|
data_raw = self.raw_post( |
|
|
|
URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path), data=json.dumps(payload)) |
|
|
|
URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path), data=json.dumps(payload)) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
def update_client_mapper(self, client_id, mapper_id, payload): |
|
|
|
""" |
|
|
|
Update client mapper |
|
|
|
:param client_id: The id of the client |
|
|
|
:param client_mapper_id: The id of the mapper to be deleted |
|
|
|
:param payload: ProtocolMapperRepresentation |
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = { |
|
|
|
"realm-name": self.realm_name, |
|
|
|
"id": self.client_id, |
|
|
|
"protocol-mapper-id": mapper_id, |
|
|
|
} |
|
|
|
|
|
|
|
data_raw = self.raw_put( |
|
|
|
URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path), data=json.dumps(payload)) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def remove_client_mapper(self, client_id, client_mapper_id): |
|
|
|
""" |
|
|
|
Removes a mapper from the client |
|
|
|
https://www.keycloak.org/docs-api/15.0/rest-api/index.html#_protocol_mappers_resource |
|
|
|
:param client_id: The id of the client |
|
|
|
:param client_mapper_id: The id of the mapper to be deleted |
|
|
|
:return: Keycloak server response |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = { |
|
|
|
"realm-name": self.realm_name, |
|
|
|
"id": client_id, |
|
|
|
"protocol-mapper-id": mapper_id |
|
|
|
} |
|
|
|
|
|
|
|
data_raw = self.raw_delete( |
|
|
|
URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path)) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
def generate_client_secrets(self, client_id): |
|
|
|
""" |
|
|
|
|
|
|
@ -1873,7 +2222,13 @@ class KeycloakAdmin: |
|
|
|
return r |
|
|
|
|
|
|
|
def get_token(self): |
|
|
|
token_realm_name = 'master' if self.client_secret_key else self.user_realm_name or self.realm_name |
|
|
|
if self.user_realm_name: |
|
|
|
token_realm_name = self.user_realm_name |
|
|
|
elif self.realm_name: |
|
|
|
token_realm_name = self.realm_name |
|
|
|
else: |
|
|
|
token_realm_name = "master" |
|
|
|
|
|
|
|
self.keycloak_openid = KeycloakOpenID(server_url=self.server_url, client_id=self.client_id, |
|
|
|
realm_name=token_realm_name, verify=self.verify, |
|
|
|
client_secret_key=self.client_secret_key, |
|
|
@ -1885,12 +2240,16 @@ class KeycloakAdmin: |
|
|
|
if self.user_realm_name: |
|
|
|
self.realm_name = self.user_realm_name |
|
|
|
|
|
|
|
self._token = self.keycloak_openid.token(self.username, self.password, grant_type=grant_type) |
|
|
|
if self.username and self.password: |
|
|
|
self._token = self.keycloak_openid.token(self.username, self.password, grant_type=grant_type) |
|
|
|
|
|
|
|
headers = { |
|
|
|
'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
|
'Content-Type': 'application/json' |
|
|
|
} |
|
|
|
headers = { |
|
|
|
'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
|
'Content-Type': 'application/json' |
|
|
|
} |
|
|
|
else: |
|
|
|
self._token = None |
|
|
|
headers = {} |
|
|
|
|
|
|
|
if self.custom_headers is not None: |
|
|
|
# merge custom headers to main headers |
|
|
@ -1902,15 +2261,23 @@ class KeycloakAdmin: |
|
|
|
verify=self.verify) |
|
|
|
|
|
|
|
def refresh_token(self): |
|
|
|
refresh_token = self.token.get('refresh_token') |
|
|
|
try: |
|
|
|
self.token = self.keycloak_openid.refresh_token(refresh_token) |
|
|
|
except KeycloakGetError as e: |
|
|
|
if e.response_code == 400 and (b'Refresh token expired' in e.response_body or |
|
|
|
b'Token is not active' in e.response_body): |
|
|
|
self.get_token() |
|
|
|
else: |
|
|
|
raise |
|
|
|
refresh_token = self.token.get('refresh_token', None) |
|
|
|
if refresh_token is None: |
|
|
|
self.get_token() |
|
|
|
else: |
|
|
|
try: |
|
|
|
self.token = self.keycloak_openid.refresh_token(refresh_token) |
|
|
|
except KeycloakGetError as e: |
|
|
|
list_errors = [ |
|
|
|
b'Refresh token expired', |
|
|
|
b'Token is not active', |
|
|
|
b'Session not active' |
|
|
|
] |
|
|
|
if e.response_code == 400 and any(err in e.response_body for err in list_errors): |
|
|
|
self.get_token() |
|
|
|
else: |
|
|
|
raise |
|
|
|
|
|
|
|
self.connection.add_param_headers('Authorization', 'Bearer ' + self.token.get('access_token')) |
|
|
|
|
|
|
|
def get_client_all_sessions(self, client_id): |
|
|
|