Browse Source

fix: added fixes based on feedback

pull/322/head
Jackson Kwok 3 years ago
parent
commit
8c3b1b62ca
No known key found for this signature in database GPG Key ID: BB88FE9481A0273D
  1. 4
      README.md
  2. 35
      src/keycloak/keycloak_openid.py
  3. 83
      src/keycloak/uma_permissions.py
  4. 43
      tests/test_uma_permissions.py

4
README.md

@ -114,6 +114,10 @@ permissions = keycloak_openid.get_permissions(token['access_token'], method_toke
token = keycloak_openid.token("user", "password") token = keycloak_openid.token("user", "password")
permissions = keycloak_openid.uma_permissions(token['access_token']) permissions = keycloak_openid.uma_permissions(token['access_token'])
# Get UMA-permissions by token with specific resource and scope requested
token = keycloak_openid.token("user", "password")
permissions = keycloak_openid.uma_permissions(token['access_token'], permissions="Resource#Scope")
# Get auth status for a specific resource and scope by token # Get auth status for a specific resource and scope by token
token = keycloak_openid.token("user", "password") token = keycloak_openid.token("user", "password")
auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope") auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope")

35
src/keycloak/keycloak_openid.py

@ -33,6 +33,7 @@ from .exceptions import (
KeycloakDeprecationError, KeycloakDeprecationError,
KeycloakGetError, KeycloakGetError,
KeycloakInvalidTokenError, KeycloakInvalidTokenError,
KeycloakPostError,
KeycloakRPTNotFound, KeycloakRPTNotFound,
raise_error_from_response, raise_error_from_response,
) )
@ -49,8 +50,6 @@ from .urls_patterns import (
URL_WELL_KNOWN, URL_WELL_KNOWN,
) )
SAME_AS_CLIENT = object()
class KeycloakOpenID: class KeycloakOpenID:
""" """
@ -457,55 +456,47 @@ class KeycloakOpenID:
return list(set(permissions)) return list(set(permissions))
def uma_permissions(
self, token, permissions, audience=SAME_AS_CLIENT, response_mode="permissions"
):
def uma_permissions(self, token, permissions=""):
""" """
Get UMA permissions by user token with requested permissions
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients. invoked by confidential clients.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param token: user token :param token: user token
:param permissions:
:param permissions: list of uma permissions list(resource:scope) requested by the user
:return: permissions list :return: permissions list
""" """
if audience is SAME_AS_CLIENT:
audience = self.client_id
permission = build_permission_param(permissions) permission = build_permission_param(permissions)
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
payload = { payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": permission, "permission": permission,
"response_mode": response_mode,
"audience": audience,
"Authorization": "Bearer " + token,
"response_mode": "permissions",
"audience": self.client_id,
} }
self.connection.add_param_headers("Authorization", "Bearer " + token) self.connection.add_param_headers("Authorization", "Bearer " + token)
try:
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
finally:
self.connection.del_param_headers("Authorization")
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
return raise_error_from_response(data_raw, KeycloakPostError)
def has_uma_access(self, token, permissions): def has_uma_access(self, token, permissions):
""" """
Get permission by user token
Determine whether user has uma permissions with specified user token
:param token: user token :param token: user token
:param permissions: dict(resource:scope)
:return: result bool
:param permissions: list of uma permissions (resource:scope)
:return: auth status
""" """
needed = build_permission_param(permissions) needed = build_permission_param(permissions)
try: try:
granted = self.uma_permissions(token, permissions) granted = self.uma_permissions(token, permissions)
except (KeycloakGetError, KeycloakAuthenticationError) as e:
except (KeycloakPostError, KeycloakAuthenticationError) as e:
if e.response_code == 403: if e.response_code == 403:
return AuthStatus( return AuthStatus(
is_logged_in=True, is_authorized=False, missing_permissions=needed is_logged_in=True, is_authorized=False, missing_permissions=needed

83
src/keycloak/uma_permissions.py

@ -38,10 +38,20 @@ class UMAPermission:
""" """
def __init__(self, *, resource="", scope=""):
def __init__(self, permission=None, resource="", scope=""):
self.resource = resource self.resource = resource
self.scope = scope self.scope = scope
if permission:
if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
if permission.resource:
self.resource = str(permission.resource)
if permission.scope:
self.scope = str(permission.scope)
def __str__(self): def __str__(self):
scope = self.scope scope = self.scope
if scope: if scope:
@ -57,41 +67,50 @@ class UMAPermission:
def __hash__(self) -> int: def __hash__(self) -> int:
return hash(str(self)) return hash(str(self))
def __call__(self, *args, resource="", scope="") -> object:
def __call__(self, permission=None, resource="", scope="") -> object:
result_resource = self.resource result_resource = self.resource
result_scope = self.scope result_scope = self.scope
for arg in args:
if not isinstance(arg, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(arg)
)
if arg.resource:
result_resource = str(arg.resource)
if arg.scope:
result_scope = str(arg.scope)
if resource: if resource:
result_resource = str(resource) result_resource = str(resource)
if scope: if scope:
result_scope = str(scope) result_scope = str(scope)
if permission:
if not isinstance(permission, UMAPermission):
raise PermissionDefinitionError(
"can't determine if '{}' is a resource or scope".format(permission)
)
if permission.resource:
result_resource = str(permission.resource)
if permission.scope:
result_scope = str(permission.scope)
return UMAPermission(resource=result_resource, scope=result_scope) return UMAPermission(resource=result_resource, scope=result_scope)
class Resource(UMAPermission): class Resource(UMAPermission):
"""An UMAPermission Resource class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
"""
def __init__(self, resource): def __init__(self, resource):
super().__init__(resource=resource) super().__init__(resource=resource)
class Scope(UMAPermission): class Scope(UMAPermission):
"""An UMAPermission Scope class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
"""
def __init__(self, scope): def __init__(self, scope):
super().__init__(scope=scope) super().__init__(scope=scope)
class AuthStatus: class AuthStatus:
"""A class that represents the authorization/login status of a user associated with a token. """A class that represents the authorization/login status of a user associated with a token.
This has to evaluate to True if and only if the user is properly authorized for the requested resource."""
This has to evaluate to True if and only if the user is properly authorized
for the requested resource."""
def __init__(self, is_logged_in, is_authorized, missing_permissions): def __init__(self, is_logged_in, is_authorized, missing_permissions):
self.is_logged_in = is_logged_in self.is_logged_in = is_logged_in
@ -102,7 +121,12 @@ class AuthStatus:
return self.is_authorized return self.is_authorized
def __repr__(self): def __repr__(self):
return f"AuthStatus(is_authorized={self.is_authorized}, is_logged_in={self.is_logged_in}, missing_permissions={self.missing_permissions})"
return (
f"AuthStatus("
f"is_authorized={self.is_authorized}, "
f"is_logged_in={self.is_logged_in}, "
f"missing_permissions={self.missing_permissions})"
)
def build_permission_param(permissions): def build_permission_param(permissions):
@ -117,29 +141,42 @@ def build_permission_param(permissions):
""" """
if permissions is None or permissions == "": if permissions is None or permissions == "":
return set() return set()
if isinstance(permissions, (str, UMAPermission)):
if isinstance(permissions, str):
return set((permissions,)) return set((permissions,))
if isinstance(permissions, UMAPermission):
return set((str(permissions),))
try: # treat as dictionary of permissions try: # treat as dictionary of permissions
result = set() result = set()
for resource, scopes in permissions.items(): for resource, scopes in permissions.items():
print(f"resource={resource}scopes={scopes}")
if scopes is None: if scopes is None:
result.add(resource) result.add(resource)
elif isinstance(scopes, (str, UMAPermission)):
elif isinstance(scopes, str):
result.add("{}#{}".format(resource, scopes)) result.add("{}#{}".format(resource, scopes))
else: else:
for scope in scopes:
if not isinstance(scope, (str, UMAPermission)):
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
result.add("{}#{}".format(resource, scope))
try:
for scope in scopes:
if not isinstance(scope, str):
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
result.add("{}#{}".format(resource, scope))
except TypeError:
raise KeycloakPermissionFormatError(
"misbuilt permission {}".format(permissions)
)
return result return result
except AttributeError: except AttributeError:
pass pass
try: # treat as any other iterable of permissions try: # treat as any other iterable of permissions
return set(permissions)
result = set()
for permission in permissions:
if not isinstance(permission, (str, UMAPermission)):
raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions))
result.add(str(permission))
return result
except TypeError: except TypeError:
pass pass
raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions)) raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions))

43
tests/test_uma_permissions.py

@ -14,11 +14,12 @@
# #
# You should have received a copy of the GNU Lesser General Public License # You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import re import re
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
from keycloak.uma_permissions import build_permission_param, Resource, Scope
import pytest
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
from keycloak.uma_permissions import Resource, Scope, build_permission_param
def test_resource_with_scope_obj(): def test_resource_with_scope_obj():
@ -45,24 +46,12 @@ def test_scope_resource_str():
assert s(resource=r) == "Resource1#Scope1" assert s(resource=r) == "Resource1#Scope1"
def test_resource_scope_dict():
r = Resource("Resource1")
s = {"scope": "Scope1"}
assert r(**s) == "Resource1#Scope1"
def test_scope_resource_dict():
r = {"resource": "Resource1"}
s = Scope("Scope1")
assert s(**r) == "Resource1#Scope1"
def test_resource_scope_list(): def test_resource_scope_list():
r = Resource("Resource1") r = Resource("Resource1")
s = ["Scope1"] s = ["Scope1"]
with pytest.raises(PermissionDefinitionError) as err: with pytest.raises(PermissionDefinitionError) as err:
r(*s)
assert err.match("can't determine if 'Scope1' is a resource or scope")
r(s)
assert err.match(re.escape("can't determine if '['Scope1']' is a resource or scope"))
def test_build_permission_none(): def test_build_permission_none():
@ -119,6 +108,16 @@ def test_build_permission_tuple_dict_str_list_str2():
) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"} ) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}
def test_build_permission_uma():
assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"}
def test_build_permission_uma_list():
assert build_permission_param(
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))]
) == {"res1#scope1", "res1#scope2"}
def test_build_permission_misbuilt_dict_str_list_list_str(): def test_build_permission_misbuilt_dict_str_list_list_str():
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": [["scope1", "scope2"]]}) build_permission_param({"res1": [["scope1", "scope2"]]})
@ -127,17 +126,23 @@ def test_build_permission_misbuilt_dict_str_list_list_str():
def test_build_permission_misbuilt_list_list_str(): def test_build_permission_misbuilt_list_list_str():
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([["scope1", "scope2"]])
print(build_permission_param([["scope1", "scope2"]]))
assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]")) assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]"))
def test_build_permission_misbuilt_list_set_str(): def test_build_permission_misbuilt_list_set_str():
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1", "scope2"}]) build_permission_param([{"scope1", "scope2"}])
assert err.match(re.escape("misbuilt permission [{'scope1', 'scope2'}]"))
assert err.match("misbuilt permission.*")
def test_build_permission_misbuilt_set_set_str(): def test_build_permission_misbuilt_set_set_str():
with pytest.raises(KeycloakPermissionFormatError) as err: with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1"}]) build_permission_param([{"scope1"}])
assert err.match(re.escape("misbuilt permission [{'scope1'}]")) assert err.match(re.escape("misbuilt permission [{'scope1'}]"))
def test_build_permission_misbuilt_dict_non_iterable():
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": 5})
assert err.match(re.escape("misbuilt permission {'res1': 5}"))
Loading…
Cancel
Save