diff --git a/src/keycloak/__init__.py b/src/keycloak/__init__.py index 694e53d..7e49c8f 100644 --- a/src/keycloak/__init__.py +++ b/src/keycloak/__init__.py @@ -42,6 +42,7 @@ from .exceptions import ( ) from .keycloak_admin import KeycloakAdmin from .keycloak_openid import KeycloakOpenID +from .keycloak_uma import KeycloakUMA __all__ = [ "__version__", @@ -61,4 +62,5 @@ __all__ = [ "KeycloakSecretNotFound", "KeycloakAdmin", "KeycloakOpenID", + "KeycloakUMA", ] diff --git a/src/keycloak/keycloak_uma.py b/src/keycloak/keycloak_uma.py new file mode 100644 index 0000000..a066567 --- /dev/null +++ b/src/keycloak/keycloak_uma.py @@ -0,0 +1,241 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +"""Keycloak UMA module. + +The module contains a UMA compatible client for keycloak: +https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html +""" +import json +from urllib.parse import quote_plus + +from .connection import ConnectionManager +from .exceptions import ( + KeycloakDeleteError, + KeycloakGetError, + KeycloakPostError, + KeycloakPutError, + raise_error_from_response, +) +from .urls_patterns import URL_UMA_WELL_KNOWN + + +class KeycloakUMA: + """Keycloak UMA client. + + :param server_url: Keycloak server url + :param client_id: client id + :param realm_name: realm name + :param client_secret_key: client secret key + :param verify: True if want check connection SSL + :param custom_headers: dict of custom header to pass to each HTML request + :param proxies: dict of proxies to sent the request by. + :param timeout: connection timeout in seconds + """ + + def __init__( + self, server_url, realm_name, verify=True, custom_headers=None, proxies=None, timeout=60 + ): + """Init method. + + :param server_url: Keycloak server url + :type server_url: str + :param realm_name: realm name + :type realm_name: str + :param verify: True if want check connection SSL + :type verify: bool + :param custom_headers: dict of custom header to pass to each HTML request + :type custom_headers: dict + :param proxies: dict of proxies to sent the request by. + :type proxies: dict + :param timeout: connection timeout in seconds + :type timeout: int + """ + self.realm_name = realm_name + headers = custom_headers if custom_headers is not None else dict() + headers.update({"Content-Type": "application/json"}) + self.connection = ConnectionManager( + base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies + ) + self._well_known = None + + def _fetch_well_known(self): + params_path = {"realm-name": self.realm_name} + data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + @staticmethod + def format_url(url, **kwargs): + """Substitute url path parameters. + + Given a parameterized url string, returns the string after url encoding and substituting + the given params. For example, + `format_url("https://myserver/{my_resource}/{id}", my_resource="hello world", id="myid")` + would produce `https://myserver/hello+world/myid`. + + :param url: url string to format + :type url: str + :param kwargs: dict containing kwargs to substitute + :type kwargs: dict + :return: formatted string + :rtype: str + """ + return url.format(**{k: quote_plus(v) for k, v in kwargs.items()}) + + def _add_bearer_token_header(self, token): + self.connection.add_param_headers("Authorization", "Bearer " + token) + + @property + def uma_well_known(self): + """Get the well_known UMA2 config. + + :returns: It lists endpoints and other configuration options relevant + :rtype: dict + """ + # per instance cache + if not self._well_known: + self._well_known = self._fetch_well_known() + return self._well_known + + def resource_set_create(self, token, payload): + """Create a resource set. + + Spec + https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1 + + ResourceRepresentation + https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation + + :param token: client access token + :type token: str + :param payload: ResourceRepresentation + :type payload: dict + :return: ResourceRepresentation with the _id property assigned + :rtype: dict + """ + self._add_bearer_token_header(token) + data_raw = self.connection.raw_post( + self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + def resource_set_update(self, token, resource_id, payload): + """Update a resource set. + + Spec + https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set + + ResourceRepresentation + https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation + + :param token: client access token + :type token: str + :param resource_id: id of the resource + :type resource_id: str + :param payload: ResourceRepresentation + :type payload: dict + :return: Response dict (empty) + :rtype: dict + """ + self._add_bearer_token_header(token) + url = self.format_url( + self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id + ) + data_raw = self.connection.raw_put(url, data=json.dumps(payload)) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + def resource_set_read(self, token, resource_id): + """Read a resource set. + + Spec + https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set + + ResourceRepresentation + https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation + + :param token: client access token + :type token: str + :param resource_id: id of the resource + :type resource_id: str + :return: ResourceRepresentation + :rtype: dict + """ + self._add_bearer_token_header(token) + url = self.format_url( + self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id + ) + data_raw = self.connection.raw_get(url) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) + + def resource_set_delete(self, token, resource_id): + """Delete a resource set. + + Spec + https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set + + :param token: client access token + :type token: str + :param resource_id: id of the resource + :type resource_id: str + :return: Response dict (empty) + :rtype: dict + """ + self._add_bearer_token_header(token) + url = self.format_url( + self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id + ) + data_raw = self.connection.raw_delete(url) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + def resource_set_list_ids(self, token): + """List all resource set ids. + + Spec + https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets + + :param token: client access token + :type token: str + :return: List of ids + :rtype: List[str] + """ + self._add_bearer_token_header(token) + data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"]) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) + + def resource_set_list(self, token): + """List all resource sets. + + Spec + https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets + + ResourceRepresentation + https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation + + :param token: client access token + :type token: str + :yields: Iterator over a list of ResourceRepresentations + :rtype: Iterator[dict] + """ + for resource_id in self.resource_set_list_ids(token): + resource = self.resource_set_read(token, resource_id) + yield resource diff --git a/src/keycloak/uma_permissions.py b/src/keycloak/uma_permissions.py index eadcd72..1560dd5 100644 --- a/src/keycloak/uma_permissions.py +++ b/src/keycloak/uma_permissions.py @@ -27,7 +27,7 @@ from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinit class UMAPermission: - """A class to conveniently assembly permissions. + """A class to conveniently assemble permissions. The class itself is callable, and will return the assembled permission. @@ -143,7 +143,7 @@ class UMAPermission: class Resource(UMAPermission): - """An UMAPermission Resource class to conveniently assembly permissions. + """A UMAPermission Resource class to conveniently assemble permissions. The class itself is callable, and will return the assembled permission. @@ -161,7 +161,7 @@ class Resource(UMAPermission): class Scope(UMAPermission): - """An UMAPermission Scope class to conveniently assembly permissions. + """A UMAPermission Scope class to conveniently assemble permissions. The class itself is callable, and will return the assembled permission. diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py index d8c5f0f..4fedee1 100644 --- a/src/keycloak/urls_patterns.py +++ b/src/keycloak/urls_patterns.py @@ -25,7 +25,8 @@ # OPENID URLS URL_REALM = "realms/{realm-name}" -URL_WELL_KNOWN = "realms/{realm-name}/.well-known/openid-configuration" +URL_WELL_KNOWN_BASE = "realms/{realm-name}/.well-known" +URL_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/openid-configuration" URL_TOKEN = "realms/{realm-name}/protocol/openid-connect/token" URL_USERINFO = "realms/{realm-name}/protocol/openid-connect/userinfo" URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout" @@ -208,3 +209,7 @@ URL_ADMIN_ATTACK_DETECTION = "admin/realms/{realm-name}/attack-detection/brute-f URL_ADMIN_ATTACK_DETECTION_USER = ( "admin/realms/{realm-name}/attack-detection/brute-force/users/{id}" ) + + +# UMA URLS +URL_UMA_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/uma2-configuration" diff --git a/tests/conftest.py b/tests/conftest.py index e0d93ea..18cb6a3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID -from keycloak import KeycloakAdmin, KeycloakOpenID +from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakUMA class KeycloakTestEnv(object): @@ -475,3 +475,20 @@ def selfsigned_cert(): ) return cert_pem, key_pem + + +@pytest.fixture +def uma(env: KeycloakTestEnv, realm: str): + """Fixture for initialized KeycloakUMA class. + + :param env: Keycloak test environment + :type env: KeycloakTestEnv + :param realm: Keycloak realm + :type realm: str + :yields: Keycloak OpenID client + :rtype: KeycloakOpenID + """ + # Return UMA + yield KeycloakUMA( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name=realm + ) diff --git a/tests/test_keycloak_uma.py b/tests/test_keycloak_uma.py new file mode 100644 index 0000000..2a1dde7 --- /dev/null +++ b/tests/test_keycloak_uma.py @@ -0,0 +1,120 @@ +"""Test module for KeycloakUMA.""" +import re +from typing import Tuple + +import pytest + +from keycloak import KeycloakOpenID +from keycloak.connection import ConnectionManager +from keycloak.exceptions import ( + KeycloakDeleteError, + KeycloakGetError, + KeycloakPostError, + KeycloakPutError, +) +from keycloak.keycloak_uma import KeycloakUMA + + +def test_keycloak_uma_init(env): + """Test KeycloakUMA's init method. + + :param env: Environment fixture + :type env: KeycloakTestEnv + """ + uma = KeycloakUMA( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name="master" + ) + + assert uma.realm_name == "master" + assert isinstance(uma.connection, ConnectionManager) + # should initially be empty + assert uma._well_known is None + assert uma.uma_well_known + # should be cached after first reference + assert uma._well_known is not None + + +def test_uma_well_known(uma: KeycloakUMA): + """Test the well_known method. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + res = uma.uma_well_known + assert res is not None + assert res != dict() + for key in ["resource_registration_endpoint"]: + assert key in res + + +def test_uma_resource_sets( + uma: KeycloakUMA, oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] +): + """Test resource sets. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + :param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, _, _ = oid_with_credentials_authz + + token = oid.token(grant_type="client_credentials") + access_token = token["access_token"] + + # Check that only the default resource is present + resource_sets = uma.resource_set_list(access_token) + resource_set_list = list(resource_sets) + assert len(resource_set_list) == 1, resource_set_list + assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] + + # Test create resource set + resource_to_create = { + "name": "mytest", + "scopes": ["test:read", "test:write"], + "type": "urn:test", + } + created_resource = uma.resource_set_create(access_token, resource_to_create) + assert created_resource + assert created_resource["_id"], created_resource + assert set(resource_to_create).issubset(set(created_resource)), created_resource + + # Test create the same resource set + with pytest.raises(KeycloakPostError) as err: + uma.resource_set_create(access_token, resource_to_create) + assert err.match( + re.escape( + '409: b\'{"error":"invalid_request","error_description":' + '"Resource with name [mytest] already exists."}\'' + ) + ) + + # Test get resource set + latest_resource = uma.resource_set_read(access_token, created_resource["_id"]) + assert latest_resource["name"] == created_resource["name"] + + # Test update resource set + latest_resource["name"] = "New Resource Name" + res = uma.resource_set_update(access_token, created_resource["_id"], latest_resource) + assert res == dict(), res + updated_resource = uma.resource_set_read(access_token, created_resource["_id"]) + assert updated_resource["name"] == "New Resource Name" + + # Test update resource set fail + with pytest.raises(KeycloakPutError) as err: + uma.resource_set_update( + token=access_token, resource_id=created_resource["_id"], payload={"wrong": "payload"} + ) + assert err.match('400: b\'{"error":"Unrecognized field') + + # Test delete resource set + res = uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"]) + assert res == dict(), res + with pytest.raises(KeycloakGetError) as err: + uma.resource_set_read(access_token, created_resource["_id"]) + err.match("404: b''") + + # Test delete fail + with pytest.raises(KeycloakDeleteError) as err: + uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"]) + assert err.match("404: b''")