|
|
@ -47,6 +47,17 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
PAGE_SIZE = 100 |
|
|
|
|
|
|
|
_server_url = None |
|
|
|
_username = None |
|
|
|
_password = None |
|
|
|
_realm_name = None |
|
|
|
_client_id = None |
|
|
|
_verify = None |
|
|
|
_client_secret_key = None |
|
|
|
_auto_refresh_token = None |
|
|
|
_connection = None |
|
|
|
_token = None |
|
|
|
|
|
|
|
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli', verify=True, client_secret_key=None, |
|
|
|
auto_refresh_token=None): |
|
|
|
""" |
|
|
@ -60,30 +71,18 @@ class KeycloakAdmin: |
|
|
|
:param client_secret_key: client secret key |
|
|
|
:param auto_refresh_token: list of methods that allows automatic token refresh. ex: ['get', 'put', 'post', 'delete'] |
|
|
|
""" |
|
|
|
self._server_url = server_url |
|
|
|
self._username = username |
|
|
|
self._password = password |
|
|
|
self._realm_name = realm_name |
|
|
|
self._client_id = client_id |
|
|
|
self._verify = verify |
|
|
|
self._client_secret_key = client_secret_key |
|
|
|
self._auto_refresh_token = auto_refresh_token or [] |
|
|
|
self.server_url = server_url |
|
|
|
self.username = username |
|
|
|
self.password = password |
|
|
|
self.realm_name = realm_name |
|
|
|
self.client_id = client_id |
|
|
|
self.verify = verify |
|
|
|
self.client_secret_key = client_secret_key |
|
|
|
self.auto_refresh_token = auto_refresh_token or [] |
|
|
|
|
|
|
|
# Get token Admin |
|
|
|
self.get_token() |
|
|
|
self.keycloak_openid = KeycloakOpenID(server_url=self.server_url, client_id=self.client_id, |
|
|
|
realm_name=self.realm_name, verify=self.verify, |
|
|
|
client_secret_key=self.client_secret_key) |
|
|
|
|
|
|
|
grant_type = ["password"] |
|
|
|
if client_secret_key: |
|
|
|
grant_type = ["client_credentials"] |
|
|
|
self._token = self.keycloak_openid.token(username, password, grant_type=grant_type) |
|
|
|
self._connection = ConnectionManager(base_url=server_url, |
|
|
|
headers={'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
|
'Content-Type': 'application/json'}, |
|
|
|
timeout=60, |
|
|
|
verify=verify) |
|
|
|
|
|
|
|
@property |
|
|
|
def server_url(self): |
|
|
@ -166,8 +165,8 @@ class KeycloakAdmin: |
|
|
|
allowed_methods = {'get', 'post', 'put', 'delete'} |
|
|
|
if not isinstance(value, Iterable): |
|
|
|
raise TypeError('Expected a list of strings among {allowed}'.format(allowed=allowed_methods)) |
|
|
|
if not any(method not in allowed_methods for method in value): |
|
|
|
raise TypeError('Unexpected method, accepted methods are {allowed}'.format(allowed=allowed_methods)) |
|
|
|
if not all(method in allowed_methods for method in value): |
|
|
|
raise TypeError('Unexpected method in auto_refresh_token, accepted methods are {allowed}'.format(allowed=allowed_methods)) |
|
|
|
|
|
|
|
self._auto_refresh_token = value |
|
|
|
|
|
|
|