|
|
|
@ -31,6 +31,7 @@ from __future__ import annotations |
|
|
|
|
|
|
|
import json |
|
|
|
import pathlib |
|
|
|
from typing import Any |
|
|
|
|
|
|
|
import aiofiles |
|
|
|
from jwcrypto import jwk, jwt |
|
|
|
@ -100,7 +101,7 @@ class KeycloakOpenID: |
|
|
|
verify: bool | str = True, |
|
|
|
custom_headers: dict | None = None, |
|
|
|
proxies: dict | None = None, |
|
|
|
timeout: int = 60, |
|
|
|
timeout: int | None = 60, |
|
|
|
cert: str | tuple | None = None, |
|
|
|
max_retries: int = 1, |
|
|
|
pool_maxsize: int | None = None, |
|
|
|
@ -166,7 +167,7 @@ class KeycloakOpenID: |
|
|
|
self._client_id = value |
|
|
|
|
|
|
|
@property |
|
|
|
def client_secret_key(self) -> str: |
|
|
|
def client_secret_key(self) -> str | None: |
|
|
|
""" |
|
|
|
Get the client secret key. |
|
|
|
|
|
|
|
@ -176,7 +177,7 @@ class KeycloakOpenID: |
|
|
|
return self._client_secret_key |
|
|
|
|
|
|
|
@client_secret_key.setter |
|
|
|
def client_secret_key(self, value: str) -> None: |
|
|
|
def client_secret_key(self, value: str | None) -> None: |
|
|
|
self._client_secret_key = value |
|
|
|
|
|
|
|
@property |
|
|
|
@ -246,7 +247,7 @@ class KeycloakOpenID: |
|
|
|
""" |
|
|
|
return self.client_id + "/" + role |
|
|
|
|
|
|
|
def _token_info(self, token: str, method_token_info: str, **kwargs: dict) -> dict: |
|
|
|
def _token_info(self, token: str, method_token_info: str, **kwargs: Any) -> dict: # noqa: ANN401 |
|
|
|
""" |
|
|
|
Getter for the token data. |
|
|
|
|
|
|
|
@ -279,7 +280,15 @@ class KeycloakOpenID: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_WELL_KNOWN.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
f"Unexpected response type on well_known. Expected 'dict', received '{type(res)}'" |
|
|
|
f", value: {res}" |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def auth_url( |
|
|
|
self, |
|
|
|
@ -325,15 +334,15 @@ class KeycloakOpenID: |
|
|
|
|
|
|
|
def token( |
|
|
|
self, |
|
|
|
username: str = "", |
|
|
|
password: str = "", |
|
|
|
username: str | None = "", |
|
|
|
password: str | None = "", |
|
|
|
grant_type: str = "password", |
|
|
|
code: str = "", |
|
|
|
redirect_uri: str = "", |
|
|
|
totp: int | None = None, |
|
|
|
scope: str = "openid", |
|
|
|
code_verifier: str | None = None, |
|
|
|
**extra: dict, |
|
|
|
**extra: Any, # noqa: ANN401 |
|
|
|
) -> dict: |
|
|
|
""" |
|
|
|
Retrieve user token. |
|
|
|
@ -385,7 +394,7 @@ class KeycloakOpenID: |
|
|
|
payload["totp"] = totp |
|
|
|
|
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -393,7 +402,15 @@ class KeycloakOpenID: |
|
|
|
if content_type |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
f"Unexpected response type from 'token'. Expected 'dict', received '{type(res)}'" |
|
|
|
f", value {res}." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def refresh_token(self, refresh_token: str, grant_type: str = "refresh_token") -> dict: |
|
|
|
""" |
|
|
|
@ -420,7 +437,7 @@ class KeycloakOpenID: |
|
|
|
"refresh_token": refresh_token, |
|
|
|
} |
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -428,7 +445,16 @@ class KeycloakOpenID: |
|
|
|
if content_type |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type from refresh_token. " |
|
|
|
f"Expected 'dict', received '{type(res)}'" |
|
|
|
f", value: {res}." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def exchange_token( |
|
|
|
self, |
|
|
|
@ -480,7 +506,7 @@ class KeycloakOpenID: |
|
|
|
"scope": scope, |
|
|
|
} |
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -488,7 +514,15 @@ class KeycloakOpenID: |
|
|
|
if content_type |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type from exchange_token. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'" |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def userinfo(self, token: str) -> dict: |
|
|
|
""" |
|
|
|
@ -504,7 +538,7 @@ class KeycloakOpenID: |
|
|
|
:returns: Userinfo object |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path)) |
|
|
|
@ -513,26 +547,42 @@ class KeycloakOpenID: |
|
|
|
if orig_bearer is not None |
|
|
|
else self.connection.del_param_headers("Authorization") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type from userinfo. Expected 'dict', " |
|
|
|
f"received '{type(res)}', value: '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
def logout(self, refresh_token: str) -> bytes: |
|
|
|
return res |
|
|
|
|
|
|
|
def logout(self, refresh_token: str) -> dict: |
|
|
|
""" |
|
|
|
Log out the authenticated user. |
|
|
|
|
|
|
|
:param refresh_token: Refresh token from Keycloak |
|
|
|
:type refresh_token: str |
|
|
|
:returns: Keycloak server response |
|
|
|
:rtype: bytes |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
payload = {"client_id": self.client_id, "refresh_token": refresh_token} |
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload) |
|
|
|
return raise_error_from_response( |
|
|
|
res = raise_error_from_response( |
|
|
|
data_raw, |
|
|
|
KeycloakPostError, |
|
|
|
expected_codes=[HTTP_NO_CONTENT], |
|
|
|
) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type from logout. Expected 'dict', " |
|
|
|
f"received '{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def certs(self) -> dict: |
|
|
|
""" |
|
|
|
@ -549,7 +599,15 @@ class KeycloakOpenID: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_CERTS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type from certs. Expected 'dict', " |
|
|
|
f"received '{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def public_key(self) -> str: |
|
|
|
""" |
|
|
|
@ -562,7 +620,15 @@ class KeycloakOpenID: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_REALM.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type from public_key. Expected 'dict', " |
|
|
|
f"received '{type(res)}', value '{res}'" |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res["public_key"] |
|
|
|
|
|
|
|
def entitlement(self, token: str, resource_server_id: str) -> dict: |
|
|
|
""" |
|
|
|
@ -581,7 +647,7 @@ class KeycloakOpenID: |
|
|
|
:returns: Entitlements |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id} |
|
|
|
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path)) |
|
|
|
@ -592,9 +658,24 @@ class KeycloakOpenID: |
|
|
|
) |
|
|
|
|
|
|
|
if data_raw.status_code in {HTTP_NOT_FOUND, HTTP_NOT_ALLOWED}: |
|
|
|
return raise_error_from_response(data_raw, KeycloakDeprecationError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakDeprecationError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', " |
|
|
|
f"received '{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
return res |
|
|
|
|
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', " |
|
|
|
f"received '{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover |
|
|
|
return res |
|
|
|
|
|
|
|
def introspect( |
|
|
|
self, |
|
|
|
@ -629,7 +710,7 @@ class KeycloakOpenID: |
|
|
|
if token_type_hint == "requesting_party_token": # noqa: S105 |
|
|
|
if rpt: |
|
|
|
payload.update({"token": rpt, "token_type_hint": token_type_hint}) |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
bearer_changed = True |
|
|
|
else: |
|
|
|
@ -645,10 +726,19 @@ class KeycloakOpenID: |
|
|
|
if orig_bearer is not None |
|
|
|
else self.connection.del_param_headers("Authorization") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
|
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _verify_token(token: str, key: jwk.JWK | jwk.JWKSet | None, **kwargs: dict) -> dict: |
|
|
|
def _verify_token(token: str, key: jwk.JWK | jwk.JWKSet | None, **kwargs: Any) -> dict: # noqa: ANN401 |
|
|
|
""" |
|
|
|
Decode and optionally validate a token. |
|
|
|
|
|
|
|
@ -669,12 +759,13 @@ class KeycloakOpenID: |
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
full_jwt.leeway = leeway |
|
|
|
full_jwt.validate(key) |
|
|
|
return jwt.json_decode(full_jwt.claims) |
|
|
|
return jwt.json_decode(full_jwt.claims) # pyright: ignore[reportAttributeAccessIssue] |
|
|
|
|
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
full_jwt.token.objects["valid"] = True |
|
|
|
return json.loads(full_jwt.token.payload.decode("utf-8")) |
|
|
|
|
|
|
|
def decode_token(self, token: str, validate: bool = True, **kwargs: dict) -> dict: |
|
|
|
def decode_token(self, token: str, validate: bool = True, **kwargs: Any) -> dict: # noqa: ANN401 |
|
|
|
""" |
|
|
|
Decode user token. |
|
|
|
|
|
|
|
@ -727,8 +818,8 @@ class KeycloakOpenID: |
|
|
|
self, |
|
|
|
token: str, |
|
|
|
method_token_info: str = "introspect", # noqa: S107 |
|
|
|
**kwargs: dict, |
|
|
|
) -> list: |
|
|
|
**kwargs: Any, # noqa: ANN401 |
|
|
|
) -> list | None: |
|
|
|
""" |
|
|
|
Get policies by user token. |
|
|
|
|
|
|
|
@ -771,8 +862,8 @@ class KeycloakOpenID: |
|
|
|
self, |
|
|
|
token: str, |
|
|
|
method_token_info: str = "introspect", # noqa: S107 |
|
|
|
**kwargs: dict, |
|
|
|
) -> list: |
|
|
|
**kwargs: Any, # noqa: ANN401 |
|
|
|
) -> list | None: |
|
|
|
""" |
|
|
|
Get permission by user token. |
|
|
|
|
|
|
|
@ -811,7 +902,7 @@ class KeycloakOpenID: |
|
|
|
|
|
|
|
return list(set(permissions)) |
|
|
|
|
|
|
|
def uma_permissions(self, token: str, permissions: str = "", **extra_payload: dict) -> list: |
|
|
|
def uma_permissions(self, token: str, permissions: str = "", **extra_payload: Any) -> list: # noqa: ANN401 |
|
|
|
""" |
|
|
|
Get UMA permissions by user token with requested permissions. |
|
|
|
|
|
|
|
@ -840,9 +931,9 @@ class KeycloakOpenID: |
|
|
|
**extra_payload, |
|
|
|
} |
|
|
|
|
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -855,9 +946,17 @@ class KeycloakOpenID: |
|
|
|
if orig_bearer is not None |
|
|
|
else self.connection.del_param_headers("Authorization") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, list): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'list', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
def has_uma_access(self, token: str, permissions: list) -> AuthStatus: |
|
|
|
return res |
|
|
|
|
|
|
|
def has_uma_access(self, token: str, permissions: str) -> AuthStatus: |
|
|
|
""" |
|
|
|
Determine whether user has uma permissions with specified user token. |
|
|
|
|
|
|
|
@ -918,9 +1017,9 @@ class KeycloakOpenID: |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
orig_content_type = self.connection.headers.get("Content-Type") |
|
|
|
orig_content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/json") |
|
|
|
data_raw = self.connection.raw_post( |
|
|
|
URL_CLIENT_REGISTRATION.format(**params_path), |
|
|
|
@ -936,7 +1035,15 @@ class KeycloakOpenID: |
|
|
|
if orig_content_type is not None |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def device(self, scope: str = "") -> dict: |
|
|
|
""" |
|
|
|
@ -964,9 +1071,17 @@ class KeycloakOpenID: |
|
|
|
|
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
data_raw = self.connection.raw_post(URL_DEVICE.format(**params_path), data=payload) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
def update_client(self, token: str, client_id: str, payload: dict) -> bytes: |
|
|
|
def update_client(self, token: str, client_id: str, payload: dict) -> dict: |
|
|
|
""" |
|
|
|
Update a client. |
|
|
|
|
|
|
|
@ -983,9 +1098,9 @@ class KeycloakOpenID: |
|
|
|
:rtype: bytes |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "client-id": client_id} |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
orig_content_type = self.connection.headers.get("Content-Type") |
|
|
|
orig_content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/json") |
|
|
|
|
|
|
|
# Keycloak complains if the clientId is not set in the payload |
|
|
|
@ -1006,9 +1121,17 @@ class KeycloakOpenID: |
|
|
|
if orig_content_type is not None |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPutError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPutError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def _a_token_info(self, token: str, method_token_info: str, **kwargs: dict) -> dict: |
|
|
|
async def _a_token_info(self, token: str, method_token_info: str, **kwargs: Any) -> dict: # noqa: ANN401 |
|
|
|
""" |
|
|
|
Asynchronous getter for the token data. |
|
|
|
|
|
|
|
@ -1041,7 +1164,15 @@ class KeycloakOpenID: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = await self.connection.a_raw_get(URL_WELL_KNOWN.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_auth_url( |
|
|
|
self, |
|
|
|
@ -1087,15 +1218,15 @@ class KeycloakOpenID: |
|
|
|
|
|
|
|
async def a_token( |
|
|
|
self, |
|
|
|
username: str = "", |
|
|
|
password: str = "", |
|
|
|
username: str | None = "", |
|
|
|
password: str | None = "", |
|
|
|
grant_type: str = "password", |
|
|
|
code: str = "", |
|
|
|
redirect_uri: str = "", |
|
|
|
totp: int | None = None, |
|
|
|
scope: str = "openid", |
|
|
|
code_verifier: str | None = None, |
|
|
|
**extra: dict, |
|
|
|
**extra: Any, # noqa: ANN401 |
|
|
|
) -> dict: |
|
|
|
""" |
|
|
|
Retrieve user token asynchronously. |
|
|
|
@ -1147,7 +1278,7 @@ class KeycloakOpenID: |
|
|
|
payload["totp"] = totp |
|
|
|
|
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -1155,7 +1286,15 @@ class KeycloakOpenID: |
|
|
|
if content_type |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_refresh_token(self, refresh_token: str, grant_type: str = "refresh_token") -> dict: |
|
|
|
""" |
|
|
|
@ -1182,7 +1321,7 @@ class KeycloakOpenID: |
|
|
|
"refresh_token": refresh_token, |
|
|
|
} |
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -1190,7 +1329,15 @@ class KeycloakOpenID: |
|
|
|
if content_type |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_exchange_token( |
|
|
|
self, |
|
|
|
@ -1242,7 +1389,7 @@ class KeycloakOpenID: |
|
|
|
"scope": scope, |
|
|
|
} |
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -1250,7 +1397,15 @@ class KeycloakOpenID: |
|
|
|
if content_type |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_userinfo(self, token: str) -> dict: |
|
|
|
""" |
|
|
|
@ -1266,7 +1421,7 @@ class KeycloakOpenID: |
|
|
|
:returns: Userinfo object |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = await self.connection.a_raw_get(URL_USERINFO.format(**params_path)) |
|
|
|
@ -1275,26 +1430,42 @@ class KeycloakOpenID: |
|
|
|
if orig_bearer is not None |
|
|
|
else self.connection.del_param_headers("Authorization") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
async def a_logout(self, refresh_token: str) -> bytes: |
|
|
|
return res |
|
|
|
|
|
|
|
async def a_logout(self, refresh_token: str) -> dict: |
|
|
|
""" |
|
|
|
Log out the authenticated user asynchronously. |
|
|
|
|
|
|
|
:param refresh_token: Refresh token from Keycloak |
|
|
|
:type refresh_token: str |
|
|
|
:returns: Keycloak server response |
|
|
|
:rtype: bytes |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
payload = {"client_id": self.client_id, "refresh_token": refresh_token} |
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
data_raw = await self.connection.a_raw_post(URL_LOGOUT.format(**params_path), data=payload) |
|
|
|
return raise_error_from_response( |
|
|
|
res = raise_error_from_response( |
|
|
|
data_raw, |
|
|
|
KeycloakPostError, |
|
|
|
expected_codes=[HTTP_NO_CONTENT], |
|
|
|
) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_certs(self) -> dict: |
|
|
|
""" |
|
|
|
@ -1311,7 +1482,15 @@ class KeycloakOpenID: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = await self.connection.a_raw_get(URL_CERTS.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_public_key(self) -> str: |
|
|
|
""" |
|
|
|
@ -1324,7 +1503,15 @@ class KeycloakOpenID: |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = await self.connection.a_raw_get(URL_REALM.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res["public_key"] |
|
|
|
|
|
|
|
async def a_entitlement(self, token: str, resource_server_id: str) -> dict: |
|
|
|
""" |
|
|
|
@ -1343,7 +1530,7 @@ class KeycloakOpenID: |
|
|
|
:returns: Entitlements |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id} |
|
|
|
data_raw = await self.connection.a_raw_get(URL_ENTITLEMENT.format(**params_path)) |
|
|
|
@ -1354,9 +1541,25 @@ class KeycloakOpenID: |
|
|
|
) |
|
|
|
|
|
|
|
if data_raw.status_code in [HTTP_NOT_FOUND, HTTP_NOT_ALLOWED]: |
|
|
|
return raise_error_from_response(data_raw, KeycloakDeprecationError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakDeprecationError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_introspect( |
|
|
|
self, |
|
|
|
@ -1391,7 +1594,7 @@ class KeycloakOpenID: |
|
|
|
if token_type_hint == "requesting_party_token": # noqa: S105 |
|
|
|
if rpt: |
|
|
|
payload.update({"token": rpt, "token_type_hint": token_type_hint}) |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
bearer_changed = True |
|
|
|
else: |
|
|
|
@ -1410,9 +1613,17 @@ class KeycloakOpenID: |
|
|
|
if orig_bearer is not None |
|
|
|
else self.connection.del_param_headers("Authorization") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
async def a_decode_token(self, token: str, validate: bool = True, **kwargs: dict) -> dict: |
|
|
|
return res |
|
|
|
|
|
|
|
async def a_decode_token(self, token: str, validate: bool = True, **kwargs: Any) -> dict: # noqa: ANN401 |
|
|
|
""" |
|
|
|
Decode user token asynchronously. |
|
|
|
|
|
|
|
@ -1465,8 +1676,8 @@ class KeycloakOpenID: |
|
|
|
self, |
|
|
|
token: str, |
|
|
|
method_token_info: str = "introspect", # noqa: S107 |
|
|
|
**kwargs: dict, |
|
|
|
) -> list: |
|
|
|
**kwargs: Any, # noqa: ANN401 |
|
|
|
) -> list | None: |
|
|
|
""" |
|
|
|
Get policies by user token asynchronously. |
|
|
|
|
|
|
|
@ -1477,7 +1688,7 @@ class KeycloakOpenID: |
|
|
|
:param kwargs: Additional keyword arguments |
|
|
|
:type kwargs: dict |
|
|
|
:return: Policies |
|
|
|
:rtype: list |
|
|
|
:rtype: list | None |
|
|
|
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration |
|
|
|
:raises KeycloakInvalidTokenError: In case of bad token |
|
|
|
""" |
|
|
|
@ -1509,8 +1720,8 @@ class KeycloakOpenID: |
|
|
|
self, |
|
|
|
token: str, |
|
|
|
method_token_info: str = "introspect", # noqa: S107 |
|
|
|
**kwargs: dict, |
|
|
|
) -> list: |
|
|
|
**kwargs: Any, # noqa: ANN401 |
|
|
|
) -> list | None: |
|
|
|
""" |
|
|
|
Get permission by user token asynchronously. |
|
|
|
|
|
|
|
@ -1521,7 +1732,7 @@ class KeycloakOpenID: |
|
|
|
:param kwargs: parameters for decode |
|
|
|
:type kwargs: dict |
|
|
|
:returns: permissions list |
|
|
|
:rtype: list |
|
|
|
:rtype: list | None |
|
|
|
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration |
|
|
|
:raises KeycloakInvalidTokenError: In case of bad token |
|
|
|
""" |
|
|
|
@ -1552,7 +1763,7 @@ class KeycloakOpenID: |
|
|
|
self, |
|
|
|
token: str, |
|
|
|
permissions: str = "", |
|
|
|
**extra_payload: dict, |
|
|
|
**extra_payload: Any, # noqa: ANN401 |
|
|
|
) -> list: |
|
|
|
""" |
|
|
|
Get UMA permissions by user token with requested permissions asynchronously. |
|
|
|
@ -1582,9 +1793,9 @@ class KeycloakOpenID: |
|
|
|
**extra_payload, |
|
|
|
} |
|
|
|
|
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
content_type = self.connection.headers.get("Content-Type") |
|
|
|
content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") |
|
|
|
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
|
( |
|
|
|
@ -1597,9 +1808,17 @@ class KeycloakOpenID: |
|
|
|
if orig_bearer is not None |
|
|
|
else self.connection.del_param_headers("Authorization") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, list): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'list', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
async def a_has_uma_access(self, token: str, permissions: list) -> AuthStatus: |
|
|
|
return res |
|
|
|
|
|
|
|
async def a_has_uma_access(self, token: str, permissions: str) -> AuthStatus: |
|
|
|
""" |
|
|
|
Determine whether user has uma permissions with specified user token asynchronously. |
|
|
|
|
|
|
|
@ -1660,9 +1879,9 @@ class KeycloakOpenID: |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
orig_content_type = self.connection.headers.get("Content-Type") |
|
|
|
orig_content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/json") |
|
|
|
data_raw = await self.connection.a_raw_post( |
|
|
|
URL_CLIENT_REGISTRATION.format(**params_path), |
|
|
|
@ -1678,7 +1897,15 @@ class KeycloakOpenID: |
|
|
|
if orig_content_type is not None |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_device(self, scope: str = "") -> dict: |
|
|
|
""" |
|
|
|
@ -1706,9 +1933,17 @@ class KeycloakOpenID: |
|
|
|
|
|
|
|
payload = self._add_secret_key(payload) |
|
|
|
data_raw = await self.connection.a_raw_post(URL_DEVICE.format(**params_path), data=payload) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |
|
|
|
|
|
|
|
async def a_update_client(self, token: str, client_id: str, payload: dict) -> bytes: |
|
|
|
async def a_update_client(self, token: str, client_id: str, payload: dict) -> dict: |
|
|
|
""" |
|
|
|
Update a client asynchronously. |
|
|
|
|
|
|
|
@ -1722,12 +1957,12 @@ class KeycloakOpenID: |
|
|
|
:param payload: ClientRepresentation |
|
|
|
:type payload: dict |
|
|
|
:return: Client Representation |
|
|
|
:rtype: bytes |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "client-id": client_id} |
|
|
|
orig_bearer = self.connection.headers.get("Authorization") |
|
|
|
orig_bearer = (self.connection.headers or {}).get("Authorization") |
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
|
orig_content_type = self.connection.headers.get("Content-Type") |
|
|
|
orig_content_type = (self.connection.headers or {}).get("Content-Type") |
|
|
|
self.connection.add_param_headers("Content-Type", "application/json") |
|
|
|
|
|
|
|
# Keycloak complains if the clientId is not set in the payload |
|
|
|
@ -1748,4 +1983,12 @@ class KeycloakOpenID: |
|
|
|
if orig_content_type is not None |
|
|
|
else self.connection.del_param_headers("Content-Type") |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPutError) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakPutError) |
|
|
|
if not isinstance(res, dict): |
|
|
|
msg = ( |
|
|
|
"Unexpected response type. Expected 'dict', received " |
|
|
|
f"'{type(res)}', value '{res}'." |
|
|
|
) |
|
|
|
raise TypeError(msg) |
|
|
|
|
|
|
|
return res |