diff --git a/keycloak/exceptions.py b/keycloak/exceptions.py index 67da62a..025dd0d 100644 --- a/keycloak/exceptions.py +++ b/keycloak/exceptions.py @@ -56,6 +56,7 @@ class KeycloakOperationError(KeycloakError): class KeycloakDeprecationError(KeycloakError): pass + class KeycloakGetError(KeycloakOperationError): pass @@ -76,6 +77,10 @@ class KeycloakInvalidTokenError(KeycloakOperationError): pass +class KeycloakPermissionFormatError(KeycloakOperationError): + pass + + def raise_error_from_response(response, error, expected_codes=None, skip_exists=False): if expected_codes is None: expected_codes = [200, 201, 204] diff --git a/keycloak/keycloak_openid.py b/keycloak/keycloak_openid.py index 1d6ed28..2beeb12 100644 --- a/keycloak/keycloak_openid.py +++ b/keycloak/keycloak_openid.py @@ -27,8 +27,8 @@ from jose import jwt from .authorization import Authorization from .connection import ConnectionManager -from .exceptions import raise_error_from_response, KeycloakGetError, \ - KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError +from .exceptions import KeycloakPermissionFormatError, raise_error_from_response, KeycloakGetError, \ + KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError, KeycloakAuthenticationError from .urls_patterns import ( URL_REALM, URL_AUTH, @@ -41,6 +41,24 @@ from .urls_patterns import ( URL_INTROSPECT ) +SAME_AS_CLIENT = object() + + +class AuthStatus: + """A class that represents the authorization/login status of a user associated with a token. + This has to evaluate to True if and only if the user is properly authorized for the requested resource.""" + + def __init__(self, is_logged_in, is_authorized, missing_permissions): + self.is_logged_in = is_logged_in + self.is_authorized = is_authorized + self.missing_permissions = missing_permissions + + def __bool__(self): + return self.is_authorized + + def __repr__(self): + return f"AuthStatus(is_authorized={self.is_authorized}, is_logged_in={self.is_logged_in}, missing_permissions={self.missing_permissions})" + class KeycloakOpenID: @@ -431,3 +449,103 @@ class KeycloakOpenID: permissions += policy.permissions return list(set(permissions)) + + def UMA_permissions(self, token, permissions, audience=SAME_AS_CLIENT, response_mode="permissions"): + """ + The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be + invoked by confidential clients. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + + :param token: user token + :param permissions: + :return: permissions list + """ + + if audience is SAME_AS_CLIENT: + audience = self.client_id + + permission = build_permission_param(permissions) + + params_path = {"realm-name": self.realm_name} + payload = {"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", "permission": permission, + "response_mode": response_mode, "audience": audience, "Authorization": "Bearer "+token} + + self.connection.add_param_headers("Authorization", "Bearer " + token) + try: + data_raw = self.connection.raw_post( + URL_TOKEN.format(**params_path), data=payload) + finally: + self.connection.del_param_headers("Authorization") + + return raise_error_from_response(data_raw, KeycloakGetError) + + def has_UMA_access(self, token, permissions): + """ + Get permission by user token + + :param token: user token + :param permissions: dict(resource:scope) + :return: result bool + """ + needed = build_permission_param(permissions) + try: + granted = self.UMA_permissions(token, permissions) + except (KeycloakGetError, KeycloakAuthenticationError) as e: + if e.response_code == 403: + return AuthStatus(is_logged_in=True, is_authorized=False, missing_permissions=needed) + elif e.response_code == 401: + return AuthStatus(is_logged_in=False, is_authorized=False, missing_permissions=needed) + raise + + for resource_struct in granted: + resource = resource_struct["rsname"] + scopes = resource_struct.get("scopes", None) + if not scopes: + needed.discard(resource) + continue + for scope in scopes: + needed.discard("{}#{}".format(resource, scope)) + + return AuthStatus(is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed) + + +def build_permission_param(permissions): + """ + Transform permissions to a set, so they are usable for requests + + :param permissions: either str (resource#scope), + iterable[str] (resource#scope), + dict[str,str] (resource: scope), + dict[str,iterable[str]] (resource: scopes) + :return: result bool + """ + if permissions is None or permissions == "": + return set() + if isinstance(permissions, str): + return set((permissions,)) + + try: # treat as dictionary of permissions + result = set() + for resource, scopes in permissions.items(): + if scopes is None: + result.add(resource) + elif isinstance(scopes, str): + result.add("{}#{}".format(resource, scopes)) + else: + for scope in scopes: + if not isinstance(scope, str): + raise KeycloakPermissionFormatError( + 'misbuilt permission {}'.format(permissions)) + result.add("{}#{}".format(resource, scope)) + return result + except AttributeError: + pass + + try: # treat as any other iterable of permissions + return set(permissions) + except TypeError: + pass + raise KeycloakPermissionFormatError( + 'misbuilt permission {}'.format(permissions)) diff --git a/keycloak/tests/test_openid.py b/keycloak/tests/test_openid.py new file mode 100644 index 0000000..f574cb2 --- /dev/null +++ b/keycloak/tests/test_openid.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2017 Marcos Pereira +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . +from ..keycloak_openid import * +from ..exceptions import * +try: + import unittest +except ImportError: + import unittest2 as unittest + +from collections import namedtuple + + +class Success(Exception): + """Used as stand-in for an actual exception for tests that are meant to succeed. + This exception should never be raised.""" + + +class TestOpenID(unittest.TestCase): + def test_build_permission_param(self): + test = namedtuple("test", + ["name", "permission", "result", "error"]) + tests = [ + test("None", None, set(), Success), + test("empty str", "", set(), Success), + test("empty list", [], set(), Success), + test("empty tuple", (), set(), Success), + test("empty set", set(), set(), Success), + test("empty dict", {}, set(), Success), + + test("str", "resource1", {"resource1"}, Success), + + test("list[str]", ["res1#scope1", "res1#scope2"], + {"res1#scope1", "res1#scope2"}, Success), + + test("tuple[str]", ("res1#scope1", "res1#scope2"), + {"res1#scope1", "res1#scope2"}, Success), + + test("set[str]", {"res1#scope1", "res1#scope2"}, + {"res1#scope1", "res1#scope2"}, Success), + + test("dict[str,str]", {"res1": "scope1"}, + {"res1#scope1"}, Success), + test("dict[str,list[str]]", {"res1": ["scope1", "scope2"]}, + {"res1#scope1", "res1#scope2"}, Success), + test("dict[str,list[str]] 2", {"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]}, + {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}, Success), + + test("misbuilt: dict[str,list[list[str]]]", { + "res1": [["scope1", "scope2"]]}, None, KeycloakPermissionFormatError), + test("misbuilt: list[list[str]]", + [["scope1", "scope2"]], None, KeycloakPermissionFormatError), + test("misbuilt: list[set[str]]", + [{"scope1", "scope2"}], None, KeycloakPermissionFormatError), + test("misbuilt: set[set[str]]", + [{"scope1"}], None, KeycloakPermissionFormatError), + ] + + for case in tests: + with self.subTest(case.name): + msg = f'in case "{case.name}"' + try: + if not case.error is Success: + with self.assertRaises(case.error, msg=msg): + build_permission_param(case.permission) + else: + self.assertEqual( + build_permission_param(case.permission), + case.result, msg=msg) + except AssertionError: + raise + except Exception as e: + self.fail( + f'unexpected exception "{e}": {msg}')