|
|
@ -25,6 +25,8 @@ |
|
|
|
# internal Keycloak server ID, usually a uuid string |
|
|
|
|
|
|
|
import json |
|
|
|
from builtins import isinstance |
|
|
|
from typing import List, Iterable |
|
|
|
|
|
|
|
from .connection import ConnectionManager |
|
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
|
@ -44,9 +46,22 @@ from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURC |
|
|
|
class KeycloakAdmin: |
|
|
|
|
|
|
|
PAGE_SIZE = 100 |
|
|
|
|
|
|
|
_server_url = None |
|
|
|
_username = None |
|
|
|
_password = None |
|
|
|
_realm_name = None |
|
|
|
_client_id = None |
|
|
|
_verify = None |
|
|
|
_client_secret_key = None |
|
|
|
_auto_refresh_token = None |
|
|
|
_connection = None |
|
|
|
_token = None |
|
|
|
_custom_headers = None |
|
|
|
_user_realm_name = None |
|
|
|
|
|
|
|
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli', verify=True, |
|
|
|
client_secret_key=None, custom_headers=None, user_realm_name=None): |
|
|
|
client_secret_key=None, custom_headers=None, user_realm_name=None, auto_refresh_token=None): |
|
|
|
""" |
|
|
|
|
|
|
|
:param server_url: Keycloak server url |
|
|
@ -57,33 +72,29 @@ class KeycloakAdmin: |
|
|
|
:param verify: True if want check connection SSL |
|
|
|
:param client_secret_key: client secret key |
|
|
|
:param custom_headers: dict of custom header to pass to each HTML request |
|
|
|
""" |
|
|
|
self._username = username |
|
|
|
self._password = password |
|
|
|
self._client_id = client_id |
|
|
|
self._realm_name = realm_name |
|
|
|
:param auto_refresh_token: list of methods that allows automatic token refresh. ex: ['get', 'put', 'post', 'delete'] |
|
|
|
""" |
|
|
|
self.server_url = server_url |
|
|
|
self.username = username |
|
|
|
self.password = password |
|
|
|
self.realm_name = realm_name |
|
|
|
self.client_id = client_id |
|
|
|
self.verify = verify |
|
|
|
self.client_secret_key = client_secret_key |
|
|
|
self.auto_refresh_token = auto_refresh_token or [] |
|
|
|
self.user_realm_name = user_realm_name |
|
|
|
self.custom_headers = custom_headers |
|
|
|
|
|
|
|
# Get token Admin |
|
|
|
keycloak_openid = KeycloakOpenID(server_url=server_url, client_id=client_id, realm_name=user_realm_name or realm_name, |
|
|
|
verify=verify, client_secret_key=client_secret_key, |
|
|
|
custom_headers=custom_headers) |
|
|
|
|
|
|
|
grant_type = ["password"] |
|
|
|
if client_secret_key: |
|
|
|
grant_type = ["client_credentials"] |
|
|
|
self._token = keycloak_openid.token(username, password, grant_type=grant_type) |
|
|
|
headers = { |
|
|
|
'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
|
'Content-Type': 'application/json' |
|
|
|
} |
|
|
|
if custom_headers is not None: |
|
|
|
# merge custom headers to main headers |
|
|
|
headers.update(custom_headers) |
|
|
|
self.get_token() |
|
|
|
|
|
|
|
self._connection = ConnectionManager(base_url=server_url, |
|
|
|
headers=headers, |
|
|
|
timeout=60, |
|
|
|
verify=verify) |
|
|
|
@property |
|
|
|
def server_url(self): |
|
|
|
return self._server_url |
|
|
|
|
|
|
|
@server_url.setter |
|
|
|
def server_url(self, value): |
|
|
|
self._server_url = value |
|
|
|
|
|
|
|
@property |
|
|
|
def realm_name(self): |
|
|
@ -109,6 +120,22 @@ class KeycloakAdmin: |
|
|
|
def client_id(self, value): |
|
|
|
self._client_id = value |
|
|
|
|
|
|
|
@property |
|
|
|
def client_secret_key(self): |
|
|
|
return self._client_secret_key |
|
|
|
|
|
|
|
@client_secret_key.setter |
|
|
|
def client_secret_key(self, value): |
|
|
|
self._client_secret_key = value |
|
|
|
|
|
|
|
@property |
|
|
|
def verify(self): |
|
|
|
return self._verify |
|
|
|
|
|
|
|
@verify.setter |
|
|
|
def verify(self, value): |
|
|
|
self._verify = value |
|
|
|
|
|
|
|
@property |
|
|
|
def username(self): |
|
|
|
return self._username |
|
|
@ -133,6 +160,36 @@ class KeycloakAdmin: |
|
|
|
def token(self, value): |
|
|
|
self._token = value |
|
|
|
|
|
|
|
@property |
|
|
|
def auto_refresh_token(self): |
|
|
|
return self._auto_refresh_token |
|
|
|
|
|
|
|
@property |
|
|
|
def user_realm_name(self): |
|
|
|
return self._user_realm_name |
|
|
|
|
|
|
|
@user_realm_name.setter |
|
|
|
def user_realm_name(self, value): |
|
|
|
self._user_realm_name = value |
|
|
|
|
|
|
|
@property |
|
|
|
def custom_headers(self): |
|
|
|
return self._custom_headers |
|
|
|
|
|
|
|
@custom_headers.setter |
|
|
|
def custom_headers(self, value): |
|
|
|
self._custom_headers = value |
|
|
|
|
|
|
|
@auto_refresh_token.setter |
|
|
|
def auto_refresh_token(self, value): |
|
|
|
allowed_methods = {'get', 'post', 'put', 'delete'} |
|
|
|
if not isinstance(value, Iterable): |
|
|
|
raise TypeError('Expected a list of strings among {allowed}'.format(allowed=allowed_methods)) |
|
|
|
if not all(method in allowed_methods for method in value): |
|
|
|
raise TypeError('Unexpected method in auto_refresh_token, accepted methods are {allowed}'.format(allowed=allowed_methods)) |
|
|
|
|
|
|
|
self._auto_refresh_token = value |
|
|
|
|
|
|
|
|
|
|
|
def __fetch_all(self, url, query=None): |
|
|
|
'''Wrapper function to paginate GET requests |
|
|
@ -154,7 +211,7 @@ class KeycloakAdmin: |
|
|
|
while True: |
|
|
|
query['first'] = page*self.PAGE_SIZE |
|
|
|
partial_results = raise_error_from_response( |
|
|
|
self.connection.raw_get(url, **query), |
|
|
|
self.raw_get(url, **query), |
|
|
|
KeycloakGetError) |
|
|
|
if not partial_results: |
|
|
|
break |
|
|
@ -174,8 +231,8 @@ class KeycloakAdmin: |
|
|
|
:return: RealmRepresentation |
|
|
|
""" |
|
|
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_REALMS, |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_REALMS, |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
def get_realms(self): |
|
|
@ -184,7 +241,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
:return: realms list |
|
|
|
""" |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_REALMS) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_REALMS) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def create_realm(self, payload, skip_exists=False): |
|
|
@ -198,8 +255,8 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response (RealmRepresentation) |
|
|
|
""" |
|
|
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_REALMS, |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_REALMS, |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists) |
|
|
|
|
|
|
|
|
|
|
@ -222,7 +279,7 @@ class KeycloakAdmin: |
|
|
|
:return: array IdentityProviderRepresentation |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_IDPS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_IDPS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def create_user(self, payload): |
|
|
@ -243,8 +300,8 @@ class KeycloakAdmin: |
|
|
|
if exists is not None: |
|
|
|
return str(exists) |
|
|
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
def users_count(self): |
|
|
@ -254,7 +311,7 @@ class KeycloakAdmin: |
|
|
|
:return: counter |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_user_id(self, username): |
|
|
@ -284,7 +341,7 @@ class KeycloakAdmin: |
|
|
|
:return: UserRepresentation |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_USER.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_user_groups(self, user_id): |
|
|
@ -296,7 +353,7 @@ class KeycloakAdmin: |
|
|
|
:return: user groups list |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER_GROUPS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_USER_GROUPS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def update_user(self, user_id, payload): |
|
|
@ -309,8 +366,8 @@ class KeycloakAdmin: |
|
|
|
:return: Http response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_USER.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def delete_user(self, user_id): |
|
|
@ -322,7 +379,7 @@ class KeycloakAdmin: |
|
|
|
:return: Http response |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_USER.format(**params_path)) |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_USER.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def set_user_password(self, user_id, password, temporary=True): |
|
|
@ -341,8 +398,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
payload = {"type": "password", "temporary": temporary, "value": password} |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_RESET_PASSWORD.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_RESET_PASSWORD.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def consents_user(self, user_id): |
|
|
@ -354,7 +411,7 @@ class KeycloakAdmin: |
|
|
|
:return: consents |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER_CONSENTS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_USER_CONSENTS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def send_update_account(self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None): |
|
|
@ -372,8 +429,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), |
|
|
|
data=payload, **params_query) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path), |
|
|
|
data=payload, **params_query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def send_verify_email(self, user_id, client_id=None, redirect_uri=None): |
|
|
@ -389,8 +446,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
params_query = {"client_id": client_id, "redirect_uri": redirect_uri} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path), |
|
|
|
data={}, **params_query) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path), |
|
|
|
data={}, **params_query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_sessions(self, user_id): |
|
|
@ -405,7 +462,7 @@ class KeycloakAdmin: |
|
|
|
:return: UserSessionRepresentation |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GET_SESSIONS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_GET_SESSIONS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_server_info(self): |
|
|
@ -417,7 +474,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
:return: ServerInfoRepresentation |
|
|
|
""" |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_SERVER_INFO) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_SERVER_INFO) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_groups(self): |
|
|
@ -442,7 +499,7 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response (GroupRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GROUP.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_GROUP.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_subgroups(self, group, path): |
|
|
@ -525,12 +582,12 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
if parent is None: |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_GROUPS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_GROUPS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
else: |
|
|
|
params_path = {"realm-name": self.realm_name, "id": parent, } |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists) |
|
|
|
|
|
|
@ -548,8 +605,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_GROUP.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_GROUP.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def group_set_permissions(self, group_id, enabled=True): |
|
|
@ -562,8 +619,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_GROUP_PERMISSIONS.format(**params_path), |
|
|
|
data=json.dumps({"enabled": enabled})) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_GROUP_PERMISSIONS.format(**params_path), |
|
|
|
data=json.dumps({"enabled": enabled})) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def group_user_add(self, user_id, group_id): |
|
|
@ -577,7 +634,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER_GROUP.format(**params_path), data=None) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_USER_GROUP.format(**params_path), data=None) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def group_user_remove(self, user_id, group_id): |
|
|
@ -591,7 +648,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id} |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_USER_GROUP.format(**params_path)) |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_USER_GROUP.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def delete_group(self, group_id): |
|
|
@ -603,7 +660,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_GROUP.format(**params_path)) |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_GROUP.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def get_clients(self): |
|
|
@ -617,7 +674,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client(self, client_id): |
|
|
@ -632,7 +689,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client_id(self, client_name): |
|
|
@ -663,7 +720,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path)) |
|
|
|
return data_raw |
|
|
|
|
|
|
|
def get_client_authz_resources(self, client_id): |
|
|
@ -676,7 +733,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path)) |
|
|
|
return data_raw |
|
|
|
|
|
|
|
def create_client(self, payload, skip_exists=False): |
|
|
@ -691,8 +748,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_CLIENTS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_CLIENTS.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists) |
|
|
|
|
|
|
|
def update_client(self, client_id, payload): |
|
|
@ -721,7 +778,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def get_realm_roles(self): |
|
|
@ -735,7 +792,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client_roles(self, client_id): |
|
|
@ -751,7 +808,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client_role(self, client_id, role_name): |
|
|
@ -768,7 +825,7 @@ class KeycloakAdmin: |
|
|
|
:return: role_id |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLE.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_ROLE.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client_role_id(self, client_id, role_name): |
|
|
@ -802,8 +859,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_role_id} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists) |
|
|
|
|
|
|
|
def delete_client_role(self, client_role_id, role_name): |
|
|
@ -817,7 +874,7 @@ class KeycloakAdmin: |
|
|
|
:param role_name: role’s name (not id!) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name} |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT_ROLE.format(**params_path)) |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_CLIENT_ROLE.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def assign_client_role(self, user_id, client_id, roles): |
|
|
@ -833,8 +890,8 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
payload = roles if isinstance(roles, list) else [roles] |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def create_realm_role(self, payload, skip_exists=False): |
|
|
@ -865,8 +922,8 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
payload = roles if isinstance(roles, list) else [roles] |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USER_REALM_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_USER_REALM_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def get_client_roles_of_user(self, user_id, client_id): |
|
|
@ -901,7 +958,7 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
def _get_client_roles_of_user(self, client_level_role_mapping_url, user_id, client_id): |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} |
|
|
|
data_raw = self.connection.raw_get(client_level_role_mapping_url.format(**params_path)) |
|
|
|
data_raw = self.raw_get(client_level_role_mapping_url.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def delete_client_roles_of_user(self, user_id, client_id, roles): |
|
|
@ -916,8 +973,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
payload = roles if isinstance(roles, list) else [roles] |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_delete(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def get_authentication_flows(self): |
|
|
@ -930,7 +987,7 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response (AuthenticationFlowRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_FLOWS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_FLOWS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def create_authentication_flow(self, payload, skip_exists=False): |
|
|
@ -945,8 +1002,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_FLOWS.format(**params_path), |
|
|
|
data=payload) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_FLOWS.format(**params_path), |
|
|
|
data=payload) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists) |
|
|
|
|
|
|
|
def get_authentication_flow_executions(self, flow_alias): |
|
|
@ -956,7 +1013,7 @@ class KeycloakAdmin: |
|
|
|
:return: Response(json) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def update_authentication_flow_executions(self, payload, flow_alias): |
|
|
@ -971,8 +1028,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), |
|
|
|
data=payload) |
|
|
|
data_raw = self.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path), |
|
|
|
data=payload) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def sync_users(self, storage_id, action): |
|
|
@ -987,8 +1044,8 @@ class KeycloakAdmin: |
|
|
|
params_query = {"action": action} |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": storage_id} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USER_STORAGE.format(**params_path), |
|
|
|
data=json.dumps(data), **params_query) |
|
|
|
data_raw = self.raw_post(URL_ADMIN_USER_STORAGE.format(**params_path), |
|
|
|
data=json.dumps(data), **params_query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client_scopes(self): |
|
|
@ -1000,7 +1057,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_SCOPES.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_SCOPES.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def get_client_scope(self, client_scope_id): |
|
|
@ -1012,7 +1069,7 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_SCOPE.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_SCOPE.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
@ -1027,7 +1084,8 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id} |
|
|
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload)) |
|
|
|
data_raw = self.raw_post( |
|
|
|
URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload)) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
@ -1042,5 +1100,95 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_SECRETS.format(**params_path)) |
|
|
|
data_raw = self.raw_get(URL_ADMIN_CLIENT_SECRETS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def raw_get(self, *args, **kwargs): |
|
|
|
""" |
|
|
|
Calls connection.raw_get. |
|
|
|
|
|
|
|
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token |
|
|
|
and try *get* once more. |
|
|
|
""" |
|
|
|
r = self.connection.raw_get(*args, **kwargs) |
|
|
|
if 'get' in self.auto_refresh_token and r.status_code == 401: |
|
|
|
self.refresh_token() |
|
|
|
return self.connection.raw_get(*args, **kwargs) |
|
|
|
return r |
|
|
|
|
|
|
|
def raw_post(self, *args, **kwargs): |
|
|
|
""" |
|
|
|
Calls connection.raw_post. |
|
|
|
|
|
|
|
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token |
|
|
|
and try *post* once more. |
|
|
|
""" |
|
|
|
r = self.connection.raw_post(*args, **kwargs) |
|
|
|
if 'post' in self.auto_refresh_token and r.status_code == 401: |
|
|
|
self.refresh_token() |
|
|
|
return self.connection.raw_post(*args, **kwargs) |
|
|
|
return r |
|
|
|
|
|
|
|
def raw_put(self, *args, **kwargs): |
|
|
|
""" |
|
|
|
Calls connection.raw_put. |
|
|
|
|
|
|
|
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token |
|
|
|
and try *put* once more. |
|
|
|
""" |
|
|
|
r = self.connection.raw_put(*args, **kwargs) |
|
|
|
if 'put' in self.auto_refresh_token and r.status_code == 401: |
|
|
|
self.refresh_token() |
|
|
|
return self.connection.raw_put(*args, **kwargs) |
|
|
|
return r |
|
|
|
|
|
|
|
def raw_delete(self, *args, **kwargs): |
|
|
|
""" |
|
|
|
Calls connection.raw_delete. |
|
|
|
|
|
|
|
If auto_refresh is set for *delete* and *access_token* is expired, it will refresh the token |
|
|
|
and try *delete* once more. |
|
|
|
""" |
|
|
|
r = self.connection.raw_delete(*args, **kwargs) |
|
|
|
if 'delete' in self.auto_refresh_token and r.status_code == 401: |
|
|
|
self.refresh_token() |
|
|
|
return self.connection.raw_delete(*args, **kwargs) |
|
|
|
return r |
|
|
|
|
|
|
|
def get_token(self): |
|
|
|
self.keycloak_openid = KeycloakOpenID(server_url=self.server_url, client_id=self.client_id, |
|
|
|
realm_name=self.user_realm_name or self.realm_name, verify=self.verify, |
|
|
|
client_secret_key=self.client_secret_key |
|
|
|
custom_headers=self.custom_headers) |
|
|
|
|
|
|
|
grant_type = ["password"] |
|
|
|
if self.client_secret_key: |
|
|
|
grant_type = ["client_credentials"] |
|
|
|
|
|
|
|
self._token = self.keycloak_openid.token(self.username, self.password, grant_type=grant_type) |
|
|
|
|
|
|
|
headers = { |
|
|
|
'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
|
'Content-Type': 'application/json' |
|
|
|
} |
|
|
|
|
|
|
|
if self.custom_headers is not None: |
|
|
|
# merge custom headers to main headers |
|
|
|
headers.update(self.custom_headers) |
|
|
|
|
|
|
|
self._connection = ConnectionManager(base_url=self.server_url, |
|
|
|
headers=headers, |
|
|
|
timeout=60, |
|
|
|
verify=self.verify) |
|
|
|
|
|
|
|
def refresh_token(self): |
|
|
|
refresh_token = self.token.get('refresh_token') |
|
|
|
try: |
|
|
|
self.token = self.keycloak_openid.refresh_token(refresh_token) |
|
|
|
except KeycloakGetError as e: |
|
|
|
if e.response_code == 400 and b'Refresh token expired' in e.response_body: |
|
|
|
self.get_token() |
|
|
|
else: |
|
|
|
raise |
|
|
|
self.connection.add_param_headers('Authorization', 'Bearer ' + self.token.get('access_token')) |