diff --git a/src/keycloak/keycloak_uma.py b/src/keycloak/keycloak_uma.py index d143bfd..f323e2a 100644 --- a/src/keycloak/keycloak_uma.py +++ b/src/keycloak/keycloak_uma.py @@ -27,8 +27,10 @@ The module contains a UMA compatible client for keycloak: https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html """ import json +from typing import Iterable from urllib.parse import quote_plus +from .connection import ConnectionManager from .exceptions import ( KeycloakDeleteError, KeycloakGetError, @@ -37,6 +39,7 @@ from .exceptions import ( raise_error_from_response, ) from .openid_connection import KeycloakOpenIDConnection +from .uma_permissions import UMAPermission from .urls_patterns import URL_UMA_WELL_KNOWN @@ -171,16 +174,62 @@ class KeycloakUMA: data_raw = self.connection.raw_delete(url) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def resource_set_list_ids(self): - """List all resource set ids. + def resource_set_list_ids( + self, + name: str = "", + exact_name: bool = False, + uri: str = "", + owner: str = "", + resource_type: str = "", + scope: str = "", + first: int = 0, + maximum: int = -1, + ): + """Query for list of resource set ids. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets + :param name: query resource name + :type name: str + :param exact_name: query exact match for resource name + :type exact_name: bool + :param uri: query resource uri + :type uri: str + :param owner: query resource owner + :type owner: str + :param resource_type: query resource type + :type resource_type: str + :param scope: query resource scope + :type scope: str + :param first: index of first matching resource to return + :type first: int + :param maximum: maximum number of resources to return (-1 for all) + :type maximum: int :return: List of ids :rtype: List[str] """ - data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"]) + query = dict() + if name: + query["name"] = name + if exact_name: + query["exactName"] = "true" + if uri: + query["uri"] = uri + if owner: + query["owner"] = owner + if resource_type: + query["type"] = resource_type + if scope: + query["scope"] = scope + if first > 0: + query["first"] = first + if maximum >= 0: + query["max"] = maximum + + data_raw = self.connection.raw_get( + self.uma_well_known["resource_registration_endpoint"], **query + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) def resource_set_list(self): @@ -198,3 +247,171 @@ class KeycloakUMA: for resource_id in self.resource_set_list_ids(): resource = self.resource_set_read(resource_id) yield resource + + def permission_ticket_create(self, permissions: Iterable[UMAPermission]): + """Create a permission ticket. + + :param permissions: Iterable of uma permissions to validate the token against + :type permissions: Iterable[UMAPermission] + :returns: Keycloak decision + :rtype: boolean + :raises KeycloakPostError: In case permission resource not found + """ + resources = dict() + for permission in permissions: + resource_id = getattr(permission, "resource_id", None) + + if resource_id is None: + resource_ids = self.resource_set_list_ids( + exact_name=True, name=permission.resource, first=0, maximum=1 + ) + + if not resource_ids: + raise KeycloakPostError("Invalid resource specified") + + setattr(permission, "resource_id", resource_ids[0]) + + resources.setdefault(resource_id, set()) + if permission.scope: + resources[resource_id].add(permission.scope) + + payload = [ + {"resource_id": resource_id, "resource_scopes": list(scopes)} + for resource_id, scopes in resources.items() + ] + + data_raw = self.connection.raw_post( + self.uma_well_known["permission_endpoint"], data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + def permissions_check(self, token, permissions: Iterable[UMAPermission]): + """Check UMA permissions by user token with requested permissions. + + The token endpoint is used to check UMA permissions from Keycloak. It can only be + invoked by confidential clients. + + https://www.keycloak.org/docs/latest/authorization_services/#_service_authorization_api + + :param token: user token + :type token: str + :param permissions: Iterable of uma permissions to validate the token against + :type permissions: Iterable[UMAPermission] + :returns: Keycloak decision + :rtype: boolean + """ + payload = { + "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", + "permission": ",".join(str(permission) for permission in permissions), + "response_mode": "decision", + "audience": self.connection.client_id, + } + + # Everyone always has the null set of permissions + # However keycloak cannot evaluate the null set + if len(payload["permission"]) == 0: + return True + + connection = ConnectionManager(self.connection.base_url) + connection.add_param_headers("Authorization", "Bearer " + token) + connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") + data_raw = connection.raw_post(self.uma_well_known["token_endpoint"], data=payload) + try: + data = raise_error_from_response(data_raw, KeycloakPostError) + except KeycloakPostError: + return False + return data.get("result", False) + + def policy_resource_create(self, resource_id, payload): + """Create permission policy for resource. + + Supports name, description, scopes, roles, groups, clients + + https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource + + :param resource_id: _id of resource + :type resource_id: str + :param payload: permission configuration + :type payload: dict + :return: PermissionRepresentation + :rtype: dict + """ + data_raw = self.connection.raw_post( + self.uma_well_known["policy_endpoint"] + f"/{resource_id}", data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + def policy_update(self, policy_id, payload): + """Update permission policy. + + https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource + https://www.keycloak.org/docs-api/21.0.1/rest-api/index.html#_policyrepresentation + + :param policy_id: id of policy permission + :type policy_id: str + :param payload: policy permission configuration + :type payload: dict + :return: PermissionRepresentation + :rtype: dict + """ + data_raw = self.connection.raw_put( + self.uma_well_known["policy_endpoint"] + f"/{policy_id}", data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError) + + def policy_delete(self, policy_id): + """Delete permission policy. + + https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission + https://www.keycloak.org/docs-api/21.0.1/rest-api/index.html#_policyrepresentation + + :param policy_id: id of permission policy + :type policy_id: str + :return: PermissionRepresentation + :rtype: dict + """ + data_raw = self.connection.raw_delete( + self.uma_well_known["policy_endpoint"] + f"/{policy_id}" + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) + + def policy_query( + self, + resource: str = "", + name: str = "", + scope: str = "", + first: int = 0, + maximum: int = -1, + ): + """Query permission policies. + + https://www.keycloak.org/docs/latest/authorization_services/#querying-permission + + :param resource: query resource id + :type resource: str + :param name: query resource name + :type name: str + :param scope: query resource scope + :type scope: str + :param first: index of first matching resource to return + :type first: int + :param maximum: maximum number of resources to return (-1 for all) + :type maximum: int + :return: List of ids + :return: List of ids + :rtype: List[str] + """ + query = dict() + if name: + query["name"] = name + if resource: + query["resource"] = resource + if scope: + query["scope"] = scope + if first > 0: + query["first"] = first + if maximum >= 0: + query["max"] = maximum + + data_raw = self.connection.raw_get(self.uma_well_known["policy_endpoint"], **query) + return raise_error_from_response(data_raw, KeycloakGetError) diff --git a/tests/test_keycloak_uma.py b/tests/test_keycloak_uma.py index 9808804..b234310 100644 --- a/tests/test_keycloak_uma.py +++ b/tests/test_keycloak_uma.py @@ -3,13 +3,14 @@ import re import pytest -from keycloak import KeycloakOpenIDConnection, KeycloakUMA +from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA from keycloak.exceptions import ( KeycloakDeleteError, KeycloakGetError, KeycloakPostError, KeycloakPutError, ) +from keycloak.uma_permissions import UMAPermission def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection): @@ -54,6 +55,40 @@ def test_uma_resource_sets(uma: KeycloakUMA): assert len(resource_set_list) == 1, resource_set_list assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] + # Test query for resource sets + resource_set_list_ids = uma.resource_set_list_ids() + assert len(resource_set_list_ids) == 1 + + resource_set_list_ids2 = uma.resource_set_list_ids(name="Default") + assert resource_set_list_ids2 == resource_set_list_ids + + resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource") + assert resource_set_list_ids2 == resource_set_list_ids + + resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True) + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = uma.resource_set_list_ids(first=1) + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = uma.resource_set_list_ids(name="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = uma.resource_set_list_ids(maximum=0) + assert len(resource_set_list_ids) == 0 + # Test create resource set resource_to_create = { "name": "mytest", @@ -102,3 +137,175 @@ def test_uma_resource_sets(uma: KeycloakUMA): with pytest.raises(KeycloakDeleteError) as err: uma.resource_set_delete(resource_id=created_resource["_id"]) assert err.match("404: b''") + + +def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin): + """Test policies. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + # Create some required test data + resource_to_create = { + "name": "mytest", + "scopes": ["test:read", "test:write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + created_resource = uma.resource_set_create(resource_to_create) + group_id = admin.create_group({"name": "UMAPolicyGroup"}) + role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"}) + other_client_id = admin.create_client({"name": "UMAOtherClient"}) + client = admin.get_client(other_client_id) + + resource_id = created_resource["_id"] + + # Create a role policy + policy_to_create = { + "name": "TestPolicyRole", + "description": "Test resource policy description", + "scopes": ["test:read", "test:write"], + "roles": ["roleUMAPolicy"], + } + policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + # Create a client policy + policy_to_create = { + "name": "TestPolicyClient", + "description": "Test resource policy description", + "scopes": ["test:read"], + "clients": [client["clientId"]], + } + policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + policy_to_create = { + "name": "TestPolicyGroup", + "description": "Test resource policy description", + "scopes": ["test:read"], + "groups": ["/UMAPolicyGroup"], + } + policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + policies = uma.policy_query() + assert len(policies) == 3 + + policies = uma.policy_query(name="TestPolicyGroup") + assert len(policies) == 1 + + policy_id = policy["id"] + uma.policy_delete(policy_id) + with pytest.raises(KeycloakDeleteError) as err: + uma.policy_delete(policy_id) + assert err.match( + '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'' + ) + + policies = uma.policy_query() + assert len(policies) == 2 + + policy = policies[0] + uma.policy_update(policy_id=policy["id"], payload=policy) + + policies = uma.policy_query() + assert len(policies) == 2 + + policies = uma.policy_query(name="Invalid") + assert len(policies) == 0 + policies = uma.policy_query(scope="Invalid") + assert len(policies) == 0 + policies = uma.policy_query(resource="Invalid") + assert len(policies) == 0 + policies = uma.policy_query(first=3) + assert len(policies) == 0 + policies = uma.policy_query(maximum=0) + assert len(policies) == 0 + + policies = uma.policy_query(name=policy["name"]) + assert len(policies) == 1 + policies = uma.policy_query(scope=policy["scopes"][0]) + assert len(policies) == 2 + policies = uma.policy_query(resource=resource_id) + assert len(policies) == 2 + + uma.resource_set_delete(resource_id) + admin.delete_client(other_client_id) + admin.delete_realm_role(role_id) + admin.delete_group(group_id) + + +def test_uma_access(uma: KeycloakUMA): + """Test permission access checks. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + resource_to_create = { + "name": "mytest", + "scopes": ["read", "write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + resource = uma.resource_set_create(resource_to_create) + + policy_to_create = { + "name": "TestPolicy", + "description": "Test resource policy description", + "scopes": [resource_to_create["scopes"][0]], + "clients": [uma.connection.client_id], + } + uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) + + token = uma.connection.token + permissions = list() + assert uma.permissions_check(token["access_token"], permissions) + + permissions.append(UMAPermission(resource=resource_to_create["name"])) + assert uma.permissions_check(token["access_token"], permissions) + + permissions.append(UMAPermission(resource="not valid")) + assert not uma.permissions_check(token["access_token"], permissions) + uma.resource_set_delete(resource["_id"]) + + +def test_uma_permission_ticket(uma: KeycloakUMA): + """Test permission ticket generation. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + resource_to_create = { + "name": "mytest", + "scopes": ["read", "write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + resource = uma.resource_set_create(resource_to_create) + + policy_to_create = { + "name": "TestPolicy", + "description": "Test resource policy description", + "scopes": [resource_to_create["scopes"][0]], + "clients": [uma.connection.client_id], + } + uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) + permissions = ( + UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]), + ) + response = uma.permission_ticket_create(permissions) + + rpt = uma.connection.keycloak_openid.token( + grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"] + ) + assert rpt + assert "access_token" in rpt + + permissions = (UMAPermission(resource="invalid"),) + with pytest.raises(KeycloakPostError): + uma.permission_ticket_create(permissions) + + uma.resource_set_delete(resource["_id"])