Browse Source

refactor: Refactor keycloak uma client to use openid connection manager

pull/415/head
nuwang 2 years ago
parent
commit
275b4504c1
  1. 77
      src/keycloak/keycloak_uma.py
  2. 35
      tests/conftest.py
  3. 51
      tests/test_keycloak_uma.py

77
src/keycloak/keycloak_uma.py

@ -29,7 +29,6 @@ https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html
import json
from urllib.parse import quote_plus
from .connection import ConnectionManager
from .exceptions import (
KeycloakDeleteError,
KeycloakGetError,
@ -37,50 +36,30 @@ from .exceptions import (
KeycloakPutError,
raise_error_from_response,
)
from .keycloak_openid import KeycloakOpenIDConnectionManager
from .urls_patterns import URL_UMA_WELL_KNOWN
class KeycloakUMA:
"""Keycloak UMA client.
:param server_url: Keycloak server url
:param client_id: client id
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by.
:param timeout: connection timeout in seconds
:param connection: OpenID connection manager
"""
def __init__(
self, server_url, realm_name, verify=True, custom_headers=None, proxies=None, timeout=60
):
def __init__(self, connection: KeycloakOpenIDConnectionManager):
"""Init method.
:param server_url: Keycloak server url
:type server_url: str
:param realm_name: realm name
:type realm_name: str
:param verify: True if want check connection SSL
:type verify: bool
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param proxies: dict of proxies to sent the request by.
:type proxies: dict
:param timeout: connection timeout in seconds
:type timeout: int
:param connection: OpenID connection manager
:type connection: KeycloakOpenIDConnectionManager
"""
self.realm_name = realm_name
headers = custom_headers if custom_headers is not None else dict()
headers.update({"Content-Type": "application/json"})
self.connection = ConnectionManager(
base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies
)
self.connection = connection
custom_headers = self.connection.custom_headers or {}
custom_headers.update({"Content-Type": "application/json"})
self.connection.custom_headers = custom_headers
self._well_known = None
def _fetch_well_known(self):
params_path = {"realm-name": self.realm_name}
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@ -102,9 +81,6 @@ class KeycloakUMA:
"""
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
def _add_bearer_token_header(self, token):
self.connection.add_param_headers("Authorization", "Bearer " + token)
@property
def uma_well_known(self):
"""Get the well_known UMA2 config.
@ -117,7 +93,7 @@ class KeycloakUMA:
self._well_known = self._fetch_well_known()
return self._well_known
def resource_set_create(self, token, payload):
def resource_set_create(self, payload):
"""Create a resource set.
Spec
@ -126,20 +102,17 @@ class KeycloakUMA:
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param payload: ResourceRepresentation
:type payload: dict
:return: ResourceRepresentation with the _id property assigned
:rtype: dict
"""
self._add_bearer_token_header(token)
data_raw = self.connection.raw_post(
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def resource_set_update(self, token, resource_id, payload):
def resource_set_update(self, resource_id, payload):
"""Update a resource set.
Spec
@ -148,8 +121,6 @@ class KeycloakUMA:
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param resource_id: id of the resource
:type resource_id: str
:param payload: ResourceRepresentation
@ -157,14 +128,13 @@ class KeycloakUMA:
:return: Response dict (empty)
:rtype: dict
"""
self._add_bearer_token_header(token)
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
def resource_set_read(self, token, resource_id):
def resource_set_read(self, resource_id):
"""Read a resource set.
Spec
@ -173,56 +143,47 @@ class KeycloakUMA:
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param resource_id: id of the resource
:type resource_id: str
:return: ResourceRepresentation
:rtype: dict
"""
self._add_bearer_token_header(token)
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_delete(self, token, resource_id):
def resource_set_delete(self, resource_id):
"""Delete a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
:param token: client access token
:type token: str
:param resource_id: id of the resource
:type resource_id: str
:return: Response dict (empty)
:rtype: dict
"""
self._add_bearer_token_header(token)
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def resource_set_list_ids(self, token):
def resource_set_list_ids(self):
"""List all resource set ids.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
:param token: client access token
:type token: str
:return: List of ids
:rtype: List[str]
"""
self._add_bearer_token_header(token)
data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_list(self, token):
def resource_set_list(self):
"""List all resource sets.
Spec
@ -231,11 +192,9 @@ class KeycloakUMA:
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:yields: Iterator over a list of ResourceRepresentations
:rtype: Iterator[dict]
"""
for resource_id in self.resource_set_list_ids(token):
resource = self.resource_set_read(token, resource_id)
for resource_id in self.resource_set_list_ids():
resource = self.resource_set_read(resource_id)
yield resource

35
tests/conftest.py

@ -4,6 +4,7 @@ import ipaddress
import os
import uuid
from datetime import datetime, timedelta
from typing import Tuple
import pytest
from cryptography import x509
@ -13,6 +14,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakUMA
from keycloak.keycloak_openid import KeycloakOpenIDConnectionManager
class KeycloakTestEnv(object):
@ -478,17 +480,34 @@ def selfsigned_cert():
@pytest.fixture
def uma(env: KeycloakTestEnv, realm: str):
def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Fixture for initialized KeycloakUMA class.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
:param realm: Keycloak realm
:type realm: str
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:yields: Keycloak OpenID connection manager
:rtype: KeycloakOpenIDConnectionManager
"""
oid, _, _ = oid_with_credentials_authz
connection = KeycloakOpenIDConnectionManager(
server_url=oid.connection.base_url,
realm_name=oid.realm_name,
client_id=oid.client_id,
client_secret_key=oid.client_secret_key,
timeout=60,
)
yield connection
@pytest.fixture
def uma(oid_connection_with_authz: KeycloakOpenIDConnectionManager):
"""Fixture for initialized KeycloakUMA class.
:param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client
:type oid_connection_with_authz: KeycloakOpenIDConnectionManager
:yields: Keycloak OpenID client
:rtype: KeycloakOpenID
"""
connection = oid_connection_with_authz
# Return UMA
yield KeycloakUMA(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name=realm
)
yield KeycloakUMA(connection=connection)

51
tests/test_keycloak_uma.py

@ -1,32 +1,28 @@
"""Test module for KeycloakUMA."""
import re
from typing import Tuple
import pytest
from keycloak import KeycloakOpenID
from keycloak.connection import ConnectionManager
from keycloak.exceptions import (
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
)
from keycloak.keycloak_openid import KeycloakOpenIDConnectionManager
from keycloak.keycloak_uma import KeycloakUMA
def test_keycloak_uma_init(env):
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnectionManager):
"""Test KeycloakUMA's init method.
:param env: Environment fixture
:type env: KeycloakTestEnv
:param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
:type oid_connection_with_authz: KeycloakOpenIDConnectionManager
"""
uma = KeycloakUMA(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name="master"
)
connection = oid_connection_with_authz
uma = KeycloakUMA(connection=connection)
assert uma.realm_name == "master"
assert isinstance(uma.connection, ConnectionManager)
assert isinstance(uma.connection, KeycloakOpenIDConnectionManager)
# should initially be empty
assert uma._well_known is None
assert uma.uma_well_known
@ -47,23 +43,14 @@ def test_uma_well_known(uma: KeycloakUMA):
assert key in res
def test_uma_resource_sets(
uma: KeycloakUMA, oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
):
def test_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, _, _ = oid_with_credentials_authz
token = oid.token(grant_type="client_credentials")
access_token = token["access_token"]
# Check that only the default resource is present
resource_sets = uma.resource_set_list(access_token)
resource_sets = uma.resource_set_list()
resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
@ -74,14 +61,14 @@ def test_uma_resource_sets(
"scopes": ["test:read", "test:write"],
"type": "urn:test",
}
created_resource = uma.resource_set_create(access_token, resource_to_create)
created_resource = uma.resource_set_create(resource_to_create)
assert created_resource
assert created_resource["_id"], created_resource
assert set(resource_to_create).issubset(set(created_resource)), created_resource
# Test create the same resource set
with pytest.raises(KeycloakPostError) as err:
uma.resource_set_create(access_token, resource_to_create)
uma.resource_set_create(resource_to_create)
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
@ -90,31 +77,29 @@ def test_uma_resource_sets(
)
# Test get resource set
latest_resource = uma.resource_set_read(access_token, created_resource["_id"])
latest_resource = uma.resource_set_read(created_resource["_id"])
assert latest_resource["name"] == created_resource["name"]
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = uma.resource_set_update(access_token, created_resource["_id"], latest_resource)
res = uma.resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
updated_resource = uma.resource_set_read(access_token, created_resource["_id"])
updated_resource = uma.resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(
token=access_token, resource_id=created_resource["_id"], payload={"wrong": "payload"}
)
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
# Test delete resource set
res = uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"])
res = uma.resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
with pytest.raises(KeycloakGetError) as err:
uma.resource_set_read(access_token, created_resource["_id"])
uma.resource_set_read(created_resource["_id"])
err.match("404: b''")
# Test delete fail
with pytest.raises(KeycloakDeleteError) as err:
uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"])
uma.resource_set_delete(resource_id=created_resource["_id"])
assert err.match("404: b''")
Loading…
Cancel
Save