Browse Source

refactor: Remove auto_refresh_token in favour of automatic refresh on expiry

pull/415/head
nuwang 2 years ago
parent
commit
56712fe308
  1. 32
      poetry.lock
  2. 1
      pyproject.toml
  3. 10
      src/keycloak/keycloak_admin.py
  4. 52
      src/keycloak/keycloak_openid.py
  5. 18
      tests/conftest.py
  6. 123
      tests/test_keycloak_admin.py

32
poetry.lock

@ -598,6 +598,21 @@ files = [
flake8 = ">=3"
pydocstyle = ">=2.1"
[[package]]
name = "freezegun"
version = "1.2.2"
description = "Let your Python tests travel through time"
category = "dev"
optional = false
python-versions = ">=3.6"
files = [
{file = "freezegun-1.2.2-py3-none-any.whl", hash = "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"},
{file = "freezegun-1.2.2.tar.gz", hash = "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446"},
]
[package.dependencies]
python-dateutil = ">=2.7"
[[package]]
name = "identify"
version = "2.5.18"
@ -1275,6 +1290,21 @@ pytest = ">=4.6"
[package.extras]
testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtualenv"]
[[package]]
name = "python-dateutil"
version = "2.8.2"
description = "Extensions to the standard Python datetime module"
category = "dev"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
files = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
]
[package.dependencies]
six = ">=1.5"
[[package]]
name = "python-jose"
version = "3.3.0"
@ -2109,4 +2139,4 @@ docs = ["Sphinx", "alabaster", "commonmark", "m2r2", "mock", "readthedocs-sphinx
[metadata]
lock-version = "2.0"
python-versions = "^3.7"
content-hash = "45f461f05bdc8da0d12a858c57782cc3a24fb4fb91175e6e0a290cd0304c10e9"
content-hash = "70bb30bae9ff3d8b6c54553f755b2f31725701f379ae9aeb4a2a5658d2f6d51a"

1
pyproject.toml

@ -73,6 +73,7 @@ cryptography = "^37.0.4"
codespell = "^2.1.0"
darglint = "^1.8.1"
twine = "^4.0.2"
freezegun = "^1.2.2"
[build-system]
requires = ["poetry-core>=1.0.0"]

10
src/keycloak/keycloak_admin.py

@ -81,6 +81,7 @@ class KeycloakAdmin:
PAGE_SIZE = 100
_auto_refresh_token = None
_connection = None
def __init__(
@ -146,8 +147,8 @@ class KeycloakAdmin:
user_realm_name=user_realm_name,
custom_headers=custom_headers,
timeout=timeout,
auto_refresh_token=auto_refresh_token or [],
)
self.auto_refresh_token = auto_refresh_token
@property
@deprecation.deprecated(
@ -450,7 +451,7 @@ class KeycloakAdmin:
:returns: List of methods for automatic token refresh
:rtype: list
"""
return self.connection.auto_refresh_token
return self._auto_refresh_token
@auto_refresh_token.setter
@deprecation.deprecated(
@ -460,7 +461,7 @@ class KeycloakAdmin:
details="Use the self.connection.custom_headers property instead",
)
def auto_refresh_token(self, value):
self.connection.auto_refresh_token = value
self._auto_refresh_token = value or []
def __fetch_all(self, url, query=None):
"""Paginate over get requests.
@ -572,9 +573,6 @@ class KeycloakAdmin:
:return: RealmRepresentation
:rtype: dict
"""
import debugpy
debugpy.breakpoint()
params_path = {"realm-name": realm_name}
data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])

52
src/keycloak/keycloak_openid.py

@ -29,7 +29,6 @@ class to handle authentication and token manipulation.
import json
from datetime import datetime, timedelta
from typing import Iterable
from jose import jwt
@ -698,7 +697,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
_client_id = None
_verify = None
_client_secret_key = None
_auto_refresh_token = None
_connection = None
_custom_headers = None
_user_realm_name = None
@ -717,7 +715,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
auto_refresh_token=None,
timeout=60,
):
"""Init method.
@ -745,12 +742,12 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
:type custom_headers: dict
:param user_realm_name: The realm name of the user, if different from realm_name
:type user_realm_name: str
:param auto_refresh_token: list of methods that allows automatic token refresh.
Ex: ['get', 'put', 'post', 'delete']
:type auto_refresh_token: list
:param timeout: connection timeout in seconds
:type timeout: int
"""
# token is renewed when it hits 90% of its lifetime. This is to account for any possible
# clock skew.
self.token_renewal_fraction = 0.9
self.server_url = server_url
self.username = username
self.password = password
@ -760,12 +757,8 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
self.client_id = client_id
self.verify = verify
self.client_secret_key = client_secret_key
self.auto_refresh_token = auto_refresh_token or []
self.user_realm_name = user_realm_name
self.timeout = timeout
# token is renewed when it hits 90% of its lifetime. This is to account for any possible
# clock skew.
self.token_renewal_fraction = 0.9
if self.token is None:
self.get_token()
@ -929,31 +922,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
# merge custom headers to main headers
self.headers.update(self.custom_headers)
@property
def auto_refresh_token(self):
"""Get auto refresh token.
:returns: List of methods for automatic token refresh
:rtype: list
"""
return self._auto_refresh_token
@auto_refresh_token.setter
def auto_refresh_token(self, value):
allowed_methods = {"get", "post", "put", "delete"}
if not isinstance(value, Iterable):
raise TypeError(
"Expected a list of strings among {allowed}".format(allowed=allowed_methods)
)
if not all(method in allowed_methods for method in value):
raise TypeError(
"Unexpected method in auto_refresh_token, accepted methods are {allowed}".format(
allowed=allowed_methods
)
)
self._auto_refresh_token = value
def get_token(self):
"""Get admin token.
@ -995,7 +963,7 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
:raises KeycloakPostError: In case the refresh token request failed.
"""
refresh_token = self.token.get("refresh_token", None)
refresh_token = self.token.get("refresh_token", None) if self.token else None
if refresh_token is None:
self.get_token()
else:
@ -1033,9 +1001,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_get(*args, **kwargs)
if "get" in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return super().raw_get(*args, **kwargs)
return r
def raw_post(self, *args, **kwargs):
@ -1053,9 +1018,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_post(*args, **kwargs)
if "post" in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return super().raw_post(*args, **kwargs)
return r
def raw_put(self, *args, **kwargs):
@ -1073,9 +1035,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_put(*args, **kwargs)
if "put" in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return super().raw_put(*args, **kwargs)
return r
def raw_delete(self, *args, **kwargs):
@ -1093,7 +1052,4 @@ class KeycloakOpenIDConnectionManager(ConnectionManager):
"""
self._refresh_if_required()
r = super().raw_delete(*args, **kwargs)
if "delete" in self.auto_refresh_token and r.status_code == 401:
self.refresh_token()
return super().raw_delete(*args, **kwargs)
return r

18
tests/conftest.py

@ -6,6 +6,7 @@ import uuid
from datetime import datetime, timedelta
from typing import Tuple
import freezegun
import pytest
from cryptography import x509
from cryptography.hazmat.backends import default_backend
@ -152,6 +153,23 @@ def admin(env: KeycloakTestEnv):
)
@pytest.fixture
@freezegun.freeze_time("2023-02-25 10:00:00")
def admin_frozen(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class, with time frozen.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
:returns: Keycloak admin
:rtype: KeycloakAdmin
"""
return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
)
@pytest.fixture
def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for initialized KeycloakOpenID class.

123
tests/test_keycloak_admin.py

@ -3,7 +3,9 @@
import copy
from typing import Tuple
import freezegun
import pytest
from dateutil import parser as datetime_parser
import keycloak
from keycloak import KeycloakAdmin, KeycloakOpenID
@ -22,31 +24,6 @@ def test_keycloak_version():
assert keycloak.__version__, keycloak.__version__
def test_keycloak_admin_bad_init(env):
"""Test keycloak admin bad init.
:param env: Environment fixture
:type env: KeycloakTestEnv
"""
with pytest.raises(TypeError) as err:
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
auto_refresh_token=1,
)
assert err.match("Expected a list of strings")
with pytest.raises(TypeError) as err:
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
auto_refresh_token=["patch"],
)
assert err.match("Unexpected method in auto_refresh_token")
def test_keycloak_admin_init(env):
"""Test keycloak admin init.
@ -2187,16 +2164,17 @@ def test_events(admin: KeycloakAdmin, realm: str):
assert events == list()
def test_auto_refresh(admin: KeycloakAdmin, realm: str):
@freezegun.freeze_time("2023-02-25 10:00:00")
def test_auto_refresh(admin_frozen: KeycloakAdmin, realm: str):
"""Test auto refresh token.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param admin_frozen: Keycloak Admin client with time frozen in place
:type admin_frozen: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin = admin_frozen
# Test get refresh
admin.auto_refresh_token = list()
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
@ -2206,65 +2184,46 @@ def test_auto_refresh(admin: KeycloakAdmin, realm: str):
admin.get_realm(realm_name=realm)
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get"]
del admin.token["refresh_token"]
assert admin.get_realm(realm_name=realm)
# Test bad refresh token
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
}
admin.token["refresh_token"] = "bad"
with pytest.raises(KeycloakPostError) as err:
admin.get_realm(realm_name="test-refresh")
assert err.match(
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\''
)
admin.realm_name = "master"
admin.get_token()
admin.realm_name = realm
# Freeze time to simulate the access token expiring
with freezegun.freeze_time("2023-02-25 10:05:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:05:00")
assert admin.get_realm(realm_name=realm)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:05:00")
# Test bad refresh token, but first make sure access token has expired again
with freezegun.freeze_time("2023-02-25 10:10:00"):
admin.connection.custom_headers = {"Content-Type": "application/json"}
admin.connection.token["refresh_token"] = "bad"
with pytest.raises(KeycloakPostError) as err:
admin.get_realm(realm_name="test-refresh")
assert err.match(
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\''
)
admin.connection.get_token()
# Test post refresh
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
}
with pytest.raises(KeycloakAuthenticationError) as err:
admin.create_realm(payload={"realm": "test-refresh"})
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post"]
admin.realm_name = "master"
admin.user_logout(user_id=admin.get_user_id(username=admin.username))
assert admin.create_realm(payload={"realm": "test-refresh"}) == b""
admin.realm_name = realm
with freezegun.freeze_time("2023-02-25 10:15:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:15:00")
admin.connection.token = None
assert admin.create_realm(payload={"realm": "test-refresh"}) == b""
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:15:00")
# Test update refresh
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
}
with pytest.raises(KeycloakAuthenticationError) as err:
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post", "put"]
assert (
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict()
)
with freezegun.freeze_time("2023-02-25 10:25:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00")
admin.connection.token = None
assert (
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
== dict()
)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00")
# Test delete refresh
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
}
with pytest.raises(KeycloakAuthenticationError) as err:
admin.delete_realm(realm_name="test-refresh")
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post", "put", "delete"]
assert admin.delete_realm(realm_name="test-refresh") == dict()
with freezegun.freeze_time("2023-02-25 10:35:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:35:00")
admin.connection.token = None
assert admin.delete_realm(realm_name="test-refresh") == dict()
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00")
def test_get_required_actions(admin: KeycloakAdmin, realm: str):

Loading…
Cancel
Save