@ -31,6 +31,7 @@ from __future__ import annotations
import json
import pathlib
from typing import Any
import aiofiles
from jwcrypto import jwk , jwt
@ -100,7 +101,7 @@ class KeycloakOpenID:
verify : bool | str = True ,
custom_headers : dict | None = None ,
proxies : dict | None = None ,
timeout : int = 60 ,
timeout : int | None = 60 ,
cert : str | tuple | None = None ,
max_retries : int = 1 ,
pool_maxsize : int | None = None ,
@ -166,7 +167,7 @@ class KeycloakOpenID:
self . _client_id = value
@property
def client_secret_key ( self ) - > str :
def client_secret_key ( self ) - > str | None :
"""
Get the client secret key .
@ -176,7 +177,7 @@ class KeycloakOpenID:
return self . _client_secret_key
@client_secret_key.setter
def client_secret_key ( self , value : str ) - > None :
def client_secret_key ( self , value : str | None ) - > None :
self . _client_secret_key = value
@property
@ -246,7 +247,7 @@ class KeycloakOpenID:
"""
return self . client_id + " / " + role
def _token_info ( self , token : str , method_token_info : str , * * kwargs : dict ) - > dict :
def _token_info ( self , token : str , method_token_info : str , * * kwargs : Any ) - > dict : # noqa: ANN401
"""
Getter for the token data .
@ -279,7 +280,15 @@ class KeycloakOpenID:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_WELL_KNOWN . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
f " Unexpected response type on well_known. Expected ' dict ' , received ' {type(res)} ' "
f " , value: {res} "
)
raise TypeError ( msg )
return res
def auth_url (
self ,
@ -325,15 +334,15 @@ class KeycloakOpenID:
def token (
self ,
username : str = " " ,
password : str = " " ,
username : str | None = " " ,
password : str | None = " " ,
grant_type : str = " password " ,
code : str = " " ,
redirect_uri : str = " " ,
totp : int | None = None ,
scope : str = " openid " ,
code_verifier : str | None = None ,
* * extra : dict ,
* * extra : Any , # noqa: ANN401
) - > dict :
"""
Retrieve user token .
@ -385,7 +394,7 @@ class KeycloakOpenID:
payload [ " totp " ] = totp
payload = self . _add_secret_key ( payload )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = self . connection . raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -393,7 +402,15 @@ class KeycloakOpenID:
if content_type
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
f " Unexpected response type from ' token ' . Expected ' dict ' , received ' {type(res)} ' "
f " , value {res}. "
)
raise TypeError ( msg )
return res
def refresh_token ( self , refresh_token : str , grant_type : str = " refresh_token " ) - > dict :
"""
@ -420,7 +437,7 @@ class KeycloakOpenID:
" refresh_token " : refresh_token ,
}
payload = self . _add_secret_key ( payload )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = self . connection . raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -428,7 +445,16 @@ class KeycloakOpenID:
if content_type
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type from refresh_token. "
f " Expected ' dict ' , received ' {type(res)} ' "
f " , value: {res}. "
)
raise TypeError ( msg )
return res
def exchange_token (
self ,
@ -480,7 +506,7 @@ class KeycloakOpenID:
" scope " : scope ,
}
payload = self . _add_secret_key ( payload )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = self . connection . raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -488,7 +514,15 @@ class KeycloakOpenID:
if content_type
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type from exchange_token. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' "
)
raise TypeError ( msg )
return res
def userinfo ( self , token : str ) - > dict :
"""
@ -504,7 +538,7 @@ class KeycloakOpenID:
: returns : Userinfo object
: rtype : dict
"""
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_USERINFO . format ( * * params_path ) )
@ -513,26 +547,42 @@ class KeycloakOpenID:
if orig_bearer is not None
else self . connection . del_param_headers ( " Authorization " )
)
return raise_error_from_response ( data_raw , KeycloakGetError )
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type from userinfo. Expected ' dict ' , "
f " received ' {type(res)} ' , value: ' {res} ' . "
)
raise TypeError ( msg )
def logout ( self , refresh_token : str ) - > bytes :
return res
def logout ( self , refresh_token : str ) - > dict :
"""
Log out the authenticated user .
: param refresh_token : Refresh token from Keycloak
: type refresh_token : str
: returns : Keycloak server response
: rtype : bytes
: rtype : dict
"""
params_path = { " realm-name " : self . realm_name }
payload = { " client_id " : self . client_id , " refresh_token " : refresh_token }
payload = self . _add_secret_key ( payload )
data_raw = self . connection . raw_post ( URL_LOGOUT . format ( * * params_path ) , data = payload )
return raise_error_from_response (
res = raise_error_from_response (
data_raw ,
KeycloakPostError ,
expected_codes = [ HTTP_NO_CONTENT ] ,
)
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type from logout. Expected ' dict ' , "
f " received ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
def certs ( self ) - > dict :
"""
@ -549,7 +599,15 @@ class KeycloakOpenID:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_CERTS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type from certs. Expected ' dict ' , "
f " received ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
def public_key ( self ) - > str :
"""
@ -562,7 +620,15 @@ class KeycloakOpenID:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = self . connection . raw_get ( URL_REALM . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError ) [ " public_key " ]
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type from public_key. Expected ' dict ' , "
f " received ' {type(res)} ' , value ' {res} ' "
)
raise TypeError ( msg )
return res [ " public_key " ]
def entitlement ( self , token : str , resource_server_id : str ) - > dict :
"""
@ -581,7 +647,7 @@ class KeycloakOpenID:
: returns : Entitlements
: rtype : dict
"""
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
params_path = { " realm-name " : self . realm_name , " resource-server-id " : resource_server_id }
data_raw = self . connection . raw_get ( URL_ENTITLEMENT . format ( * * params_path ) )
@ -592,9 +658,24 @@ class KeycloakOpenID:
)
if data_raw . status_code in { HTTP_NOT_FOUND , HTTP_NOT_ALLOWED } :
return raise_error_from_response ( data_raw , KeycloakDeprecationError )
res = raise_error_from_response ( data_raw , KeycloakDeprecationError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , "
f " received ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , "
f " received ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return raise_error_from_response ( data_raw , KeycloakGetError ) # pragma: no cover
return res
def introspect (
self ,
@ -629,7 +710,7 @@ class KeycloakOpenID:
if token_type_hint == " requesting_party_token " : # noqa: S105
if rpt :
payload . update ( { " token " : rpt , " token_type_hint " : token_type_hint } )
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
bearer_changed = True
else :
@ -645,10 +726,19 @@ class KeycloakOpenID:
if orig_bearer is not None
else self . connection . del_param_headers ( " Authorization " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
@staticmethod
def _verify_token ( token : str , key : jwk . JWK | jwk . JWKSet | None , * * kwargs : dict ) - > dict :
def _verify_token ( token : str , key : jwk . JWK | jwk . JWKSet | None , * * kwargs : Any ) - > dict : # noqa: ANN401
"""
Decode and optionally validate a token .
@ -669,12 +759,13 @@ class KeycloakOpenID:
full_jwt = jwt . JWT ( jwt = token , * * kwargs )
full_jwt . leeway = leeway
full_jwt . validate ( key )
return jwt . json_decode ( full_jwt . claims )
return jwt . json_decode ( full_jwt . claims ) # pyright: ignore[reportAttributeAccessIssue]
full_jwt = jwt . JWT ( jwt = token , * * kwargs )
full_jwt . token . objects [ " valid " ] = True
return json . loads ( full_jwt . token . payload . decode ( " utf-8 " ) )
def decode_token ( self , token : str , validate : bool = True , * * kwargs : dict ) - > dict :
def decode_token ( self , token : str , validate : bool = True , * * kwargs : Any ) - > dict : # noqa: ANN401
"""
Decode user token .
@ -727,8 +818,8 @@ class KeycloakOpenID:
self ,
token : str ,
method_token_info : str = " introspect " , # noqa: S107
* * kwargs : dict ,
) - > list :
* * kwargs : Any , # noqa: ANN401
) - > list | None :
"""
Get policies by user token .
@ -771,8 +862,8 @@ class KeycloakOpenID:
self ,
token : str ,
method_token_info : str = " introspect " , # noqa: S107
* * kwargs : dict ,
) - > list :
* * kwargs : Any , # noqa: ANN401
) - > list | None :
"""
Get permission by user token .
@ -811,7 +902,7 @@ class KeycloakOpenID:
return list ( set ( permissions ) )
def uma_permissions ( self , token : str , permissions : str = " " , * * extra_payload : dict ) - > list :
def uma_permissions ( self , token : str , permissions : str = " " , * * extra_payload : Any ) - > list : # noqa: ANN401
"""
Get UMA permissions by user token with requested permissions .
@ -840,9 +931,9 @@ class KeycloakOpenID:
* * extra_payload ,
}
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = self . connection . raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -855,9 +946,17 @@ class KeycloakOpenID:
if orig_bearer is not None
else self . connection . del_param_headers ( " Authorization " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , list ) :
msg = (
" Unexpected response type. Expected ' list ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
def has_uma_access ( self , token : str , permissions : list ) - > AuthStatus :
return res
def has_uma_access ( self , token : str , permissions : str ) - > AuthStatus :
"""
Determine whether user has uma permissions with specified user token .
@ -918,9 +1017,9 @@ class KeycloakOpenID:
: rtype : dict
"""
params_path = { " realm-name " : self . realm_name }
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
orig_content_type = self . connection . headers . get ( " Content-Type " )
orig_content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/json " )
data_raw = self . connection . raw_post (
URL_CLIENT_REGISTRATION . format ( * * params_path ) ,
@ -936,7 +1035,15 @@ class KeycloakOpenID:
if orig_content_type is not None
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
def device ( self , scope : str = " " ) - > dict :
"""
@ -964,9 +1071,17 @@ class KeycloakOpenID:
payload = self . _add_secret_key ( payload )
data_raw = self . connection . raw_post ( URL_DEVICE . format ( * * params_path ) , data = payload )
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
def update_client ( self , token : str , client_id : str , payload : dict ) - > bytes :
def update_client ( self , token : str , client_id : str , payload : dict ) - > dict :
"""
Update a client .
@ -983,9 +1098,9 @@ class KeycloakOpenID:
: rtype : bytes
"""
params_path = { " realm-name " : self . realm_name , " client-id " : client_id }
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
orig_content_type = self . connection . headers . get ( " Content-Type " )
orig_content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/json " )
# Keycloak complains if the clientId is not set in the payload
@ -1006,9 +1121,17 @@ class KeycloakOpenID:
if orig_content_type is not None
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPutError )
res = raise_error_from_response ( data_raw , KeycloakPutError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def _a_token_info ( self , token : str , method_token_info : str , * * kwargs : dict ) - > dict :
async def _a_token_info ( self , token : str , method_token_info : str , * * kwargs : Any ) - > dict : # noqa: ANN401
"""
Asynchronous getter for the token data .
@ -1041,7 +1164,15 @@ class KeycloakOpenID:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = await self . connection . a_raw_get ( URL_WELL_KNOWN . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_auth_url (
self ,
@ -1087,15 +1218,15 @@ class KeycloakOpenID:
async def a_token (
self ,
username : str = " " ,
password : str = " " ,
username : str | None = " " ,
password : str | None = " " ,
grant_type : str = " password " ,
code : str = " " ,
redirect_uri : str = " " ,
totp : int | None = None ,
scope : str = " openid " ,
code_verifier : str | None = None ,
* * extra : dict ,
* * extra : Any , # noqa: ANN401
) - > dict :
"""
Retrieve user token asynchronously .
@ -1147,7 +1278,7 @@ class KeycloakOpenID:
payload [ " totp " ] = totp
payload = self . _add_secret_key ( payload )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = await self . connection . a_raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -1155,7 +1286,15 @@ class KeycloakOpenID:
if content_type
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_refresh_token ( self , refresh_token : str , grant_type : str = " refresh_token " ) - > dict :
"""
@ -1182,7 +1321,7 @@ class KeycloakOpenID:
" refresh_token " : refresh_token ,
}
payload = self . _add_secret_key ( payload )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = await self . connection . a_raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -1190,7 +1329,15 @@ class KeycloakOpenID:
if content_type
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_exchange_token (
self ,
@ -1242,7 +1389,7 @@ class KeycloakOpenID:
" scope " : scope ,
}
payload = self . _add_secret_key ( payload )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = await self . connection . a_raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -1250,7 +1397,15 @@ class KeycloakOpenID:
if content_type
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_userinfo ( self , token : str ) - > dict :
"""
@ -1266,7 +1421,7 @@ class KeycloakOpenID:
: returns : Userinfo object
: rtype : dict
"""
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
params_path = { " realm-name " : self . realm_name }
data_raw = await self . connection . a_raw_get ( URL_USERINFO . format ( * * params_path ) )
@ -1275,26 +1430,42 @@ class KeycloakOpenID:
if orig_bearer is not None
else self . connection . del_param_headers ( " Authorization " )
)
return raise_error_from_response ( data_raw , KeycloakGetError )
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
async def a_logout ( self , refresh_token : str ) - > bytes :
return res
async def a_logout ( self , refresh_token : str ) - > dict :
"""
Log out the authenticated user asynchronously .
: param refresh_token : Refresh token from Keycloak
: type refresh_token : str
: returns : Keycloak server response
: rtype : bytes
: rtype : dict
"""
params_path = { " realm-name " : self . realm_name }
payload = { " client_id " : self . client_id , " refresh_token " : refresh_token }
payload = self . _add_secret_key ( payload )
data_raw = await self . connection . a_raw_post ( URL_LOGOUT . format ( * * params_path ) , data = payload )
return raise_error_from_response (
res = raise_error_from_response (
data_raw ,
KeycloakPostError ,
expected_codes = [ HTTP_NO_CONTENT ] ,
)
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_certs ( self ) - > dict :
"""
@ -1311,7 +1482,15 @@ class KeycloakOpenID:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = await self . connection . a_raw_get ( URL_CERTS . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError )
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_public_key ( self ) - > str :
"""
@ -1324,7 +1503,15 @@ class KeycloakOpenID:
"""
params_path = { " realm-name " : self . realm_name }
data_raw = await self . connection . a_raw_get ( URL_REALM . format ( * * params_path ) )
return raise_error_from_response ( data_raw , KeycloakGetError ) [ " public_key " ]
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res [ " public_key " ]
async def a_entitlement ( self , token : str , resource_server_id : str ) - > dict :
"""
@ -1343,7 +1530,7 @@ class KeycloakOpenID:
: returns : Entitlements
: rtype : dict
"""
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
params_path = { " realm-name " : self . realm_name , " resource-server-id " : resource_server_id }
data_raw = await self . connection . a_raw_get ( URL_ENTITLEMENT . format ( * * params_path ) )
@ -1354,9 +1541,25 @@ class KeycloakOpenID:
)
if data_raw . status_code in [ HTTP_NOT_FOUND , HTTP_NOT_ALLOWED ] :
return raise_error_from_response ( data_raw , KeycloakDeprecationError )
res = raise_error_from_response ( data_raw , KeycloakDeprecationError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
return raise_error_from_response ( data_raw , KeycloakGetError ) # pragma: no cover
res = raise_error_from_response ( data_raw , KeycloakGetError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_introspect (
self ,
@ -1391,7 +1594,7 @@ class KeycloakOpenID:
if token_type_hint == " requesting_party_token " : # noqa: S105
if rpt :
payload . update ( { " token " : rpt , " token_type_hint " : token_type_hint } )
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
bearer_changed = True
else :
@ -1410,9 +1613,17 @@ class KeycloakOpenID:
if orig_bearer is not None
else self . connection . del_param_headers ( " Authorization " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
async def a_decode_token ( self , token : str , validate : bool = True , * * kwargs : dict ) - > dict :
return res
async def a_decode_token ( self , token : str , validate : bool = True , * * kwargs : Any ) - > dict : # noqa: ANN401
"""
Decode user token asynchronously .
@ -1465,8 +1676,8 @@ class KeycloakOpenID:
self ,
token : str ,
method_token_info : str = " introspect " , # noqa: S107
* * kwargs : dict ,
) - > list :
* * kwargs : Any , # noqa: ANN401
) - > list | None :
"""
Get policies by user token asynchronously .
@ -1477,7 +1688,7 @@ class KeycloakOpenID:
: param kwargs : Additional keyword arguments
: type kwargs : dict
: return : Policies
: rtype : list
: rtype : list | None
: raises KeycloakAuthorizationConfigError : In case of bad authorization configuration
: raises KeycloakInvalidTokenError : In case of bad token
"""
@ -1509,8 +1720,8 @@ class KeycloakOpenID:
self ,
token : str ,
method_token_info : str = " introspect " , # noqa: S107
* * kwargs : dict ,
) - > list :
* * kwargs : Any , # noqa: ANN401
) - > list | None :
"""
Get permission by user token asynchronously .
@ -1521,7 +1732,7 @@ class KeycloakOpenID:
: param kwargs : parameters for decode
: type kwargs : dict
: returns : permissions list
: rtype : list
: rtype : list | None
: raises KeycloakAuthorizationConfigError : In case of bad authorization configuration
: raises KeycloakInvalidTokenError : In case of bad token
"""
@ -1552,7 +1763,7 @@ class KeycloakOpenID:
self ,
token : str ,
permissions : str = " " ,
* * extra_payload : dict ,
* * extra_payload : Any , # noqa: ANN401
) - > list :
"""
Get UMA permissions by user token with requested permissions asynchronously .
@ -1582,9 +1793,9 @@ class KeycloakOpenID:
* * extra_payload ,
}
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
content_type = self . connection . headers . get ( " Content-Type " )
content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/x-www-form-urlencoded " )
data_raw = await self . connection . a_raw_post ( URL_TOKEN . format ( * * params_path ) , data = payload )
(
@ -1597,9 +1808,17 @@ class KeycloakOpenID:
if orig_bearer is not None
else self . connection . del_param_headers ( " Authorization " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , list ) :
msg = (
" Unexpected response type. Expected ' list ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
async def a_has_uma_access ( self , token : str , permissions : list ) - > AuthStatus :
return res
async def a_has_uma_access ( self , token : str , permissions : str ) - > AuthStatus :
"""
Determine whether user has uma permissions with specified user token asynchronously .
@ -1660,9 +1879,9 @@ class KeycloakOpenID:
: rtype : dict
"""
params_path = { " realm-name " : self . realm_name }
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
orig_content_type = self . connection . headers . get ( " Content-Type " )
orig_content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/json " )
data_raw = await self . connection . a_raw_post (
URL_CLIENT_REGISTRATION . format ( * * params_path ) ,
@ -1678,7 +1897,15 @@ class KeycloakOpenID:
if orig_content_type is not None
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_device ( self , scope : str = " " ) - > dict :
"""
@ -1706,9 +1933,17 @@ class KeycloakOpenID:
payload = self . _add_secret_key ( payload )
data_raw = await self . connection . a_raw_post ( URL_DEVICE . format ( * * params_path ) , data = payload )
return raise_error_from_response ( data_raw , KeycloakPostError )
res = raise_error_from_response ( data_raw , KeycloakPostError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res
async def a_update_client ( self , token : str , client_id : str , payload : dict ) - > bytes :
async def a_update_client ( self , token : str , client_id : str , payload : dict ) - > dict :
"""
Update a client asynchronously .
@ -1722,12 +1957,12 @@ class KeycloakOpenID:
: param payload : ClientRepresentation
: type payload : dict
: return : Client Representation
: rtype : bytes
: rtype : dict
"""
params_path = { " realm-name " : self . realm_name , " client-id " : client_id }
orig_bearer = self . connection . headers . get ( " Authorization " )
orig_bearer = ( self . connection . headers or { } ) . get ( " Authorization " )
self . connection . add_param_headers ( " Authorization " , " Bearer " + token )
orig_content_type = self . connection . headers . get ( " Content-Type " )
orig_content_type = ( self . connection . headers or { } ) . get ( " Content-Type " )
self . connection . add_param_headers ( " Content-Type " , " application/json " )
# Keycloak complains if the clientId is not set in the payload
@ -1748,4 +1983,12 @@ class KeycloakOpenID:
if orig_content_type is not None
else self . connection . del_param_headers ( " Content-Type " )
)
return raise_error_from_response ( data_raw , KeycloakPutError )
res = raise_error_from_response ( data_raw , KeycloakPutError )
if not isinstance ( res , dict ) :
msg = (
" Unexpected response type. Expected ' dict ' , received "
f " ' {type(res)} ' , value ' {res} ' . "
)
raise TypeError ( msg )
return res