diff --git a/README.md b/README.md index 42a4824..692ce48 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,10 @@ permissions = keycloak_openid.get_permissions(token['access_token'], method_toke token = keycloak_openid.token("user", "password") permissions = keycloak_openid.uma_permissions(token['access_token']) +# Get UMA-permissions by token with specific resource and scope requested +token = keycloak_openid.token("user", "password") +permissions = keycloak_openid.uma_permissions(token['access_token'], permissions="Resource#Scope") + # Get auth status for a specific resource and scope by token token = keycloak_openid.token("user", "password") auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope") diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index e4378da..f0b73a6 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -33,6 +33,7 @@ from .exceptions import ( KeycloakDeprecationError, KeycloakGetError, KeycloakInvalidTokenError, + KeycloakPostError, KeycloakRPTNotFound, raise_error_from_response, ) @@ -49,8 +50,6 @@ from .urls_patterns import ( URL_WELL_KNOWN, ) -SAME_AS_CLIENT = object() - class KeycloakOpenID: """ @@ -457,55 +456,47 @@ class KeycloakOpenID: return list(set(permissions)) - def uma_permissions( - self, token, permissions, audience=SAME_AS_CLIENT, response_mode="permissions" - ): + def uma_permissions(self, token, permissions=""): """ + Get UMA permissions by user token with requested permissions + The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be invoked by confidential clients. http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint - :param token: user token - :param permissions: + :param permissions: list of uma permissions list(resource:scope) requested by the user :return: permissions list """ - if audience is SAME_AS_CLIENT: - audience = self.client_id - permission = build_permission_param(permissions) params_path = {"realm-name": self.realm_name} payload = { "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", "permission": permission, - "response_mode": response_mode, - "audience": audience, - "Authorization": "Bearer " + token, + "response_mode": "permissions", + "audience": self.client_id, } self.connection.add_param_headers("Authorization", "Bearer " + token) - try: - data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) - finally: - self.connection.del_param_headers("Authorization") + data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) - return raise_error_from_response(data_raw, KeycloakGetError) + return raise_error_from_response(data_raw, KeycloakPostError) def has_uma_access(self, token, permissions): """ - Get permission by user token + Determine whether user has uma permissions with specified user token :param token: user token - :param permissions: dict(resource:scope) - :return: result bool + :param permissions: list of uma permissions (resource:scope) + :return: auth status """ needed = build_permission_param(permissions) try: granted = self.uma_permissions(token, permissions) - except (KeycloakGetError, KeycloakAuthenticationError) as e: + except (KeycloakPostError, KeycloakAuthenticationError) as e: if e.response_code == 403: return AuthStatus( is_logged_in=True, is_authorized=False, missing_permissions=needed diff --git a/src/keycloak/uma_permissions.py b/src/keycloak/uma_permissions.py index 5d023dc..5653c76 100644 --- a/src/keycloak/uma_permissions.py +++ b/src/keycloak/uma_permissions.py @@ -38,10 +38,20 @@ class UMAPermission: """ - def __init__(self, *, resource="", scope=""): + def __init__(self, permission=None, resource="", scope=""): self.resource = resource self.scope = scope + if permission: + if not isinstance(permission, UMAPermission): + raise PermissionDefinitionError( + "can't determine if '{}' is a resource or scope".format(permission) + ) + if permission.resource: + self.resource = str(permission.resource) + if permission.scope: + self.scope = str(permission.scope) + def __str__(self): scope = self.scope if scope: @@ -57,41 +67,50 @@ class UMAPermission: def __hash__(self) -> int: return hash(str(self)) - def __call__(self, *args, resource="", scope="") -> object: + def __call__(self, permission=None, resource="", scope="") -> object: result_resource = self.resource result_scope = self.scope - for arg in args: - if not isinstance(arg, UMAPermission): - raise PermissionDefinitionError( - "can't determine if '{}' is a resource or scope".format(arg) - ) - if arg.resource: - result_resource = str(arg.resource) - if arg.scope: - result_scope = str(arg.scope) - if resource: result_resource = str(resource) if scope: result_scope = str(scope) + if permission: + if not isinstance(permission, UMAPermission): + raise PermissionDefinitionError( + "can't determine if '{}' is a resource or scope".format(permission) + ) + if permission.resource: + result_resource = str(permission.resource) + if permission.scope: + result_scope = str(permission.scope) + return UMAPermission(resource=result_resource, scope=result_scope) class Resource(UMAPermission): + """An UMAPermission Resource class to conveniently assembly permissions. + The class itself is callable, and will return the assembled permission. + """ + def __init__(self, resource): super().__init__(resource=resource) class Scope(UMAPermission): + """An UMAPermission Scope class to conveniently assembly permissions. + The class itself is callable, and will return the assembled permission. + """ + def __init__(self, scope): super().__init__(scope=scope) class AuthStatus: """A class that represents the authorization/login status of a user associated with a token. - This has to evaluate to True if and only if the user is properly authorized for the requested resource.""" + This has to evaluate to True if and only if the user is properly authorized + for the requested resource.""" def __init__(self, is_logged_in, is_authorized, missing_permissions): self.is_logged_in = is_logged_in @@ -102,7 +121,12 @@ class AuthStatus: return self.is_authorized def __repr__(self): - return f"AuthStatus(is_authorized={self.is_authorized}, is_logged_in={self.is_logged_in}, missing_permissions={self.missing_permissions})" + return ( + f"AuthStatus(" + f"is_authorized={self.is_authorized}, " + f"is_logged_in={self.is_logged_in}, " + f"missing_permissions={self.missing_permissions})" + ) def build_permission_param(permissions): @@ -117,29 +141,42 @@ def build_permission_param(permissions): """ if permissions is None or permissions == "": return set() - if isinstance(permissions, (str, UMAPermission)): + if isinstance(permissions, str): return set((permissions,)) + if isinstance(permissions, UMAPermission): + return set((str(permissions),)) try: # treat as dictionary of permissions result = set() for resource, scopes in permissions.items(): + print(f"resource={resource}scopes={scopes}") if scopes is None: result.add(resource) - elif isinstance(scopes, (str, UMAPermission)): + elif isinstance(scopes, str): result.add("{}#{}".format(resource, scopes)) else: - for scope in scopes: - if not isinstance(scope, (str, UMAPermission)): - raise KeycloakPermissionFormatError( - "misbuilt permission {}".format(permissions) - ) - result.add("{}#{}".format(resource, scope)) + try: + for scope in scopes: + if not isinstance(scope, str): + raise KeycloakPermissionFormatError( + "misbuilt permission {}".format(permissions) + ) + result.add("{}#{}".format(resource, scope)) + except TypeError: + raise KeycloakPermissionFormatError( + "misbuilt permission {}".format(permissions) + ) return result except AttributeError: pass try: # treat as any other iterable of permissions - return set(permissions) + result = set() + for permission in permissions: + if not isinstance(permission, (str, UMAPermission)): + raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions)) + result.add(str(permission)) + return result except TypeError: pass raise KeycloakPermissionFormatError("misbuilt permission {}".format(permissions)) diff --git a/tests/test_uma_permissions.py b/tests/test_uma_permissions.py index d26830b..09d7147 100644 --- a/tests/test_uma_permissions.py +++ b/tests/test_uma_permissions.py @@ -14,11 +14,12 @@ # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import pytest import re -from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError -from keycloak.uma_permissions import build_permission_param, Resource, Scope +import pytest + +from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError +from keycloak.uma_permissions import Resource, Scope, build_permission_param def test_resource_with_scope_obj(): @@ -45,24 +46,12 @@ def test_scope_resource_str(): assert s(resource=r) == "Resource1#Scope1" -def test_resource_scope_dict(): - r = Resource("Resource1") - s = {"scope": "Scope1"} - assert r(**s) == "Resource1#Scope1" - - -def test_scope_resource_dict(): - r = {"resource": "Resource1"} - s = Scope("Scope1") - assert s(**r) == "Resource1#Scope1" - - def test_resource_scope_list(): r = Resource("Resource1") s = ["Scope1"] with pytest.raises(PermissionDefinitionError) as err: - r(*s) - assert err.match("can't determine if 'Scope1' is a resource or scope") + r(s) + assert err.match(re.escape("can't determine if '['Scope1']' is a resource or scope")) def test_build_permission_none(): @@ -119,6 +108,16 @@ def test_build_permission_tuple_dict_str_list_str2(): ) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"} +def test_build_permission_uma(): + assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"} + + +def test_build_permission_uma_list(): + assert build_permission_param( + [Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))] + ) == {"res1#scope1", "res1#scope2"} + + def test_build_permission_misbuilt_dict_str_list_list_str(): with pytest.raises(KeycloakPermissionFormatError) as err: build_permission_param({"res1": [["scope1", "scope2"]]}) @@ -127,17 +126,23 @@ def test_build_permission_misbuilt_dict_str_list_list_str(): def test_build_permission_misbuilt_list_list_str(): with pytest.raises(KeycloakPermissionFormatError) as err: - build_permission_param([["scope1", "scope2"]]) + print(build_permission_param([["scope1", "scope2"]])) assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]")) def test_build_permission_misbuilt_list_set_str(): with pytest.raises(KeycloakPermissionFormatError) as err: build_permission_param([{"scope1", "scope2"}]) - assert err.match(re.escape("misbuilt permission [{'scope1', 'scope2'}]")) + assert err.match("misbuilt permission.*") def test_build_permission_misbuilt_set_set_str(): with pytest.raises(KeycloakPermissionFormatError) as err: build_permission_param([{"scope1"}]) assert err.match(re.escape("misbuilt permission [{'scope1'}]")) + + +def test_build_permission_misbuilt_dict_non_iterable(): + with pytest.raises(KeycloakPermissionFormatError) as err: + build_permission_param({"res1": 5}) + assert err.match(re.escape("misbuilt permission {'res1': 5}"))