Browse Source

fix: Refactor auto refresh (#415)

* refactor: Factor our OpenIdConnectionManager class and deprecate old methods

* refactor: Refactor keycloak uma client to use openid connection manager

* fix: Perform token renewal at 90% of lifetime

* refactor: Add optional openid connection constructor param to keycloak admin

* refactor: Remove auto_refresh_token in favour of automatic refresh on expiry

* refactor: move KeycloakOpenIDConnectionManager to a separate file

* docs: uma additions and fixes

* refactor: rename token_renewal_fraction->token_lifetime_fraction

* refactor: shorten KeycloakOpenIDConnectionManager->KeycloakOpenIDConnection

* docs: incorporate review comments
pull/420/head v2.13.2
Nuwan Goonasekera 2 years ago
committed by GitHub
parent
commit
7bbf4e15b7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 53
      README.md
  2. 47
      poetry.lock
  3. 2
      pyproject.toml
  4. 2
      src/keycloak/__init__.py
  5. 804
      src/keycloak/keycloak_admin.py
  6. 77
      src/keycloak/keycloak_uma.py
  7. 408
      src/keycloak/openid_connection.py
  8. 54
      tests/conftest.py
  9. 141
      tests/test_keycloak_admin.py
  10. 3
      tests/test_keycloak_openid.py
  11. 52
      tests/test_keycloak_uma.py

53
README.md

@ -69,7 +69,7 @@ keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/",
realm_name="example_realm", realm_name="example_realm",
client_secret_key="secret") client_secret_key="secret")
# Get WellKnow
# Get WellKnown
config_well_known = keycloak_openid.well_known() config_well_known = keycloak_openid.well_known()
# Get Code With Oauth Authorization Request # Get Code With Oauth Authorization Request
@ -142,14 +142,19 @@ auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Sc
# KEYCLOAK ADMIN # KEYCLOAK ADMIN
from keycloak import KeycloakAdmin from keycloak import KeycloakAdmin
from keycloak import KeycloakOpenIDConnection
keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
username='example-admin',
password='secret',
realm_name="master",
user_realm_name="only_if_other_realm_than_master",
client_secret_key="client-secret",
verify=True)
keycloak_connection = KeycloakOpenIDConnection(
server_url="http://localhost:8080/",
username='example-admin',
password='secret',
realm_name="master",
user_realm_name="only_if_other_realm_than_master",
client_id="my_client",
client_secret_key="client-secret",
verify=True)
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
# Add user # Add user
new_user = keycloak_admin.create_user({"email": "example@example.com", new_user = keycloak_admin.create_user({"email": "example@example.com",
@ -344,4 +349,36 @@ keycloak_admin.get_users() # Get user in main realm
keycloak_admin.realm_name = "demo" # Change realm to 'demo' keycloak_admin.realm_name = "demo" # Change realm to 'demo'
keycloak_admin.get_users() # Get users in realm 'demo' keycloak_admin.get_users() # Get users in realm 'demo'
keycloak_admin.create_user(...) # Creates a new user in 'demo' keycloak_admin.create_user(...) # Creates a new user in 'demo'
# KEYCLOAK UMA
from keycloak import KeycloakOpenIDConnection
from keycloak import KeycloakUMA
keycloak_connection = KeycloakOpenIDConnection(
server_url="http://localhost:8080/",
realm_name="master",
client_id="my_client",
client_secret_key="client-secret")
keycloak_uma = KeycloakUMA(connection=keycloak_connection)
# Create a resource set
resource_set = keycloak_uma.resource_set_create({
"name": "example_resource",
"scopes": ["example:read", "example:write"],
"type": "urn:example"})
# List resource sets
resource_sets = uma.resource_set_list()
# get resource set
latest_resource = uma.resource_set_read(resource_set["_id"])
# update resource set
latest_resource["name"] = "New Resource Name"
uma.resource_set_update(resource_set["_id"], latest_resource)
# delete resource set
uma.resource_set_delete(resource_id=resource_set["_id"])
``` ```

47
poetry.lock

@ -475,6 +475,21 @@ files = [
{file = "decli-0.5.2.tar.gz", hash = "sha256:f2cde55034a75c819c630c7655a844c612f2598c42c21299160465df6ad463ad"}, {file = "decli-0.5.2.tar.gz", hash = "sha256:f2cde55034a75c819c630c7655a844c612f2598c42c21299160465df6ad463ad"},
] ]
[[package]]
name = "deprecation"
version = "2.1.0"
description = "A library to handle automated deprecations"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a"},
{file = "deprecation-2.1.0.tar.gz", hash = "sha256:72b3bde64e5d778694b0cf68178aed03d15e15477116add3fb773e581f9518ff"},
]
[package.dependencies]
packaging = "*"
[[package]] [[package]]
name = "distlib" name = "distlib"
version = "0.3.6" version = "0.3.6"
@ -583,6 +598,21 @@ files = [
flake8 = ">=3" flake8 = ">=3"
pydocstyle = ">=2.1" pydocstyle = ">=2.1"
[[package]]
name = "freezegun"
version = "1.2.2"
description = "Let your Python tests travel through time"
category = "dev"
optional = false
python-versions = ">=3.6"
files = [
{file = "freezegun-1.2.2-py3-none-any.whl", hash = "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"},
{file = "freezegun-1.2.2.tar.gz", hash = "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446"},
]
[package.dependencies]
python-dateutil = ">=2.7"
[[package]] [[package]]
name = "identify" name = "identify"
version = "2.5.18" version = "2.5.18"
@ -1260,6 +1290,21 @@ pytest = ">=4.6"
[package.extras] [package.extras]
testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtualenv"] testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtualenv"]
[[package]]
name = "python-dateutil"
version = "2.8.2"
description = "Extensions to the standard Python datetime module"
category = "dev"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
files = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
]
[package.dependencies]
six = ">=1.5"
[[package]] [[package]]
name = "python-jose" name = "python-jose"
version = "3.3.0" version = "3.3.0"
@ -2094,4 +2139,4 @@ docs = ["Sphinx", "alabaster", "commonmark", "m2r2", "mock", "readthedocs-sphinx
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.7" python-versions = "^3.7"
content-hash = "8d76b155adddd2eacd0304397b33465d5c67f09165d8de641c71f5ce7b979be2"
content-hash = "70bb30bae9ff3d8b6c54553f755b2f31725701f379ae9aeb4a2a5658d2f6d51a"

2
pyproject.toml

@ -43,6 +43,7 @@ readthedocs-sphinx-ext = {version = "^2.1.9", optional = true}
m2r2 = {version = "^0.3.2", optional = true} m2r2 = {version = "^0.3.2", optional = true}
sphinx-autoapi = {version = "^2.0.0", optional = true} sphinx-autoapi = {version = "^2.0.0", optional = true}
requests-toolbelt = "^0.10.1" requests-toolbelt = "^0.10.1"
deprecation = "^2.1.0"
[tool.poetry.extras] [tool.poetry.extras]
docs = [ docs = [
@ -72,6 +73,7 @@ cryptography = "^37.0.4"
codespell = "^2.1.0" codespell = "^2.1.0"
darglint = "^1.8.1" darglint = "^1.8.1"
twine = "^4.0.2" twine = "^4.0.2"
freezegun = "^1.2.2"
[build-system] [build-system]
requires = ["poetry-core>=1.0.0"] requires = ["poetry-core>=1.0.0"]

2
src/keycloak/__init__.py

@ -43,6 +43,7 @@ from .exceptions import (
from .keycloak_admin import KeycloakAdmin from .keycloak_admin import KeycloakAdmin
from .keycloak_openid import KeycloakOpenID from .keycloak_openid import KeycloakOpenID
from .keycloak_uma import KeycloakUMA from .keycloak_uma import KeycloakUMA
from .openid_connection import KeycloakOpenIDConnection
__all__ = [ __all__ = [
"__version__", "__version__",
@ -62,5 +63,6 @@ __all__ = [
"KeycloakSecretNotFound", "KeycloakSecretNotFound",
"KeycloakAdmin", "KeycloakAdmin",
"KeycloakOpenID", "KeycloakOpenID",
"KeycloakOpenIDConnection",
"KeycloakUMA", "KeycloakUMA",
] ]

804
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

77
src/keycloak/keycloak_uma.py

@ -29,7 +29,6 @@ https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html
import json import json
from urllib.parse import quote_plus from urllib.parse import quote_plus
from .connection import ConnectionManager
from .exceptions import ( from .exceptions import (
KeycloakDeleteError, KeycloakDeleteError,
KeycloakGetError, KeycloakGetError,
@ -37,50 +36,30 @@ from .exceptions import (
KeycloakPutError, KeycloakPutError,
raise_error_from_response, raise_error_from_response,
) )
from .openid_connection import KeycloakOpenIDConnection
from .urls_patterns import URL_UMA_WELL_KNOWN from .urls_patterns import URL_UMA_WELL_KNOWN
class KeycloakUMA: class KeycloakUMA:
"""Keycloak UMA client. """Keycloak UMA client.
:param server_url: Keycloak server url
:param client_id: client id
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by.
:param timeout: connection timeout in seconds
:param connection: OpenID connection manager
""" """
def __init__(
self, server_url, realm_name, verify=True, custom_headers=None, proxies=None, timeout=60
):
def __init__(self, connection: KeycloakOpenIDConnection):
"""Init method. """Init method.
:param server_url: Keycloak server url
:type server_url: str
:param realm_name: realm name
:type realm_name: str
:param verify: True if want check connection SSL
:type verify: bool
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param proxies: dict of proxies to sent the request by.
:type proxies: dict
:param timeout: connection timeout in seconds
:type timeout: int
:param connection: OpenID connection manager
:type connection: KeycloakOpenIDConnection
""" """
self.realm_name = realm_name
headers = custom_headers if custom_headers is not None else dict()
headers.update({"Content-Type": "application/json"})
self.connection = ConnectionManager(
base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies
)
self.connection = connection
custom_headers = self.connection.custom_headers or {}
custom_headers.update({"Content-Type": "application/json"})
self.connection.custom_headers = custom_headers
self._well_known = None self._well_known = None
def _fetch_well_known(self): def _fetch_well_known(self):
params_path = {"realm-name": self.realm_name}
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path)) data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
@ -102,9 +81,6 @@ class KeycloakUMA:
""" """
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()}) return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
def _add_bearer_token_header(self, token):
self.connection.add_param_headers("Authorization", "Bearer " + token)
@property @property
def uma_well_known(self): def uma_well_known(self):
"""Get the well_known UMA2 config. """Get the well_known UMA2 config.
@ -117,7 +93,7 @@ class KeycloakUMA:
self._well_known = self._fetch_well_known() self._well_known = self._fetch_well_known()
return self._well_known return self._well_known
def resource_set_create(self, token, payload):
def resource_set_create(self, payload):
"""Create a resource set. """Create a resource set.
Spec Spec
@ -126,20 +102,17 @@ class KeycloakUMA:
ResourceRepresentation ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param payload: ResourceRepresentation :param payload: ResourceRepresentation
:type payload: dict :type payload: dict
:return: ResourceRepresentation with the _id property assigned :return: ResourceRepresentation with the _id property assigned
:rtype: dict :rtype: dict
""" """
self._add_bearer_token_header(token)
data_raw = self.connection.raw_post( data_raw = self.connection.raw_post(
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload) self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload)
) )
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def resource_set_update(self, token, resource_id, payload):
def resource_set_update(self, resource_id, payload):
"""Update a resource set. """Update a resource set.
Spec Spec
@ -148,8 +121,6 @@ class KeycloakUMA:
ResourceRepresentation ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param resource_id: id of the resource :param resource_id: id of the resource
:type resource_id: str :type resource_id: str
:param payload: ResourceRepresentation :param payload: ResourceRepresentation
@ -157,14 +128,13 @@ class KeycloakUMA:
:return: Response dict (empty) :return: Response dict (empty)
:rtype: dict :rtype: dict
""" """
self._add_bearer_token_header(token)
url = self.format_url( url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
) )
data_raw = self.connection.raw_put(url, data=json.dumps(payload)) data_raw = self.connection.raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
def resource_set_read(self, token, resource_id):
def resource_set_read(self, resource_id):
"""Read a resource set. """Read a resource set.
Spec Spec
@ -173,56 +143,47 @@ class KeycloakUMA:
ResourceRepresentation ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param resource_id: id of the resource :param resource_id: id of the resource
:type resource_id: str :type resource_id: str
:return: ResourceRepresentation :return: ResourceRepresentation
:rtype: dict :rtype: dict
""" """
self._add_bearer_token_header(token)
url = self.format_url( url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
) )
data_raw = self.connection.raw_get(url) data_raw = self.connection.raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_delete(self, token, resource_id):
def resource_set_delete(self, resource_id):
"""Delete a resource set. """Delete a resource set.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
:param token: client access token
:type token: str
:param resource_id: id of the resource :param resource_id: id of the resource
:type resource_id: str :type resource_id: str
:return: Response dict (empty) :return: Response dict (empty)
:rtype: dict :rtype: dict
""" """
self._add_bearer_token_header(token)
url = self.format_url( url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
) )
data_raw = self.connection.raw_delete(url) data_raw = self.connection.raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def resource_set_list_ids(self, token):
def resource_set_list_ids(self):
"""List all resource set ids. """List all resource set ids.
Spec Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
:param token: client access token
:type token: str
:return: List of ids :return: List of ids
:rtype: List[str] :rtype: List[str]
""" """
self._add_bearer_token_header(token)
data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"]) data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_list(self, token):
def resource_set_list(self):
"""List all resource sets. """List all resource sets.
Spec Spec
@ -231,11 +192,9 @@ class KeycloakUMA:
ResourceRepresentation ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:yields: Iterator over a list of ResourceRepresentations :yields: Iterator over a list of ResourceRepresentations
:rtype: Iterator[dict] :rtype: Iterator[dict]
""" """
for resource_id in self.resource_set_list_ids(token):
resource = self.resource_set_read(token, resource_id)
for resource_id in self.resource_set_list_ids():
resource = self.resource_set_read(resource_id)
yield resource yield resource

408
src/keycloak/openid_connection.py

@ -0,0 +1,408 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID Connection Manager module.
The module contains mainly the implementation of KeycloakOpenIDConnection class.
This is an extension of the ConnectionManager class, and handles the automatic refresh
of openid tokens when required.
"""
from datetime import datetime, timedelta
from .connection import ConnectionManager
from .exceptions import KeycloakPostError
from .keycloak_openid import KeycloakOpenID
class KeycloakOpenIDConnection(ConnectionManager):
"""A class to help with OpenID connections which can auto refresh tokens.
:param object: _description_
:type object: _type_
"""
_server_url = None
_username = None
_password = None
_totp = None
_realm_name = None
_client_id = None
_verify = None
_client_secret_key = None
_connection = None
_custom_headers = None
_user_realm_name = None
_expires_at = None
def __init__(
self,
server_url,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
timeout=60,
):
"""Init method.
:param server_url: Keycloak server url
:type server_url: str
:param username: admin username
:type username: str
:param password: admin password
:type password: str
:param token: access and refresh tokens
:type token: dict
:param totp: Time based OTP
:type totp: str
:param realm_name: realm name
:type realm_name: str
:param client_id: client id
:type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:type client_secret_key: str
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param user_realm_name: The realm name of the user, if different from realm_name
:type user_realm_name: str
:param timeout: connection timeout in seconds
:type timeout: int
"""
# token is renewed when it hits 90% of its lifetime. This is to account for any possible
# clock skew.
self.token_lifetime_fraction = 0.9
self.server_url = server_url
self.username = username
self.password = password
self.token = token
self.totp = totp
self.realm_name = realm_name
self.client_id = client_id
self.verify = verify
self.client_secret_key = client_secret_key
self.user_realm_name = user_realm_name
self.timeout = timeout
if self.token is None:
self.get_token()
self.headers = (
{
"Authorization": "Bearer " + self.token.get("access_token"),
"Content-Type": "application/json",
}
if self.token is not None
else {}
)
self.custom_headers = custom_headers
super().__init__(
base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
)
@property
def server_url(self):
"""Get server url.
:returns: Keycloak server url
:rtype: str
"""
return self.base_url
@server_url.setter
def server_url(self, value):
self.base_url = value
@property
def realm_name(self):
"""Get realm name.
:returns: Realm name
:rtype: str
"""
return self._realm_name
@realm_name.setter
def realm_name(self, value):
self._realm_name = value
@property
def client_id(self):
"""Get client id.
:returns: Client id
:rtype: str
"""
return self._client_id
@client_id.setter
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
"""Get client secret key.
:returns: Client secret key
:rtype: str
"""
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def username(self):
"""Get username.
:returns: Admin username
:rtype: str
"""
return self._username
@username.setter
def username(self, value):
self._username = value
@property
def password(self):
"""Get password.
:returns: Admin password
:rtype: str
"""
return self._password
@password.setter
def password(self, value):
self._password = value
@property
def totp(self):
"""Get totp.
:returns: TOTP
:rtype: str
"""
return self._totp
@totp.setter
def totp(self, value):
self._totp = value
@property
def token(self):
"""Get token.
:returns: Access and refresh token
:rtype: dict
"""
return self._token
@token.setter
def token(self, value):
self._token = value
self._expires_at = datetime.now() + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
)
@property
def expires_at(self):
"""Get token expiry time.
:returns: Datetime at which the current token will expire
:rtype: datetime
"""
return self._expires_at
@property
def user_realm_name(self):
"""Get user realm name.
:returns: User realm name
:rtype: str
"""
return self._user_realm_name
@user_realm_name.setter
def user_realm_name(self, value):
self._user_realm_name = value
@property
def custom_headers(self):
"""Get custom headers.
:returns: Custom headers
:rtype: dict
"""
return self._custom_headers
@custom_headers.setter
def custom_headers(self, value):
self._custom_headers = value
if self.custom_headers is not None:
# merge custom headers to main headers
self.headers.update(self.custom_headers)
def get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self.keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
)
grant_type = []
if self.client_secret_key:
if self.user_realm_name:
self.realm_name = self.user_realm_name
grant_type.append("client_credentials")
elif self.username and self.password:
grant_type.append("password")
if grant_type:
self.token = self.keycloak_openid.token(
self.username, self.password, grant_type=grant_type, totp=self.totp
)
else:
self.token = None
def refresh_token(self):
"""Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed.
"""
refresh_token = self.token.get("refresh_token", None) if self.token else None
if refresh_token is None:
self.get_token()
else:
try:
self.token = self.keycloak_openid.refresh_token(refresh_token)
except KeycloakPostError as e:
list_errors = [
b"Refresh token expired",
b"Token is not active",
b"Session not active",
]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
self.get_token()
else:
raise
self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
def _refresh_if_required(self):
if datetime.now() >= self.expires_at:
self.refresh_token()
def raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_get(*args, **kwargs)
return r
def raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_post(*args, **kwargs)
return r
def raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_put(*args, **kwargs)
return r
def raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_delete(*args, **kwargs)
return r

54
tests/conftest.py

@ -4,7 +4,9 @@ import ipaddress
import os import os
import uuid import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Tuple
import freezegun
import pytest import pytest
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -12,7 +14,7 @@ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID from cryptography.x509.oid import NameOID
from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakUMA
from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection, KeycloakUMA
class KeycloakTestEnv(object): class KeycloakTestEnv(object):
@ -150,6 +152,23 @@ def admin(env: KeycloakTestEnv):
) )
@pytest.fixture
@freezegun.freeze_time("2023-02-25 10:00:00")
def admin_frozen(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class, with time frozen.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
:returns: Keycloak admin
:rtype: KeycloakAdmin
"""
return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
)
@pytest.fixture @pytest.fixture
def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin): def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for initialized KeycloakOpenID class. """Fixture for initialized KeycloakOpenID class.
@ -478,17 +497,34 @@ def selfsigned_cert():
@pytest.fixture @pytest.fixture
def uma(env: KeycloakTestEnv, realm: str):
def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Fixture for initialized KeycloakUMA class. """Fixture for initialized KeycloakUMA class.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
:param realm: Keycloak realm
:type realm: str
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:yields: Keycloak OpenID connection manager
:rtype: KeycloakOpenIDConnection
"""
oid, _, _ = oid_with_credentials_authz
connection = KeycloakOpenIDConnection(
server_url=oid.connection.base_url,
realm_name=oid.realm_name,
client_id=oid.client_id,
client_secret_key=oid.client_secret_key,
timeout=60,
)
yield connection
@pytest.fixture
def uma(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Fixture for initialized KeycloakUMA class.
:param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client
:type oid_connection_with_authz: KeycloakOpenIDConnection
:yields: Keycloak OpenID client :yields: Keycloak OpenID client
:rtype: KeycloakOpenID :rtype: KeycloakOpenID
""" """
connection = oid_connection_with_authz
# Return UMA # Return UMA
yield KeycloakUMA(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name=realm
)
yield KeycloakUMA(connection=connection)

141
tests/test_keycloak_admin.py

@ -3,7 +3,9 @@
import copy import copy
from typing import Tuple from typing import Tuple
import freezegun
import pytest import pytest
from dateutil import parser as datetime_parser
import keycloak import keycloak
from keycloak import KeycloakAdmin, KeycloakOpenID from keycloak import KeycloakAdmin, KeycloakOpenID
@ -22,31 +24,6 @@ def test_keycloak_version():
assert keycloak.__version__, keycloak.__version__ assert keycloak.__version__, keycloak.__version__
def test_keycloak_admin_bad_init(env):
"""Test keycloak admin bad init.
:param env: Environment fixture
:type env: KeycloakTestEnv
"""
with pytest.raises(TypeError) as err:
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
auto_refresh_token=1,
)
assert err.match("Expected a list of strings")
with pytest.raises(TypeError) as err:
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
auto_refresh_token=["patch"],
)
assert err.match("Unexpected method in auto_refresh_token")
def test_keycloak_admin_init(env): def test_keycloak_admin_init(env):
"""Test keycloak admin init. """Test keycloak admin init.
@ -2187,94 +2164,66 @@ def test_events(admin: KeycloakAdmin, realm: str):
assert events == list() assert events == list()
def test_auto_refresh(admin: KeycloakAdmin, realm: str):
@freezegun.freeze_time("2023-02-25 10:00:00")
def test_auto_refresh(admin_frozen: KeycloakAdmin, realm: str):
"""Test auto refresh token. """Test auto refresh token.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param admin_frozen: Keycloak Admin client with time frozen in place
:type admin_frozen: KeycloakAdmin
:param realm: Keycloak realm :param realm: Keycloak realm
:type realm: str :type realm: str
""" """
admin = admin_frozen
# Test get refresh # Test get refresh
admin.auto_refresh_token = list()
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
}
with pytest.raises(KeycloakAuthenticationError) as err: with pytest.raises(KeycloakAuthenticationError) as err:
admin.get_realm(realm_name=realm) admin.get_realm(realm_name=realm)
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get"]
del admin.token["refresh_token"]
assert admin.get_realm(realm_name=realm)
# Test bad refresh token
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
admin.token["refresh_token"] = "bad"
with pytest.raises(KeycloakPostError) as err:
admin.get_realm(realm_name="test-refresh")
assert err.match(
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\''
)
admin.realm_name = "master"
admin.get_token()
admin.realm_name = realm
# Freeze time to simulate the access token expiring
with freezegun.freeze_time("2023-02-25 10:05:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:05:00")
assert admin.get_realm(realm_name=realm)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:05:00")
# Test bad refresh token, but first make sure access token has expired again
with freezegun.freeze_time("2023-02-25 10:10:00"):
admin.connection.custom_headers = {"Content-Type": "application/json"}
admin.connection.token["refresh_token"] = "bad"
with pytest.raises(KeycloakPostError) as err:
admin.get_realm(realm_name="test-refresh")
assert err.match(
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\''
)
admin.connection.get_token()
# Test post refresh # Test post refresh
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
with pytest.raises(KeycloakAuthenticationError) as err:
admin.create_realm(payload={"realm": "test-refresh"})
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post"]
admin.realm_name = "master"
admin.user_logout(user_id=admin.get_user_id(username=admin.username))
assert admin.create_realm(payload={"realm": "test-refresh"}) == b""
admin.realm_name = realm
with freezegun.freeze_time("2023-02-25 10:15:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:15:00")
admin.connection.token = None
assert admin.create_realm(payload={"realm": "test-refresh"}) == b""
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:15:00")
# Test update refresh # Test update refresh
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
with pytest.raises(KeycloakAuthenticationError) as err:
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post", "put"]
assert (
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict()
)
with freezegun.freeze_time("2023-02-25 10:25:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00")
admin.connection.token = None
assert (
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
== dict()
)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00")
# Test delete refresh # Test delete refresh
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
with pytest.raises(KeycloakAuthenticationError) as err:
admin.delete_realm(realm_name="test-refresh")
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post", "put", "delete"]
assert admin.delete_realm(realm_name="test-refresh") == dict()
with freezegun.freeze_time("2023-02-25 10:35:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:35:00")
admin.connection.token = None
assert admin.delete_realm(realm_name="test-refresh") == dict()
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00")
def test_get_required_actions(admin: KeycloakAdmin, realm: str): def test_get_required_actions(admin: KeycloakAdmin, realm: str):

3
tests/test_keycloak_openid.py

@ -4,6 +4,7 @@ from unittest import mock
import pytest import pytest
from keycloak import KeycloakAdmin, KeycloakOpenID
from keycloak.authorization import Authorization from keycloak.authorization import Authorization
from keycloak.authorization.permission import Permission from keycloak.authorization.permission import Permission
from keycloak.authorization.policy import Policy from keycloak.authorization.policy import Policy
@ -17,8 +18,6 @@ from keycloak.exceptions import (
KeycloakPostError, KeycloakPostError,
KeycloakRPTNotFound, KeycloakRPTNotFound,
) )
from keycloak.keycloak_admin import KeycloakAdmin
from keycloak.keycloak_openid import KeycloakOpenID
def test_keycloak_openid_init(env): def test_keycloak_openid_init(env):

52
tests/test_keycloak_uma.py

@ -1,32 +1,27 @@
"""Test module for KeycloakUMA.""" """Test module for KeycloakUMA."""
import re import re
from typing import Tuple
import pytest import pytest
from keycloak import KeycloakOpenID
from keycloak.connection import ConnectionManager
from keycloak import KeycloakOpenIDConnection, KeycloakUMA
from keycloak.exceptions import ( from keycloak.exceptions import (
KeycloakDeleteError, KeycloakDeleteError,
KeycloakGetError, KeycloakGetError,
KeycloakPostError, KeycloakPostError,
KeycloakPutError, KeycloakPutError,
) )
from keycloak.keycloak_uma import KeycloakUMA
def test_keycloak_uma_init(env):
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Test KeycloakUMA's init method. """Test KeycloakUMA's init method.
:param env: Environment fixture
:type env: KeycloakTestEnv
:param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
:type oid_connection_with_authz: KeycloakOpenIDConnection
""" """
uma = KeycloakUMA(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name="master"
)
connection = oid_connection_with_authz
uma = KeycloakUMA(connection=connection)
assert uma.realm_name == "master"
assert isinstance(uma.connection, ConnectionManager)
assert isinstance(uma.connection, KeycloakOpenIDConnection)
# should initially be empty # should initially be empty
assert uma._well_known is None assert uma._well_known is None
assert uma.uma_well_known assert uma.uma_well_known
@ -47,23 +42,14 @@ def test_uma_well_known(uma: KeycloakUMA):
assert key in res assert key in res
def test_uma_resource_sets(
uma: KeycloakUMA, oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
):
def test_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets. """Test resource sets.
:param uma: Keycloak UMA client :param uma: Keycloak UMA client
:type uma: KeycloakUMA :type uma: KeycloakUMA
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
""" """
oid, _, _ = oid_with_credentials_authz
token = oid.token(grant_type="client_credentials")
access_token = token["access_token"]
# Check that only the default resource is present # Check that only the default resource is present
resource_sets = uma.resource_set_list(access_token)
resource_sets = uma.resource_set_list()
resource_set_list = list(resource_sets) resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
@ -74,14 +60,14 @@ def test_uma_resource_sets(
"scopes": ["test:read", "test:write"], "scopes": ["test:read", "test:write"],
"type": "urn:test", "type": "urn:test",
} }
created_resource = uma.resource_set_create(access_token, resource_to_create)
created_resource = uma.resource_set_create(resource_to_create)
assert created_resource assert created_resource
assert created_resource["_id"], created_resource assert created_resource["_id"], created_resource
assert set(resource_to_create).issubset(set(created_resource)), created_resource assert set(resource_to_create).issubset(set(created_resource)), created_resource
# Test create the same resource set # Test create the same resource set
with pytest.raises(KeycloakPostError) as err: with pytest.raises(KeycloakPostError) as err:
uma.resource_set_create(access_token, resource_to_create)
uma.resource_set_create(resource_to_create)
assert err.match( assert err.match(
re.escape( re.escape(
'409: b\'{"error":"invalid_request","error_description":' '409: b\'{"error":"invalid_request","error_description":'
@ -90,31 +76,29 @@ def test_uma_resource_sets(
) )
# Test get resource set # Test get resource set
latest_resource = uma.resource_set_read(access_token, created_resource["_id"])
latest_resource = uma.resource_set_read(created_resource["_id"])
assert latest_resource["name"] == created_resource["name"] assert latest_resource["name"] == created_resource["name"]
# Test update resource set # Test update resource set
latest_resource["name"] = "New Resource Name" latest_resource["name"] = "New Resource Name"
res = uma.resource_set_update(access_token, created_resource["_id"], latest_resource)
res = uma.resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res assert res == dict(), res
updated_resource = uma.resource_set_read(access_token, created_resource["_id"])
updated_resource = uma.resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name" assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail # Test update resource set fail
with pytest.raises(KeycloakPutError) as err: with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(
token=access_token, resource_id=created_resource["_id"], payload={"wrong": "payload"}
)
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field') assert err.match('400: b\'{"error":"Unrecognized field')
# Test delete resource set # Test delete resource set
res = uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"])
res = uma.resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res assert res == dict(), res
with pytest.raises(KeycloakGetError) as err: with pytest.raises(KeycloakGetError) as err:
uma.resource_set_read(access_token, created_resource["_id"])
uma.resource_set_read(created_resource["_id"])
err.match("404: b''") err.match("404: b''")
# Test delete fail # Test delete fail
with pytest.raises(KeycloakDeleteError) as err: with pytest.raises(KeycloakDeleteError) as err:
uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"])
uma.resource_set_delete(resource_id=created_resource["_id"])
assert err.match("404: b''") assert err.match("404: b''")
Loading…
Cancel
Save