Browse Source

UMA: added UMA-permission request functionality

pull/297/head
Merle Nerger 3 years ago
parent
commit
244a5f3d6e
  1. 5
      keycloak/exceptions.py
  2. 122
      keycloak/keycloak_openid.py
  3. 87
      keycloak/tests/test_openid.py

5
keycloak/exceptions.py

@ -56,6 +56,7 @@ class KeycloakOperationError(KeycloakError):
class KeycloakDeprecationError(KeycloakError):
pass
class KeycloakGetError(KeycloakOperationError):
pass
@ -76,6 +77,10 @@ class KeycloakInvalidTokenError(KeycloakOperationError):
pass
class KeycloakPermissionFormatError(KeycloakOperationError):
pass
def raise_error_from_response(response, error, expected_codes=None, skip_exists=False):
if expected_codes is None:
expected_codes = [200, 201, 204]

122
keycloak/keycloak_openid.py

@ -27,8 +27,8 @@ from jose import jwt
from .authorization import Authorization
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError, \
KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError
from .exceptions import KeycloakPermissionFormatError, raise_error_from_response, KeycloakGetError, \
KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError, KeycloakAuthenticationError
from .urls_patterns import (
URL_REALM,
URL_AUTH,
@ -41,6 +41,24 @@ from .urls_patterns import (
URL_INTROSPECT
)
SAME_AS_CLIENT = object()
class AuthStatus:
"""A class that represents the authorization/login status of a user associated with a token.
This has to evaluate to True if and only if the user is properly authorized for the requested resource."""
def __init__(self, is_logged_in, is_authorized, missing_permissions):
self.is_logged_in = is_logged_in
self.is_authorized = is_authorized
self.missing_permissions = missing_permissions
def __bool__(self):
return self.is_authorized
def __repr__(self):
return f"AuthStatus(is_authorized={self.is_authorized}, is_logged_in={self.is_logged_in}, missing_permissions={self.missing_permissions})"
class KeycloakOpenID:
@ -431,3 +449,103 @@ class KeycloakOpenID:
permissions += policy.permissions
return list(set(permissions))
def UMA_permissions(self, token, permissions, audience=SAME_AS_CLIENT, response_mode="permissions"):
"""
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param token: user token
:param permissions:
:return: permissions list
"""
if audience is SAME_AS_CLIENT:
audience = self.client_id
permission = build_permission_param(permissions)
params_path = {"realm-name": self.realm_name}
payload = {"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", "permission": permission,
"response_mode": response_mode, "audience": audience, "Authorization": "Bearer "+token}
self.connection.add_param_headers("Authorization", "Bearer " + token)
try:
data_raw = self.connection.raw_post(
URL_TOKEN.format(**params_path), data=payload)
finally:
self.connection.del_param_headers("Authorization")
return raise_error_from_response(data_raw, KeycloakGetError)
def has_UMA_access(self, token, permissions):
"""
Get permission by user token
:param token: user token
:param permissions: dict(resource:scope)
:return: result bool
"""
needed = build_permission_param(permissions)
try:
granted = self.UMA_permissions(token, permissions)
except (KeycloakGetError, KeycloakAuthenticationError) as e:
if e.response_code == 403:
return AuthStatus(is_logged_in=True, is_authorized=False, missing_permissions=needed)
elif e.response_code == 401:
return AuthStatus(is_logged_in=False, is_authorized=False, missing_permissions=needed)
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes:
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed)
def build_permission_param(permissions):
"""
Transform permissions to a set, so they are usable for requests
:param permissions: either str (resource#scope),
iterable[str] (resource#scope),
dict[str,str] (resource: scope),
dict[str,iterable[str]] (resource: scopes)
:return: result bool
"""
if permissions is None or permissions == "":
return set()
if isinstance(permissions, str):
return set((permissions,))
try: # treat as dictionary of permissions
result = set()
for resource, scopes in permissions.items():
if scopes is None:
result.add(resource)
elif isinstance(scopes, str):
result.add("{}#{}".format(resource, scopes))
else:
for scope in scopes:
if not isinstance(scope, str):
raise KeycloakPermissionFormatError(
'misbuilt permission {}'.format(permissions))
result.add("{}#{}".format(resource, scope))
return result
except AttributeError:
pass
try: # treat as any other iterable of permissions
return set(permissions)
except TypeError:
pass
raise KeycloakPermissionFormatError(
'misbuilt permission {}'.format(permissions))

87
keycloak/tests/test_openid.py

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..keycloak_openid import *
from ..exceptions import *
try:
import unittest
except ImportError:
import unittest2 as unittest
from collections import namedtuple
class Success(Exception):
"""Used as stand-in for an actual exception for tests that are meant to succeed.
This exception should never be raised."""
class TestOpenID(unittest.TestCase):
def test_build_permission_param(self):
test = namedtuple("test",
["name", "permission", "result", "error"])
tests = [
test("None", None, set(), Success),
test("empty str", "", set(), Success),
test("empty list", [], set(), Success),
test("empty tuple", (), set(), Success),
test("empty set", set(), set(), Success),
test("empty dict", {}, set(), Success),
test("str", "resource1", {"resource1"}, Success),
test("list[str]", ["res1#scope1", "res1#scope2"],
{"res1#scope1", "res1#scope2"}, Success),
test("tuple[str]", ("res1#scope1", "res1#scope2"),
{"res1#scope1", "res1#scope2"}, Success),
test("set[str]", {"res1#scope1", "res1#scope2"},
{"res1#scope1", "res1#scope2"}, Success),
test("dict[str,str]", {"res1": "scope1"},
{"res1#scope1"}, Success),
test("dict[str,list[str]]", {"res1": ["scope1", "scope2"]},
{"res1#scope1", "res1#scope2"}, Success),
test("dict[str,list[str]] 2", {"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]},
{"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}, Success),
test("misbuilt: dict[str,list[list[str]]]", {
"res1": [["scope1", "scope2"]]}, None, KeycloakPermissionFormatError),
test("misbuilt: list[list[str]]",
[["scope1", "scope2"]], None, KeycloakPermissionFormatError),
test("misbuilt: list[set[str]]",
[{"scope1", "scope2"}], None, KeycloakPermissionFormatError),
test("misbuilt: set[set[str]]",
[{"scope1"}], None, KeycloakPermissionFormatError),
]
for case in tests:
with self.subTest(case.name):
msg = f'in case "{case.name}"'
try:
if not case.error is Success:
with self.assertRaises(case.error, msg=msg):
build_permission_param(case.permission)
else:
self.assertEqual(
build_permission_param(case.permission),
case.result, msg=msg)
except AssertionError:
raise
except Exception as e:
self.fail(
f'unexpected exception "{e}": {msg}')
Loading…
Cancel
Save