@ -46,7 +46,7 @@ from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURC
class KeycloakAdmin :
class KeycloakAdmin :
PAGE_SIZE = 100
PAGE_SIZE = 100
_server_url = None
_server_url = None
_username = None
_username = None
_password = None
_password = None
@ -57,9 +57,11 @@ class KeycloakAdmin:
_auto_refresh_token = None
_auto_refresh_token = None
_connection = None
_connection = None
_token = None
_token = None
_custom_headers = None
_user_realm_name = None
def __init__ ( self , server_url , username , password , realm_name = ' master ' , client_id = ' admin-cli ' , verify = True , client_secret_key = None ,
auto_refresh_token = None ) :
def __init__ ( self , server_url , username , password , realm_name = ' master ' , client_id = ' admin-cli ' , verify = True ,
client_secret_key = None , custom_headers = None , user_realm_name = None , auto_refresh_token = None ) :
"""
"""
: param server_url : Keycloak server url
: param server_url : Keycloak server url
@ -69,6 +71,7 @@ class KeycloakAdmin:
: param client_id : client id
: param client_id : client id
: param verify : True if want check connection SSL
: param verify : True if want check connection SSL
: param client_secret_key : client secret key
: param client_secret_key : client secret key
: param custom_headers : dict of custom header to pass to each HTML request
: param auto_refresh_token : list of methods that allows automatic token refresh . ex : [ ' get ' , ' put ' , ' post ' , ' delete ' ]
: param auto_refresh_token : list of methods that allows automatic token refresh . ex : [ ' get ' , ' put ' , ' post ' , ' delete ' ]
"""
"""
self . server_url = server_url
self . server_url = server_url
@ -79,11 +82,12 @@ class KeycloakAdmin:
self . verify = verify
self . verify = verify
self . client_secret_key = client_secret_key
self . client_secret_key = client_secret_key
self . auto_refresh_token = auto_refresh_token or [ ]
self . auto_refresh_token = auto_refresh_token or [ ]
self . user_realm_name = user_realm_name
self . custom_headers = custom_headers
# Get token Admin
# Get token Admin
self . get_token ( )
self . get_token ( )
@property
@property
def server_url ( self ) :
def server_url ( self ) :
return self . _server_url
return self . _server_url
@ -160,6 +164,22 @@ class KeycloakAdmin:
def auto_refresh_token ( self ) :
def auto_refresh_token ( self ) :
return self . _auto_refresh_token
return self . _auto_refresh_token
@property
def user_realm_name ( self ) :
return self . _user_realm_name
@user_realm_name.setter
def user_realm_name ( self , value ) :
self . _user_realm_name = value
@property
def custom_headers ( self ) :
return self . _custom_headers
@custom_headers.setter
def custom_headers ( self , value ) :
self . _custom_headers = value
@auto_refresh_token.setter
@auto_refresh_token.setter
def auto_refresh_token ( self , value ) :
def auto_refresh_token ( self , value ) :
allowed_methods = { ' get ' , ' post ' , ' put ' , ' delete ' }
allowed_methods = { ' get ' , ' post ' , ' put ' , ' delete ' }
@ -226,13 +246,13 @@ class KeycloakAdmin:
def create_realm ( self , payload , skip_exists = False ) :
def create_realm ( self , payload , skip_exists = False ) :
"""
"""
Create a client
Create a realm
ClientRepresentation : http : / / www . keycloak . org / docs - api / 3.3 / rest - api / index . html #_realmrepresentation
ClientRepresentation : http : / / www . keycloak . org / docs - api / 3.3 / rest - api / index . html #_realmrepresentation
: param skip_exists : Skip if Realm already exist .
: param skip_exists : Skip if Realm already exist .
: param payload : RealmRepresentation
: param payload : RealmRepresentation
: return : Keycloak server response ( User Representation)
: return : Keycloak server response ( Realm Representation)
"""
"""
data_raw = self . raw_post ( URL_ADMIN_REALMS ,
data_raw = self . raw_post ( URL_ADMIN_REALMS ,
@ -732,6 +752,20 @@ class KeycloakAdmin:
data = json . dumps ( payload ) )
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
def update_client ( self , client_id , payload ) :
"""
Update a client
: param client_id : Client id
: param payload : ClientRepresentation
: return : Http response
"""
params_path = { " realm-name " : self . realm_name , " id " : client_id }
data_raw = self . connection . raw_put ( URL_ADMIN_CLIENT . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def delete_client ( self , client_id ) :
def delete_client ( self , client_id ) :
"""
"""
Get representation of the client
Get representation of the client
@ -860,6 +894,21 @@ class KeycloakAdmin:
data = json . dumps ( payload ) )
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 204 )
def create_realm_role ( self , payload , skip_exists = False ) :
"""
Create a new role for the realm or client
: param realm : realm name ( not id )
: param rep : RoleRepresentation https : / / www . keycloak . org / docs - api / 5.0 / rest - api / index . html #_rolerepresentation
: return Keycloak server response
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_post ( URL_ADMIN_REALM_ROLES . format ( * * params_path ) ,
data = json . dumps ( payload ) )
return raise_error_from_response ( data_raw , KeycloakGetError , expected_code = 201 , skip_exists = skip_exists )
def assign_realm_roles ( self , user_id , client_id , roles ) :
def assign_realm_roles ( self , user_id , client_id , roles ) :
"""
"""
Assign realm roles to a user
Assign realm roles to a user
@ -1109,16 +1158,27 @@ class KeycloakAdmin:
def get_token ( self ) :
def get_token ( self ) :
self . keycloak_openid = KeycloakOpenID ( server_url = self . server_url , client_id = self . client_id ,
self . keycloak_openid = KeycloakOpenID ( server_url = self . server_url , client_id = self . client_id ,
realm_name = self . realm_name , verify = self . verify ,
client_secret_key = self . client_secret_key )
realm_name = self . user_realm_name or self . realm_name , verify = self . verify ,
client_secret_key = self . client_secret_key
custom_headers = self . custom_headers )
grant_type = [ " password " ]
grant_type = [ " password " ]
if self . client_secret_key :
if self . client_secret_key :
grant_type = [ " client_credentials " ]
grant_type = [ " client_credentials " ]
self . _token = self . keycloak_openid . token ( self . username , self . password , grant_type = grant_type )
self . _token = self . keycloak_openid . token ( self . username , self . password , grant_type = grant_type )
headers = {
' Authorization ' : ' Bearer ' + self . token . get ( ' access_token ' ) ,
' Content-Type ' : ' application/json '
}
if self . custom_headers is not None :
# merge custom headers to main headers
headers . update ( self . custom_headers )
self . _connection = ConnectionManager ( base_url = self . server_url ,
self . _connection = ConnectionManager ( base_url = self . server_url ,
headers = { ' Authorization ' : ' Bearer ' + self . token . get ( ' access_token ' ) ,
' Content-Type ' : ' application/json ' } ,
headers = headers ,
timeout = 60 ,
timeout = 60 ,
verify = self . verify )
verify = self . verify )