You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

420 lines
12 KiB

# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID Connection Manager module.
The module contains mainly the implementation of KeycloakOpenIDConnection class.
This is an extension of the ConnectionManager class, and handles the automatic refresh
of openid tokens when required.
"""
from datetime import datetime, timedelta
from .connection import ConnectionManager
from .exceptions import KeycloakPostError
from .keycloak_openid import KeycloakOpenID
class KeycloakOpenIDConnection(ConnectionManager):
"""A class to help with OpenID connections which can auto refresh tokens.
:param object: _description_
:type object: _type_
"""
_server_url = None
_username = None
_password = None
_totp = None
_realm_name = None
_client_id = None
_verify = None
_client_secret_key = None
_connection = None
_custom_headers = None
_user_realm_name = None
_expires_at = None
_keycloak_openid = None
def __init__(
self,
server_url,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
timeout=60,
):
"""Init method.
:param server_url: Keycloak server url
:type server_url: str
:param username: admin username
:type username: str
:param password: admin password
:type password: str
:param token: access and refresh tokens
:type token: dict
:param totp: Time based OTP
:type totp: str
:param realm_name: realm name
:type realm_name: str
:param client_id: client id
:type client_id: str
:param verify: Boolean value to enable or disable certificate validation or a string
containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:type client_secret_key: str
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param user_realm_name: The realm name of the user, if different from realm_name
:type user_realm_name: str
:param timeout: connection timeout in seconds
:type timeout: int
"""
# token is renewed when it hits 90% of its lifetime. This is to account for any possible
# clock skew.
self.token_lifetime_fraction = 0.9
self.server_url = server_url
self.username = username
self.password = password
self.token = token
self.totp = totp
self.realm_name = realm_name
self.client_id = client_id
self.verify = verify
self.client_secret_key = client_secret_key
self.user_realm_name = user_realm_name
self.timeout = timeout
self.headers = {}
self.custom_headers = custom_headers
if self.token is None:
self.get_token()
if self.token is not None:
self.headers = {
**self.headers,
"Authorization": "Bearer " + self.token.get("access_token"),
"Content-Type": "application/json",
}
super().__init__(
base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
)
@property
def server_url(self):
"""Get server url.
:returns: Keycloak server url
:rtype: str
"""
return self.base_url
@server_url.setter
def server_url(self, value):
self.base_url = value
@property
def realm_name(self):
"""Get realm name.
:returns: Realm name
:rtype: str
"""
return self._realm_name
@realm_name.setter
def realm_name(self, value):
self._realm_name = value
@property
def client_id(self):
"""Get client id.
:returns: Client id
:rtype: str
"""
return self._client_id
@client_id.setter
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
"""Get client secret key.
:returns: Client secret key
:rtype: str
"""
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def username(self):
"""Get username.
:returns: Admin username
:rtype: str
"""
return self._username
@username.setter
def username(self, value):
self._username = value
@property
def password(self):
"""Get password.
:returns: Admin password
:rtype: str
"""
return self._password
@password.setter
def password(self, value):
self._password = value
@property
def totp(self):
"""Get totp.
:returns: TOTP
:rtype: str
"""
return self._totp
@totp.setter
def totp(self, value):
self._totp = value
@property
def token(self):
"""Get token.
:returns: Access and refresh token
:rtype: dict
"""
return self._token
@token.setter
def token(self, value):
self._token = value
self._expires_at = datetime.now() + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
)
@property
def expires_at(self):
"""Get token expiry time.
:returns: Datetime at which the current token will expire
:rtype: datetime
"""
return self._expires_at
@property
def user_realm_name(self):
"""Get user realm name.
:returns: User realm name
:rtype: str
"""
return self._user_realm_name
@user_realm_name.setter
def user_realm_name(self, value):
self._user_realm_name = value
@property
def custom_headers(self):
"""Get custom headers.
:returns: Custom headers
:rtype: dict
"""
return self._custom_headers
@custom_headers.setter
def custom_headers(self, value):
self._custom_headers = value
if self.custom_headers is not None:
# merge custom headers to main headers
self.headers.update(self.custom_headers)
@property
def keycloak_openid(self) -> KeycloakOpenID:
"""Get the KeycloakOpenID object.
The KeycloakOpenID is used to refresh tokens
:returns: KeycloakOpenID
:rtype: KeycloakOpenID
"""
if self._keycloak_openid is None:
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self._keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
custom_headers=self.custom_headers,
)
return self._keycloak_openid
def get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
grant_type = []
if self.username and self.password:
grant_type.append("password")
elif self.client_secret_key:
grant_type.append("client_credentials")
if grant_type:
self.token = self.keycloak_openid.token(
self.username, self.password, grant_type=grant_type, totp=self.totp
)
else:
self.token = None
def refresh_token(self):
"""Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed.
"""
refresh_token = self.token.get("refresh_token", None) if self.token else None
if refresh_token is None:
self.get_token()
else:
try:
self.token = self.keycloak_openid.refresh_token(refresh_token)
except KeycloakPostError as e:
list_errors = [
b"Refresh token expired",
b"Token is not active",
b"Session not active",
]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
self.get_token()
else:
raise
self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
def _refresh_if_required(self):
if datetime.now() >= self.expires_at:
self.refresh_token()
def raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_get(*args, **kwargs)
return r
def raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_post(*args, **kwargs)
return r
def raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_put(*args, **kwargs)
return r
def raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_delete(*args, **kwargs)
return r