|
|
@ -42,6 +42,8 @@ from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURC |
|
|
|
|
|
|
|
class KeycloakAdmin: |
|
|
|
|
|
|
|
PAGE_SIZE = 100 |
|
|
|
|
|
|
|
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli', verify=True, client_secret_key=None): |
|
|
|
""" |
|
|
|
|
|
|
@ -120,6 +122,35 @@ class KeycloakAdmin: |
|
|
|
def token(self, value): |
|
|
|
self._token = value |
|
|
|
|
|
|
|
|
|
|
|
def __fetch_all(self, url, query=None): |
|
|
|
'''Wrapper function to paginate GET requests |
|
|
|
|
|
|
|
:param url: The url on which the query is executed |
|
|
|
:param query: Existing query parameters (optional) |
|
|
|
|
|
|
|
:return: Combined results of paginated queries |
|
|
|
''' |
|
|
|
results = [] |
|
|
|
|
|
|
|
# initalize query if it was called with None |
|
|
|
if not query: |
|
|
|
query = {} |
|
|
|
page = 0 |
|
|
|
query['max'] = self.PAGE_SIZE |
|
|
|
|
|
|
|
# fetch until we can |
|
|
|
while True: |
|
|
|
query['first'] = page*self.PAGE_SIZE |
|
|
|
partial_results = raise_error_from_response( |
|
|
|
self.connection.raw_get(url, **query), |
|
|
|
KeycloakGetError) |
|
|
|
if not partial_results: |
|
|
|
break |
|
|
|
results.extend(partial_results) |
|
|
|
page += 1 |
|
|
|
return results |
|
|
|
|
|
|
|
def import_realm(self, payload): |
|
|
|
""" |
|
|
|
Import a new realm from a RealmRepresentation. Realm name must be unique. |
|
|
@ -168,8 +199,7 @@ class KeycloakAdmin: |
|
|
|
:return: users list |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path), **query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
return self.__fetch_all(URL_ADMIN_USERS.format(**params_path), query) |
|
|
|
|
|
|
|
def get_idps(self): |
|
|
|
""" |
|
|
@ -389,8 +419,7 @@ class KeycloakAdmin: |
|
|
|
:return: array GroupRepresentation |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GROUPS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
return self.__fetch_all(URL_ADMIN_GROUPS.format(**params_path)) |
|
|
|
|
|
|
|
def get_group(self, group_id): |
|
|
|
""" |
|
|
@ -439,8 +468,7 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response (UserRepresentation) |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": group_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GROUP_MEMBERS.format(**params_path), **query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
return self.__fetch_all(URL_ADMIN_GROUP_MEMBERS.format(**params_path), query) |
|
|
|
|
|
|
|
def get_group_by_path(self, path, search_in_subgroups=False): |
|
|
|
""" |
|
|
|