Browse Source

Update keycloak_admin.py

automatically refresh stale token
pull/32/head
Guillaume Troupel 5 years ago
parent
commit
d72340d677
  1. 236
      keycloak/keycloak_admin.py

236
keycloak/keycloak_admin.py

@ -25,6 +25,8 @@
# internal Keycloak server ID, usually a uuid string
import json
from builtins import isinstance
from typing import List, Iterable
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError
@ -45,7 +47,8 @@ class KeycloakAdmin:
PAGE_SIZE = 100
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli', verify=True, client_secret_key=None):
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli', verify=True, client_secret_key=None,
auto_refresh_token=None):
"""
:param server_url: Keycloak server url
@ -55,26 +58,41 @@ class KeycloakAdmin:
:param client_id: client id
:param verify: True if want check connection SSL
:param client_secret_key: client secret key
:param auto_refresh_token: list of methods that allows automatic token refresh. ex: ['get', 'put', 'post', 'delete']
"""
self._server_url = server_url
self._username = username
self._password = password
self._client_id = client_id
self._realm_name = realm_name
self._client_id = client_id
self._verify = verify
self._client_secret_key = client_secret_key
self._auto_refresh_token = auto_refresh_token or []
# Get token Admin
keycloak_openid = KeycloakOpenID(server_url=server_url, client_id=client_id, realm_name=realm_name,
verify=verify, client_secret_key=client_secret_key)
self.get_token()
self.keycloak_openid = KeycloakOpenID(server_url=self.server_url, client_id=self.client_id,
realm_name=self.realm_name, verify=self.verify,
client_secret_key=self.client_secret_key)
grant_type = ["password"]
if client_secret_key:
grant_type = ["client_credentials"]
self._token = keycloak_openid.token(username, password, grant_type=grant_type)
self._token = self.keycloak_openid.token(username, password, grant_type=grant_type)
self._connection = ConnectionManager(base_url=server_url,
headers={'Authorization': 'Bearer ' + self.token.get('access_token'),
'Content-Type': 'application/json'},
timeout=60,
verify=verify)
@property
def server_url(self):
return self._server_url
@server_url.setter
def server_url(self, value):
self._server_url = value
@property
def realm_name(self):
return self._realm_name
@ -99,6 +117,22 @@ class KeycloakAdmin:
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def verify(self):
return self._verify
@verify.setter
def verify(self, value):
self._verify = value
@property
def username(self):
return self._username
@ -123,6 +157,20 @@ class KeycloakAdmin:
def token(self, value):
self._token = value
@property
def auto_refresh_token(self):
return self._auto_refresh_token
@auto_refresh_token.setter
def auto_refresh_token(self, value):
allowed_methods = {'get', 'post', 'put', 'delete'}
if not isinstance(value, Iterable):
raise TypeError('Expected a list of strings among {allowed}'.format(allowed=allowed_methods))
if not any(method not in allowed_methods for method in value):
raise TypeError('Unexpected method, accepted methods are {allowed}'.format(allowed=allowed_methods))
self._auto_refresh_token = value
def __fetch_all(self, url, query=None):
'''Wrapper function to paginate GET requests
@ -144,7 +192,7 @@ class KeycloakAdmin:
while True:
query['first'] = page*self.PAGE_SIZE
partial_results = raise_error_from_response(
self.connection.raw_get(url, **query),
self.raw_get(url, **query),
KeycloakGetError)
if not partial_results:
break
@ -164,7 +212,7 @@ class KeycloakAdmin:
:return: RealmRepresentation
"""
data_raw = self.connection.raw_post(URL_ADMIN_REALMS,
data_raw = self.raw_post(URL_ADMIN_REALMS,
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201)
@ -174,7 +222,7 @@ class KeycloakAdmin:
:return: realms list
"""
data_raw = self.connection.raw_get(URL_ADMIN_REALMS)
data_raw = self.raw_get(URL_ADMIN_REALMS)
return raise_error_from_response(data_raw, KeycloakGetError)
def create_realm(self, payload, skip_exists=False):
@ -188,7 +236,7 @@ class KeycloakAdmin:
:return: Keycloak server response (UserRepresentation)
"""
data_raw = self.connection.raw_post(URL_ADMIN_REALMS,
data_raw = self.raw_post(URL_ADMIN_REALMS,
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists)
@ -212,7 +260,7 @@ class KeycloakAdmin:
:return: array IdentityProviderRepresentation
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_ADMIN_IDPS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_IDPS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def create_user(self, payload):
@ -233,7 +281,7 @@ class KeycloakAdmin:
if exists is not None:
return str(exists)
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_USERS.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201)
@ -244,7 +292,7 @@ class KeycloakAdmin:
:return: counter
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_user_id(self, username):
@ -274,7 +322,7 @@ class KeycloakAdmin:
:return: UserRepresentation
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_USER.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_user_groups(self, user_id):
@ -286,7 +334,7 @@ class KeycloakAdmin:
:return: user groups list
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_get(URL_ADMIN_USER_GROUPS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_USER_GROUPS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def update_user(self, user_id, payload):
@ -299,7 +347,7 @@ class KeycloakAdmin:
:return: Http response
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path),
data_raw = self.raw_put(URL_ADMIN_USER.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -312,7 +360,7 @@ class KeycloakAdmin:
:return: Http response
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_delete(URL_ADMIN_USER.format(**params_path))
data_raw = self.raw_delete(URL_ADMIN_USER.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def set_user_password(self, user_id, password, temporary=True):
@ -331,7 +379,7 @@ class KeycloakAdmin:
"""
payload = {"type": "password", "temporary": temporary, "value": password}
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_put(URL_ADMIN_RESET_PASSWORD.format(**params_path),
data_raw = self.raw_put(URL_ADMIN_RESET_PASSWORD.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -344,7 +392,7 @@ class KeycloakAdmin:
:return: consents
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_get(URL_ADMIN_USER_CONSENTS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_USER_CONSENTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def send_update_account(self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None):
@ -362,7 +410,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
params_query = {"client_id": client_id, "lifespan": lifespan, "redirect_uri": redirect_uri}
data_raw = self.connection.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path),
data_raw = self.raw_put(URL_ADMIN_SEND_UPDATE_ACCOUNT.format(**params_path),
data=payload, **params_query)
return raise_error_from_response(data_raw, KeycloakGetError)
@ -379,7 +427,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
params_query = {"client_id": client_id, "redirect_uri": redirect_uri}
data_raw = self.connection.raw_put(URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path),
data_raw = self.raw_put(URL_ADMIN_SEND_VERIFY_EMAIL.format(**params_path),
data={}, **params_query)
return raise_error_from_response(data_raw, KeycloakGetError)
@ -395,7 +443,7 @@ class KeycloakAdmin:
:return: UserSessionRepresentation
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_get(URL_ADMIN_GET_SESSIONS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_GET_SESSIONS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_server_info(self):
@ -407,7 +455,7 @@ class KeycloakAdmin:
:return: ServerInfoRepresentation
"""
data_raw = self.connection.raw_get(URL_ADMIN_SERVER_INFO)
data_raw = self.raw_get(URL_ADMIN_SERVER_INFO)
return raise_error_from_response(data_raw, KeycloakGetError)
def get_groups(self):
@ -432,7 +480,7 @@ class KeycloakAdmin:
:return: Keycloak server response (GroupRepresentation)
"""
params_path = {"realm-name": self.realm_name, "id": group_id}
data_raw = self.connection.raw_get(URL_ADMIN_GROUP.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_GROUP.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_subgroups(self, group, path):
@ -515,11 +563,11 @@ class KeycloakAdmin:
if parent is None:
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_GROUPS.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_GROUPS.format(**params_path),
data=json.dumps(payload))
else:
params_path = {"realm-name": self.realm_name, "id": parent, }
data_raw = self.connection.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists)
@ -538,7 +586,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": group_id}
data_raw = self.connection.raw_put(URL_ADMIN_GROUP.format(**params_path),
data_raw = self.raw_put(URL_ADMIN_GROUP.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -552,7 +600,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": group_id}
data_raw = self.connection.raw_put(URL_ADMIN_GROUP_PERMISSIONS.format(**params_path),
data_raw = self.raw_put(URL_ADMIN_GROUP_PERMISSIONS.format(**params_path),
data=json.dumps({"enabled": enabled}))
return raise_error_from_response(data_raw, KeycloakGetError)
@ -567,7 +615,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id}
data_raw = self.connection.raw_put(URL_ADMIN_USER_GROUP.format(**params_path), data=None)
data_raw = self.raw_put(URL_ADMIN_USER_GROUP.format(**params_path), data=None)
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def group_user_remove(self, user_id, group_id):
@ -581,7 +629,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": user_id, "group-id": group_id}
data_raw = self.connection.raw_delete(URL_ADMIN_USER_GROUP.format(**params_path))
data_raw = self.raw_delete(URL_ADMIN_USER_GROUP.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def delete_group(self, group_id):
@ -593,7 +641,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": group_id}
data_raw = self.connection.raw_delete(URL_ADMIN_GROUP.format(**params_path))
data_raw = self.raw_delete(URL_ADMIN_GROUP.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def get_clients(self):
@ -607,7 +655,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client(self, client_id):
@ -622,7 +670,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_id(self, client_name):
@ -653,7 +701,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT_AUTHZ_SETTINGS.format(**params_path))
return data_raw
def get_client_authz_resources(self, client_id):
@ -666,7 +714,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT_AUTHZ_RESOURCES.format(**params_path))
return data_raw
def create_client(self, payload, skip_exists=False):
@ -681,7 +729,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_CLIENTS.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_CLIENTS.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists)
@ -697,7 +745,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT.format(**params_path))
data_raw = self.raw_delete(URL_ADMIN_CLIENT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def get_realm_roles(self):
@ -711,7 +759,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_roles(self, client_id):
@ -727,7 +775,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_role(self, client_id, role_name):
@ -744,7 +792,7 @@ class KeycloakAdmin:
:return: role_id
"""
params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLE.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT_ROLE.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_role_id(self, client_id, role_name):
@ -778,7 +826,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_role_id}
data_raw = self.connection.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists)
@ -793,7 +841,7 @@ class KeycloakAdmin:
:param role_name: roles name (not id!)
"""
params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name}
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT_ROLE.format(**params_path))
data_raw = self.raw_delete(URL_ADMIN_CLIENT_ROLE.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def assign_client_role(self, user_id, client_id, roles):
@ -809,7 +857,7 @@ class KeycloakAdmin:
payload = roles if isinstance(roles, list) else [roles]
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id}
data_raw = self.connection.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -826,7 +874,7 @@ class KeycloakAdmin:
payload = roles if isinstance(roles, list) else [roles]
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_post(URL_ADMIN_USER_REALM_ROLES.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_USER_REALM_ROLES.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -862,7 +910,7 @@ class KeycloakAdmin:
def _get_client_roles_of_user(self, client_level_role_mapping_url, user_id, client_id):
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id}
data_raw = self.connection.raw_get(client_level_role_mapping_url.format(**params_path))
data_raw = self.raw_get(client_level_role_mapping_url.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def delete_client_roles_of_user(self, user_id, client_id, roles):
@ -877,7 +925,7 @@ class KeycloakAdmin:
"""
payload = roles if isinstance(roles, list) else [roles]
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id}
data_raw = self.connection.raw_delete(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path),
data_raw = self.raw_delete(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -891,7 +939,7 @@ class KeycloakAdmin:
:return: Keycloak server response (AuthenticationFlowRepresentation)
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_ADMIN_FLOWS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_FLOWS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def create_authentication_flow(self, payload, skip_exists=False):
@ -906,7 +954,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_FLOWS.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_FLOWS.format(**params_path),
data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists)
@ -917,7 +965,7 @@ class KeycloakAdmin:
:return: Response(json)
"""
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias}
data_raw = self.connection.raw_get(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def update_authentication_flow_executions(self, payload, flow_alias):
@ -932,7 +980,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias}
data_raw = self.connection.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path),
data_raw = self.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path),
data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -948,7 +996,7 @@ class KeycloakAdmin:
params_query = {"action": action}
params_path = {"realm-name": self.realm_name, "id": storage_id}
data_raw = self.connection.raw_post(URL_ADMIN_USER_STORAGE.format(**params_path),
data_raw = self.raw_post(URL_ADMIN_USER_STORAGE.format(**params_path),
data=json.dumps(data), **params_query)
return raise_error_from_response(data_raw, KeycloakGetError)
@ -961,7 +1009,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_SCOPES.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT_SCOPES.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_scope(self, client_scope_id):
@ -973,7 +1021,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_SCOPE.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT_SCOPE.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@ -988,7 +1036,8 @@ class KeycloakAdmin:
params_path = {"realm-name": self.realm_name, "scope-id": client_scope_id}
data_raw = self.connection.raw_post(URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload))
data_raw = self.raw_post(
URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201)
@ -1003,5 +1052,84 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_SECRETS.format(**params_path))
data_raw = self.raw_get(URL_ADMIN_CLIENT_SECRETS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def raw_get(self, *args, **kwargs):
"""
Calls connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
"""
r = self.connection.raw_get(*args, **kwargs)
if 'get' in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return self.connection.raw_get(*args, **kwargs)
return r
def raw_post(self, *args, **kwargs):
"""
Calls connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
"""
r = self.connection.raw_post(*args, **kwargs)
if 'post' in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return self.connection.raw_post(*args, **kwargs)
return r
def raw_put(self, *args, **kwargs):
"""
Calls connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
"""
r = self.connection.raw_put(*args, **kwargs)
if 'put' in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return self.connection.raw_put(*args, **kwargs)
return r
def raw_delete(self, *args, **kwargs):
"""
Calls connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired, it will refresh the token
and try *delete* once more.
"""
r = self.connection.raw_delete(*args, **kwargs)
if 'delete' in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return self.connection.raw_delete(*args, **kwargs)
return r
def get_token(self):
self.keycloak_openid = KeycloakOpenID(server_url=self.server_url, client_id=self.client_id,
realm_name=self.realm_name, verify=self.verify,
client_secret_key=self.client_secret_key)
grant_type = ["password"]
if self.client_secret_key:
grant_type = ["client_credentials"]
self._token = self.keycloak_openid.token(self.username, self.password, grant_type=grant_type)
self._connection = ConnectionManager(base_url=self.server_url,
headers={'Authorization': 'Bearer ' + self.token.get('access_token'),
'Content-Type': 'application/json'},
timeout=60,
verify=self.verify)
def refresh_token(self):
refresh_token = self.token.get('refresh_token')
try:
self.token = self.keycloak_openid.refresh_token(refresh_token)
except KeycloakGetError as e:
if e.response_code == 400 and b'Refresh token expired' in e.response_body:
self.get_token()
else:
raise
self.connection.add_param_headers('Authorization', 'Bearer ' + self.token.get('access_token'))
Loading…
Cancel
Save