@ -25,6 +25,8 @@
# internal Keycloak server ID, usually a uuid string
import json
from builtins import isinstance
from typing import List , Iterable
from .connection import ConnectionManager
from .exceptions import raise_error_from_response , KeycloakGetError
@ -44,8 +46,22 @@ from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURC
class KeycloakAdmin :
PAGE_SIZE = 100
def __init__ ( self , server_url , username , password , realm_name = ' master ' , client_id = ' admin-cli ' , verify = True , client_secret_key = None ) :
_server_url = None
_username = None
_password = None
_realm_name = None
_client_id = None
_verify = None
_client_secret_key = None
_auto_refresh_token = None
_connection = None
_token = None
_custom_headers = None
_user_realm_name = None
def __init__ ( self , server_url , username , password , realm_name = ' master ' , client_id = ' admin-cli ' , verify = True ,
client_secret_key = None , custom_headers = None , user_realm_name = None , auto_refresh_token = None ) :
"""
: param server_url : Keycloak server url
@ -55,25 +71,30 @@ class KeycloakAdmin:
: param client_id : client id
: param verify : True if want check connection SSL
: param client_secret_key : client secret key
"""
self . _username = username
self . _password = password
self . _client_id = client_id
self . _realm_name = realm_name
: param custom_headers : dict of custom header to pass to each HTML request
: param auto_refresh_token : list of methods that allows automatic token refresh . ex : [ ' get ' , ' put ' , ' post ' , ' delete ' ]
"""
self . server_url = server_url
self . username = username
self . password = password
self . realm_name = realm_name
self . client_id = client_id
self . verify = verify
self . client_secret_key = client_secret_key
self . auto_refresh_token = auto_refresh_token or [ ]
self . user_realm_name = user_realm_name
self . custom_headers = custom_headers
# Get token Admin
keycloak_openid = KeycloakOpenID ( server_url = server_url , client_id = client_id , realm_name = realm_name ,
verify = verify , client_secret_key = client_secret_key )
self . get_token ( )
grant_type = [ " password " ]
if client_secret_key :
grant_type = [ " client_credentials " ]
self . _token = keycloak_openid . token ( username , password , grant_type = grant_type )
self . _connection = ConnectionManager ( base_url = server_url ,
headers = { ' Authorization ' : ' Bearer ' + self . token . get ( ' access_token ' ) ,
' Content-Type ' : ' application/json ' } ,
timeout = 60 ,
verify = verify )
@property
def server_url ( self ) :
return self . _server_url
@server_url.setter
def server_url ( self , value ) :
self . _server_url = value
@property
def realm_name ( self ) :
@ -99,6 +120,22 @@ class KeycloakAdmin:
def client_id ( self , value ) :
self . _client_id = value
@property
def client_secret_key ( self ) :
return self . _client_secret_key
@client_secret_key.setter
def client_secret_key ( self , value ) :
self . _client_secret_key = value
@property
def verify ( self ) :
return self . _verify
@verify.setter
def verify ( self , value ) :
self . _verify = value
@property
def username ( self ) :
return self . _username
@ -123,6 +160,36 @@ class KeycloakAdmin:
def token ( self , value ) :
self . _token = value
@property
def auto_refresh_token ( self ) :
return self . _auto_refresh_token
@property
def user_realm_name ( self ) :
return self . _user_realm_name
@user_realm_name.setter
def user_realm_name ( self , value ) :
self . _user_realm_name = value
@property
def custom_headers ( self ) :
return self . _custom_headers
@custom_headers.setter
def custom_headers ( self , value ) :
self . _custom_headers = value
@auto_refresh_token.setter
def auto_refresh_token ( self , value ) :
allowed_methods = { ' get ' , ' post ' , ' put ' , ' delete ' }
if not isinstance ( value , Iterable ) :
raise TypeError ( ' Expected a list of strings among {allowed} ' . format ( allowed = allowed_methods ) )
if not all ( method in allowed_methods for method in value ) :
raise TypeError ( ' Unexpected method in auto_refresh_token, accepted methods are {allowed} ' . format ( allowed = allowed_methods ) )
self . _auto_refresh_token = value
def __fetch_all ( self , url , query = None ) :
''' Wrapper function to paginate GET requests
@ -144,7 +211,7 @@ class KeycloakAdmin:
while True :
query [ ' first ' ] = page * self . PAGE_SIZE
partial_results = raise_error_from_response (
self . connection . raw_get ( url , * * query ) ,
self . raw_get ( url , * * query ) ,
KeycloakGetError )
if not partial_results :
break
@ -164,8 +231,8 @@ class KeycloakAdmin:
: return : RealmRepresentation
"""
data_raw = self . connection . raw_post ( URL_ADMIN_REALMS ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_REALMS ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 )
def get_realms ( self ) :
@ -174,22 +241,22 @@ class KeycloakAdmin:
: return : realms list
"""
data_raw = self . connection . raw_get ( URL_ADMIN_REALMS )
data_raw = self . raw_get ( URL_ADMIN_REALMS )
return raise_error_from_response ( data_raw , KeycloakGetError )
def create_realm ( self , payload , skip_exists = False ) :
"""
Create a client
Create a realm
ClientRepresentation : http : / / www . keycloak . org / docs - api / 3.3 / rest - api / index . html #_realmrepresentation
: param skip_exists : Skip if Realm already exist .
: param payload : RealmRepresentation
: return : Keycloak server response ( User Representation)
: return : Keycloak server response ( Realm Representation)
"""
data_raw = self . connection . raw_post ( URL_ADMIN_REALMS ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_REALMS ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
@ -212,7 +279,7 @@ class KeycloakAdmin:
: return : array IdentityProviderRepresentation
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_ADMIN_IDPS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_IDPS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def create_user ( self , payload ) :
@ -233,8 +300,8 @@ class KeycloakAdmin:
if exists is not None :
return str ( exists )
data_raw = self . connection . raw_post ( URL_ADMIN_USERS . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_USERS . format ( * * params_path ) ,
data = json . dumps ( payload ) )
raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 )
_last_slash_idx = data_raw . headers [ ' Location ' ] . rindex ( ' / ' )
return data_raw . headers [ ' Location ' ] [ _last_slash_idx + 1 : ]
@ -246,7 +313,7 @@ class KeycloakAdmin:
: return : counter
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_ADMIN_USERS_COUNT . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_USERS_COUNT . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_user_id ( self , username ) :
@ -276,7 +343,7 @@ class KeycloakAdmin:
: return : UserRepresentation
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_get ( URL_ADMIN_USER . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_USER . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_user_groups ( self , user_id ) :
@ -288,7 +355,7 @@ class KeycloakAdmin:
: return : user groups list
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_get ( URL_ADMIN_USER_GROUPS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_USER_GROUPS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def update_user ( self , user_id , payload ) :
@ -301,8 +368,8 @@ class KeycloakAdmin:
: return : Http response
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_put ( URL_ADMIN_USER . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_put ( URL_ADMIN_USER . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def delete_user ( self , user_id ) :
@ -314,7 +381,7 @@ class KeycloakAdmin:
: return : Http response
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_delete ( URL_ADMIN_USER . format ( * * params_path ) )
data_raw = self . raw_delete ( URL_ADMIN_USER . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def set_user_password ( self , user_id , password , temporary = True ) :
@ -333,8 +400,8 @@ class KeycloakAdmin:
"""
payload = { " type " : " password " , " temporary " : temporary , " value " : password }
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_put ( URL_ADMIN_RESET_PASSWORD . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_put ( URL_ADMIN_RESET_PASSWORD . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def consents_user ( self , user_id ) :
@ -346,7 +413,7 @@ class KeycloakAdmin:
: return : consents
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_get ( URL_ADMIN_USER_CONSENTS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_USER_CONSENTS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def send_update_account ( self , user_id , payload , client_id = None , lifespan = None , redirect_uri = None ) :
@ -364,8 +431,8 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
params_query = { " client_id " : client_id , " lifespan " : lifespan , " redirect_uri " : redirect_uri }
data_raw = self . connection . raw_put ( URL_ADMIN_SEND_UPDATE_ACCOUNT . format ( * * params_path ) ,
data = payload , * * params_query )
data_raw = self . raw_put ( URL_ADMIN_SEND_UPDATE_ACCOUNT . format ( * * params_path ) ,
data = payload , * * params_query )
return raise_error_from_response ( data_raw , KeycloakGetError )
def send_verify_email ( self , user_id , client_id = None , redirect_uri = None ) :
@ -381,8 +448,8 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
params_query = { " client_id " : client_id , " redirect_uri " : redirect_uri }
data_raw = self . connection . raw_put ( URL_ADMIN_SEND_VERIFY_EMAIL . format ( * * params_path ) ,
data = { } , * * params_query )
data_raw = self . raw_put ( URL_ADMIN_SEND_VERIFY_EMAIL . format ( * * params_path ) ,
data = { } , * * params_query )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_sessions ( self , user_id ) :
@ -397,7 +464,7 @@ class KeycloakAdmin:
: return : UserSessionRepresentation
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_get ( URL_ADMIN_GET_SESSIONS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_GET_SESSIONS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_server_info ( self ) :
@ -409,7 +476,7 @@ class KeycloakAdmin:
: return : ServerInfoRepresentation
"""
data_raw = self . connection . raw_get ( URL_ADMIN_SERVER_INFO )
data_raw = self . raw_get ( URL_ADMIN_SERVER_INFO )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_groups ( self ) :
@ -434,7 +501,7 @@ class KeycloakAdmin:
: return : Keycloak server response ( GroupRepresentation )
"""
params_path = { " realm-name " : self . realm_name , " id " : group_id }
data_raw = self . connection . raw_get ( URL_ADMIN_GROUP . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_GROUP . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_subgroups ( self , group , path ) :
@ -517,12 +584,12 @@ class KeycloakAdmin:
if parent is None :
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_post ( URL_ADMIN_GROUPS . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_GROUPS . format ( * * params_path ) ,
data = json . dumps ( payload ) )
else :
params_path = { " realm-name " : self . realm_name , " id " : parent , }
data_raw = self . connection . raw_post ( URL_ADMIN_GROUP_CHILD . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_GROUP_CHILD . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
@ -540,8 +607,8 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : group_id }
data_raw = self . connection . raw_put ( URL_ADMIN_GROUP . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_put ( URL_ADMIN_GROUP . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def group_set_permissions ( self , group_id , enabled = True ) :
@ -554,8 +621,8 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : group_id }
data_raw = self . connection . raw_put ( URL_ADMIN_GROUP_PERMISSIONS . format ( * * params_path ) ,
data = json . dumps ( { " enabled " : enabled } ) )
data_raw = self . raw_put ( URL_ADMIN_GROUP_PERMISSIONS . format ( * * params_path ) ,
data = json . dumps ( { " enabled " : enabled } ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def group_user_add ( self , user_id , group_id ) :
@ -569,7 +636,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id , " group-id " : group_id }
data_raw = self . connection . raw_put ( URL_ADMIN_USER_GROUP . format ( * * params_path ) , data = None )
data_raw = self . raw_put ( URL_ADMIN_USER_GROUP . format ( * * params_path ) , data = None )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def group_user_remove ( self , user_id , group_id ) :
@ -583,7 +650,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : user_id , " group-id " : group_id }
data_raw = self . connection . raw_delete ( URL_ADMIN_USER_GROUP . format ( * * params_path ) )
data_raw = self . raw_delete ( URL_ADMIN_USER_GROUP . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def delete_group ( self , group_id ) :
@ -595,7 +662,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : group_id }
data_raw = self . connection . raw_delete ( URL_ADMIN_GROUP . format ( * * params_path ) )
data_raw = self . raw_delete ( URL_ADMIN_GROUP . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def get_clients ( self ) :
@ -609,7 +676,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENTS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENTS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_client ( self , client_id ) :
@ -624,7 +691,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_client_id ( self , client_name ) :
@ -655,7 +722,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT_AUTHZ_SETTINGS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT_AUTHZ_SETTINGS . format ( * * params_path ) )
return data_raw
def get_client_authz_resources ( self , client_id ) :
@ -668,7 +735,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT_AUTHZ_RESOURCES . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT_AUTHZ_RESOURCES . format ( * * params_path ) )
return data_raw
def create_client ( self , payload , skip_exists = False ) :
@ -683,10 +750,24 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_post ( URL_ADMIN_CLIENTS . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_CLIENTS . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
def update_client ( self , client_id , payload ) :
"""
Update a client
: param client_id : Client id
: param payload : ClientRepresentation
: return : Http response
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_put ( URL_ADMIN_CLIENT . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def delete_client ( self , client_id ) :
"""
Get representation of the client
@ -699,7 +780,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_delete ( URL_ADMIN_CLIENT . format ( * * params_path ) )
data_raw = self . raw_delete ( URL_ADMIN_CLIENT . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def get_realm_roles ( self ) :
@ -713,7 +794,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_ADMIN_REALM_ROLES . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_REALM_ROLES . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_client_roles ( self , client_id ) :
@ -729,7 +810,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT_ROLES . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT_ROLES . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_client_role ( self , client_id , role_name ) :
@ -746,7 +827,7 @@ class KeycloakAdmin:
: return : role_id
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id , " role-name " : role_name }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT_ROLE . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT_ROLE . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_client_role_id ( self , client_id , role_name ) :
@ -780,8 +861,8 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : client_role_id }
data_raw = self . connection . raw_post ( URL_ADMIN_CLIENT_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_CLIENT_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
def delete_client_role ( self , client_role_id , role_name ) :
@ -795,7 +876,7 @@ class KeycloakAdmin:
: param role_name : role ’ s name ( not id ! )
"""
params_path = { " realm-name " : self . realm_name , " id " : client_role_id , " role-name " : role_name }
data_raw = self . connection . raw_delete ( URL_ADMIN_CLIENT_ROLE . format ( * * params_path ) )
data_raw = self . raw_delete ( URL_ADMIN_CLIENT_ROLE . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def assign_client_role ( self , user_id , client_id , roles ) :
@ -811,10 +892,25 @@ class KeycloakAdmin:
payload = roles if isinstance ( roles , list ) else [ roles ]
params_path = { " realm-name " : self . realm_name , " id " : user_id , " client-id " : client_id }
data_raw = self . connection . raw_post ( URL_ADMIN_USER_CLIENT_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_USER_CLIENT_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def create_realm_role ( self , payload , skip_exists = False ) :
"""
Create a new role for the realm or client
: param realm : realm name ( not id )
: param rep : RoleRepresentation https : / / www . keycloak . org / docs - api / 5.0 / rest - api / index . html #_rolerepresentation
: return Keycloak server response
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_post ( URL_ADMIN_REALM_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
def assign_realm_roles ( self , user_id , client_id , roles ) :
"""
Assign realm roles to a user
@ -828,8 +924,8 @@ class KeycloakAdmin:
payload = roles if isinstance ( roles , list ) else [ roles ]
params_path = { " realm-name " : self . realm_name , " id " : user_id }
data_raw = self . connection . raw_post ( URL_ADMIN_USER_REALM_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_post ( URL_ADMIN_USER_REALM_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def get_client_roles_of_user ( self , user_id , client_id ) :
@ -864,7 +960,7 @@ class KeycloakAdmin:
def _get_client_roles_of_user ( self , client_level_role_mapping_url , user_id , client_id ) :
params_path = { " realm-name " : self . realm_name , " id " : user_id , " client-id " : client_id }
data_raw = self . connection . raw_get ( client_level_role_mapping_url . format ( * * params_path ) )
data_raw = self . raw_get ( client_level_role_mapping_url . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def delete_client_roles_of_user ( self , user_id , client_id , roles ) :
@ -879,8 +975,8 @@ class KeycloakAdmin:
"""
payload = roles if isinstance ( roles , list ) else [ roles ]
params_path = { " realm-name " : self . realm_name , " id " : user_id , " client-id " : client_id }
data_raw = self . connection . raw_delete ( URL_ADMIN_USER_CLIENT_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
data_raw = self . raw_delete ( URL_ADMIN_USER_CLIENT_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def get_authentication_flows ( self ) :
@ -893,7 +989,7 @@ class KeycloakAdmin:
: return : Keycloak server response ( AuthenticationFlowRepresentation )
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_ADMIN_FLOWS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_FLOWS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def create_authentication_flow ( self , payload , skip_exists = False ) :
@ -908,8 +1004,8 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_post ( URL_ADMIN_FLOWS . format ( * * params_path ) ,
data = payload )
data_raw = self . raw_post ( URL_ADMIN_FLOWS . format ( * * params_path ) ,
data = payload )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
def get_authentication_flow_executions ( self , flow_alias ) :
@ -919,7 +1015,7 @@ class KeycloakAdmin:
: return : Response ( json )
"""
params_path = { " realm-name " : self . realm_name , " flow-alias " : flow_alias }
data_raw = self . connection . raw_get ( URL_ADMIN_FLOWS_EXECUTIONS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_FLOWS_EXECUTIONS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def update_authentication_flow_executions ( self , payload , flow_alias ) :
@ -934,8 +1030,8 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " flow-alias " : flow_alias }
data_raw = self . connection . raw_put ( URL_ADMIN_FLOWS_EXECUTIONS . format ( * * params_path ) ,
data = payload )
data_raw = self . raw_put ( URL_ADMIN_FLOWS_EXECUTIONS . format ( * * params_path ) ,
data = payload )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def sync_users ( self , storage_id , action ) :
@ -950,8 +1046,8 @@ class KeycloakAdmin:
params_query = { " action " : action }
params_path = { " realm-name " : self . realm_name , " id " : storage_id }
data_raw = self . connection . raw_post ( URL_ADMIN_USER_STORAGE . format ( * * params_path ) ,
data = json . dumps ( data ) , * * params_query )
data_raw = self . raw_post ( URL_ADMIN_USER_STORAGE . format ( * * params_path ) ,
data = json . dumps ( data ) , * * params_query )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_client_scopes ( self ) :
@ -963,7 +1059,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT_SCOPES . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT_SCOPES . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def get_client_scope ( self , client_scope_id ) :
@ -975,7 +1071,7 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " scope-id " : client_scope_id }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT_SCOPE . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT_SCOPE . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
@ -990,7 +1086,8 @@ class KeycloakAdmin:
params_path = { " realm-name " : self . realm_name , " scope-id " : client_scope_id }
data_raw = self . connection . raw_post ( URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER . format ( * * params_path ) , data = json . dumps ( payload ) )
data_raw = self . raw_post (
URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER . format ( * * params_path ) , data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 )
@ -1005,5 +1102,95 @@ class KeycloakAdmin:
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_get ( URL_ADMIN_CLIENT_SECRETS . format ( * * params_path ) )
data_raw = self . raw_get ( URL_ADMIN_CLIENT_SECRETS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
def raw_get ( self , * args , * * kwargs ) :
"""
Calls connection . raw_get .
If auto_refresh is set for * get * and * access_token * is expired , it will refresh the token
and try * get * once more .
"""
r = self . connection . raw_get ( * args , * * kwargs )
if ' get ' in self . auto_refresh_token and r . status_code == 401 :
self . refresh_token ( )
return self . connection . raw_get ( * args , * * kwargs )
return r
def raw_post ( self , * args , * * kwargs ) :
"""
Calls connection . raw_post .
If auto_refresh is set for * post * and * access_token * is expired , it will refresh the token
and try * post * once more .
"""
r = self . connection . raw_post ( * args , * * kwargs )
if ' post ' in self . auto_refresh_token and r . status_code == 401 :
self . refresh_token ( )
return self . connection . raw_post ( * args , * * kwargs )
return r
def raw_put ( self , * args , * * kwargs ) :
"""
Calls connection . raw_put .
If auto_refresh is set for * put * and * access_token * is expired , it will refresh the token
and try * put * once more .
"""
r = self . connection . raw_put ( * args , * * kwargs )
if ' put ' in self . auto_refresh_token and r . status_code == 401 :
self . refresh_token ( )
return self . connection . raw_put ( * args , * * kwargs )
return r
def raw_delete ( self , * args , * * kwargs ) :
"""
Calls connection . raw_delete .
If auto_refresh is set for * delete * and * access_token * is expired , it will refresh the token
and try * delete * once more .
"""
r = self . connection . raw_delete ( * args , * * kwargs )
if ' delete ' in self . auto_refresh_token and r . status_code == 401 :
self . refresh_token ( )
return self . connection . raw_delete ( * args , * * kwargs )
return r
def get_token ( self ) :
self . keycloak_openid = KeycloakOpenID ( server_url = self . server_url , client_id = self . client_id ,
realm_name = self . user_realm_name or self . realm_name , verify = self . verify ,
client_secret_key = self . client_secret_key ,
custom_headers = self . custom_headers )
grant_type = [ " password " ]
if self . client_secret_key :
grant_type = [ " client_credentials " ]
self . _token = self . keycloak_openid . token ( self . username , self . password , grant_type = grant_type )
headers = {
' Authorization ' : ' Bearer ' + self . token . get ( ' access_token ' ) ,
' Content-Type ' : ' application/json '
}
if self . custom_headers is not None :
# merge custom headers to main headers
headers . update ( self . custom_headers )
self . _connection = ConnectionManager ( base_url = self . server_url ,
headers = headers ,
timeout = 60 ,
verify = self . verify )
def refresh_token ( self ) :
refresh_token = self . token . get ( ' refresh_token ' )
try :
self . token = self . keycloak_openid . refresh_token ( refresh_token )
except KeycloakGetError as e :
if e . response_code == 400 and b ' Refresh token expired ' in e . response_body :
self . get_token ( )
else :
raise
self . connection . add_param_headers ( ' Authorization ' , ' Bearer ' + self . token . get ( ' access_token ' ) )