diff --git a/poetry.lock b/poetry.lock index da7e72b..738b108 100644 --- a/poetry.lock +++ b/poetry.lock @@ -598,6 +598,21 @@ files = [ flake8 = ">=3" pydocstyle = ">=2.1" +[[package]] +name = "freezegun" +version = "1.2.2" +description = "Let your Python tests travel through time" +category = "dev" +optional = false +python-versions = ">=3.6" +files = [ + {file = "freezegun-1.2.2-py3-none-any.whl", hash = "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"}, + {file = "freezegun-1.2.2.tar.gz", hash = "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446"}, +] + +[package.dependencies] +python-dateutil = ">=2.7" + [[package]] name = "identify" version = "2.5.18" @@ -1275,6 +1290,21 @@ pytest = ">=4.6" [package.extras] testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtualenv"] +[[package]] +name = "python-dateutil" +version = "2.8.2" +description = "Extensions to the standard Python datetime module" +category = "dev" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +files = [ + {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, + {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, +] + +[package.dependencies] +six = ">=1.5" + [[package]] name = "python-jose" version = "3.3.0" @@ -2109,4 +2139,4 @@ docs = ["Sphinx", "alabaster", "commonmark", "m2r2", "mock", "readthedocs-sphinx [metadata] lock-version = "2.0" python-versions = "^3.7" -content-hash = "45f461f05bdc8da0d12a858c57782cc3a24fb4fb91175e6e0a290cd0304c10e9" +content-hash = "70bb30bae9ff3d8b6c54553f755b2f31725701f379ae9aeb4a2a5658d2f6d51a" diff --git a/pyproject.toml b/pyproject.toml index d4bf17b..b3e10af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ cryptography = "^37.0.4" codespell = "^2.1.0" darglint = "^1.8.1" twine = "^4.0.2" +freezegun = "^1.2.2" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index a955dc6..065f94c 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -81,6 +81,7 @@ class KeycloakAdmin: PAGE_SIZE = 100 + _auto_refresh_token = None _connection = None def __init__( @@ -146,8 +147,8 @@ class KeycloakAdmin: user_realm_name=user_realm_name, custom_headers=custom_headers, timeout=timeout, - auto_refresh_token=auto_refresh_token or [], ) + self.auto_refresh_token = auto_refresh_token @property @deprecation.deprecated( @@ -450,7 +451,7 @@ class KeycloakAdmin: :returns: List of methods for automatic token refresh :rtype: list """ - return self.connection.auto_refresh_token + return self._auto_refresh_token @auto_refresh_token.setter @deprecation.deprecated( @@ -460,7 +461,7 @@ class KeycloakAdmin: details="Use the self.connection.custom_headers property instead", ) def auto_refresh_token(self, value): - self.connection.auto_refresh_token = value + self._auto_refresh_token = value or [] def __fetch_all(self, url, query=None): """Paginate over get requests. @@ -572,9 +573,6 @@ class KeycloakAdmin: :return: RealmRepresentation :rtype: dict """ - import debugpy - - debugpy.breakpoint() params_path = {"realm-name": realm_name} data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 19e7aac..bc16283 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -29,7 +29,6 @@ class to handle authentication and token manipulation. import json from datetime import datetime, timedelta -from typing import Iterable from jose import jwt @@ -698,7 +697,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): _client_id = None _verify = None _client_secret_key = None - _auto_refresh_token = None _connection = None _custom_headers = None _user_realm_name = None @@ -717,7 +715,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): client_secret_key=None, custom_headers=None, user_realm_name=None, - auto_refresh_token=None, timeout=60, ): """Init method. @@ -745,12 +742,12 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): :type custom_headers: dict :param user_realm_name: The realm name of the user, if different from realm_name :type user_realm_name: str - :param auto_refresh_token: list of methods that allows automatic token refresh. - Ex: ['get', 'put', 'post', 'delete'] - :type auto_refresh_token: list :param timeout: connection timeout in seconds :type timeout: int """ + # token is renewed when it hits 90% of its lifetime. This is to account for any possible + # clock skew. + self.token_renewal_fraction = 0.9 self.server_url = server_url self.username = username self.password = password @@ -760,12 +757,8 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): self.client_id = client_id self.verify = verify self.client_secret_key = client_secret_key - self.auto_refresh_token = auto_refresh_token or [] self.user_realm_name = user_realm_name self.timeout = timeout - # token is renewed when it hits 90% of its lifetime. This is to account for any possible - # clock skew. - self.token_renewal_fraction = 0.9 if self.token is None: self.get_token() @@ -929,31 +922,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): # merge custom headers to main headers self.headers.update(self.custom_headers) - @property - def auto_refresh_token(self): - """Get auto refresh token. - - :returns: List of methods for automatic token refresh - :rtype: list - """ - return self._auto_refresh_token - - @auto_refresh_token.setter - def auto_refresh_token(self, value): - allowed_methods = {"get", "post", "put", "delete"} - if not isinstance(value, Iterable): - raise TypeError( - "Expected a list of strings among {allowed}".format(allowed=allowed_methods) - ) - if not all(method in allowed_methods for method in value): - raise TypeError( - "Unexpected method in auto_refresh_token, accepted methods are {allowed}".format( - allowed=allowed_methods - ) - ) - - self._auto_refresh_token = value - def get_token(self): """Get admin token. @@ -995,7 +963,7 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): :raises KeycloakPostError: In case the refresh token request failed. """ - refresh_token = self.token.get("refresh_token", None) + refresh_token = self.token.get("refresh_token", None) if self.token else None if refresh_token is None: self.get_token() else: @@ -1033,9 +1001,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): """ self._refresh_if_required() r = super().raw_get(*args, **kwargs) - if "get" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return super().raw_get(*args, **kwargs) return r def raw_post(self, *args, **kwargs): @@ -1053,9 +1018,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): """ self._refresh_if_required() r = super().raw_post(*args, **kwargs) - if "post" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return super().raw_post(*args, **kwargs) return r def raw_put(self, *args, **kwargs): @@ -1073,9 +1035,6 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): """ self._refresh_if_required() r = super().raw_put(*args, **kwargs) - if "put" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return super().raw_put(*args, **kwargs) return r def raw_delete(self, *args, **kwargs): @@ -1093,7 +1052,4 @@ class KeycloakOpenIDConnectionManager(ConnectionManager): """ self._refresh_if_required() r = super().raw_delete(*args, **kwargs) - if "delete" in self.auto_refresh_token and r.status_code == 401: - self.refresh_token() - return super().raw_delete(*args, **kwargs) return r diff --git a/tests/conftest.py b/tests/conftest.py index d518c31..fed85e3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,7 @@ import uuid from datetime import datetime, timedelta from typing import Tuple +import freezegun import pytest from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -152,6 +153,23 @@ def admin(env: KeycloakTestEnv): ) +@pytest.fixture +@freezegun.freeze_time("2023-02-25 10:00:00") +def admin_frozen(env: KeycloakTestEnv): + """Fixture for initialized KeycloakAdmin class, with time frozen. + + :param env: Keycloak test environment + :type env: KeycloakTestEnv + :returns: Keycloak admin + :rtype: KeycloakAdmin + """ + return KeycloakAdmin( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + username=env.KEYCLOAK_ADMIN, + password=env.KEYCLOAK_ADMIN_PASSWORD, + ) + + @pytest.fixture def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin): """Fixture for initialized KeycloakOpenID class. diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 5b97140..4b8e824 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -3,7 +3,9 @@ import copy from typing import Tuple +import freezegun import pytest +from dateutil import parser as datetime_parser import keycloak from keycloak import KeycloakAdmin, KeycloakOpenID @@ -22,31 +24,6 @@ def test_keycloak_version(): assert keycloak.__version__, keycloak.__version__ -def test_keycloak_admin_bad_init(env): - """Test keycloak admin bad init. - - :param env: Environment fixture - :type env: KeycloakTestEnv - """ - with pytest.raises(TypeError) as err: - KeycloakAdmin( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", - username=env.KEYCLOAK_ADMIN, - password=env.KEYCLOAK_ADMIN_PASSWORD, - auto_refresh_token=1, - ) - assert err.match("Expected a list of strings") - - with pytest.raises(TypeError) as err: - KeycloakAdmin( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", - username=env.KEYCLOAK_ADMIN, - password=env.KEYCLOAK_ADMIN_PASSWORD, - auto_refresh_token=["patch"], - ) - assert err.match("Unexpected method in auto_refresh_token") - - def test_keycloak_admin_init(env): """Test keycloak admin init. @@ -2187,16 +2164,17 @@ def test_events(admin: KeycloakAdmin, realm: str): assert events == list() -def test_auto_refresh(admin: KeycloakAdmin, realm: str): +@freezegun.freeze_time("2023-02-25 10:00:00") +def test_auto_refresh(admin_frozen: KeycloakAdmin, realm: str): """Test auto refresh token. - :param admin: Keycloak Admin client - :type admin: KeycloakAdmin + :param admin_frozen: Keycloak Admin client with time frozen in place + :type admin_frozen: KeycloakAdmin :param realm: Keycloak realm :type realm: str """ + admin = admin_frozen # Test get refresh - admin.auto_refresh_token = list() admin.connection.custom_headers = { "Authorization": "Bearer bad", "Content-Type": "application/json", @@ -2206,65 +2184,46 @@ def test_auto_refresh(admin: KeycloakAdmin, realm: str): admin.get_realm(realm_name=realm) assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - admin.auto_refresh_token = ["get"] - del admin.token["refresh_token"] - assert admin.get_realm(realm_name=realm) - - # Test bad refresh token - admin.connection.custom_headers = { - "Authorization": "Bearer bad", - "Content-Type": "application/json", - } - admin.token["refresh_token"] = "bad" - with pytest.raises(KeycloakPostError) as err: - admin.get_realm(realm_name="test-refresh") - assert err.match( - '400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\'' - ) - admin.realm_name = "master" - admin.get_token() - admin.realm_name = realm + # Freeze time to simulate the access token expiring + with freezegun.freeze_time("2023-02-25 10:05:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:05:00") + assert admin.get_realm(realm_name=realm) + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:05:00") + + # Test bad refresh token, but first make sure access token has expired again + with freezegun.freeze_time("2023-02-25 10:10:00"): + admin.connection.custom_headers = {"Content-Type": "application/json"} + admin.connection.token["refresh_token"] = "bad" + with pytest.raises(KeycloakPostError) as err: + admin.get_realm(realm_name="test-refresh") + assert err.match( + '400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\'' + ) + admin.connection.get_token() # Test post refresh - admin.connection.custom_headers = { - "Authorization": "Bearer bad", - "Content-Type": "application/json", - } - with pytest.raises(KeycloakAuthenticationError) as err: - admin.create_realm(payload={"realm": "test-refresh"}) - assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - - admin.auto_refresh_token = ["get", "post"] - admin.realm_name = "master" - admin.user_logout(user_id=admin.get_user_id(username=admin.username)) - assert admin.create_realm(payload={"realm": "test-refresh"}) == b"" - admin.realm_name = realm + with freezegun.freeze_time("2023-02-25 10:15:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:15:00") + admin.connection.token = None + assert admin.create_realm(payload={"realm": "test-refresh"}) == b"" + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:15:00") # Test update refresh - admin.connection.custom_headers = { - "Authorization": "Bearer bad", - "Content-Type": "application/json", - } - with pytest.raises(KeycloakAuthenticationError) as err: - admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) - assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - - admin.auto_refresh_token = ["get", "post", "put"] - assert ( - admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict() - ) + with freezegun.freeze_time("2023-02-25 10:25:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00") + admin.connection.token = None + assert ( + admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) + == dict() + ) + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00") # Test delete refresh - admin.connection.custom_headers = { - "Authorization": "Bearer bad", - "Content-Type": "application/json", - } - with pytest.raises(KeycloakAuthenticationError) as err: - admin.delete_realm(realm_name="test-refresh") - assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') - - admin.auto_refresh_token = ["get", "post", "put", "delete"] - assert admin.delete_realm(realm_name="test-refresh") == dict() + with freezegun.freeze_time("2023-02-25 10:35:00"): + assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:35:00") + admin.connection.token = None + assert admin.delete_realm(realm_name="test-refresh") == dict() + assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00") def test_get_required_actions(admin: KeycloakAdmin, realm: str):