Browse Source

feat: add Keycloak UMA client (#403)

pull/409/head v2.12.0
Nuwan Goonasekera 2 years ago
committed by GitHub
parent
commit
fb84c3b67b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/keycloak/__init__.py
  2. 241
      src/keycloak/keycloak_uma.py
  3. 6
      src/keycloak/uma_permissions.py
  4. 7
      src/keycloak/urls_patterns.py
  5. 19
      tests/conftest.py
  6. 120
      tests/test_keycloak_uma.py

2
src/keycloak/__init__.py

@ -42,6 +42,7 @@ from .exceptions import (
)
from .keycloak_admin import KeycloakAdmin
from .keycloak_openid import KeycloakOpenID
from .keycloak_uma import KeycloakUMA
__all__ = [
"__version__",
@ -61,4 +62,5 @@ __all__ = [
"KeycloakSecretNotFound",
"KeycloakAdmin",
"KeycloakOpenID",
"KeycloakUMA",
]

241
src/keycloak/keycloak_uma.py

@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak UMA module.
The module contains a UMA compatible client for keycloak:
https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html
"""
import json
from urllib.parse import quote_plus
from .connection import ConnectionManager
from .exceptions import (
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
raise_error_from_response,
)
from .urls_patterns import URL_UMA_WELL_KNOWN
class KeycloakUMA:
"""Keycloak UMA client.
:param server_url: Keycloak server url
:param client_id: client id
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by.
:param timeout: connection timeout in seconds
"""
def __init__(
self, server_url, realm_name, verify=True, custom_headers=None, proxies=None, timeout=60
):
"""Init method.
:param server_url: Keycloak server url
:type server_url: str
:param realm_name: realm name
:type realm_name: str
:param verify: True if want check connection SSL
:type verify: bool
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param proxies: dict of proxies to sent the request by.
:type proxies: dict
:param timeout: connection timeout in seconds
:type timeout: int
"""
self.realm_name = realm_name
headers = custom_headers if custom_headers is not None else dict()
headers.update({"Content-Type": "application/json"})
self.connection = ConnectionManager(
base_url=server_url, headers=headers, timeout=timeout, verify=verify, proxies=proxies
)
self._well_known = None
def _fetch_well_known(self):
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@staticmethod
def format_url(url, **kwargs):
"""Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example,
`format_url("https://myserver/{my_resource}/{id}", my_resource="hello world", id="myid")`
would produce `https://myserver/hello+world/myid`.
:param url: url string to format
:type url: str
:param kwargs: dict containing kwargs to substitute
:type kwargs: dict
:return: formatted string
:rtype: str
"""
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
def _add_bearer_token_header(self, token):
self.connection.add_param_headers("Authorization", "Bearer " + token)
@property
def uma_well_known(self):
"""Get the well_known UMA2 config.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
# per instance cache
if not self._well_known:
self._well_known = self._fetch_well_known()
return self._well_known
def resource_set_create(self, token, payload):
"""Create a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param payload: ResourceRepresentation
:type payload: dict
:return: ResourceRepresentation with the _id property assigned
:rtype: dict
"""
self._add_bearer_token_header(token)
data_raw = self.connection.raw_post(
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def resource_set_update(self, token, resource_id, payload):
"""Update a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param resource_id: id of the resource
:type resource_id: str
:param payload: ResourceRepresentation
:type payload: dict
:return: Response dict (empty)
:rtype: dict
"""
self._add_bearer_token_header(token)
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
def resource_set_read(self, token, resource_id):
"""Read a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:param resource_id: id of the resource
:type resource_id: str
:return: ResourceRepresentation
:rtype: dict
"""
self._add_bearer_token_header(token)
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_delete(self, token, resource_id):
"""Delete a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
:param token: client access token
:type token: str
:param resource_id: id of the resource
:type resource_id: str
:return: Response dict (empty)
:rtype: dict
"""
self._add_bearer_token_header(token)
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def resource_set_list_ids(self, token):
"""List all resource set ids.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
:param token: client access token
:type token: str
:return: List of ids
:rtype: List[str]
"""
self._add_bearer_token_header(token)
data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_list(self, token):
"""List all resource sets.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param token: client access token
:type token: str
:yields: Iterator over a list of ResourceRepresentations
:rtype: Iterator[dict]
"""
for resource_id in self.resource_set_list_ids(token):
resource = self.resource_set_read(token, resource_id)
yield resource

6
src/keycloak/uma_permissions.py

@ -27,7 +27,7 @@ from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinit
class UMAPermission:
"""A class to conveniently assembly permissions.
"""A class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.
@ -143,7 +143,7 @@ class UMAPermission:
class Resource(UMAPermission):
"""An UMAPermission Resource class to conveniently assembly permissions.
"""A UMAPermission Resource class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.
@ -161,7 +161,7 @@ class Resource(UMAPermission):
class Scope(UMAPermission):
"""An UMAPermission Scope class to conveniently assembly permissions.
"""A UMAPermission Scope class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.

7
src/keycloak/urls_patterns.py

@ -25,7 +25,8 @@
# OPENID URLS
URL_REALM = "realms/{realm-name}"
URL_WELL_KNOWN = "realms/{realm-name}/.well-known/openid-configuration"
URL_WELL_KNOWN_BASE = "realms/{realm-name}/.well-known"
URL_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/openid-configuration"
URL_TOKEN = "realms/{realm-name}/protocol/openid-connect/token"
URL_USERINFO = "realms/{realm-name}/protocol/openid-connect/userinfo"
URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout"
@ -208,3 +209,7 @@ URL_ADMIN_ATTACK_DETECTION = "admin/realms/{realm-name}/attack-detection/brute-f
URL_ADMIN_ATTACK_DETECTION_USER = (
"admin/realms/{realm-name}/attack-detection/brute-force/users/{id}"
)
# UMA URLS
URL_UMA_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/uma2-configuration"

19
tests/conftest.py

@ -12,7 +12,7 @@ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from keycloak import KeycloakAdmin, KeycloakOpenID
from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakUMA
class KeycloakTestEnv(object):
@ -475,3 +475,20 @@ def selfsigned_cert():
)
return cert_pem, key_pem
@pytest.fixture
def uma(env: KeycloakTestEnv, realm: str):
"""Fixture for initialized KeycloakUMA class.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
:param realm: Keycloak realm
:type realm: str
:yields: Keycloak OpenID client
:rtype: KeycloakOpenID
"""
# Return UMA
yield KeycloakUMA(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name=realm
)

120
tests/test_keycloak_uma.py

@ -0,0 +1,120 @@
"""Test module for KeycloakUMA."""
import re
from typing import Tuple
import pytest
from keycloak import KeycloakOpenID
from keycloak.connection import ConnectionManager
from keycloak.exceptions import (
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
)
from keycloak.keycloak_uma import KeycloakUMA
def test_keycloak_uma_init(env):
"""Test KeycloakUMA's init method.
:param env: Environment fixture
:type env: KeycloakTestEnv
"""
uma = KeycloakUMA(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", realm_name="master"
)
assert uma.realm_name == "master"
assert isinstance(uma.connection, ConnectionManager)
# should initially be empty
assert uma._well_known is None
assert uma.uma_well_known
# should be cached after first reference
assert uma._well_known is not None
def test_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
res = uma.uma_well_known
assert res is not None
assert res != dict()
for key in ["resource_registration_endpoint"]:
assert key in res
def test_uma_resource_sets(
uma: KeycloakUMA, oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
):
"""Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, _, _ = oid_with_credentials_authz
token = oid.token(grant_type="client_credentials")
access_token = token["access_token"]
# Check that only the default resource is present
resource_sets = uma.resource_set_list(access_token)
resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
# Test create resource set
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
}
created_resource = uma.resource_set_create(access_token, resource_to_create)
assert created_resource
assert created_resource["_id"], created_resource
assert set(resource_to_create).issubset(set(created_resource)), created_resource
# Test create the same resource set
with pytest.raises(KeycloakPostError) as err:
uma.resource_set_create(access_token, resource_to_create)
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
)
# Test get resource set
latest_resource = uma.resource_set_read(access_token, created_resource["_id"])
assert latest_resource["name"] == created_resource["name"]
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = uma.resource_set_update(access_token, created_resource["_id"], latest_resource)
assert res == dict(), res
updated_resource = uma.resource_set_read(access_token, created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(
token=access_token, resource_id=created_resource["_id"], payload={"wrong": "payload"}
)
assert err.match('400: b\'{"error":"Unrecognized field')
# Test delete resource set
res = uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"])
assert res == dict(), res
with pytest.raises(KeycloakGetError) as err:
uma.resource_set_read(access_token, created_resource["_id"])
err.match("404: b''")
# Test delete fail
with pytest.raises(KeycloakDeleteError) as err:
uma.resource_set_delete(token=access_token, resource_id=created_resource["_id"])
assert err.match("404: b''")
Loading…
Cancel
Save