From 275b4504c1102cb065cfbcd33559638e77c42de1 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Tue, 28 Feb 2023 01:45:27 +0530 Subject: [PATCH] refactor: Refactor keycloak uma client to use openid connection manager --- src/keycloak/keycloak_uma.py | 77 +++++++++--------------------------- tests/conftest.py | 35 ++++++++++++---- tests/test_keycloak_uma.py | 51 +++++++++--------------- 3 files changed, 63 insertions(+), 100 deletions(-) diff --git a/src/keycloak/keycloak_uma.py b/src/keycloak/keycloak_uma.py index a066567..aaf57f7 100644 --- a/src/keycloak/keycloak_uma.py +++ b/src/keycloak/keycloak_uma.py @@ -29,7 +29,6 @@ https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html import json from urllib.parse import quote_plus -from .connection import ConnectionManager from .exceptions import ( KeycloakDeleteError, KeycloakGetError, @@ -37,50 +36,30 @@ from .exceptions import ( KeycloakPutError, raise_error_from_response, ) +from .keycloak_openid import KeycloakOpenIDConnectionManager from .urls_patterns import URL_UMA_WELL_KNOWN class KeycloakUMA: """Keycloak UMA client. - :param server_url: Keycloak server url - :param client_id: client id - :param realm_name: realm name - :param client_secret_key: client secret key - :param verify: True if want check connection SSL - :param custom_headers: dict of custom header to pass to each HTML request - :param proxies: dict of proxies to sent the request by. - :param timeout: connection timeout in seconds + :param connection: OpenID connection manager """ - def __init__( - self, server_url, realm_name, verify=True, custom_headers=None, proxies=None, timeout=60 - ): + def __init__(self, connection: KeycloakOpenIDConnectionManager): """Init method. - :param server_url: Keycloak server url - :type server_url: str - :param realm_name: realm name - :type realm_name: str - :param verify: True if want check connection SSL - :type verify: bool - :param custom_headers: dict of custom header to pass to each HTML request - :type custom_headers: dict - :param proxies: dict of proxies to sent the request by. - :type proxies: dict - :param timeout: connection timeout in seconds - :type timeout: int + :param connection: OpenID connection manager + :type connection: KeycloakOpenIDConnectionManager """ - self.realm_name = realm_name - headers = custom_headers if custom_headers is not None else dict() - headers.update({"Content-Type": "application/json"}) - self.connection = ConnectionManager( - base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies - ) + self.connection = connection + custom_headers = self.connection.custom_headers or {} + custom_headers.update({"Content-Type": "application/json"}) + self.connection.custom_headers = custom_headers self._well_known = None def _fetch_well_known(self): - params_path = {"realm-name": self.realm_name} + params_path = {"realm-name": self.connection.realm_name} data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) @@ -102,9 +81,6 @@ class KeycloakUMA: """ return url.format(**{k: quote_plus(v) for k, v in kwargs.items()}) - def _add_bearer_token_header(self, token): - self.connection.add_param_headers("Authorization", "Bearer " + token) - @property def uma_well_known(self): """Get the well_known UMA2 config. @@ -117,7 +93,7 @@ class KeycloakUMA: self._well_known = self._fetch_well_known() return self._well_known - def resource_set_create(self, token, payload): + def resource_set_create(self, payload): """Create a resource set. Spec @@ -126,20 +102,17 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :param payload: ResourceRepresentation :type payload: dict :return: ResourceRepresentation with the _id property assigned :rtype: dict """ - self._add_bearer_token_header(token) data_raw = self.connection.raw_post( self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) - def resource_set_update(self, token, resource_id, payload): + def resource_set_update(self, resource_id, payload): """Update a resource set. Spec @@ -148,8 +121,6 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :param resource_id: id of the resource :type resource_id: str :param payload: ResourceRepresentation @@ -157,14 +128,13 @@ class KeycloakUMA: :return: Response dict (empty) :rtype: dict """ - self._add_bearer_token_header(token) url = self.format_url( self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id ) data_raw = self.connection.raw_put(url, data=json.dumps(payload)) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) - def resource_set_read(self, token, resource_id): + def resource_set_read(self, resource_id): """Read a resource set. Spec @@ -173,56 +143,47 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :param resource_id: id of the resource :type resource_id: str :return: ResourceRepresentation :rtype: dict """ - self._add_bearer_token_header(token) url = self.format_url( self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id ) data_raw = self.connection.raw_get(url) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) - def resource_set_delete(self, token, resource_id): + def resource_set_delete(self, resource_id): """Delete a resource set. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set - :param token: client access token - :type token: str :param resource_id: id of the resource :type resource_id: str :return: Response dict (empty) :rtype: dict """ - self._add_bearer_token_header(token) url = self.format_url( self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id ) data_raw = self.connection.raw_delete(url) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - def resource_set_list_ids(self, token): + def resource_set_list_ids(self): """List all resource set ids. Spec https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets - :param token: client access token - :type token: str :return: List of ids :rtype: List[str] """ - self._add_bearer_token_header(token) data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"]) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) - def resource_set_list(self, token): + def resource_set_list(self): """List all resource sets. Spec @@ -231,11 +192,9 @@ class KeycloakUMA: ResourceRepresentation https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation - :param token: client access token - :type token: str :yields: Iterator over a list of ResourceRepresentations :rtype: Iterator[dict] """ - for resource_id in self.resource_set_list_ids(token): - resource = self.resource_set_read(token, resource_id) + for resource_id in self.resource_set_list_ids(): + resource = self.resource_set_read(resource_id) yield resource diff --git a/tests/conftest.py b/tests/conftest.py index 18cb6a3..d518c31 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ import ipaddress import os import uuid from datetime import datetime, timedelta +from typing import Tuple import pytest from cryptography import x509 @@ -13,6 +14,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakUMA +from keycloak.keycloak_openid import KeycloakOpenIDConnectionManager class KeycloakTestEnv(object): @@ -478,17 +480,34 @@ def selfsigned_cert(): @pytest.fixture -def uma(env: KeycloakTestEnv, realm: str): +def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): """Fixture for initialized KeycloakUMA class. - :param env: Keycloak test environment - :type env: KeycloakTestEnv - :param realm: Keycloak realm - :type realm: str + :param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + :yields: Keycloak OpenID connection manager + :rtype: KeycloakOpenIDConnectionManager + """ + oid, _, _ = oid_with_credentials_authz + connection = KeycloakOpenIDConnectionManager( + server_url=oid.connection.base_url, + realm_name=oid.realm_name, + client_id=oid.client_id, + client_secret_key=oid.client_secret_key, + timeout=60, + ) + yield connection + + +@pytest.fixture +def uma(oid_connection_with_authz: KeycloakOpenIDConnectionManager): + """Fixture for initialized KeycloakUMA class. + + :param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client + :type oid_connection_with_authz: KeycloakOpenIDConnectionManager :yields: Keycloak OpenID client :rtype: KeycloakOpenID """ + connection = oid_connection_with_authz # Return UMA - yield KeycloakUMA( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name=realm - ) + yield KeycloakUMA(connection=connection) diff --git a/tests/test_keycloak_uma.py b/tests/test_keycloak_uma.py index 2a1dde7..51ff3f3 100644 --- a/tests/test_keycloak_uma.py +++ b/tests/test_keycloak_uma.py @@ -1,32 +1,28 @@ """Test module for KeycloakUMA.""" import re -from typing import Tuple import pytest -from keycloak import KeycloakOpenID -from keycloak.connection import ConnectionManager from keycloak.exceptions import ( KeycloakDeleteError, KeycloakGetError, KeycloakPostError, KeycloakPutError, ) +from keycloak.keycloak_openid import KeycloakOpenIDConnectionManager from keycloak.keycloak_uma import KeycloakUMA -def test_keycloak_uma_init(env): +def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnectionManager): """Test KeycloakUMA's init method. - :param env: Environment fixture - :type env: KeycloakTestEnv + :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz + :type oid_connection_with_authz: KeycloakOpenIDConnectionManager """ - uma = KeycloakUMA( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name="master" - ) + connection = oid_connection_with_authz + uma = KeycloakUMA(connection=connection) - assert uma.realm_name == "master" - assert isinstance(uma.connection, ConnectionManager) + assert isinstance(uma.connection, KeycloakOpenIDConnectionManager) # should initially be empty assert uma._well_known is None assert uma.uma_well_known @@ -47,23 +43,14 @@ def test_uma_well_known(uma: KeycloakUMA): assert key in res -def test_uma_resource_sets( - uma: KeycloakUMA, oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] -): +def test_uma_resource_sets(uma: KeycloakUMA): """Test resource sets. :param uma: Keycloak UMA client :type uma: KeycloakUMA - :param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials - :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] """ - oid, _, _ = oid_with_credentials_authz - - token = oid.token(grant_type="client_credentials") - access_token = token["access_token"] - # Check that only the default resource is present - resource_sets = uma.resource_set_list(access_token) + resource_sets = uma.resource_set_list() resource_set_list = list(resource_sets) assert len(resource_set_list) == 1, resource_set_list assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] @@ -74,14 +61,14 @@ def test_uma_resource_sets( "scopes": ["test:read", "test:write"], "type": "urn:test", } - created_resource = uma.resource_set_create(access_token, resource_to_create) + created_resource = uma.resource_set_create(resource_to_create) assert created_resource assert created_resource["_id"], created_resource assert set(resource_to_create).issubset(set(created_resource)), created_resource # Test create the same resource set with pytest.raises(KeycloakPostError) as err: - uma.resource_set_create(access_token, resource_to_create) + uma.resource_set_create(resource_to_create) assert err.match( re.escape( '409: b\'{"error":"invalid_request","error_description":' @@ -90,31 +77,29 @@ def test_uma_resource_sets( ) # Test get resource set - latest_resource = uma.resource_set_read(access_token, created_resource["_id"]) + latest_resource = uma.resource_set_read(created_resource["_id"]) assert latest_resource["name"] == created_resource["name"] # Test update resource set latest_resource["name"] = "New Resource Name" - res = uma.resource_set_update(access_token, created_resource["_id"], latest_resource) + res = uma.resource_set_update(created_resource["_id"], latest_resource) assert res == dict(), res - updated_resource = uma.resource_set_read(access_token, created_resource["_id"]) + updated_resource = uma.resource_set_read(created_resource["_id"]) assert updated_resource["name"] == "New Resource Name" # Test update resource set fail with pytest.raises(KeycloakPutError) as err: - uma.resource_set_update( - token=access_token, resource_id=created_resource["_id"], payload={"wrong": "payload"} - ) + uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"}) assert err.match('400: b\'{"error":"Unrecognized field') # Test delete resource set - res = uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"]) + res = uma.resource_set_delete(resource_id=created_resource["_id"]) assert res == dict(), res with pytest.raises(KeycloakGetError) as err: - uma.resource_set_read(access_token, created_resource["_id"]) + uma.resource_set_read(created_resource["_id"]) err.match("404: b''") # Test delete fail with pytest.raises(KeycloakDeleteError) as err: - uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"]) + uma.resource_set_delete(resource_id=created_resource["_id"]) assert err.match("404: b''")