|
|
@ -31,11 +31,9 @@ import json |
|
|
|
from builtins import isinstance |
|
|
|
from typing import Optional |
|
|
|
|
|
|
|
import deprecation |
|
|
|
from requests_toolbelt import MultipartEncoder |
|
|
|
|
|
|
|
from . import urls_patterns |
|
|
|
from ._version import __version__ |
|
|
|
from .exceptions import ( |
|
|
|
KeycloakDeleteError, |
|
|
|
KeycloakGetError, |
|
|
@ -73,9 +71,6 @@ class KeycloakAdmin: |
|
|
|
:type custom_headers: dict |
|
|
|
:param user_realm_name: The realm name of the user, if different from realm_name |
|
|
|
:type user_realm_name: str |
|
|
|
:param auto_refresh_token: list of methods that allows automatic token refresh. |
|
|
|
Ex: ['get', 'put', 'post', 'delete'] |
|
|
|
:type auto_refresh_token: list |
|
|
|
:param timeout: connection timeout in seconds |
|
|
|
:type timeout: int |
|
|
|
:param connection: A KeycloakOpenIDConnection as an alternative to individual params. |
|
|
@ -84,9 +79,6 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
PAGE_SIZE = 100 |
|
|
|
|
|
|
|
_auto_refresh_token = None |
|
|
|
_connection: Optional[KeycloakOpenIDConnection] = None |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
server_url=None, |
|
|
@ -100,7 +92,6 @@ class KeycloakAdmin: |
|
|
|
client_secret_key=None, |
|
|
|
custom_headers=None, |
|
|
|
user_realm_name=None, |
|
|
|
auto_refresh_token=None, |
|
|
|
timeout=60, |
|
|
|
connection: Optional[KeycloakOpenIDConnection] = None, |
|
|
|
): |
|
|
@ -130,9 +121,6 @@ class KeycloakAdmin: |
|
|
|
:type custom_headers: dict |
|
|
|
:param user_realm_name: The realm name of the user, if different from realm_name |
|
|
|
:type user_realm_name: str |
|
|
|
:param auto_refresh_token: list of methods that allows automatic token refresh. |
|
|
|
Ex: ['get', 'put', 'post', 'delete'] |
|
|
|
:type auto_refresh_token: list |
|
|
|
:param timeout: connection timeout in seconds |
|
|
|
:type timeout: int |
|
|
|
:param connection: An OpenID Connection as an alternative to individual params. |
|
|
@ -152,58 +140,6 @@ class KeycloakAdmin: |
|
|
|
custom_headers=custom_headers, |
|
|
|
timeout=timeout, |
|
|
|
) |
|
|
|
if auto_refresh_token is not None: |
|
|
|
self.auto_refresh_token = auto_refresh_token |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.server_url property instead", |
|
|
|
) |
|
|
|
def server_url(self): |
|
|
|
"""Get server url. |
|
|
|
|
|
|
|
:returns: Keycloak server url |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.server_url |
|
|
|
|
|
|
|
@server_url.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.server_url property instead", |
|
|
|
) |
|
|
|
def server_url(self, value): |
|
|
|
self.connection.server_url = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.realm_name property instead", |
|
|
|
) |
|
|
|
def realm_name(self): |
|
|
|
"""Get realm name. |
|
|
|
|
|
|
|
:returns: Realm name |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.realm_name |
|
|
|
|
|
|
|
@realm_name.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.realm_name property instead", |
|
|
|
) |
|
|
|
def realm_name(self, value): |
|
|
|
self.connection.realm_name = value |
|
|
|
|
|
|
|
@property |
|
|
|
def connection(self) -> KeycloakOpenIDConnection: |
|
|
@ -218,256 +154,6 @@ class KeycloakAdmin: |
|
|
|
def connection(self, value: KeycloakOpenIDConnection) -> None: |
|
|
|
self._connection = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.client_id property instead", |
|
|
|
) |
|
|
|
def client_id(self): |
|
|
|
"""Get client id. |
|
|
|
|
|
|
|
:returns: Client id |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.client_id |
|
|
|
|
|
|
|
@client_id.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.client_id property instead", |
|
|
|
) |
|
|
|
def client_id(self, value): |
|
|
|
self.connection.client_id = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.client_secret_key property instead", |
|
|
|
) |
|
|
|
def client_secret_key(self): |
|
|
|
"""Get client secret key. |
|
|
|
|
|
|
|
:returns: Client secret key |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.client_secret_key |
|
|
|
|
|
|
|
@client_secret_key.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.client_secret_key property instead", |
|
|
|
) |
|
|
|
def client_secret_key(self, value): |
|
|
|
self.connection.client_secret_key = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.verify property instead", |
|
|
|
) |
|
|
|
def verify(self): |
|
|
|
"""Get verify. |
|
|
|
|
|
|
|
:returns: Verify indicator |
|
|
|
:rtype: bool |
|
|
|
""" |
|
|
|
return self.connection.verify |
|
|
|
|
|
|
|
@verify.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.verify property instead", |
|
|
|
) |
|
|
|
def verify(self, value): |
|
|
|
self.connection.verify = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.username property instead", |
|
|
|
) |
|
|
|
def username(self): |
|
|
|
"""Get username. |
|
|
|
|
|
|
|
:returns: Admin username |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.username |
|
|
|
|
|
|
|
@username.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.username property instead", |
|
|
|
) |
|
|
|
def username(self, value): |
|
|
|
self.connection.username = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.password property instead", |
|
|
|
) |
|
|
|
def password(self): |
|
|
|
"""Get password. |
|
|
|
|
|
|
|
:returns: Admin password |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.password |
|
|
|
|
|
|
|
@password.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.password property instead", |
|
|
|
) |
|
|
|
def password(self, value): |
|
|
|
self.connection.password = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.totp property instead", |
|
|
|
) |
|
|
|
def totp(self): |
|
|
|
"""Get totp. |
|
|
|
|
|
|
|
:returns: TOTP |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.totp |
|
|
|
|
|
|
|
@totp.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.totp property instead", |
|
|
|
) |
|
|
|
def totp(self, value): |
|
|
|
self.connection.totp = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.token property instead", |
|
|
|
) |
|
|
|
def token(self): |
|
|
|
"""Get token. |
|
|
|
|
|
|
|
:returns: Access and refresh token |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
return self.connection.token |
|
|
|
|
|
|
|
@token.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.token property instead", |
|
|
|
) |
|
|
|
def token(self, value): |
|
|
|
self.connection.token = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.user_realm_name property instead", |
|
|
|
) |
|
|
|
def user_realm_name(self): |
|
|
|
"""Get user realm name. |
|
|
|
|
|
|
|
:returns: User realm name |
|
|
|
:rtype: str |
|
|
|
""" |
|
|
|
return self.connection.user_realm_name |
|
|
|
|
|
|
|
@user_realm_name.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.user_realm_name property instead", |
|
|
|
) |
|
|
|
def user_realm_name(self, value): |
|
|
|
self.connection.user_realm_name = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.custom_headers property instead", |
|
|
|
) |
|
|
|
def custom_headers(self): |
|
|
|
"""Get custom headers. |
|
|
|
|
|
|
|
:returns: Custom headers |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
return self.connection.custom_headers |
|
|
|
|
|
|
|
@custom_headers.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.custom_headers property instead", |
|
|
|
) |
|
|
|
def custom_headers(self, value): |
|
|
|
self.connection.custom_headers = value |
|
|
|
|
|
|
|
@property |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Auto-refresh will be implicitly set for all requests", |
|
|
|
) |
|
|
|
def auto_refresh_token(self): |
|
|
|
"""Get auto refresh token. |
|
|
|
|
|
|
|
:returns: List of methods for automatic token refresh |
|
|
|
:rtype: list |
|
|
|
""" |
|
|
|
return self._auto_refresh_token |
|
|
|
|
|
|
|
@auto_refresh_token.setter |
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Auto-refresh will be implicitly set for all requests", |
|
|
|
) |
|
|
|
def auto_refresh_token(self, value): |
|
|
|
self._auto_refresh_token = value or [] |
|
|
|
|
|
|
|
def __fetch_all(self, url, query=None): |
|
|
|
"""Paginate over get requests. |
|
|
|
|
|
|
@ -4095,120 +3781,6 @@ class KeycloakAdmin: |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) |
|
|
|
|
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.raw_get function instead", |
|
|
|
) |
|
|
|
def raw_get(self, *args, **kwargs): |
|
|
|
"""Call connection.raw_get. |
|
|
|
|
|
|
|
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token |
|
|
|
and try *get* once more. |
|
|
|
|
|
|
|
:param args: Additional arguments |
|
|
|
:type args: tuple |
|
|
|
:param kwargs: Additional keyword arguments |
|
|
|
:type kwargs: dict |
|
|
|
:returns: Response |
|
|
|
:rtype: Response |
|
|
|
""" |
|
|
|
return self.connection.raw_get(*args, **kwargs) |
|
|
|
|
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.raw_post function instead", |
|
|
|
) |
|
|
|
def raw_post(self, *args, **kwargs): |
|
|
|
"""Call connection.raw_post. |
|
|
|
|
|
|
|
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token |
|
|
|
and try *post* once more. |
|
|
|
|
|
|
|
:param args: Additional arguments |
|
|
|
:type args: tuple |
|
|
|
:param kwargs: Additional keyword arguments |
|
|
|
:type kwargs: dict |
|
|
|
:returns: Response |
|
|
|
:rtype: Response |
|
|
|
""" |
|
|
|
return self.connection.raw_post(*args, **kwargs) |
|
|
|
|
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.raw_put function instead", |
|
|
|
) |
|
|
|
def raw_put(self, *args, **kwargs): |
|
|
|
"""Call connection.raw_put. |
|
|
|
|
|
|
|
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token |
|
|
|
and try *put* once more. |
|
|
|
|
|
|
|
:param args: Additional arguments |
|
|
|
:type args: tuple |
|
|
|
:param kwargs: Additional keyword arguments |
|
|
|
:type kwargs: dict |
|
|
|
:returns: Response |
|
|
|
:rtype: Response |
|
|
|
""" |
|
|
|
return self.connection.raw_put(*args, **kwargs) |
|
|
|
|
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.raw_delete function instead", |
|
|
|
) |
|
|
|
def raw_delete(self, *args, **kwargs): |
|
|
|
"""Call connection.raw_delete. |
|
|
|
|
|
|
|
If auto_refresh is set for *delete* and *access_token* is expired, |
|
|
|
it will refresh the token and try *delete* once more. |
|
|
|
|
|
|
|
:param args: Additional arguments |
|
|
|
:type args: tuple |
|
|
|
:param kwargs: Additional keyword arguments |
|
|
|
:type kwargs: dict |
|
|
|
:returns: Response |
|
|
|
:rtype: Response |
|
|
|
""" |
|
|
|
return self.connection.raw_delete(*args, **kwargs) |
|
|
|
|
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.get_token function instead", |
|
|
|
) |
|
|
|
def get_token(self): |
|
|
|
"""Get admin token. |
|
|
|
|
|
|
|
The admin token is then set in the `token` attribute. |
|
|
|
|
|
|
|
:returns: token |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
return self.connection.get_token() |
|
|
|
|
|
|
|
@deprecation.deprecated( |
|
|
|
deprecated_in="2.13.0", |
|
|
|
removed_in="4.0.0", |
|
|
|
current_version=__version__, |
|
|
|
details="Use the connection.refresh_token function instead", |
|
|
|
) |
|
|
|
def refresh_token(self): |
|
|
|
"""Refresh the token. |
|
|
|
|
|
|
|
:returns: token |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
return self.connection.refresh_token() |
|
|
|
|
|
|
|
def get_client_all_sessions(self, client_id): |
|
|
|
"""Get sessions associated with the client. |
|
|
|
|
|
|
@ -4368,8 +3940,8 @@ class KeycloakAdmin: |
|
|
|
:return: Keycloak server response |
|
|
|
:rtype: bytes |
|
|
|
""" |
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
data_raw = self.raw_post( |
|
|
|
params_path = {"realm-name": self.connection.realm_name, "id": client_id} |
|
|
|
data_raw = self.connection.raw_post( |
|
|
|
urls_patterns.URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path), |
|
|
|
data=json.dumps(payload), |
|
|
|
) |
|
|
|