Browse Source

feat: added UMA-permission request functionality

pull/322/head
Merle Nerger 3 years ago
committed by Jackson Kwok
parent
commit
8dafb4ec30
No known key found for this signature in database GPG Key ID: BB88FE9481A0273D
  1. 9
      README.md
  2. 8
      src/keycloak/exceptions.py
  3. 76
      src/keycloak/keycloak_openid.py
  4. 145
      src/keycloak/uma_permissions.py
  5. 143
      tests/test_uma_permissions.py

9
README.md

@ -110,6 +110,15 @@ keycloak_openid.load_authorization_config("example-authz-config.json")
policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY)
permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect')
# Get UMA-permissions by token
token = keycloak_openid.token("user", "password")
permissions = keycloak_openid.uma_permissions(token['access_token'])
# Get auth status for a specific resource and scope by token
token = keycloak_openid.token("user", "password")
auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope")
# KEYCLOAK ADMIN
from keycloak import KeycloakAdmin

8
src/keycloak/exceptions.py

@ -88,6 +88,14 @@ class KeycloakInvalidTokenError(KeycloakOperationError):
pass
class KeycloakPermissionFormatError(KeycloakOperationError):
pass
class PermissionDefinitionError(Exception):
pass
def raise_error_from_response(response, error, expected_codes=None, skip_exists=False):
if expected_codes is None:
expected_codes = [200, 201, 204]

76
src/keycloak/keycloak_openid.py

@ -28,6 +28,7 @@ from jose import jwt
from .authorization import Authorization
from .connection import ConnectionManager
from .exceptions import (
KeycloakAuthenticationError,
KeycloakAuthorizationConfigError,
KeycloakDeprecationError,
KeycloakGetError,
@ -35,6 +36,7 @@ from .exceptions import (
KeycloakRPTNotFound,
raise_error_from_response,
)
from .uma_permissions import AuthStatus, build_permission_param
from .urls_patterns import (
URL_AUTH,
URL_CERTS,
@ -47,6 +49,8 @@ from .urls_patterns import (
URL_WELL_KNOWN,
)
SAME_AS_CLIENT = object()
class KeycloakOpenID:
"""
@ -452,3 +456,75 @@ class KeycloakOpenID:
permissions += policy.permissions
return list(set(permissions))
def uma_permissions(
self, token, permissions, audience=SAME_AS_CLIENT, response_mode="permissions"
):
"""
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param token: user token
:param permissions:
:return: permissions list
"""
if audience is SAME_AS_CLIENT:
audience = self.client_id
permission = build_permission_param(permissions)
params_path = {"realm-name": self.realm_name}
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": permission,
"response_mode": response_mode,
"audience": audience,
"Authorization": "Bearer " + token,
}
self.connection.add_param_headers("Authorization", "Bearer " + token)
try:
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
finally:
self.connection.del_param_headers("Authorization")
return raise_error_from_response(data_raw, KeycloakGetError)
def has_uma_access(self, token, permissions):
"""
Get permission by user token
:param token: user token
:param permissions: dict(resource:scope)
:return: result bool
"""
needed = build_permission_param(permissions)
try:
granted = self.uma_permissions(token, permissions)
except (KeycloakGetError, KeycloakAuthenticationError) as e:
if e.response_code == 403:
return AuthStatus(
is_logged_in=True, is_authorized=False, missing_permissions=needed
)
elif e.response_code == 401:
return AuthStatus(
is_logged_in=False, is_authorized=False, missing_permissions=needed
)
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes:
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
)

145
src/keycloak/uma_permissions.py

@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
class UMAPermission:
"""A class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
Usage example:
>>> r = Resource("Users")
>>> s = Scope("delete")
>>> permission = r(s)
>>> print(permission)
'Users#delete'
"""
def __init__(self, *, resource="", scope=""):
self.resource = resource
self.scope = scope
def __str__(self):
scope = self.scope
if scope:
scope = "#" + scope
return "{}{}".format(self.resource, scope)
def __eq__(self, __o: object) -> bool:
return str(self) == str(__o)
def __repr__(self) -> str:
return self.__str__()
def __hash__(self) -> int:
return hash(str(self))
def __call__(self, *args, resource="", scope="") -> object:
result_resource = self.resource
result_scope = self.scope
for arg in args:
if not isinstance(arg, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(arg)
)
if arg.resource:
result_resource = str(arg.resource)
if arg.scope:
result_scope = str(arg.scope)
if resource:
result_resource = str(resource)
if scope:
result_scope = str(scope)
return UMAPermission(resource=result_resource, scope=result_scope)
class Resource(UMAPermission):
def __init__(self, resource):
super().__init__(resource=resource)
class Scope(UMAPermission):
def __init__(self, scope):
super().__init__(scope=scope)
class AuthStatus:
"""A class that represents the authorization/login status of a user associated with a token.
This has to evaluate to True if and only if the user is properly authorized for the requested resource."""
def __init__(self, is_logged_in, is_authorized, missing_permissions):
self.is_logged_in = is_logged_in
self.is_authorized = is_authorized
self.missing_permissions = missing_permissions
def __bool__(self):
return self.is_authorized
def __repr__(self):
return f"AuthStatus(is_authorized={self.is_authorized}, is_logged_in={self.is_logged_in}, missing_permissions={self.missing_permissions})"
def build_permission_param(permissions):
"""
Transform permissions to a set, so they are usable for requests
:param permissions: either str (resource#scope),
iterable[str] (resource#scope),
dict[str,str] (resource: scope),
dict[str,iterable[str]] (resource: scopes)
:return: result bool
"""
if permissions is None or permissions == "":
return set()
if isinstance(permissions, (str, UMAPermission)):
return set((permissions,))
try: # treat as dictionary of permissions
result = set()
for resource, scopes in permissions.items():
if scopes is None:
result.add(resource)
elif isinstance(scopes, (str, UMAPermission)):
result.add("{}#{}".format(resource, scopes))
else:
for scope in scopes:
if not isinstance(scope, (str, UMAPermission)):
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
result.add("{}#{}".format(resource, scope))
return result
except AttributeError:
pass
try: # treat as any other iterable of permissions
return set(permissions)
except TypeError:
pass
raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions))

143
tests/test_uma_permissions.py

@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import re
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
from keycloak.uma_permissions import build_permission_param, Resource, Scope
def test_resource_with_scope_obj():
r = Resource("Resource1")
s = Scope("Scope1")
assert r(s) == "Resource1#Scope1"
def test_scope_with_resource_obj():
r = Resource("Resource1")
s = Scope("Scope1")
assert s(r) == "Resource1#Scope1"
def test_resource_scope_str():
r = Resource("Resource1")
s = "Scope1"
assert r(scope=s) == "Resource1#Scope1"
def test_scope_resource_str():
r = "Resource1"
s = Scope("Scope1")
assert s(resource=r) == "Resource1#Scope1"
def test_resource_scope_dict():
r = Resource("Resource1")
s = {"scope": "Scope1"}
assert r(**s) == "Resource1#Scope1"
def test_scope_resource_dict():
r = {"resource": "Resource1"}
s = Scope("Scope1")
assert s(**r) == "Resource1#Scope1"
def test_resource_scope_list():
r = Resource("Resource1")
s = ["Scope1"]
with pytest.raises(PermissionDefinitionError) as err:
r(*s)
assert err.match("can't determine if 'Scope1' is a resource or scope")
def test_build_permission_none():
assert build_permission_param(None) == set()
def test_build_permission_empty_str():
assert build_permission_param("") == set()
def test_build_permission_empty_list():
assert build_permission_param([]) == set()
def test_build_permission_empty_tuple():
assert build_permission_param(()) == set()
def test_build_permission_empty_set():
assert build_permission_param(set()) == set()
def test_build_permission_empty_dict():
assert build_permission_param({}) == set()
def test_build_permission_str():
assert build_permission_param("resource1") == {"resource1"}
def test_build_permission_list_str():
assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_str():
assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"}
def test_build_permission_set_str():
assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_str():
assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"}
def test_build_permission_tuple_dict_str_list_str():
assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_list_str2():
assert build_permission_param(
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]}
) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}
def test_build_permission_misbuilt_dict_str_list_list_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": [["scope1", "scope2"]]})
assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}"))
def test_build_permission_misbuilt_list_list_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([["scope1", "scope2"]])
assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]"))
def test_build_permission_misbuilt_list_set_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1", "scope2"}])
assert err.match(re.escape("misbuilt permission [{'scope1', 'scope2'}]"))
def test_build_permission_misbuilt_set_set_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1"}])
assert err.match(re.escape("misbuilt permission [{'scope1'}]"))
Loading…
Cancel
Save