|
@ -2,10 +2,9 @@ |
|
|
|
|
|
|
|
|
""" |
|
|
""" |
|
|
|
|
|
|
|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
from keycloak.exceptions import raise_error_from_response, KeycloakGetError |
|
|
from keycloak.exceptions import raise_error_from_response, KeycloakGetError |
|
|
from .urls_patterns import URL_AUTH, URL_TOKEN, URL_USERINFO, URL_WELL_KNOWN, URL_LOGOUT |
|
|
|
|
|
|
|
|
from .urls_patterns import URL_AUTH, URL_TOKEN, URL_USERINFO, URL_WELL_KNOWN, URL_LOGOUT, \ |
|
|
|
|
|
URL_CERTS, URL_ENTITLEMENT |
|
|
from .connection import ConnectionManager |
|
|
from .connection import ConnectionManager |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -30,9 +29,8 @@ class Keycloak: |
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.__realm_name} |
|
|
params_path = {"realm-name": self.__realm_name} |
|
|
data_raw = self.__connection.raw_get(URL_WELL_KNOWN.format(**params_path)) |
|
|
data_raw = self.__connection.raw_get(URL_WELL_KNOWN.format(**params_path)) |
|
|
raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
return json.loads(data_raw.text) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def auth_url(self, redirect_uri): |
|
|
def auth_url(self, redirect_uri): |
|
|
""" |
|
|
""" |
|
@ -66,9 +64,7 @@ class Keycloak: |
|
|
|
|
|
|
|
|
data_raw = self.__connection.raw_post(URL_TOKEN.format(**params_path), |
|
|
data_raw = self.__connection.raw_post(URL_TOKEN.format(**params_path), |
|
|
data=payload) |
|
|
data=payload) |
|
|
raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
return json.loads(data_raw.text) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def userinfo(self, token): |
|
|
def userinfo(self, token): |
|
|
""" |
|
|
""" |
|
@ -85,9 +81,8 @@ class Keycloak: |
|
|
params_path = {"realm-name": self.__realm_name} |
|
|
params_path = {"realm-name": self.__realm_name} |
|
|
|
|
|
|
|
|
data_raw = self.__connection.raw_get(URL_USERINFO.format(**params_path)) |
|
|
data_raw = self.__connection.raw_get(URL_USERINFO.format(**params_path)) |
|
|
raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
return json.loads(data_raw.text) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def logout(self, refresh_token): |
|
|
def logout(self, refresh_token): |
|
|
""" |
|
|
""" |
|
@ -103,6 +98,47 @@ class Keycloak: |
|
|
|
|
|
|
|
|
data_raw = self.__connection.raw_post(URL_LOGOUT.format(**params_path), |
|
|
data_raw = self.__connection.raw_post(URL_LOGOUT.format(**params_path), |
|
|
data=payload) |
|
|
data=payload) |
|
|
raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
|
|
|
|
def certs(self): |
|
|
|
|
|
""" |
|
|
|
|
|
The certificate endpoint returns the public keys enabled by the realm, encoded as a |
|
|
|
|
|
JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled |
|
|
|
|
|
for verifying tokens. |
|
|
|
|
|
|
|
|
|
|
|
https://tools.ietf.org/html/rfc7517 |
|
|
|
|
|
|
|
|
|
|
|
:return: |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.__realm_name} |
|
|
|
|
|
data_raw = self.__connection.raw_get(URL_CERTS.format(**params_path)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def entitlement(self, token, resource_server_id): |
|
|
|
|
|
""" |
|
|
|
|
|
Client applications can use a specific endpoint to obtain a special security token |
|
|
|
|
|
called a requesting party token (RPT). This token consists of all the entitlements |
|
|
|
|
|
(or permissions) for a user as a result of the evaluation of the permissions and authorization |
|
|
|
|
|
policies associated with the resources being requested. With an RPT, client applications can |
|
|
|
|
|
gain access to protected resources at the resource server. |
|
|
|
|
|
|
|
|
|
|
|
:return: |
|
|
|
|
|
""" |
|
|
|
|
|
self.__connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
|
|
params_path = {"realm-name": self.__realm_name, "resource-server-id": resource_server_id} |
|
|
|
|
|
data_raw = self.__connection.raw_get(URL_ENTITLEMENT.format(**params_path)) |
|
|
|
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def instropect(self, token, token_type_hint="requesting_party_token"): |
|
|
|
|
|
""" |
|
|
|
|
|
The introspection endpoint is used to retrieve the active state of a token. It is can only be |
|
|
|
|
|
invoked by confidential clients. |
|
|
|
|
|
|
|
|
|
|
|
https://tools.ietf.org/html/rfc7662 |
|
|
|
|
|
|
|
|
|
|
|
:param token: |
|
|
|
|
|
:return: |
|
|
|
|
|
""" |
|
|
return None |
|
|
return None |