|
@ -47,7 +47,7 @@ from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURC |
|
|
URL_ADMIN_REALM_ROLES_MEMBERS, URL_ADMIN_CLIENT_PROTOCOL_MAPPER, URL_ADMIN_CLIENT_SCOPES_MAPPERS, \ |
|
|
URL_ADMIN_REALM_ROLES_MEMBERS, URL_ADMIN_CLIENT_PROTOCOL_MAPPER, URL_ADMIN_CLIENT_SCOPES_MAPPERS, \ |
|
|
URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION, URL_ADMIN_FLOWS_EXECUTIONS_FLOW, URL_ADMIN_FLOWS_COPY, \ |
|
|
URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION, URL_ADMIN_FLOWS_EXECUTIONS_FLOW, URL_ADMIN_FLOWS_COPY, \ |
|
|
URL_ADMIN_FLOWS_ALIAS, URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER, URL_ADMIN_AUTHENTICATOR_CONFIG, \ |
|
|
URL_ADMIN_FLOWS_ALIAS, URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER, URL_ADMIN_AUTHENTICATOR_CONFIG, \ |
|
|
URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE |
|
|
|
|
|
|
|
|
URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE, URL_ADMIN_CLIENT_ALL_SESSIONS |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class KeycloakAdmin: |
|
|
class KeycloakAdmin: |
|
@ -362,7 +362,7 @@ class KeycloakAdmin: |
|
|
data_raw = self.raw_delete(URL_ADMIN_IDP.format(**params_path)) |
|
|
data_raw = self.raw_delete(URL_ADMIN_IDP.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
def create_user(self, payload): |
|
|
|
|
|
|
|
|
def create_user(self, payload, exist_ok=True): |
|
|
""" |
|
|
""" |
|
|
Create a new user. Username must be unique |
|
|
Create a new user. Username must be unique |
|
|
|
|
|
|
|
@ -370,15 +370,17 @@ class KeycloakAdmin: |
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_userrepresentation |
|
|
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_userrepresentation |
|
|
|
|
|
|
|
|
:param payload: UserRepresentation |
|
|
:param payload: UserRepresentation |
|
|
|
|
|
:param exist_ok: If False, raise KeycloakGetError if username already exists. Otherwise, return existing user ID. |
|
|
|
|
|
|
|
|
:return: UserRepresentation |
|
|
:return: UserRepresentation |
|
|
""" |
|
|
""" |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
|
|
|
|
|
exists = self.get_user_id(username=payload['username']) |
|
|
|
|
|
|
|
|
if exist_ok: |
|
|
|
|
|
exists = self.get_user_id(username=payload['username']) |
|
|
|
|
|
|
|
|
if exists is not None: |
|
|
|
|
|
return str(exists) |
|
|
|
|
|
|
|
|
if exists is not None: |
|
|
|
|
|
return str(exists) |
|
|
|
|
|
|
|
|
data_raw = self.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
data_raw = self.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
data=json.dumps(payload)) |
|
|
data=json.dumps(payload)) |
|
@ -537,7 +539,7 @@ class KeycloakAdmin: |
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} |
|
|
params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} |
|
|
data_raw = self.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), |
|
|
data_raw = self.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), |
|
|
data=payload, **params_query) |
|
|
|
|
|
|
|
|
data=json.dumps(payload), **params_query) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def send_verify_email(self, user_id, client_id=None, redirect_uri=None): |
|
|
def send_verify_email(self, user_id, client_id=None, redirect_uri=None): |
|
@ -1376,7 +1378,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS.format(**params_path), |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS.format(**params_path), |
|
|
data=payload) |
|
|
|
|
|
|
|
|
data=json.dumps(payload)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
|
def copy_authentication_flow(self, payload, flow_alias): |
|
|
def copy_authentication_flow(self, payload, flow_alias): |
|
@ -1390,7 +1392,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_COPY.format(**params_path), |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_COPY.format(**params_path), |
|
|
data=payload) |
|
|
|
|
|
|
|
|
data=json.dumps(payload)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
|
def get_authentication_flow_executions(self, flow_alias): |
|
|
def get_authentication_flow_executions(self, flow_alias): |
|
@ -1418,7 +1420,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
data_raw = self.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), |
|
|
data_raw = self.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), |
|
|
data=payload) |
|
|
|
|
|
|
|
|
data=json.dumps(payload)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
def create_authentication_flow_execution(self, payload, flow_alias): |
|
|
def create_authentication_flow_execution(self, payload, flow_alias): |
|
@ -1435,7 +1437,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION.format(**params_path), |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_EXEUCUTION.format(**params_path), |
|
|
data=payload) |
|
|
|
|
|
|
|
|
data=json.dumps(payload)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201]) |
|
|
|
|
|
|
|
|
def create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): |
|
|
def create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): |
|
@ -1453,7 +1455,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), |
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), |
|
|
data=payload) |
|
|
|
|
|
|
|
|
data=json.dumps(payload)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[201], skip_exists=skip_exists) |
|
|
|
|
|
|
|
|
def get_authenticator_config(self, config_id): |
|
|
def get_authenticator_config(self, config_id): |
|
@ -1827,3 +1829,18 @@ class KeycloakAdmin: |
|
|
else: |
|
|
else: |
|
|
raise |
|
|
raise |
|
|
self.connection.add_param_headers('Authorization', 'Bearer ' + self.token.get('access_token')) |
|
|
self.connection.add_param_headers('Authorization', 'Bearer ' + self.token.get('access_token')) |
|
|
|
|
|
|
|
|
|
|
|
def get_client_all_sessions(self, client_id): |
|
|
|
|
|
""" |
|
|
|
|
|
Get sessions associated with the client |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client |
|
|
|
|
|
|
|
|
|
|
|
UserSessionRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_usersessionrepresentation |
|
|
|
|
|
|
|
|
|
|
|
:return: UserSessionRepresentation |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "client-id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |