Browse Source

Merge pull request #322 from knackjax/authz

Authz
pull/324/head v0.29.0
Richard Nemeth 3 years ago
committed by GitHub
parent
commit
f08ec55da3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      README.md
  2. 8
      src/keycloak/exceptions.py
  3. 67
      src/keycloak/keycloak_openid.py
  4. 182
      src/keycloak/uma_permissions.py
  5. 148
      tests/test_uma_permissions.py

13
README.md

@ -110,6 +110,19 @@ keycloak_openid.load_authorization_config("example-authz-config.json")
policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY) policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY)
permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect') permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect')
# Get UMA-permissions by token
token = keycloak_openid.token("user", "password")
permissions = keycloak_openid.uma_permissions(token['access_token'])
# Get UMA-permissions by token with specific resource and scope requested
token = keycloak_openid.token("user", "password")
permissions = keycloak_openid.uma_permissions(token['access_token'], permissions="Resource#Scope")
# Get auth status for a specific resource and scope by token
token = keycloak_openid.token("user", "password")
auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope")
# KEYCLOAK ADMIN # KEYCLOAK ADMIN
from keycloak import KeycloakAdmin from keycloak import KeycloakAdmin

8
src/keycloak/exceptions.py

@ -88,6 +88,14 @@ class KeycloakInvalidTokenError(KeycloakOperationError):
pass pass
class KeycloakPermissionFormatError(KeycloakOperationError):
pass
class PermissionDefinitionError(Exception):
pass
def raise_error_from_response(response, error, expected_codes=None, skip_exists=False): def raise_error_from_response(response, error, expected_codes=None, skip_exists=False):
if expected_codes is None: if expected_codes is None:
expected_codes = [200, 201, 204] expected_codes = [200, 201, 204]

67
src/keycloak/keycloak_openid.py

@ -28,13 +28,16 @@ from jose import jwt
from .authorization import Authorization from .authorization import Authorization
from .connection import ConnectionManager from .connection import ConnectionManager
from .exceptions import ( from .exceptions import (
KeycloakAuthenticationError,
KeycloakAuthorizationConfigError, KeycloakAuthorizationConfigError,
KeycloakDeprecationError, KeycloakDeprecationError,
KeycloakGetError, KeycloakGetError,
KeycloakInvalidTokenError, KeycloakInvalidTokenError,
KeycloakPostError,
KeycloakRPTNotFound, KeycloakRPTNotFound,
raise_error_from_response, raise_error_from_response,
) )
from .uma_permissions import AuthStatus, build_permission_param
from .urls_patterns import ( from .urls_patterns import (
URL_AUTH, URL_AUTH,
URL_CERTS, URL_CERTS,
@ -452,3 +455,67 @@ class KeycloakOpenID:
permissions += policy.permissions permissions += policy.permissions
return list(set(permissions)) return list(set(permissions))
def uma_permissions(self, token, permissions=""):
"""
Get UMA permissions by user token with requested permissions
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param token: user token
:param permissions: list of uma permissions list(resource:scope) requested by the user
:return: permissions list
"""
permission = build_permission_param(permissions)
params_path = {"realm-name": self.realm_name}
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": permission,
"response_mode": "permissions",
"audience": self.client_id,
}
self.connection.add_param_headers("Authorization", "Bearer " + token)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def has_uma_access(self, token, permissions):
"""
Determine whether user has uma permissions with specified user token
:param token: user token
:param permissions: list of uma permissions (resource:scope)
:return: auth status
"""
needed = build_permission_param(permissions)
try:
granted = self.uma_permissions(token, permissions)
except (KeycloakPostError, KeycloakAuthenticationError) as e:
if e.response_code == 403:
return AuthStatus(
is_logged_in=True, is_authorized=False, missing_permissions=needed
)
elif e.response_code == 401:
return AuthStatus(
is_logged_in=False, is_authorized=False, missing_permissions=needed
)
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes:
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
)

182
src/keycloak/uma_permissions.py

@ -0,0 +1,182 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
class UMAPermission:
"""A class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
Usage example:
>>> r = Resource("Users")
>>> s = Scope("delete")
>>> permission = r(s)
>>> print(permission)
'Users#delete'
"""
def __init__(self, permission=None, resource="", scope=""):
self.resource = resource
self.scope = scope
if permission:
if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
if permission.resource:
self.resource = str(permission.resource)
if permission.scope:
self.scope = str(permission.scope)
def __str__(self):
scope = self.scope
if scope:
scope = "#" + scope
return "{}{}".format(self.resource, scope)
def __eq__(self, __o: object) -> bool:
return str(self) == str(__o)
def __repr__(self) -> str:
return self.__str__()
def __hash__(self) -> int:
return hash(str(self))
def __call__(self, permission=None, resource="", scope="") -> object:
result_resource = self.resource
result_scope = self.scope
if resource:
result_resource = str(resource)
if scope:
result_scope = str(scope)
if permission:
if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
if permission.resource:
result_resource = str(permission.resource)
if permission.scope:
result_scope = str(permission.scope)
return UMAPermission(resource=result_resource, scope=result_scope)
class Resource(UMAPermission):
"""An UMAPermission Resource class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
"""
def __init__(self, resource):
super().__init__(resource=resource)
class Scope(UMAPermission):
"""An UMAPermission Scope class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
"""
def __init__(self, scope):
super().__init__(scope=scope)
class AuthStatus:
"""A class that represents the authorization/login status of a user associated with a token.
This has to evaluate to True if and only if the user is properly authorized
for the requested resource."""
def __init__(self, is_logged_in, is_authorized, missing_permissions):
self.is_logged_in = is_logged_in
self.is_authorized = is_authorized
self.missing_permissions = missing_permissions
def __bool__(self):
return self.is_authorized
def __repr__(self):
return (
f"AuthStatus("
f"is_authorized={self.is_authorized}, "
f"is_logged_in={self.is_logged_in}, "
f"missing_permissions={self.missing_permissions})"
)
def build_permission_param(permissions):
"""
Transform permissions to a set, so they are usable for requests
:param permissions: either str (resource#scope),
iterable[str] (resource#scope),
dict[str,str] (resource: scope),
dict[str,iterable[str]] (resource: scopes)
:return: result bool
"""
if permissions is None or permissions == "":
return set()
if isinstance(permissions, str):
return set((permissions,))
if isinstance(permissions, UMAPermission):
return set((str(permissions),))
try: # treat as dictionary of permissions
result = set()
for resource, scopes in permissions.items():
print(f"resource={resource}scopes={scopes}")
if scopes is None:
result.add(resource)
elif isinstance(scopes, str):
result.add("{}#{}".format(resource, scopes))
else:
try:
for scope in scopes:
if not isinstance(scope, str):
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
result.add("{}#{}".format(resource, scope))
except TypeError:
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
return result
except AttributeError:
pass
try: # treat as any other iterable of permissions
result = set()
for permission in permissions:
if not isinstance(permission, (str, UMAPermission)):
raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions))
result.add(str(permission))
return result
except TypeError:
pass
raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions))

148
tests/test_uma_permissions.py

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import pytest
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
from keycloak.uma_permissions import Resource, Scope, build_permission_param
def test_resource_with_scope_obj():
r = Resource("Resource1")
s = Scope("Scope1")
assert r(s) == "Resource1#Scope1"
def test_scope_with_resource_obj():
r = Resource("Resource1")
s = Scope("Scope1")
assert s(r) == "Resource1#Scope1"
def test_resource_scope_str():
r = Resource("Resource1")
s = "Scope1"
assert r(scope=s) == "Resource1#Scope1"
def test_scope_resource_str():
r = "Resource1"
s = Scope("Scope1")
assert s(resource=r) == "Resource1#Scope1"
def test_resource_scope_list():
r = Resource("Resource1")
s = ["Scope1"]
with pytest.raises(PermissionDefinitionError) as err:
r(s)
assert err.match(re.escape("can't determine if '['Scope1']' is a resource or scope"))
def test_build_permission_none():
assert build_permission_param(None) == set()
def test_build_permission_empty_str():
assert build_permission_param("") == set()
def test_build_permission_empty_list():
assert build_permission_param([]) == set()
def test_build_permission_empty_tuple():
assert build_permission_param(()) == set()
def test_build_permission_empty_set():
assert build_permission_param(set()) == set()
def test_build_permission_empty_dict():
assert build_permission_param({}) == set()
def test_build_permission_str():
assert build_permission_param("resource1") == {"resource1"}
def test_build_permission_list_str():
assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_str():
assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"}
def test_build_permission_set_str():
assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_str():
assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"}
def test_build_permission_tuple_dict_str_list_str():
assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_list_str2():
assert build_permission_param(
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]}
) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}
def test_build_permission_uma():
assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"}
def test_build_permission_uma_list():
assert build_permission_param(
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))]
) == {"res1#scope1", "res1#scope2"}
def test_build_permission_misbuilt_dict_str_list_list_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": [["scope1", "scope2"]]})
assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}"))
def test_build_permission_misbuilt_list_list_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
print(build_permission_param([["scope1", "scope2"]]))
assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]"))
def test_build_permission_misbuilt_list_set_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1", "scope2"}])
assert err.match("misbuilt permission.*")
def test_build_permission_misbuilt_set_set_str():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1"}])
assert err.match(re.escape("misbuilt permission [{'scope1'}]"))
def test_build_permission_misbuilt_dict_non_iterable():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": 5})
assert err.match(re.escape("misbuilt permission {'res1': 5}"))
Loading…
Cancel
Save