diff --git a/README.md b/README.md index 8704e1d..692ce48 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,19 @@ keycloak_openid.load_authorization_config("example-authz-config.json") policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY) permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect') +# Get UMA-permissions by token +token = keycloak_openid.token("user", "password") +permissions = keycloak_openid.uma_permissions(token['access_token']) + +# Get UMA-permissions by token with specific resource and scope requested +token = keycloak_openid.token("user", "password") +permissions = keycloak_openid.uma_permissions(token['access_token'], permissions="Resource#Scope") + +# Get auth status for a specific resource and scope by token +token = keycloak_openid.token("user", "password") +auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope") + + # KEYCLOAK ADMIN from keycloak import KeycloakAdmin diff --git a/src/keycloak/exceptions.py b/src/keycloak/exceptions.py index a9c1b2b..925c937 100644 --- a/src/keycloak/exceptions.py +++ b/src/keycloak/exceptions.py @@ -88,6 +88,14 @@ class KeycloakInvalidTokenError(KeycloakOperationError): pass +class KeycloakPermissionFormatError(KeycloakOperationError): + pass + + +class PermissionDefinitionError(Exception): + pass + + def raise_error_from_response(response, error, expected_codes=None, skip_exists=False): if expected_codes is None: expected_codes = [200, 201, 204] diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 4205b0b..f0b73a6 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -28,13 +28,16 @@ from jose import jwt from .authorization import Authorization from .connection import ConnectionManager from .exceptions import ( + KeycloakAuthenticationError, KeycloakAuthorizationConfigError, KeycloakDeprecationError, KeycloakGetError, KeycloakInvalidTokenError, + KeycloakPostError, KeycloakRPTNotFound, raise_error_from_response, ) +from .uma_permissions import AuthStatus, build_permission_param from .urls_patterns import ( URL_AUTH, URL_CERTS, @@ -452,3 +455,67 @@ class KeycloakOpenID: permissions += policy.permissions return list(set(permissions)) + + def uma_permissions(self, token, permissions=""): + """ + Get UMA permissions by user token with requested permissions + + The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be + invoked by confidential clients. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + :param token: user token + :param permissions: list of uma permissions list(resource:scope) requested by the user + :return: permissions list + """ + + permission = build_permission_param(permissions) + + params_path = {"realm-name": self.realm_name} + payload = { + "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", + "permission": permission, + "response_mode": "permissions", + "audience": self.client_id, + } + + self.connection.add_param_headers("Authorization", "Bearer " + token) + data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) + + return raise_error_from_response(data_raw, KeycloakPostError) + + def has_uma_access(self, token, permissions): + """ + Determine whether user has uma permissions with specified user token + + :param token: user token + :param permissions: list of uma permissions (resource:scope) + :return: auth status + """ + needed = build_permission_param(permissions) + try: + granted = self.uma_permissions(token, permissions) + except (KeycloakPostError, KeycloakAuthenticationError) as e: + if e.response_code == 403: + return AuthStatus( + is_logged_in=True, is_authorized=False, missing_permissions=needed + ) + elif e.response_code == 401: + return AuthStatus( + is_logged_in=False, is_authorized=False, missing_permissions=needed + ) + raise + + for resource_struct in granted: + resource = resource_struct["rsname"] + scopes = resource_struct.get("scopes", None) + if not scopes: + needed.discard(resource) + continue + for scope in scopes: + needed.discard("{}#{}".format(resource, scope)) + + return AuthStatus( + is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed + ) diff --git a/src/keycloak/uma_permissions.py b/src/keycloak/uma_permissions.py new file mode 100644 index 0000000..5653c76 --- /dev/null +++ b/src/keycloak/uma_permissions.py @@ -0,0 +1,182 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError + + +class UMAPermission: + """A class to conveniently assembly permissions. + The class itself is callable, and will return the assembled permission. + + Usage example: + + >>> r = Resource("Users") + >>> s = Scope("delete") + >>> permission = r(s) + >>> print(permission) + 'Users#delete' + + """ + + def __init__(self, permission=None, resource="", scope=""): + self.resource = resource + self.scope = scope + + if permission: + if not isinstance(permission, UMAPermission): + raise PermissionDefinitionError( + "can't determine if '{}' is a resource or scope".format(permission) + ) + if permission.resource: + self.resource = str(permission.resource) + if permission.scope: + self.scope = str(permission.scope) + + def __str__(self): + scope = self.scope + if scope: + scope = "#" + scope + return "{}{}".format(self.resource, scope) + + def __eq__(self, __o: object) -> bool: + return str(self) == str(__o) + + def __repr__(self) -> str: + return self.__str__() + + def __hash__(self) -> int: + return hash(str(self)) + + def __call__(self, permission=None, resource="", scope="") -> object: + result_resource = self.resource + result_scope = self.scope + + if resource: + result_resource = str(resource) + if scope: + result_scope = str(scope) + + if permission: + if not isinstance(permission, UMAPermission): + raise PermissionDefinitionError( + "can't determine if '{}' is a resource or scope".format(permission) + ) + if permission.resource: + result_resource = str(permission.resource) + if permission.scope: + result_scope = str(permission.scope) + + return UMAPermission(resource=result_resource, scope=result_scope) + + +class Resource(UMAPermission): + """An UMAPermission Resource class to conveniently assembly permissions. + The class itself is callable, and will return the assembled permission. + """ + + def __init__(self, resource): + super().__init__(resource=resource) + + +class Scope(UMAPermission): + """An UMAPermission Scope class to conveniently assembly permissions. + The class itself is callable, and will return the assembled permission. + """ + + def __init__(self, scope): + super().__init__(scope=scope) + + +class AuthStatus: + """A class that represents the authorization/login status of a user associated with a token. + This has to evaluate to True if and only if the user is properly authorized + for the requested resource.""" + + def __init__(self, is_logged_in, is_authorized, missing_permissions): + self.is_logged_in = is_logged_in + self.is_authorized = is_authorized + self.missing_permissions = missing_permissions + + def __bool__(self): + return self.is_authorized + + def __repr__(self): + return ( + f"AuthStatus(" + f"is_authorized={self.is_authorized}, " + f"is_logged_in={self.is_logged_in}, " + f"missing_permissions={self.missing_permissions})" + ) + + +def build_permission_param(permissions): + """ + Transform permissions to a set, so they are usable for requests + + :param permissions: either str (resource#scope), + iterable[str] (resource#scope), + dict[str,str] (resource: scope), + dict[str,iterable[str]] (resource: scopes) + :return: result bool + """ + if permissions is None or permissions == "": + return set() + if isinstance(permissions, str): + return set((permissions,)) + if isinstance(permissions, UMAPermission): + return set((str(permissions),)) + + try: # treat as dictionary of permissions + result = set() + for resource, scopes in permissions.items(): + print(f"resource={resource}scopes={scopes}") + if scopes is None: + result.add(resource) + elif isinstance(scopes, str): + result.add("{}#{}".format(resource, scopes)) + else: + try: + for scope in scopes: + if not isinstance(scope, str): + raise KeycloakPermissionFormatError( + "misbuilt permission {}".format(permissions) + ) + result.add("{}#{}".format(resource, scope)) + except TypeError: + raise KeycloakPermissionFormatError( + "misbuilt permission {}".format(permissions) + ) + return result + except AttributeError: + pass + + try: # treat as any other iterable of permissions + result = set() + for permission in permissions: + if not isinstance(permission, (str, UMAPermission)): + raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions)) + result.add(str(permission)) + return result + except TypeError: + pass + raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions)) diff --git a/tests/test_uma_permissions.py b/tests/test_uma_permissions.py new file mode 100644 index 0000000..09d7147 --- /dev/null +++ b/tests/test_uma_permissions.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2017 Marcos Pereira +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . +import re + +import pytest + +from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError +from keycloak.uma_permissions import Resource, Scope, build_permission_param + + +def test_resource_with_scope_obj(): + r = Resource("Resource1") + s = Scope("Scope1") + assert r(s) == "Resource1#Scope1" + + +def test_scope_with_resource_obj(): + r = Resource("Resource1") + s = Scope("Scope1") + assert s(r) == "Resource1#Scope1" + + +def test_resource_scope_str(): + r = Resource("Resource1") + s = "Scope1" + assert r(scope=s) == "Resource1#Scope1" + + +def test_scope_resource_str(): + r = "Resource1" + s = Scope("Scope1") + assert s(resource=r) == "Resource1#Scope1" + + +def test_resource_scope_list(): + r = Resource("Resource1") + s = ["Scope1"] + with pytest.raises(PermissionDefinitionError) as err: + r(s) + assert err.match(re.escape("can't determine if '['Scope1']' is a resource or scope")) + + +def test_build_permission_none(): + assert build_permission_param(None) == set() + + +def test_build_permission_empty_str(): + assert build_permission_param("") == set() + + +def test_build_permission_empty_list(): + assert build_permission_param([]) == set() + + +def test_build_permission_empty_tuple(): + assert build_permission_param(()) == set() + + +def test_build_permission_empty_set(): + assert build_permission_param(set()) == set() + + +def test_build_permission_empty_dict(): + assert build_permission_param({}) == set() + + +def test_build_permission_str(): + assert build_permission_param("resource1") == {"resource1"} + + +def test_build_permission_list_str(): + assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_tuple_str(): + assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_set_str(): + assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_tuple_dict_str_str(): + assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"} + + +def test_build_permission_tuple_dict_str_list_str(): + assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_tuple_dict_str_list_str2(): + assert build_permission_param( + {"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]} + ) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"} + + +def test_build_permission_uma(): + assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"} + + +def test_build_permission_uma_list(): + assert build_permission_param( + [Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))] + ) == {"res1#scope1", "res1#scope2"} + + +def test_build_permission_misbuilt_dict_str_list_list_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param({"res1": [["scope1", "scope2"]]}) + assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}")) + + +def test_build_permission_misbuilt_list_list_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + print(build_permission_param([["scope1", "scope2"]])) + assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]")) + + +def test_build_permission_misbuilt_list_set_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param([{"scope1", "scope2"}]) + assert err.match("misbuilt permission.*") + + +def test_build_permission_misbuilt_set_set_str(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param([{"scope1"}]) + assert err.match(re.escape("misbuilt permission [{'scope1'}]")) + + +def test_build_permission_misbuilt_dict_non_iterable(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param({"res1": 5}) + assert err.match(re.escape("misbuilt permission {'res1': 5}"))