Browse Source

feat: implement client for revoking consents/offline access, with async and improved testing

pull/644/head
Chris Oldham 2 weeks ago
parent
commit
af89d0aacf
  1. 50
      src/keycloak/keycloak_admin.py
  2. 1
      src/keycloak/urls_patterns.py
  3. 104
      tests/test_keycloak_admin.py

50
src/keycloak/keycloak_admin.py

@ -1524,6 +1524,30 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
def revoke_consent(self, user_id: str, client_id: str) -> dict | bytes:
"""
Revoke consent and offline tokens for particular client from user.
:param user_id: User id
:type user_id: str
:param client_id: Client id
:type client_id: str
"""
params_path = {
"realm-name": self.connection.realm_name,
"id": user_id,
"client-id": client_id,
}
data_raw = self.connection.raw_delete(
urls_patterns.URL_ADMIN_USER_CONSENT.format(**params_path),
)
return raise_error_from_response(
data_raw,
KeycloakDeleteError,
expected_codes=[HTTP_NO_CONTENT],
)
def get_user_social_logins(self, user_id: str) -> list:
"""
Get user social logins.
@ -6742,7 +6766,7 @@ class KeycloakAdmin:
async def a_user_consents(self, user_id: str) -> list:
"""
Get consents granted asynchronously by the user.
Asynchronously get consents granted by the user.
UserConsentRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_userconsentrepresentation
@ -6758,6 +6782,30 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_revoke_consent(self, user_id: str, client_id: str) -> dict | bytes:
"""
Asynchronously revoke consent and offline tokens for particular client from user.
:param user_id: User id
:type user_id: str
:param client_id: Client id
:type client_id: str
"""
params_path = {
"realm-name": self.connection.realm_name,
"id": user_id,
"client-id": client_id,
}
data_raw = await self.connection.a_raw_delete(
urls_patterns.URL_ADMIN_USER_CONSENT.format(**params_path),
)
return raise_error_from_response(
data_raw,
KeycloakDeleteError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_get_user_social_logins(self, user_id: str) -> list:
"""
Get user social logins asynchronously.

1
src/keycloak/urls_patterns.py

@ -46,6 +46,7 @@ URL_ADMIN_USERS = "admin/realms/{realm-name}/users"
URL_ADMIN_USERS_COUNT = "admin/realms/{realm-name}/users/count"
URL_ADMIN_USER = "admin/realms/{realm-name}/users/{id}"
URL_ADMIN_USER_CONSENTS = "admin/realms/{realm-name}/users/{id}/consents"
URL_ADMIN_USER_CONSENT = URL_ADMIN_USER_CONSENTS + "/{client-id}"
URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-actions-email"
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password" # noqa: S105

104
tests/test_keycloak_admin.py

@ -31,6 +31,7 @@ from tests.conftest import KeycloakTestEnv
CLIENT_NOT_FOUND_REGEX = '404: b\'{"error":"Client not found".*}\''
CLIENT_SCOPE_NOT_FOUND_REGEX = '404: b\'{"error":"Client scope not found".*}\''
CONSENT_NOT_FOUND_REGEX = '404: b\'{"error":"Consent nor offline token not found".*}\''
COULD_NOT_FIND_ROLE_REGEX = '404: b\'{"error":"Could not find role".*}\''
COULD_NOT_FIND_ROLE_WITH_ID_REGEX = '404: b\'{"error":"Could not find role with id".*}\''
HTTP_404_REGEX = '404: b\'{"error":"HTTP 404 Not Found".*}\''
@ -3548,6 +3549,57 @@ def test_refresh_token(admin: KeycloakAdmin) -> None:
admin.connection.refresh_token()
def test_consents(
admin: KeycloakAdmin, oid_with_credentials: tuple[KeycloakOpenID, str, str]
) -> None:
"""
Test getting and revoking offline access via the consents API.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
# Use offline access as ersatz consent
offline_token = oid.token(username, password, scope="offline_access")
decoded_access_token = oid.decode_token(token=offline_token["access_token"])
user_id = decoded_access_token["sub"]
# Test get consents/offline access
res = admin.user_consents(user_id=user_id)
assert len(res) == 1, res
assert "additionalGrants" in res[0], res[0]
assert res[0]["additionalGrants"][0].get("key") == "Offline Token", res[0]
# Test get consents fail
with pytest.raises(KeycloakGetError) as err:
admin.user_consents(user_id="non-existent-id")
assert err.match(USER_NOT_FOUND_REGEX)
# Test revoke fails
with pytest.raises(KeycloakDeleteError) as err:
admin.revoke_consent(user_id="non-existent-id", client_id=oid.client_id)
assert err.match(USER_NOT_FOUND_REGEX)
with pytest.raises(KeycloakDeleteError) as err:
admin.revoke_consent(user_id=user_id, client_id="non-existent-client")
assert err.match(CLIENT_NOT_FOUND_REGEX)
# Test revoke offline access
res = admin.revoke_consent(user_id=user_id, client_id=oid.client_id)
assert res == {}, res
res = admin.user_consents(user_id=user_id)
assert len(res) == 0, res
# Test re-revoke fails
with pytest.raises(KeycloakDeleteError) as err:
admin.revoke_consent(user_id=user_id, client_id=oid.client_id)
assert err.match(CONSENT_NOT_FOUND_REGEX)
# async function start
@ -7199,6 +7251,58 @@ async def test_a_refresh_token(admin: KeycloakAdmin) -> None:
admin.connection.refresh_token()
@pytest.mark.asyncio
async def test_a_consents(
admin: KeycloakAdmin, oid_with_credentials: tuple[KeycloakOpenID, str, str]
) -> None:
"""
Test getting and revoking offline access via the consents API.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
# Use offline access as ersatz consent
offline_token = await oid.a_token(username, password, scope="offline_access")
decoded_access_token = await oid.a_decode_token(token=offline_token["access_token"])
user_id = decoded_access_token["sub"]
# Test get consents/offline access
res = await admin.a_user_consents(user_id=user_id)
assert len(res) == 1, res
assert "additionalGrants" in res[0], res[0]
assert res[0]["additionalGrants"][0].get("key") == "Offline Token", res[0]
# Test get consents fail
with pytest.raises(KeycloakGetError) as err:
await admin.a_user_consents(user_id="non-existent-id")
assert err.match(USER_NOT_FOUND_REGEX)
# Test revoke fails
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_revoke_consent(user_id="non-existent-id", client_id=oid.client_id)
assert err.match(USER_NOT_FOUND_REGEX)
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_revoke_consent(user_id=user_id, client_id="non-existent-client")
assert err.match(CLIENT_NOT_FOUND_REGEX)
# Test revoke offline access
res = await admin.a_revoke_consent(user_id=user_id, client_id=oid.client_id)
assert res == {}, res
res = await admin.a_user_consents(user_id=user_id)
assert len(res) == 0, res
# Test re-revoke fails
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_revoke_consent(user_id=user_id, client_id=oid.client_id)
assert err.match(CONSENT_NOT_FOUND_REGEX)
def test_counter_part() -> None:
"""Test that each function has its async counter part."""
admin_methods = [func for func in dir(KeycloakAdmin) if callable(getattr(KeycloakAdmin, func))]

Loading…
Cancel
Save