From 8dafb4ec30b7203f2c918a6b52c3e53f19ec72ab Mon Sep 17 00:00:00 2001 From: Merle Nerger Date: Tue, 12 Apr 2022 11:38:16 +0200 Subject: [PATCH] feat: added UMA-permission request functionality --- README.md | 9 ++ src/keycloak/exceptions.py | 8 ++ src/keycloak/keycloak_openid.py | 76 +++++++++++++++++ src/keycloak/uma_permissions.py | 145 ++++++++++++++++++++++++++++++++ tests/test_uma_permissions.py | 143 +++++++++++++++++++++++++++++++ 5 files changed, 381 insertions(+) create mode 100644 src/keycloak/uma_permissions.py create mode 100644 tests/test_uma_permissions.py diff --git a/README.md b/README.md index 8704e1d..42a4824 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,15 @@ keycloak_openid.load_authorization_config("example-authz-config.json") policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY) permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect') +# Get UMA-permissions by token +token = keycloak_openid.token("user", "password") +permissions = keycloak_openid.uma_permissions(token['access_token']) + +# Get auth status for a specific resource and scope by token +token = keycloak_openid.token("user", "password") +auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope") + + # KEYCLOAK ADMIN from keycloak import KeycloakAdmin diff --git a/src/keycloak/exceptions.py b/src/keycloak/exceptions.py index a9c1b2b..925c937 100644 --- a/src/keycloak/exceptions.py +++ b/src/keycloak/exceptions.py @@ -88,6 +88,14 @@ class KeycloakInvalidTokenError(KeycloakOperationError): pass +class KeycloakPermissionFormatError(KeycloakOperationError): + pass + + +class PermissionDefinitionError(Exception): + pass + + def raise_error_from_response(response, error, expected_codes=None, skip_exists=False): if expected_codes is None: expected_codes = [200, 201, 204] diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 4205b0b..e4378da 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -28,6 +28,7 @@ from jose import jwt from .authorization import Authorization from .connection import ConnectionManager from .exceptions import ( + KeycloakAuthenticationError, KeycloakAuthorizationConfigError, KeycloakDeprecationError, KeycloakGetError, @@ -35,6 +36,7 @@ from .exceptions import ( KeycloakRPTNotFound, raise_error_from_response, ) +from .uma_permissions import AuthStatus, build_permission_param from .urls_patterns import ( URL_AUTH, URL_CERTS, @@ -47,6 +49,8 @@ from .urls_patterns import ( URL_WELL_KNOWN, ) +SAME_AS_CLIENT = object() + class KeycloakOpenID: """ @@ -452,3 +456,75 @@ class KeycloakOpenID: permissions += policy.permissions return list(set(permissions)) + + def uma_permissions( + self, token, permissions, audience=SAME_AS_CLIENT, response_mode="permissions" + ): + """ + The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be + invoked by confidential clients. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + + :param token: user token + :param permissions: + :return: permissions list + """ + + if audience is SAME_AS_CLIENT: + audience = self.client_id + + permission = build_permission_param(permissions) + + params_path = {"realm-name": self.realm_name} + payload = { + "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", + "permission": permission, + "response_mode": response_mode, + "audience": audience, + "Authorization": "Bearer " + token, + } + + self.connection.add_param_headers("Authorization", "Bearer " + token) + try: + data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) + finally: + self.connection.del_param_headers("Authorization") + + return raise_error_from_response(data_raw, KeycloakGetError) + + def has_uma_access(self, token, permissions): + """ + Get permission by user token + + :param token: user token + :param permissions: dict(resource:scope) + :return: result bool + """ + needed = build_permission_param(permissions) + try: + granted = self.uma_permissions(token, permissions) + except (KeycloakGetError, KeycloakAuthenticationError) as e: + if e.response_code == 403: + return AuthStatus( + is_logged_in=True, is_authorized=False, missing_permissions=needed + ) + elif e.response_code == 401: + return AuthStatus( + is_logged_in=False, is_authorized=False, missing_permissions=needed + ) + raise + + for resource_struct in granted: + resource = resource_struct["rsname"] + scopes = resource_struct.get("scopes", None) + if not scopes: + needed.discard(resource) + continue + for scope in scopes: + needed.discard("{}#{}".format(resource, scope)) + + return AuthStatus( + is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed + ) diff --git a/src/keycloak/uma_permissions.py b/src/keycloak/uma_permissions.py new file mode 100644 index 0000000..5d023dc --- /dev/null +++ b/src/keycloak/uma_permissions.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError + + +class UMAPermission: + """A class to conveniently assembly permissions. + The class itself is callable, and will return the assembled permission. + + Usage example: + + >>> r = Resource("Users") + >>> s = Scope("delete") + >>> permission = r(s) + >>> print(permission) + 'Users#delete' + + """ + + def __init__(self, *, resource="", scope=""): + self.resource = resource + self.scope = scope + + def __str__(self): + scope = self.scope + if scope: + scope = "#" + scope + return "{}{}".format(self.resource, scope) + + def __eq__(self, __o: object) -> bool: + return str(self) == str(__o) + + def __repr__(self) -> str: + return self.__str__() + + def __hash__(self) -> int: + return hash(str(self)) + + def __call__(self, *args, resource="", scope="") -> object: + result_resource = self.resource + result_scope = self.scope + + for arg in args: + if not isinstance(arg, UMAPermission): + raise PermissionDefinitionError( + "can't determine if '{}' is a resource or scope".format(arg) + ) + if arg.resource: + result_resource = str(arg.resource) + if arg.scope: + result_scope = str(arg.scope) + + if resource: + result_resource = str(resource) + if scope: + result_scope = str(scope) + + return UMAPermission(resource=result_resource, scope=result_scope) + + +class Resource(UMAPermission): + def __init__(self, resource): + super().__init__(resource=resource) + + +class Scope(UMAPermission): + def __init__(self, scope): + super().__init__(scope=scope) + + +class AuthStatus: + """A class that represents the authorization/login status of a user associated with a token. + This has to evaluate to True if and only if the user is properly authorized for the requested resource.""" + + def __init__(self, is_logged_in, is_authorized, missing_permissions): + self.is_logged_in = is_logged_in + self.is_authorized = is_authorized + self.missing_permissions = missing_permissions + + def __bool__(self): + return self.is_authorized + + def __repr__(self): + return f"AuthStatus(is_authorized={self.is_authorized}, is_logged_in={self.is_logged_in}, missing_permissions={self.missing_permissions})" + + +def build_permission_param(permissions): + """ + Transform permissions to a set, so they are usable for requests + + :param permissions: either str (resource#scope), + iterable[str] (resource#scope), + dict[str,str] (resource: scope), + dict[str,iterable[str]] (resource: scopes) + :return: result bool + """ + if permissions is None or permissions == "": + return set() + if isinstance(permissions, (str, UMAPermission)): + return set((permissions,)) + + try: # treat as dictionary of permissions + result = set() + for resource, scopes in permissions.items(): + if scopes is None: + result.add(resource) + elif isinstance(scopes, (str, UMAPermission)): + result.add("{}#{}".format(resource, scopes)) + else: + for scope in scopes: + if not isinstance(scope, (str, UMAPermission)): + raise KeycloakPermissionFormatError( + "misbuilt permission {}".format(permissions) + ) + result.add("{}#{}".format(resource, scope)) + return result + except AttributeError: + pass + + try: # treat as any other iterable of permissions + return set(permissions) + except TypeError: + pass + raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions)) diff --git a/tests/test_uma_permissions.py b/tests/test_uma_permissions.py new file mode 100644 index 0000000..d26830b --- /dev/null +++ b/tests/test_uma_permissions.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2017 Marcos Pereira +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . +import pytest +import re +from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError +from keycloak.uma_permissions import build_permission_param, Resource, Scope + + + +def test_resource_with_scope_obj(): + r = Resource("Resource1") + s = Scope("Scope1") + assert r(s) == "Resource1#Scope1" + + +def test_scope_with_resource_obj(): + r = Resource("Resource1") + s = Scope("Scope1") + assert s(r) == "Resource1#Scope1" + + +def test_resource_scope_str(): + r = Resource("Resource1") + s = "Scope1" + assert r(scope=s) == "Resource1#Scope1" + + +def test_scope_resource_str(): + r = "Resource1" + s = Scope("Scope1") + assert s(resource=r) == "Resource1#Scope1" + + +def test_resource_scope_dict(): + r = Resource("Resource1") + s = {"scope": "Scope1"} + assert r(**s) == "Resource1#Scope1" + + +def test_scope_resource_dict(): + r = {"resource": "Resource1"} + s = Scope("Scope1") + assert s(**r) == "Resource1#Scope1" + + +def test_resource_scope_list(): + r = Resource("Resource1") + s = ["Scope1"] + with pytest.raises(PermissionDefinitionError) as err: + r(*s) + assert err.match("can't determine if 'Scope1' is a resource or scope") + + +def test_build_permission_none(): + assert build_permission_param(None) == set() + + +def test_build_permission_empty_str(): + assert build_permission_param("") == set() + + +def test_build_permission_empty_list(): + assert build_permission_param([]) == set() + + +def test_build_permission_empty_tuple(): + assert build_permission_param(()) == set() + + +def test_build_permission_empty_set(): + assert build_permission_param(set()) == set() + + +def test_build_permission_empty_dict(): + assert build_permission_param({}) == set() + + +def test_build_permission_str(): + assert build_permission_param("resource1") == {"resource1"} + + +def test_build_permission_list_str(): + assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_tuple_str(): + assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_set_str(): + assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_tuple_dict_str_str(): + assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"} + + +def test_build_permission_tuple_dict_str_list_str(): + assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_tuple_dict_str_list_str2(): + assert build_permission_param( + {"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]} + ) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"} + + +def test_build_permission_misbuilt_dict_str_list_list_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param({"res1": [["scope1", "scope2"]]}) + assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}")) + + +def test_build_permission_misbuilt_list_list_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param([["scope1", "scope2"]]) + assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]")) + + +def test_build_permission_misbuilt_list_set_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param([{"scope1", "scope2"}]) + assert err.match(re.escape("misbuilt permission [{'scope1', 'scope2'}]")) + + +def test_build_permission_misbuilt_set_set_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param([{"scope1"}]) + assert err.match(re.escape("misbuilt permission [{'scope1'}]"))