From af89d0aacf1651f9bb15b8ffcc0677b0ee6578c0 Mon Sep 17 00:00:00 2001 From: Chris Oldham Date: Wed, 9 Apr 2025 06:45:36 +0100 Subject: [PATCH] feat: implement client for revoking consents/offline access, with async and improved testing --- src/keycloak/keycloak_admin.py | 50 +++++++++++++++- src/keycloak/urls_patterns.py | 1 + tests/test_keycloak_admin.py | 104 +++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 1 deletion(-) diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index a240b6c..3af3319 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -1524,6 +1524,30 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakGetError) + def revoke_consent(self, user_id: str, client_id: str) -> dict | bytes: + """ + Revoke consent and offline tokens for particular client from user. + + :param user_id: User id + :type user_id: str + :param client_id: Client id + :type client_id: str + + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "client-id": client_id, + } + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_USER_CONSENT.format(**params_path), + ) + return raise_error_from_response( + data_raw, + KeycloakDeleteError, + expected_codes=[HTTP_NO_CONTENT], + ) + def get_user_social_logins(self, user_id: str) -> list: """ Get user social logins. @@ -6742,7 +6766,7 @@ class KeycloakAdmin: async def a_user_consents(self, user_id: str) -> list: """ - Get consents granted asynchronously by the user. + Asynchronously get consents granted by the user. UserConsentRepresentation https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userconsentrepresentation @@ -6758,6 +6782,30 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakGetError) + async def a_revoke_consent(self, user_id: str, client_id: str) -> dict | bytes: + """ + Asynchronously revoke consent and offline tokens for particular client from user. + + :param user_id: User id + :type user_id: str + :param client_id: Client id + :type client_id: str + + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": user_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_USER_CONSENT.format(**params_path), + ) + return raise_error_from_response( + data_raw, + KeycloakDeleteError, + expected_codes=[HTTP_NO_CONTENT], + ) + async def a_get_user_social_logins(self, user_id: str) -> list: """ Get user social logins asynchronously. diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py index d878647..aa36810 100644 --- a/src/keycloak/urls_patterns.py +++ b/src/keycloak/urls_patterns.py @@ -46,6 +46,7 @@ URL_ADMIN_USERS = "admin/realms/{realm-name}/users" URL_ADMIN_USERS_COUNT = "admin/realms/{realm-name}/users/count" URL_ADMIN_USER = "admin/realms/{realm-name}/users/{id}" URL_ADMIN_USER_CONSENTS = "admin/realms/{realm-name}/users/{id}/consents" +URL_ADMIN_USER_CONSENT = URL_ADMIN_USER_CONSENTS + "/{client-id}" URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-actions-email" URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email" URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password" # noqa: S105 diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index c5b8f88..03d7591 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -31,6 +31,7 @@ from tests.conftest import KeycloakTestEnv CLIENT_NOT_FOUND_REGEX = '404: b\'{"error":"Client not found".*}\'' CLIENT_SCOPE_NOT_FOUND_REGEX = '404: b\'{"error":"Client scope not found".*}\'' +CONSENT_NOT_FOUND_REGEX = '404: b\'{"error":"Consent nor offline token not found".*}\'' COULD_NOT_FIND_ROLE_REGEX = '404: b\'{"error":"Could not find role".*}\'' COULD_NOT_FIND_ROLE_WITH_ID_REGEX = '404: b\'{"error":"Could not find role with id".*}\'' HTTP_404_REGEX = '404: b\'{"error":"HTTP 404 Not Found".*}\'' @@ -3548,6 +3549,57 @@ def test_refresh_token(admin: KeycloakAdmin) -> None: admin.connection.refresh_token() +def test_consents( + admin: KeycloakAdmin, oid_with_credentials: tuple[KeycloakOpenID, str, str] +) -> None: + """ + Test getting and revoking offline access via the consents API. + + :param admin: Keycloak admin + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + + # Use offline access as ersatz consent + offline_token = oid.token(username, password, scope="offline_access") + decoded_access_token = oid.decode_token(token=offline_token["access_token"]) + user_id = decoded_access_token["sub"] + + # Test get consents/offline access + res = admin.user_consents(user_id=user_id) + assert len(res) == 1, res + assert "additionalGrants" in res[0], res[0] + assert res[0]["additionalGrants"][0].get("key") == "Offline Token", res[0] + + # Test get consents fail + with pytest.raises(KeycloakGetError) as err: + admin.user_consents(user_id="non-existent-id") + assert err.match(USER_NOT_FOUND_REGEX) + + # Test revoke fails + with pytest.raises(KeycloakDeleteError) as err: + admin.revoke_consent(user_id="non-existent-id", client_id=oid.client_id) + assert err.match(USER_NOT_FOUND_REGEX) + + with pytest.raises(KeycloakDeleteError) as err: + admin.revoke_consent(user_id=user_id, client_id="non-existent-client") + assert err.match(CLIENT_NOT_FOUND_REGEX) + + # Test revoke offline access + res = admin.revoke_consent(user_id=user_id, client_id=oid.client_id) + assert res == {}, res + + res = admin.user_consents(user_id=user_id) + assert len(res) == 0, res + + # Test re-revoke fails + with pytest.raises(KeycloakDeleteError) as err: + admin.revoke_consent(user_id=user_id, client_id=oid.client_id) + assert err.match(CONSENT_NOT_FOUND_REGEX) + + # async function start @@ -7199,6 +7251,58 @@ async def test_a_refresh_token(admin: KeycloakAdmin) -> None: admin.connection.refresh_token() +@pytest.mark.asyncio +async def test_a_consents( + admin: KeycloakAdmin, oid_with_credentials: tuple[KeycloakOpenID, str, str] +) -> None: + """ + Test getting and revoking offline access via the consents API. + + :param admin: Keycloak admin + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + + # Use offline access as ersatz consent + offline_token = await oid.a_token(username, password, scope="offline_access") + decoded_access_token = await oid.a_decode_token(token=offline_token["access_token"]) + user_id = decoded_access_token["sub"] + + # Test get consents/offline access + res = await admin.a_user_consents(user_id=user_id) + assert len(res) == 1, res + assert "additionalGrants" in res[0], res[0] + assert res[0]["additionalGrants"][0].get("key") == "Offline Token", res[0] + + # Test get consents fail + with pytest.raises(KeycloakGetError) as err: + await admin.a_user_consents(user_id="non-existent-id") + assert err.match(USER_NOT_FOUND_REGEX) + + # Test revoke fails + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_revoke_consent(user_id="non-existent-id", client_id=oid.client_id) + assert err.match(USER_NOT_FOUND_REGEX) + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_revoke_consent(user_id=user_id, client_id="non-existent-client") + assert err.match(CLIENT_NOT_FOUND_REGEX) + + # Test revoke offline access + res = await admin.a_revoke_consent(user_id=user_id, client_id=oid.client_id) + assert res == {}, res + + res = await admin.a_user_consents(user_id=user_id) + assert len(res) == 0, res + + # Test re-revoke fails + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_revoke_consent(user_id=user_id, client_id=oid.client_id) + assert err.match(CONSENT_NOT_FOUND_REGEX) + + def test_counter_part() -> None: """Test that each function has its async counter part.""" admin_methods = [func for func in dir(KeycloakAdmin) if callable(getattr(KeycloakAdmin, func))]