Browse Source

feat: Async feature (#566)

* chore: add async client to connection

* chore: add async client to keycloak openid

* chore: add async client to keycloak uma

* chore: add async client and methods to keycloak admin

* chore: add async tests for connection and uma class

* chore: add async tests for keycloak openid class

* chore: add async tests for keycloak admin class

* chore: update poetry lock

* chore: update poetry lock

* fix: poetry files

* fix: lint issues

* fix: conftest fix

* fix: lint test fix

* fix: lint test fix

* fix: lint test fix

* fix: lint test fix

* fix: lint test fix

* fix: added setuptools

* fix: delete request fix and test cases fix

* fix: email test case

* fix: email test case for older versions

* fix: set correct content type on token endpoint

* fix: async on missing calls

* test: updated tests

* chore: deps

* fix: preserve original bearer

* fix: dont set bearer in refresh token directly

* fix: default content type

* fix: content type for initial access token

* fix: content type for async initial access token

* chore: add divergence test

* chore: add divergence test for uma and conneciton class

* chore: add docs for async module

* fix: sphinx error fixes

* test: verify signature

* test: final divergence tests

---------

Co-authored-by: Richard Nemeth <ryshoooo@gmail.com>
pull/569/head v4.1.0
pyaiman 8 months ago
committed by GitHub
parent
commit
81025e085d
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      .gitignore
  2. 1
      docs/source/getting_started.rst
  3. 408
      docs/source/modules/async.rst
  4. 294
      poetry.lock
  5. 11
      pyproject.toml
  6. 108
      src/keycloak/connection.py
  7. 4139
      src/keycloak/keycloak_admin.py
  8. 726
      src/keycloak/keycloak_openid.py
  9. 374
      src/keycloak/keycloak_uma.py
  10. 130
      src/keycloak/openid_connection.py
  11. 59
      tests/test_connection.py
  12. 3123
      tests/test_keycloak_admin.py
  13. 513
      tests/test_keycloak_openid.py
  14. 316
      tests/test_keycloak_uma.py
  15. 1
      tox.env

3
.gitignore

@ -108,3 +108,6 @@ main2.py
s3air-authz-config.json
.vscode
_build
test.py

1
docs/source/getting_started.rst

@ -13,3 +13,4 @@ For more details, see :ref:`api`.
modules/openid_client
modules/admin
modules/uma
modules/async

408
docs/source/modules/async.rst

@ -0,0 +1,408 @@
.. admin:
Use Python Keycloak Asynchronously
==================================
Asynchronous admin client
-------------------------
Configure admin client
------------------------
.. code-block:: python
admin = KeycloakAdmin(
server_url="http://localhost:8080/",
username='example-admin',
password='secret',
realm_name="master",
user_realm_name="only_if_other_realm_than_master")
Configure admin client with connection
-----------------------------------------
.. code-block:: python
from keycloak import KeycloakAdmin
from keycloak import KeycloakOpenIDConnection
keycloak_connection = KeycloakOpenIDConnection(
server_url="http://localhost:8080/",
username='example-admin',
password='secret',
realm_name="master",
user_realm_name="only_if_other_realm_than_master",
client_id="my_client",
client_secret_key="client-secret",
verify=True)
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
Create user asynchronously
----------------------------
.. code-block:: python
new_user = await keycloak_admin.a_create_user({"email": "example@example.com",
"username": "example@example.com",
"enabled": True,
"firstName": "Example",
"lastName": "Example"})
Add user asynchronously and raise exception if username already exists
-----------------------------------------------------------------------
The exist_ok currently defaults to True for backwards compatibility reasons.
.. code-block:: python
new_user = await keycloak_admin.a_create_user({"email": "example@example.com",
"username": "example@example.com",
"enabled": True,
"firstName": "Example",
"lastName": "Example"},
exist_ok=False)
Add user asynchronously and set password
----------------------------------------
.. code-block:: python
new_user = await keycloak_admin.a_create_user({"email": "example@example.com",
"username": "example@example.com",
"enabled": True,
"firstName": "Example",
"lastName": "Example",
"credentials": [{"value": "secret","type": "password",}]})
Add user asynchronous and specify a locale
-------------------------------------------
.. code-block:: python
new_user = await keycloak_admin.a_create_user({"email": "example@example.fr",
"username": "example@example.fr",
"enabled": True,
"firstName": "Example",
"lastName": "Example",
"attributes": {
"locale": ["fr"]
}})
Asynchronous User counter
------------------------------
.. code-block:: python
count_users = await keycloak_admin.a_users_count()
Get users Returns a list of users asynchronously, filtered according to query parameters
-----------------------------------------------------------------------------------------
.. code-block:: python
users = await keycloak_admin.a_get_users({})
Get user ID asynchronously from username
-----------------------------------------
.. code-block:: python
user_id_keycloak = await keycloak_admin.a_get_user_id("username-keycloak")
Get user asynchronously
------------------------------
.. code-block:: python
user = await keycloak_admin.a_get_user("user-id-keycloak")
Update user asynchronously
------------------------------
.. code-block:: python
response = await keycloak_admin.a_update_user(user_id="user-id-keycloak",
payload={'firstName': 'Example Update'})
Update user password asynchronously
------------------------------------
.. code-block:: python
response = await keycloak_admin.a_set_user_password(user_id="user-id-keycloak", password="secret", temporary=True)
Get user credentials asynchronously
------------------------------------
.. code-block:: python
credentials = await keycloak_admin.a_get_credentials(user_id='user_id')
Get user credential asynchronously by ID
-----------------------------------------
.. code-block:: python
credential = await keycloak_admin.a_get_credential(user_id='user_id', credential_id='credential_id')
Delete user credential asynchronously
---------------------------------------
.. code-block:: python
response = await keycloak_admin.a_delete_credential(user_id='user_id', credential_id='credential_id')
Delete User asynchronously
------------------------------
.. code-block:: python
response = await keycloak_admin.a_delete_user(user_id="user-id-keycloak")
Get consents granted asynchronously by the user
------------------------------------------------
.. code-block:: python
consents = await keycloak_admin.a_consents_user(user_id="user-id-keycloak")
Send user action asynchronously
---------------------------------
.. code-block:: python
response = await keycloak_admin.a_send_update_account(user_id="user-id-keycloak",
payload=['UPDATE_PASSWORD'])
Send verify email asynchronously
----------------------------------
.. code-block:: python
response = await keycloak_admin.a_send_verify_email(user_id="user-id-keycloak")
Get sessions associated asynchronously with the user
-----------------------------------------------------
.. code-block:: python
sessions = await keycloak_admin.a_get_sessions(user_id="user-id-keycloak")
Asynchronous OpenID Client
===========================
Asynchronous Configure client OpenID
-------------------------------------
.. code-block:: python
from keycloak import KeycloakOpenID
# Configure client
# For versions older than 18 /auth/ must be added at the end of the server_url.
keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/",
client_id="example_client",
realm_name="example_realm",
client_secret_key="secret")
Get .well_know asynchronously
------------------------------
.. code-block:: python
config_well_known = await keycloak_openid.a_well_known()
Get code asynchronously with OAuth authorization request
---------------------------------------------------------
.. code-block:: python
auth_url = await keycloak_openid.a_auth_url(
redirect_uri="your_call_back_url",
scope="email",
state="your_state_info")
Get access token asynchronously with code
----------------------------------------------
.. code-block:: python
access_token = await keycloak_openid.a_token(
grant_type='authorization_code',
code='the_code_you_get_from_auth_url_callback',
redirect_uri="your_call_back_url")
Get access asynchronously token with user and password
-------------------------------------------------------
.. code-block:: python
token = await keycloak_openid.a_token("user", "password")
token = await keycloak_openid.a_token("user", "password", totp="012345")
Get token asynchronously using Token Exchange
----------------------------------------------
.. code-block:: python
token = await keycloak_openid.a_exchange_token(token['access_token'],
"my_client", "other_client", "some_user")
Refresh token asynchronously
----------------------------------------------
.. code-block:: python
token = await keycloak_openid.a_refresh_token(token['refresh_token'])
Get UserInfo asynchronously
----------------------------------------------
.. code-block:: python
userinfo = await keycloak_openid.a_userinfo(token['access_token'])
Logout asynchronously
----------------------------------------------
.. code-block:: python
await keycloak_openid.a_logout(token['refresh_token'])
Get certs asynchronously
----------------------------------------------
.. code-block:: python
certs = await keycloak_openid.a_certs()
Introspect RPT asynchronously
----------------------------------------------
.. code-block:: python
token_rpt_info = await keycloak_openid.a_introspect(await keycloak_openid.a_introspect(token['access_token'],
rpt=rpt['rpt'],
token_type_hint="requesting_party_token"))
Introspect token asynchronously
----------------------------------------------
.. code-block:: python
token_info = await keycloak_openid.a_introspect(token['access_token'])
Decode token asynchronously
----------------------------------------------
.. code-block:: python
token_info = await keycloak_openid.a_decode_token(token['access_token'])
# Without validation
token_info = await keycloak_openid.a_decode_token(token['access_token'], validate=False)
Get UMA-permissions asynchronously by token
----------------------------------------------
.. code-block:: python
token = await keycloak_openid.a_token("user", "password")
permissions = await keycloak_openid.a_uma_permissions(token['access_token'])
Get UMA-permissions asynchronously by token with specific resource and scope requested
---------------------------------------------------------------------------------------
.. code-block:: python
token = await keycloak_openid.a_token("user", "password")
permissions = await keycloak_openid.a_uma_permissions(token['access_token'], permissions="Resource#Scope")
Get auth status asynchronously for a specific resource and scope by token
--------------------------------------------------------------------------
.. code-block:: python
token = await keycloak_openid.a_token("user", "password")
auth_status = await keycloak_openid.a_has_uma_access(token['access_token'], "Resource#Scope")
Asynchronous UMA
========================
Asynchronous Configure client UMA
----------------------------------
.. code-block:: python
from keycloak import KeycloakOpenIDConnection
from keycloak import KeycloakUMA
keycloak_connection = KeycloakOpenIDConnection(
server_url="http://localhost:8080/",
realm_name="master",
client_id="my_client",
client_secret_key="client-secret")
keycloak_uma = KeycloakUMA(connection=keycloak_connection)
Create a resource set asynchronously
---------------------------------------
.. code-block:: python
resource_set = await keycloak_uma.a_resource_set_create({
"name": "example_resource",
"scopes": ["example:read", "example:write"],
"type": "urn:example"})
List resource sets asynchronously
----------------------------------
.. code-block:: python
resource_sets = await uma.a_resource_set_list()
Get resource set asynchronously
--------------------------------
.. code-block:: python
latest_resource = await uma.a_resource_set_read(resource_set["_id"])
Update resource set asynchronously
-------------------------------------
.. code-block:: python
latest_resource["name"] = "New Resource Name"
await uma.a_resource_set_update(resource_set["_id"], latest_resource)
Delete resource set asynchronously
------------------------------------
.. code-block:: python
await uma.a_resource_set_delete(resource_id=resource_set["_id"])

294
poetry.lock

@ -11,6 +11,28 @@ files = [
{file = "alabaster-0.7.13.tar.gz", hash = "sha256:a27a4a084d5e690e16e01e03ad2b2e552c61a65469419b907243193de1a84ae2"},
]
[[package]]
name = "anyio"
version = "4.4.0"
description = "High level compatibility layer for multiple asynchronous event loop implementations"
optional = false
python-versions = ">=3.8"
files = [
{file = "anyio-4.4.0-py3-none-any.whl", hash = "sha256:c1b2d8f46a8a812513012e1107cb0e68c17159a7a594208005a57dc776e1bdc7"},
{file = "anyio-4.4.0.tar.gz", hash = "sha256:5aadc6a1bbb7cdb0bede386cac5e2940f5e2ff3aa20277e991cf028e0585ce94"},
]
[package.dependencies]
exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""}
idna = ">=2.8"
sniffio = ">=1.1"
typing-extensions = {version = ">=4.1", markers = "python_version < \"3.11\""}
[package.extras]
doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"]
test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
trio = ["trio (>=0.23)"]
[[package]]
name = "argcomplete"
version = "3.3.0"
@ -39,6 +61,17 @@ files = [
[package.dependencies]
typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.11\""}
[[package]]
name = "async-property"
version = "0.2.2"
description = "Python decorator for async properties."
optional = false
python-versions = "*"
files = [
{file = "async_property-0.2.2-py2.py3-none-any.whl", hash = "sha256:8924d792b5843994537f8ed411165700b27b2bd966cefc4daeefc1253442a9d7"},
{file = "async_property-0.2.2.tar.gz", hash = "sha256:17d9bd6ca67e27915a75d92549df64b5c7174e9dc806b30a3934dc4ff0506380"},
]
[[package]]
name = "babel"
version = "2.15.0"
@ -58,18 +91,18 @@ dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"]
[[package]]
name = "backports-tarfile"
version = "1.1.1"
version = "1.2.0"
description = "Backport of CPython tarfile module"
optional = false
python-versions = ">=3.8"
files = [
{file = "backports.tarfile-1.1.1-py3-none-any.whl", hash = "sha256:73e0179647803d3726d82e76089d01d8549ceca9bace469953fcb4d97cf2d417"},
{file = "backports_tarfile-1.1.1.tar.gz", hash = "sha256:9c2ef9696cb73374f7164e17fc761389393ca76777036f5aad42e8b93fcd8009"},
{file = "backports.tarfile-1.2.0-py3-none-any.whl", hash = "sha256:77e284d754527b01fb1e6fa8a1afe577858ebe4e9dad8919e34c862cb399bc34"},
{file = "backports_tarfile-1.2.0.tar.gz", hash = "sha256:d75e02c268746e1b8144c278978b6e98e85de6ad16f8e4b0844a154557eca991"},
]
[package.extras]
docs = ["furo", "jaraco.packaging (>=9.3)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
testing = ["jaraco.test", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)"]
testing = ["jaraco.test", "pytest (!=8.0.*)", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)"]
[[package]]
name = "black"
@ -340,13 +373,13 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "codespell"
version = "2.2.6"
version = "2.3.0"
description = "Codespell"
optional = false
python-versions = ">=3.8"
files = [
{file = "codespell-2.2.6-py3-none-any.whl", hash = "sha256:9ee9a3e5df0990604013ac2a9f22fa8e57669c827124a2e961fe8a1da4cacc07"},
{file = "codespell-2.2.6.tar.gz", hash = "sha256:a8c65d8eb3faa03deabab6b3bbe798bea72e1799c7e9e955d57eca4096abcff9"},
{file = "codespell-2.3.0-py3-none-any.whl", hash = "sha256:a9c7cef2501c9cfede2110fd6d4e5e62296920efe9abfb84648df866e47f58d1"},
{file = "codespell-2.3.0.tar.gz", hash = "sha256:360c7d10f75e65f67bad720af7007e1060a5d395670ec11a7ed1fed9dd17471f"},
]
[package.extras]
@ -406,63 +439,63 @@ test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"]
[[package]]
name = "coverage"
version = "7.5.1"
version = "7.5.3"
description = "Code coverage measurement for Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "coverage-7.5.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c0884920835a033b78d1c73b6d3bbcda8161a900f38a488829a83982925f6c2e"},
{file = "coverage-7.5.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:39afcd3d4339329c5f58de48a52f6e4e50f6578dd6099961cf22228feb25f38f"},
{file = "coverage-7.5.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4a7b0ceee8147444347da6a66be737c9d78f3353b0681715b668b72e79203e4a"},
{file = "coverage-7.5.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4a9ca3f2fae0088c3c71d743d85404cec8df9be818a005ea065495bedc33da35"},
{file = "coverage-7.5.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5fd215c0c7d7aab005221608a3c2b46f58c0285a819565887ee0b718c052aa4e"},
{file = "coverage-7.5.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:4bf0655ab60d754491004a5efd7f9cccefcc1081a74c9ef2da4735d6ee4a6223"},
{file = "coverage-7.5.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:61c4bf1ba021817de12b813338c9be9f0ad5b1e781b9b340a6d29fc13e7c1b5e"},
{file = "coverage-7.5.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:db66fc317a046556a96b453a58eced5024af4582a8dbdc0c23ca4dbc0d5b3146"},
{file = "coverage-7.5.1-cp310-cp310-win32.whl", hash = "sha256:b016ea6b959d3b9556cb401c55a37547135a587db0115635a443b2ce8f1c7228"},
{file = "coverage-7.5.1-cp310-cp310-win_amd64.whl", hash = "sha256:df4e745a81c110e7446b1cc8131bf986157770fa405fe90e15e850aaf7619bc8"},
{file = "coverage-7.5.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:796a79f63eca8814ca3317a1ea443645c9ff0d18b188de470ed7ccd45ae79428"},
{file = "coverage-7.5.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4fc84a37bfd98db31beae3c2748811a3fa72bf2007ff7902f68746d9757f3746"},
{file = "coverage-7.5.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6175d1a0559986c6ee3f7fccfc4a90ecd12ba0a383dcc2da30c2b9918d67d8a3"},
{file = "coverage-7.5.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fc81d5878cd6274ce971e0a3a18a8803c3fe25457165314271cf78e3aae3aa2"},
{file = "coverage-7.5.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:556cf1a7cbc8028cb60e1ff0be806be2eded2daf8129b8811c63e2b9a6c43bca"},
{file = "coverage-7.5.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:9981706d300c18d8b220995ad22627647be11a4276721c10911e0e9fa44c83e8"},
{file = "coverage-7.5.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:d7fed867ee50edf1a0b4a11e8e5d0895150e572af1cd6d315d557758bfa9c057"},
{file = "coverage-7.5.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:ef48e2707fb320c8f139424a596f5b69955a85b178f15af261bab871873bb987"},
{file = "coverage-7.5.1-cp311-cp311-win32.whl", hash = "sha256:9314d5678dcc665330df5b69c1e726a0e49b27df0461c08ca12674bcc19ef136"},
{file = "coverage-7.5.1-cp311-cp311-win_amd64.whl", hash = "sha256:5fa567e99765fe98f4e7d7394ce623e794d7cabb170f2ca2ac5a4174437e90dd"},
{file = "coverage-7.5.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:b6cf3764c030e5338e7f61f95bd21147963cf6aa16e09d2f74f1fa52013c1206"},
{file = "coverage-7.5.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2ec92012fefebee89a6b9c79bc39051a6cb3891d562b9270ab10ecfdadbc0c34"},
{file = "coverage-7.5.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:16db7f26000a07efcf6aea00316f6ac57e7d9a96501e990a36f40c965ec7a95d"},
{file = "coverage-7.5.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:beccf7b8a10b09c4ae543582c1319c6df47d78fd732f854ac68d518ee1fb97fa"},
{file = "coverage-7.5.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8748731ad392d736cc9ccac03c9845b13bb07d020a33423fa5b3a36521ac6e4e"},
{file = "coverage-7.5.1-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:7352b9161b33fd0b643ccd1f21f3a3908daaddf414f1c6cb9d3a2fd618bf2572"},
{file = "coverage-7.5.1-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:7a588d39e0925f6a2bff87154752481273cdb1736270642aeb3635cb9b4cad07"},
{file = "coverage-7.5.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:68f962d9b72ce69ea8621f57551b2fa9c70509af757ee3b8105d4f51b92b41a7"},
{file = "coverage-7.5.1-cp312-cp312-win32.whl", hash = "sha256:f152cbf5b88aaeb836127d920dd0f5e7edff5a66f10c079157306c4343d86c19"},
{file = "coverage-7.5.1-cp312-cp312-win_amd64.whl", hash = "sha256:5a5740d1fb60ddf268a3811bcd353de34eb56dc24e8f52a7f05ee513b2d4f596"},
{file = "coverage-7.5.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e2213def81a50519d7cc56ed643c9e93e0247f5bbe0d1247d15fa520814a7cd7"},
{file = "coverage-7.5.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5037f8fcc2a95b1f0e80585bd9d1ec31068a9bcb157d9750a172836e98bc7a90"},
{file = "coverage-7.5.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5c3721c2c9e4c4953a41a26c14f4cef64330392a6d2d675c8b1db3b645e31f0e"},
{file = "coverage-7.5.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ca498687ca46a62ae590253fba634a1fe9836bc56f626852fb2720f334c9e4e5"},
{file = "coverage-7.5.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0cdcbc320b14c3e5877ee79e649677cb7d89ef588852e9583e6b24c2e5072661"},
{file = "coverage-7.5.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:57e0204b5b745594e5bc14b9b50006da722827f0b8c776949f1135677e88d0b8"},
{file = "coverage-7.5.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:8fe7502616b67b234482c3ce276ff26f39ffe88adca2acf0261df4b8454668b4"},
{file = "coverage-7.5.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:9e78295f4144f9dacfed4f92935fbe1780021247c2fabf73a819b17f0ccfff8d"},
{file = "coverage-7.5.1-cp38-cp38-win32.whl", hash = "sha256:1434e088b41594baa71188a17533083eabf5609e8e72f16ce8c186001e6b8c41"},
{file = "coverage-7.5.1-cp38-cp38-win_amd64.whl", hash = "sha256:0646599e9b139988b63704d704af8e8df7fa4cbc4a1f33df69d97f36cb0a38de"},
{file = "coverage-7.5.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:4cc37def103a2725bc672f84bd939a6fe4522310503207aae4d56351644682f1"},
{file = "coverage-7.5.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:fc0b4d8bfeabd25ea75e94632f5b6e047eef8adaed0c2161ada1e922e7f7cece"},
{file = "coverage-7.5.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0d0a0f5e06881ecedfe6f3dd2f56dcb057b6dbeb3327fd32d4b12854df36bf26"},
{file = "coverage-7.5.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9735317685ba6ec7e3754798c8871c2f49aa5e687cc794a0b1d284b2389d1bd5"},
{file = "coverage-7.5.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d21918e9ef11edf36764b93101e2ae8cc82aa5efdc7c5a4e9c6c35a48496d601"},
{file = "coverage-7.5.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:c3e757949f268364b96ca894b4c342b41dc6f8f8b66c37878aacef5930db61be"},
{file = "coverage-7.5.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:79afb6197e2f7f60c4824dd4b2d4c2ec5801ceb6ba9ce5d2c3080e5660d51a4f"},
{file = "coverage-7.5.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:d1d0d98d95dd18fe29dc66808e1accf59f037d5716f86a501fc0256455219668"},
{file = "coverage-7.5.1-cp39-cp39-win32.whl", hash = "sha256:1cc0fe9b0b3a8364093c53b0b4c0c2dd4bb23acbec4c9240b5f284095ccf7981"},
{file = "coverage-7.5.1-cp39-cp39-win_amd64.whl", hash = "sha256:dde0070c40ea8bb3641e811c1cfbf18e265d024deff6de52c5950677a8fb1e0f"},
{file = "coverage-7.5.1-pp38.pp39.pp310-none-any.whl", hash = "sha256:6537e7c10cc47c595828b8a8be04c72144725c383c4702703ff4e42e44577312"},
{file = "coverage-7.5.1.tar.gz", hash = "sha256:54de9ef3a9da981f7af93eafde4ede199e0846cd819eb27c88e2b712aae9708c"},
{file = "coverage-7.5.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a6519d917abb15e12380406d721e37613e2a67d166f9fb7e5a8ce0375744cd45"},
{file = "coverage-7.5.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:aea7da970f1feccf48be7335f8b2ca64baf9b589d79e05b9397a06696ce1a1ec"},
{file = "coverage-7.5.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:923b7b1c717bd0f0f92d862d1ff51d9b2b55dbbd133e05680204465f454bb286"},
{file = "coverage-7.5.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:62bda40da1e68898186f274f832ef3e759ce929da9a9fd9fcf265956de269dbc"},
{file = "coverage-7.5.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d8b7339180d00de83e930358223c617cc343dd08e1aa5ec7b06c3a121aec4e1d"},
{file = "coverage-7.5.3-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:25a5caf742c6195e08002d3b6c2dd6947e50efc5fc2c2205f61ecb47592d2d83"},
{file = "coverage-7.5.3-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:05ac5f60faa0c704c0f7e6a5cbfd6f02101ed05e0aee4d2822637a9e672c998d"},
{file = "coverage-7.5.3-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:239a4e75e09c2b12ea478d28815acf83334d32e722e7433471fbf641c606344c"},
{file = "coverage-7.5.3-cp310-cp310-win32.whl", hash = "sha256:a5812840d1d00eafae6585aba38021f90a705a25b8216ec7f66aebe5b619fb84"},
{file = "coverage-7.5.3-cp310-cp310-win_amd64.whl", hash = "sha256:33ca90a0eb29225f195e30684ba4a6db05dbef03c2ccd50b9077714c48153cac"},
{file = "coverage-7.5.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:f81bc26d609bf0fbc622c7122ba6307993c83c795d2d6f6f6fd8c000a770d974"},
{file = "coverage-7.5.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7cec2af81f9e7569280822be68bd57e51b86d42e59ea30d10ebdbb22d2cb7232"},
{file = "coverage-7.5.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:55f689f846661e3f26efa535071775d0483388a1ccfab899df72924805e9e7cd"},
{file = "coverage-7.5.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:50084d3516aa263791198913a17354bd1dc627d3c1639209640b9cac3fef5807"},
{file = "coverage-7.5.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:341dd8f61c26337c37988345ca5c8ccabeff33093a26953a1ac72e7d0103c4fb"},
{file = "coverage-7.5.3-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:ab0b028165eea880af12f66086694768f2c3139b2c31ad5e032c8edbafca6ffc"},
{file = "coverage-7.5.3-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:5bc5a8c87714b0c67cfeb4c7caa82b2d71e8864d1a46aa990b5588fa953673b8"},
{file = "coverage-7.5.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:38a3b98dae8a7c9057bd91fbf3415c05e700a5114c5f1b5b0ea5f8f429ba6614"},
{file = "coverage-7.5.3-cp311-cp311-win32.whl", hash = "sha256:fcf7d1d6f5da887ca04302db8e0e0cf56ce9a5e05f202720e49b3e8157ddb9a9"},
{file = "coverage-7.5.3-cp311-cp311-win_amd64.whl", hash = "sha256:8c836309931839cca658a78a888dab9676b5c988d0dd34ca247f5f3e679f4e7a"},
{file = "coverage-7.5.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:296a7d9bbc598e8744c00f7a6cecf1da9b30ae9ad51c566291ff1314e6cbbed8"},
{file = "coverage-7.5.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:34d6d21d8795a97b14d503dcaf74226ae51eb1f2bd41015d3ef332a24d0a17b3"},
{file = "coverage-7.5.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8e317953bb4c074c06c798a11dbdd2cf9979dbcaa8ccc0fa4701d80042d4ebf1"},
{file = "coverage-7.5.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:705f3d7c2b098c40f5b81790a5fedb274113373d4d1a69e65f8b68b0cc26f6db"},
{file = "coverage-7.5.3-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b1196e13c45e327d6cd0b6e471530a1882f1017eb83c6229fc613cd1a11b53cd"},
{file = "coverage-7.5.3-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:015eddc5ccd5364dcb902eaecf9515636806fa1e0d5bef5769d06d0f31b54523"},
{file = "coverage-7.5.3-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:fd27d8b49e574e50caa65196d908f80e4dff64d7e592d0c59788b45aad7e8b35"},
{file = "coverage-7.5.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:33fc65740267222fc02975c061eb7167185fef4cc8f2770267ee8bf7d6a42f84"},
{file = "coverage-7.5.3-cp312-cp312-win32.whl", hash = "sha256:7b2a19e13dfb5c8e145c7a6ea959485ee8e2204699903c88c7d25283584bfc08"},
{file = "coverage-7.5.3-cp312-cp312-win_amd64.whl", hash = "sha256:0bbddc54bbacfc09b3edaec644d4ac90c08ee8ed4844b0f86227dcda2d428fcb"},
{file = "coverage-7.5.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:f78300789a708ac1f17e134593f577407d52d0417305435b134805c4fb135adb"},
{file = "coverage-7.5.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:b368e1aee1b9b75757942d44d7598dcd22a9dbb126affcbba82d15917f0cc155"},
{file = "coverage-7.5.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f836c174c3a7f639bded48ec913f348c4761cbf49de4a20a956d3431a7c9cb24"},
{file = "coverage-7.5.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:244f509f126dc71369393ce5fea17c0592c40ee44e607b6d855e9c4ac57aac98"},
{file = "coverage-7.5.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c4c2872b3c91f9baa836147ca33650dc5c172e9273c808c3c3199c75490e709d"},
{file = "coverage-7.5.3-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:dd4b3355b01273a56b20c219e74e7549e14370b31a4ffe42706a8cda91f19f6d"},
{file = "coverage-7.5.3-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:f542287b1489c7a860d43a7d8883e27ca62ab84ca53c965d11dac1d3a1fab7ce"},
{file = "coverage-7.5.3-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:75e3f4e86804023e991096b29e147e635f5e2568f77883a1e6eed74512659ab0"},
{file = "coverage-7.5.3-cp38-cp38-win32.whl", hash = "sha256:c59d2ad092dc0551d9f79d9d44d005c945ba95832a6798f98f9216ede3d5f485"},
{file = "coverage-7.5.3-cp38-cp38-win_amd64.whl", hash = "sha256:fa21a04112c59ad54f69d80e376f7f9d0f5f9123ab87ecd18fbb9ec3a2beed56"},
{file = "coverage-7.5.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:f5102a92855d518b0996eb197772f5ac2a527c0ec617124ad5242a3af5e25f85"},
{file = "coverage-7.5.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:d1da0a2e3b37b745a2b2a678a4c796462cf753aebf94edcc87dcc6b8641eae31"},
{file = "coverage-7.5.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8383a6c8cefba1b7cecc0149415046b6fc38836295bc4c84e820872eb5478b3d"},
{file = "coverage-7.5.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9aad68c3f2566dfae84bf46295a79e79d904e1c21ccfc66de88cd446f8686341"},
{file = "coverage-7.5.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2e079c9ec772fedbade9d7ebc36202a1d9ef7291bc9b3a024ca395c4d52853d7"},
{file = "coverage-7.5.3-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:bde997cac85fcac227b27d4fb2c7608a2c5f6558469b0eb704c5726ae49e1c52"},
{file = "coverage-7.5.3-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:990fb20b32990b2ce2c5f974c3e738c9358b2735bc05075d50a6f36721b8f303"},
{file = "coverage-7.5.3-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:3d5a67f0da401e105753d474369ab034c7bae51a4c31c77d94030d59e41df5bd"},
{file = "coverage-7.5.3-cp39-cp39-win32.whl", hash = "sha256:e08c470c2eb01977d221fd87495b44867a56d4d594f43739a8028f8646a51e0d"},
{file = "coverage-7.5.3-cp39-cp39-win_amd64.whl", hash = "sha256:1d2a830ade66d3563bb61d1e3c77c8def97b30ed91e166c67d0632c018f380f0"},
{file = "coverage-7.5.3-pp38.pp39.pp310-none-any.whl", hash = "sha256:3538d8fb1ee9bdd2e2692b3b18c22bb1c19ffbefd06880f5ac496e42d7bb3884"},
{file = "coverage-7.5.3.tar.gz", hash = "sha256:04aefca5190d1dc7a53a4c1a5a7f8568811306d7a8ee231c42fb69215571944f"},
]
[package.dependencies]
@ -577,8 +610,11 @@ name = "docutils"
version = "0.20.1"
description = "Docutils -- Python Documentation Utilities"
optional = false
python-versions = "*"
files = []
python-versions = ">=3.7"
files = [
{file = "docutils-0.20.1-py3-none-any.whl", hash = "sha256:96f387a2c5562db4476f09f13bbab2192e764cac08ebbf3a34a95d9b1e4a59d6"},
{file = "docutils-0.20.1.tar.gz", hash = "sha256:f08a4e276c3a1583a86dce3e34aba3fe04d02bba2dd51ed16106244e8a923e3b"},
]
[[package]]
name = "exceptiongroup"
@ -655,6 +691,62 @@ files = [
[package.dependencies]
python-dateutil = ">=2.7"
[[package]]
name = "h11"
version = "0.14.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
optional = false
python-versions = ">=3.7"
files = [
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
]
[[package]]
name = "httpcore"
version = "1.0.5"
description = "A minimal low-level HTTP client."
optional = false
python-versions = ">=3.8"
files = [
{file = "httpcore-1.0.5-py3-none-any.whl", hash = "sha256:421f18bac248b25d310f3cacd198d55b8e6125c107797b609ff9b7a6ba7991b5"},
{file = "httpcore-1.0.5.tar.gz", hash = "sha256:34a38e2f9291467ee3b44e89dd52615370e152954ba21721378a87b2960f7a61"},
]
[package.dependencies]
certifi = "*"
h11 = ">=0.13,<0.15"
[package.extras]
asyncio = ["anyio (>=4.0,<5.0)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
trio = ["trio (>=0.22.0,<0.26.0)"]
[[package]]
name = "httpx"
version = "0.27.0"
description = "The next generation HTTP client."
optional = false
python-versions = ">=3.8"
files = [
{file = "httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5"},
{file = "httpx-0.27.0.tar.gz", hash = "sha256:a0cb88a46f32dc874e04ee956e4c2764aba2aa228f650b06788ba6bda2962ab5"},
]
[package.dependencies]
anyio = "*"
certifi = "*"
httpcore = "==1.*"
idna = "*"
sniffio = "*"
[package.extras]
brotli = ["brotli", "brotlicffi"]
cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
[[package]]
name = "identify"
version = "2.5.36"
@ -1070,18 +1162,15 @@ files = [
[[package]]
name = "nodeenv"
version = "1.8.0"
version = "1.9.0"
description = "Node.js virtual environment builder"
optional = false
python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*"
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
files = [
{file = "nodeenv-1.8.0-py2.py3-none-any.whl", hash = "sha256:df865724bb3c3adc86b3876fa209771517b0cfe596beff01a92700e0e8be4cec"},
{file = "nodeenv-1.8.0.tar.gz", hash = "sha256:d51e0c37e64fbf47d017feac3145cdbb58836d7eee8c6f6d3b6880c5456227d2"},
{file = "nodeenv-1.9.0-py2.py3-none-any.whl", hash = "sha256:508ecec98f9f3330b636d4448c0f1a56fc68017c68f1e7857ebc52acf0eb879a"},
{file = "nodeenv-1.9.0.tar.gz", hash = "sha256:07f144e90dae547bf0d4ee8da0ee42664a42a04e02ed68e06324348dafe4bdb1"},
]
[package.dependencies]
setuptools = "*"
[[package]]
name = "packaging"
version = "24.0"
@ -1106,13 +1195,13 @@ files = [
[[package]]
name = "pkginfo"
version = "1.10.0"
version = "1.11.0"
description = "Query metadata from sdists / bdists / installed packages."
optional = false
python-versions = ">=3.6"
python-versions = ">=3.8"
files = [
{file = "pkginfo-1.10.0-py3-none-any.whl", hash = "sha256:889a6da2ed7ffc58ab5b900d888ddce90bce912f2d2de1dc1c26f4cb9fe65097"},
{file = "pkginfo-1.10.0.tar.gz", hash = "sha256:5df73835398d10db79f8eecd5cd86b1f6d29317589ea70796994d49399af6297"},
{file = "pkginfo-1.11.0-py3-none-any.whl", hash = "sha256:6d4998d1cd42c297af72cc0eab5f5bab1d356fb8a55b828fa914173f8bc1ba05"},
{file = "pkginfo-1.11.0.tar.gz", hash = "sha256:dba885aa82e31e80d615119874384923f4e011c2a39b0c4b7104359e36cb7087"},
]
[package.extras]
@ -1286,6 +1375,24 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
[[package]]
name = "pytest-asyncio"
version = "0.23.7"
description = "Pytest support for asyncio"
optional = false
python-versions = ">=3.8"
files = [
{file = "pytest_asyncio-0.23.7-py3-none-any.whl", hash = "sha256:009b48127fbe44518a547bddd25611551b0e43ccdbf1e67d12479f569832c20b"},
{file = "pytest_asyncio-0.23.7.tar.gz", hash = "sha256:5f5c72948f4c49e7db4f29f2521d4031f1c27f86e57b046126654083d4770268"},
]
[package.dependencies]
pytest = ">=7.0.0,<9"
[package.extras]
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"]
[[package]]
name = "pytest-cov"
version = "5.0.0"
@ -1467,13 +1574,13 @@ sphinx = ">=1.3.1"
[[package]]
name = "requests"
version = "2.32.2"
version = "2.32.3"
description = "Python HTTP for Humans."
optional = false
python-versions = ">=3.8"
files = [
{file = "requests-2.32.2-py3-none-any.whl", hash = "sha256:fc06670dd0ed212426dfeb94fc1b983d917c4f9847c863f313c9dfaaffb7c23c"},
{file = "requests-2.32.2.tar.gz", hash = "sha256:dd951ff5ecf3e3b3aa26b40703ba77495dab41da839ae72ef3c8e5d8e2433289"},
{file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"},
{file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"},
]
[package.dependencies]
@ -1574,6 +1681,17 @@ files = [
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
]
[[package]]
name = "sniffio"
version = "1.3.1"
description = "Sniff out which async library your code is running under"
optional = false
python-versions = ">=3.7"
files = [
{file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"},
{file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"},
]
[[package]]
name = "snowballstemmer"
version = "2.2.0"
@ -1622,13 +1740,13 @@ test = ["cython", "filelock", "html5lib", "pytest (>=4.6)"]
[[package]]
name = "sphinx-autoapi"
version = "3.1.0"
version = "3.1.1"
description = "Sphinx API documentation generator"
optional = false
python-versions = ">=3.8"
files = [
{file = "sphinx_autoapi-3.1.0-py2.py3-none-any.whl", hash = "sha256:b102ded12ff5397ff6f9536065644c0a01a203b1d53dac07419c267fd771367f"},
{file = "sphinx_autoapi-3.1.0.tar.gz", hash = "sha256:c5455191c36af19e0de73dd52e15feb04a37ca4439fa5e8d77f1941768c15d32"},
{file = "sphinx_autoapi-3.1.1-py2.py3-none-any.whl", hash = "sha256:ff202754c38e119fe60b9336fb3cdbd0b78f8623753fa9151aed42a7052506b3"},
{file = "sphinx_autoapi-3.1.1.tar.gz", hash = "sha256:b5f6e3c61cd86c0cdb7ee77a9d580c0fd116726c5b29cdcb1f1d5f30a5bca1bd"},
]
[package.dependencies]
@ -1852,13 +1970,13 @@ urllib3 = ">=1.26.0"
[[package]]
name = "typing-extensions"
version = "4.11.0"
version = "4.12.0"
description = "Backported and Experimental Type Hints for Python 3.8+"
optional = false
python-versions = ">=3.8"
files = [
{file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"},
{file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"},
{file = "typing_extensions-4.12.0-py3-none-any.whl", hash = "sha256:b349c66bea9016ac22978d800cfff206d5f9816951f12a7d0ec5578b0a819594"},
{file = "typing_extensions-4.12.0.tar.gz", hash = "sha256:8cbcdc8606ebcb0d95453ad7dc5065e6237b6aa230a31e81d0f440c30fed5fd8"},
]
[[package]]
@ -1925,20 +2043,20 @@ test = ["pytest (>=6.0.0)", "setuptools (>=65)"]
[[package]]
name = "zipp"
version = "3.18.2"
version = "3.19.1"
description = "Backport of pathlib-compatible object wrapper for zip files"
optional = false
python-versions = ">=3.8"
files = [
{file = "zipp-3.18.2-py3-none-any.whl", hash = "sha256:dce197b859eb796242b0622af1b8beb0a722d52aa2f57133ead08edd5bf5374e"},
{file = "zipp-3.18.2.tar.gz", hash = "sha256:6278d9ddbcfb1f1089a88fde84481528b07b0e10474e09dcfe53dad4069fa059"},
{file = "zipp-3.19.1-py3-none-any.whl", hash = "sha256:2828e64edb5386ea6a52e7ba7cdb17bb30a73a858f5eb6eb93d8d36f5ea26091"},
{file = "zipp-3.19.1.tar.gz", hash = "sha256:35427f6d5594f4acf82d25541438348c26736fa9b3afa2754bcd63cdb99d8e8f"},
]
[package.extras]
docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
testing = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.8,<4.0"
content-hash = "ec75648f8f24c61cf48f630e25deedf5b2a94b0774b63c2ee1682d6208e1a538"
content-hash = "d595876f098ba3ac3601772c153cc547ec1f7029d52d1f87ebc77968349109ab"

11
pyproject.toml

@ -33,7 +33,9 @@ python = ">=3.8,<4.0"
requests = ">=2.20.0"
requests-toolbelt = ">=0.6.0"
deprecation = ">=2.1.0"
jwcrypto = "^1.5.4"
jwcrypto = ">=1.5.4"
httpx = ">=0.23.2"
async-property = ">=0.2.2"
[tool.poetry.group.docs.dependencies]
alabaster = ">=0.7.0"
@ -44,11 +46,13 @@ sphinx-rtd-theme = ">=1.0.0"
readthedocs-sphinx-ext = ">=2.1.9"
m2r2 = ">=0.3.2"
sphinx-autoapi = ">=3.0.0"
setuptools = ">=70.0.0"
[tool.poetry.group.dev.dependencies]
tox = ">=4.0.0"
pytest = ">=7.1.2"
pytest-cov = ">=3.0.0"
pytest-asyncio = ">=0.23.7"
wheel = ">=0.38.4"
pre-commit = ">=3.5.0"
isort = ">=5.10.1"
@ -61,6 +65,11 @@ codespell = ">=2.1.0"
darglint = ">=1.8.1"
twine = ">=4.0.2"
freezegun = ">=1.2.2"
docutils = "<0.21"
[[tool.poetry.source]]
name = "PyPI"
priority = "primary"
[build-system]
requires = ["poetry-core>=1.0.0"]

108
src/keycloak/connection.py

@ -28,6 +28,7 @@ try:
except ImportError: # pragma: no cover
from urlparse import urljoin
import httpx
import requests
from requests.adapters import HTTPAdapter
@ -86,6 +87,15 @@ class ConnectionManager(object):
if proxies:
self._s.proxies.update(proxies)
self.async_s = httpx.AsyncClient(verify=verify, proxies=proxies)
self.async_s.auth = None # don't let requests add auth headers
self.async_s.transport = httpx.AsyncHTTPTransport(retries=1)
async def aclose(self):
"""Close the async connection on delete."""
if hasattr(self, "_s"):
await self.async_s.aclose()
def __del__(self):
"""Del method."""
if hasattr(self, "_s"):
@ -271,7 +281,7 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return self._s.delete(
r = self._s.delete(
urljoin(self.base_url, path),
params=kwargs,
data=data or dict(),
@ -279,5 +289,101 @@ class ConnectionManager(object):
timeout=self.timeout,
verify=self.verify,
)
return r
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
async def a_raw_get(self, path, **kwargs):
"""Submit get request to the path.
:param path: Path for request.
:type path: str
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return await self.async_s.get(
urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
async def a_raw_post(self, path, data, **kwargs):
"""Submit post request to the path.
:param path: Path for request.
:type path: str
:param data: Payload for request.
:type data: dict
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return await self.async_s.request(
method="POST",
url=urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
async def a_raw_put(self, path, data, **kwargs):
"""Submit put request to the path.
:param path: Path for request.
:type path: str
:param data: Payload for request.
:type data: dict
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return await self.async_s.put(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
async def a_raw_delete(self, path, data=None, **kwargs):
"""Submit delete request to the path.
:param path: Path for request.
:type path: str
:param data: Payload for request.
:type data: dict | None
:param kwargs: Additional arguments
:type kwargs: dict
:returns: Response the request.
:rtype: Response
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return await self.async_s.request(
method="DELETE",
url=urljoin(self.base_url, path),
data=data or dict(),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
)
except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e)

4139
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

726
src/keycloak/keycloak_openid.py

@ -315,7 +315,14 @@ class KeycloakOpenID:
payload["totp"] = totp
payload = self._add_secret_key(payload)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
@ -342,7 +349,14 @@ class KeycloakOpenID:
"refresh_token": refresh_token,
}
payload = self._add_secret_key(payload)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def exchange_token(
@ -394,7 +408,14 @@ class KeycloakOpenID:
"scope": scope,
}
payload = self._add_secret_key(payload)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def userinfo(self, token):
@ -410,9 +431,15 @@ class KeycloakOpenID:
:returns: Userinfo object
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakGetError)
def logout(self, refresh_token):
@ -473,9 +500,15 @@ class KeycloakOpenID:
:returns: Entitlements
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
if data_raw.status_code == 404 or data_raw.status_code == 405:
return raise_error_from_response(data_raw, KeycloakDeprecationError)
@ -504,16 +537,26 @@ class KeycloakOpenID:
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "token": token}
bearer_changed = False
orig_bearer = None
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
bearer_changed = True
else:
raise KeycloakRPTNotFound("Can't found RPT.")
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
if bearer_changed:
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def decode_token(self, token, validate: bool = True, **kwargs):
@ -609,7 +652,7 @@ class KeycloakOpenID:
return list(set(policies))
def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""Get permission by user token.
"""Get permission by user token .
:param token: user token
:type token: str
@ -624,7 +667,7 @@ class KeycloakOpenID:
"""
if not self.authorization.policies:
raise KeycloakAuthorizationConfigError(
"Keycloak settings not found. Load Authorization Keycloak settings."
"Keycloak settings not found. Load Authorization Keycloak settings ."
)
token_info = self._token_info(token, method_token_info, **kwargs)
@ -671,8 +714,21 @@ class KeycloakOpenID:
"audience": self.client_id,
}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def has_uma_access(self, token, permissions):
@ -728,11 +784,23 @@ class KeycloakOpenID:
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
orig_content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/json")
data_raw = self.connection.raw_post(
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
(
self.connection.add_param_headers("Content-Type", orig_content_type)
if orig_content_type is not None
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def device(self):
@ -776,7 +844,9 @@ class KeycloakOpenID:
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
orig_content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
@ -786,4 +856,656 @@ class KeycloakOpenID:
data_raw = self.connection.raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
(
self.connection.add_param_headers("Content-Type", orig_content_type)
if orig_content_type is not None
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPutError)
async def a_well_known(self):
"""Get the well_known object asynchronously.
The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
data_raw = await self.connection.a_raw_get(URL_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_auth_url(self, redirect_uri, scope="email", state=""):
"""Get authorization URL endpoint asynchronously.
:param redirect_uri: Redirect url to receive oauth code
:type redirect_uri: str
:param scope: Scope of authorization request, split with the blank space
:type scope: str
:param state: State will be returned to the redirect_uri
:type state: str
:returns: Authorization URL Full Build
:rtype: str
"""
params_path = {
"authorization-endpoint": (await self.a_well_known())["authorization_endpoint"],
"client-id": self.client_id,
"redirect-uri": redirect_uri,
"scope": scope,
"state": state,
}
return URL_AUTH.format(**params_path)
async def a_token(
self,
username="",
password="",
grant_type=["password"],
code="",
redirect_uri="",
totp=None,
scope="openid",
**extra
):
"""Retrieve user token asynchronously.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
what flow is used. The token endpoint is also used to obtain new access tokens
when they expire.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param username: Username
:type username: str
:param password: Password
:type password: str
:param grant_type: Grant type
:type grant_type: str
:param code: Code
:type code: str
:param redirect_uri: Redirect URI
:type redirect_uri: str
:param totp: Time-based one-time password
:type totp: int
:param scope: Scope, defaults to openid
:type scope: str
:param extra: Additional extra arguments
:type extra: dict
:returns: Keycloak token
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {
"username": username,
"password": password,
"client_id": self.client_id,
"grant_type": grant_type,
"code": code,
"redirect_uri": redirect_uri,
"scope": scope,
}
if extra:
payload.update(extra)
if totp:
payload["totp"] = totp
payload = self._add_secret_key(payload)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_refresh_token(self, refresh_token, grant_type=["refresh_token"]):
"""Refresh the user token asynchronously.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
what flow is used. The token endpoint is also used to obtain new access tokens
when they expire.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param refresh_token: Refresh token from Keycloak
:type refresh_token: str
:param grant_type: Grant type
:type grant_type: str
:returns: New token
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {
"client_id": self.client_id,
"grant_type": grant_type,
"refresh_token": refresh_token,
}
payload = self._add_secret_key(payload)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_exchange_token(
self,
token: str,
audience: Optional[str] = None,
subject: Optional[str] = None,
subject_token_type: Optional[str] = None,
subject_issuer: Optional[str] = None,
requested_issuer: Optional[str] = None,
requested_token_type: str = "urn:ietf:params:oauth:token-type:refresh_token",
scope: str = "openid",
) -> dict:
"""Exchange user token asynchronously.
Use a token to obtain an entirely different token. See
https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange
:param token: Access token
:type token: str
:param audience: Audience
:type audience: str
:param subject: Subject
:type subject: str
:param subject_token_type: Token Type specification
:type subject_token_type: Optional[str]
:param subject_issuer: Issuer
:type subject_issuer: Optional[str]
:param requested_issuer: Issuer
:type requested_issuer: Optional[str]
:param requested_token_type: Token type specification
:type requested_token_type: str
:param scope: Scope, defaults to openid
:type scope: str
:returns: Exchanged token
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {
"grant_type": ["urn:ietf:params:oauth:grant-type:token-exchange"],
"client_id": self.client_id,
"subject_token": token,
"subject_token_type": subject_token_type,
"subject_issuer": subject_issuer,
"requested_token_type": requested_token_type,
"audience": audience,
"requested_subject": subject,
"requested_issuer": requested_issuer,
"scope": scope,
}
payload = self._add_secret_key(payload)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_userinfo(self, token):
"""Get the user info object asynchronously.
The userinfo endpoint returns standard claims about the authenticated user,
and is protected by a bearer token.
http://openid.net/specs/openid-connect-core-1_0.html#UserInfo
:param token: Access token
:type token: str
:returns: Userinfo object
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name}
data_raw = await self.connection.a_raw_get(URL_USERINFO.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_logout(self, refresh_token):
"""Log out the authenticated user asynchronously.
:param refresh_token: Refresh token from Keycloak
:type refresh_token: str
:returns: Keycloak server response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "refresh_token": refresh_token}
payload = self._add_secret_key(payload)
data_raw = await self.connection.a_raw_post(URL_LOGOUT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
async def a_certs(self):
"""Get certificates asynchronously.
The certificate endpoint returns the public keys enabled by the realm, encoded as a
JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled
for verifying tokens.
https://tools.ietf.org/html/rfc7517
:returns: Certificates
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
data_raw = await self.connection.a_raw_get(URL_CERTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_public_key(self):
"""Retrieve the public key asynchronously.
The public key is exposed by the realm page directly.
:returns: The public key
:rtype: str
"""
params_path = {"realm-name": self.realm_name}
data_raw = await self.connection.a_raw_get(URL_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
async def a_entitlement(self, token, resource_server_id):
"""Get entitlements from the token asynchronously.
Client applications can use a specific endpoint to obtain a special security token
called a requesting party token (RPT). This token consists of all the entitlements
(or permissions) for a user as a result of the evaluation of the permissions and
authorization policies associated with the resources being requested. With an RPT,
client applications can gain access to protected resources at the resource server.
:param token: Access token
:type token: str
:param resource_server_id: Resource server ID
:type resource_server_id: str
:returns: Entitlements
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = await self.connection.a_raw_get(URL_ENTITLEMENT.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
if data_raw.status_code == 404 or data_raw.status_code == 405:
return raise_error_from_response(data_raw, KeycloakDeprecationError)
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover
async def a_introspect(self, token, rpt=None, token_type_hint=None):
"""Introspect the user token asynchronously.
The introspection endpoint is used to retrieve the active state of a token.
It is can only be invoked by confidential clients.
https://tools.ietf.org/html/rfc7662
:param token: Access token
:type token: str
:param rpt: Requesting party token
:type rpt: str
:param token_type_hint: Token type hint
:type token_type_hint: str
:returns: Token info
:rtype: dict
:raises KeycloakRPTNotFound: In case of RPT not specified
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "token": token}
orig_bearer = None
bearer_changed = False
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
bearer_changed = True
else:
raise KeycloakRPTNotFound("Can't found RPT.")
payload = self._add_secret_key(payload)
data_raw = await self.connection.a_raw_post(
URL_INTROSPECT.format(**params_path), data=payload
)
if bearer_changed:
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_decode_token(self, token, validate: bool = True, **kwargs):
"""Decode user token asynchronously.
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
also defines a JWK Set JSON data structure that represents a set of
JWKs. Cryptographic algorithms and identifiers for use with this
specification are described in the separate JSON Web Algorithms (JWA)
specification and IANA registries established by that specification.
https://tools.ietf.org/html/rfc7517
:param token: Keycloak token
:type token: str
:param validate: Determines whether the token should be validated with the public key.
Defaults to True.
:type validate: bool
:param kwargs: Additional keyword arguments for jwcrypto's JWT object
:type kwargs: dict
:returns: Decoded token
:rtype: dict
"""
if validate:
if "key" not in kwargs:
key = (
"-----BEGIN PUBLIC KEY-----\n"
+ self.public_key()
+ "\n-----END PUBLIC KEY-----"
)
key = jwk.JWK.from_pem(key.encode("utf-8"))
kwargs["key"] = key
full_jwt = jwt.JWT(jwt=token, **kwargs)
return jwt.json_decode(full_jwt.claims)
else:
full_jwt = jwt.JWT(jwt=token, **kwargs)
full_jwt.token.objects["valid"] = True
return json.loads(full_jwt.token.payload.decode("utf-8"))
async def a_load_authorization_config(self, path):
"""Load Keycloak settings (authorization) asynchronously.
:param path: settings file (json)
:type path: str
"""
with open(path, "r") as fp:
authorization_json = json.load(fp)
self.authorization.load_config(authorization_json)
async def a_get_policies(self, token, method_token_info="introspect", **kwargs):
"""Get policies by user token asynchronously.
:param token: User token
:type token: str
:param method_token_info: Method for token info decoding
:type method_token_info: str
:param kwargs: Additional keyword arguments
:type kwargs: dict
:return: Policies
:rtype: dict
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration
:raises KeycloakInvalidTokenError: In case of bad token
"""
if not self.authorization.policies:
raise KeycloakAuthorizationConfigError(
"Keycloak settings not found. Load Authorization Keycloak settings."
)
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
policies = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
policies.append(policy)
return list(set(policies))
async def a_get_permissions(self, token, method_token_info="introspect", **kwargs):
"""Get permission by user token asynchronously.
:param token: user token
:type token: str
:param method_token_info: Decode token method
:type method_token_info: str
:param kwargs: parameters for decode
:type kwargs: dict
:returns: permissions list
:rtype: list
:raises KeycloakAuthorizationConfigError: In case of bad authorization configuration
:raises KeycloakInvalidTokenError: In case of bad token
"""
if not self.authorization.policies:
raise KeycloakAuthorizationConfigError(
"Keycloak settings not found. Load Authorization Keycloak settings."
)
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
permissions = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
permissions += policy.permissions
return list(set(permissions))
async def a_uma_permissions(self, token, permissions=""):
"""Get UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param token: user token
:type token: str
:param permissions: list of uma permissions list(resource:scope) requested by the user
:type permissions: str
:returns: Keycloak server response
:rtype: dict
"""
permission = build_permission_param(permissions)
params_path = {"realm-name": self.realm_name}
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": permission,
"response_mode": "permissions",
"audience": self.client_id,
}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
(
self.connection.add_param_headers("Content-Type", content_type)
if content_type
else self.connection.del_param_headers("Content-Type")
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_has_uma_access(self, token, permissions):
"""Determine whether user has uma permissions with specified user token asynchronously.
:param token: user token
:type token: str
:param permissions: list of uma permissions (resource:scope)
:type permissions: str
:return: Authentication status
:rtype: AuthStatus
:raises KeycloakAuthenticationError: In case of failed authentication
:raises KeycloakPostError: In case of failed request to Keycloak
"""
needed = build_permission_param(permissions)
try:
granted = await self.a_uma_permissions(token, permissions)
except (KeycloakPostError, KeycloakAuthenticationError) as e:
if e.response_code == 403: # pragma: no cover
return AuthStatus(
is_logged_in=True, is_authorized=False, missing_permissions=needed
)
elif e.response_code == 401:
return AuthStatus(
is_logged_in=False, is_authorized=False, missing_permissions=needed
)
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes: # pragma: no cover
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
)
async def a_register_client(self, token: str, payload: dict):
"""Create a client asynchronously.
ClientRepresentation:
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation
:param token: Initial access token
:type token: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
orig_content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/json")
data_raw = await self.connection.a_raw_post(
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
(
self.connection.add_param_headers("Content-Type", orig_content_type)
if orig_content_type is not None
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_device(self):
"""Get device authorization grant asynchronously.
The device endpoint is used to obtain a user code verification and user authentication.
The response contains a device_code, user_code, verification_uri,
verification_uri_complete, expires_in (lifetime in seconds for device_code
and user_code), and polling interval.
Users can either follow the verification_uri and enter the user_code or
follow the verification_uri_complete.
After authenticating with valid credentials, users can obtain tokens using the
"urn:ietf:params:oauth:grant-type:device_code" grant_type and the device_code.
https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow
https://github.com/keycloak/keycloak-community/blob/main/design/oauth2-device-authorization-grant.md#how-to-try-it
:returns: Device Authorization Response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id}
payload = self._add_secret_key(payload)
data_raw = await self.connection.a_raw_post(URL_DEVICE.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_update_client(self, token: str, client_id: str, payload: dict):
"""Update a client asynchronously.
ClientRepresentation:
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation
:param token: registration access token
:type token: str
:param client_id: Keycloak client id
:type client_id: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
orig_content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
if "clientId" not in payload:
payload["clientId"] = client_id
data_raw = await self.connection.a_raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
(
self.connection.add_param_headers("Content-Type", orig_content_type)
if orig_content_type is not None
else self.connection.del_param_headers("Content-Type")
)
return raise_error_from_response(data_raw, KeycloakPutError)

374
src/keycloak/keycloak_uma.py

@ -30,6 +30,8 @@ import json
from typing import Iterable
from urllib.parse import quote_plus
from async_property import async_property
from .connection import ConnectionManager
from .exceptions import (
KeycloakDeleteError,
@ -56,9 +58,6 @@ class KeycloakUMA:
:type connection: KeycloakOpenIDConnection
"""
self.connection = connection
custom_headers = self.connection.custom_headers or {}
custom_headers.update({"Content-Type": "application/json"})
self.connection.custom_headers = custom_headers
self._well_known = None
def _fetch_well_known(self):
@ -84,6 +83,24 @@ class KeycloakUMA:
"""
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@staticmethod
async def a_format_url(url, **kwargs):
"""Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example,
`format_url("https://myserver/{my_resource}/{id}", my_resource="hello world", id="myid")`
would produce `https://myserver/hello+world/myid`.
:param url: url string to format
:type url: str
:param kwargs: dict containing kwargs to substitute
:type kwargs: dict
:return: formatted string
:rtype: str
"""
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@property
def uma_well_known(self):
"""Get the well_known UMA2 config.
@ -96,6 +113,17 @@ class KeycloakUMA:
self._well_known = self._fetch_well_known()
return self._well_known
@async_property
async def a_uma_well_known(self):
"""Get the well_known UMA2 config async.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
if not self._well_known:
self._well_known = await self.a__fetch_well_known()
return self._well_known
def resource_set_create(self, payload):
"""Create a resource set.
@ -415,3 +443,343 @@ class KeycloakUMA:
data_raw = self.connection.raw_get(self.uma_well_known["policy_endpoint"], **query)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a__fetch_well_known(self):
"""Get the well_known UMA2 config async.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = await self.connection.a_raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_resource_set_create(self, payload):
"""Create a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:param payload: ResourceRepresentation
:type payload: dict
:return: ResourceRepresentation with the _id property assigned
:rtype: dict
"""
data_raw = await self.connection.a_raw_post(
(await self.a_uma_well_known)["resource_registration_endpoint"],
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
async def a_resource_set_update(self, resource_id, payload):
"""Update a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:param resource_id: id of the resource
:type resource_id: str
:param payload: ResourceRepresentation
:type payload: dict
:return: Response dict (empty)
:rtype: dict
"""
url = self.format_url(
(await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = await self.connection.a_raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
async def a_resource_set_read(self, resource_id):
"""Read a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:param resource_id: id of the resource
:type resource_id: str
:return: ResourceRepresentation
:rtype: dict
"""
url = self.format_url(
(await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = await self.connection.a_raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
async def a_resource_set_delete(self, resource_id):
"""Delete a resource set asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
:param resource_id: id of the resource
:type resource_id: str
:return: Response dict (empty)
:rtype: dict
"""
url = self.format_url(
(await self.a_uma_well_known)["resource_registration_endpoint"] + "/{id}",
id=resource_id,
)
data_raw = await self.connection.a_raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
async def a_resource_set_list_ids(
self,
name: str = "",
exact_name: bool = False,
uri: str = "",
owner: str = "",
resource_type: str = "",
scope: str = "",
first: int = 0,
maximum: int = -1,
):
"""Query for list of resource set ids asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
:param name: query resource name
:type name: str
:param exact_name: query exact match for resource name
:type exact_name: bool
:param uri: query resource uri
:type uri: str
:param owner: query resource owner
:type owner: str
:param resource_type: query resource type
:type resource_type: str
:param scope: query resource scope
:type scope: str
:param first: index of first matching resource to return
:type first: int
:param maximum: maximum number of resources to return (-1 for all)
:type maximum: int
:return: List of ids
:rtype: List[str]
"""
query = dict()
if name:
query["name"] = name
if exact_name:
query["exactName"] = "true"
if uri:
query["uri"] = uri
if owner:
query["owner"] = owner
if resource_type:
query["type"] = resource_type
if scope:
query["scope"] = scope
if first > 0:
query["first"] = first
if maximum >= 0:
query["max"] = maximum
data_raw = await self.connection.a_raw_get(
(await self.a_uma_well_known)["resource_registration_endpoint"], **query
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
async def a_resource_set_list(self):
"""List all resource sets asynchronously.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
ResourceRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_resourcerepresentation
:yields: Iterator over a list of ResourceRepresentations
:rtype: Iterator[dict]
"""
for resource_id in await self.a_resource_set_list_ids():
resource = await self.a_resource_set_read(resource_id)
yield resource
async def a_permission_ticket_create(self, permissions: Iterable[UMAPermission]):
"""Create a permission ticket asynchronously.
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
:returns: Keycloak decision
:rtype: boolean
:raises KeycloakPostError: In case permission resource not found
"""
resources = dict()
for permission in permissions:
resource_id = getattr(permission, "resource_id", None)
if resource_id is None:
resource_ids = await self.a_resource_set_list_ids(
exact_name=True, name=permission.resource, first=0, maximum=1
)
if not resource_ids:
raise KeycloakPostError("Invalid resource specified")
setattr(permission, "resource_id", resource_ids[0])
resources.setdefault(resource_id, set())
if permission.scope:
resources[resource_id].add(permission.scope)
payload = [
{"resource_id": resource_id, "resource_scopes": list(scopes)}
for resource_id, scopes in resources.items()
]
data_raw = await self.connection.a_raw_post(
(await self.a_uma_well_known)["permission_endpoint"], data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_permissions_check(self, token, permissions: Iterable[UMAPermission]):
"""Check UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to check UMA permissions from Keycloak. It can only be
invoked by confidential clients.
https://www.keycloak.org/docs/latest/authorization_services/#_service_authorization_api
:param token: user token
:type token: str
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
:returns: Keycloak decision
:rtype: boolean
"""
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": ",".join(str(permission) for permission in permissions),
"response_mode": "decision",
"audience": self.connection.client_id,
}
# Everyone always has the null set of permissions
# However keycloak cannot evaluate the null set
if len(payload["permission"]) == 0:
return True
connection = ConnectionManager(self.connection.base_url)
connection.add_param_headers("Authorization", "Bearer " + token)
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = await connection.a_raw_post(
(await self.a_uma_well_known)["token_endpoint"], data=payload
)
try:
data = raise_error_from_response(data_raw, KeycloakPostError)
except KeycloakPostError:
return False
return data.get("result", False)
async def a_policy_resource_create(self, resource_id, payload):
"""Create permission policy for resource asynchronously.
Supports name, description, scopes, roles, groups, clients
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
:param resource_id: _id of resource
:type resource_id: str
:param payload: permission configuration
:type payload: dict
:return: PermissionRepresentation
:rtype: dict
"""
data_raw = await self.connection.a_raw_post(
(await self.a_uma_well_known)["policy_endpoint"] + f"/{resource_id}",
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_policy_update(self, policy_id, payload):
"""Update permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#associating-a-permission-with-a-resource
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
:param policy_id: id of policy permission
:type policy_id: str
:param payload: policy permission configuration
:type payload: dict
:return: PermissionRepresentation
:rtype: dict
"""
data_raw = await self.connection.a_raw_put(
(await self.a_uma_well_known)["policy_endpoint"] + f"/{policy_id}",
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPutError)
async def a_policy_delete(self, policy_id):
"""Delete permission policy asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#removing-a-permission
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_policyrepresentation
:param policy_id: id of permission policy
:type policy_id: str
:return: PermissionRepresentation
:rtype: dict
"""
data_raw = await self.connection.a_raw_delete(
(await self.a_uma_well_known)["policy_endpoint"] + f"/{policy_id}"
)
return raise_error_from_response(data_raw, KeycloakDeleteError)
async def a_policy_query(
self,
resource: str = "",
name: str = "",
scope: str = "",
first: int = 0,
maximum: int = -1,
):
"""Query permission policies asynchronously.
https://www.keycloak.org/docs/latest/authorization_services/#querying-permission
:param resource: query resource id
:type resource: str
:param name: query resource name
:type name: str
:param scope: query resource scope
:type scope: str
:param first: index of first matching resource to return
:type first: int
:param maximum: maximum number of resources to return (-1 for all)
:type maximum: int
:return: List of ids
:return: List of ids
:rtype: List[str]
"""
query = dict()
if name:
query["name"] = name
if resource:
query["resource"] = resource
if scope:
query["scope"] = scope
if first > 0:
query["first"] = first
if maximum >= 0:
query["max"] = maximum
data_raw = await self.connection.a_raw_get(
(await self.a_uma_well_known)["policy_endpoint"], **query
)
return raise_error_from_response(data_raw, KeycloakGetError)

130
src/keycloak/openid_connection.py

@ -103,6 +103,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
# token is renewed when it hits 90% of its lifetime. This is to account for any possible
# clock skew.
self.token_lifetime_fraction = 0.9
self.headers = {}
self.server_url = server_url
self.username = username
self.password = password
@ -114,18 +115,8 @@ class KeycloakOpenIDConnection(ConnectionManager):
self.client_secret_key = client_secret_key
self.user_realm_name = user_realm_name
self.timeout = timeout
self.headers = {}
self.custom_headers = custom_headers
if self.token is None:
self.get_token()
if self.token is not None:
self.headers = {
**self.headers,
"Authorization": "Bearer " + self.token.get("access_token"),
"Content-Type": "application/json",
}
self.headers = {**self.headers, "Content-Type": "application/json"}
super().__init__(
base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
@ -237,6 +228,8 @@ class KeycloakOpenIDConnection(ConnectionManager):
self._expires_at = datetime.now() + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
)
if value is not None:
self.add_param_headers("Authorization", "Bearer " + value.get("access_token"))
@property
def expires_at(self):
@ -345,8 +338,6 @@ class KeycloakOpenIDConnection(ConnectionManager):
else:
raise
self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
def _refresh_if_required(self):
if datetime.now() >= self.expires_at:
self.refresh_token()
@ -418,3 +409,116 @@ class KeycloakOpenIDConnection(ConnectionManager):
self._refresh_if_required()
r = super().raw_delete(*args, **kwargs)
return r
async def a_get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
grant_type = []
if self.username and self.password:
grant_type.append("password")
elif self.client_secret_key:
grant_type.append("client_credentials")
if grant_type:
self.token = await self.keycloak_openid.a_token(
self.username, self.password, grant_type=grant_type, totp=self.totp
)
else:
self.token = None
async def a_refresh_token(self):
"""Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed.
"""
refresh_token = self.token.get("refresh_token", None) if self.token else None
if refresh_token is None:
await self.a_get_token()
else:
try:
self.token = await self.keycloak_openid.a_refresh_token(refresh_token)
except KeycloakPostError as e:
list_errors = [
b"Refresh token expired",
b"Token is not active",
b"Session not active",
]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
await self.a_get_token()
else:
raise
async def a__refresh_if_required(self):
"""Refresh the token if it is expired."""
if datetime.now() >= self.expires_at:
await self.a_refresh_token()
async def a_raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
await self.a__refresh_if_required()
r = await super().a_raw_get(*args, **kwargs)
return r
async def a_raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
await self.a__refresh_if_required()
r = await super().a_raw_post(*args, **kwargs)
return r
async def a_raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
await self.a__refresh_if_required()
r = await super().a_raw_put(*args, **kwargs)
return r
async def a_raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
await self.a__refresh_if_required()
r = await super().a_raw_delete(*args, **kwargs)
return r

59
tests/test_connection.py

@ -1,5 +1,7 @@
"""Connection test module."""
from inspect import iscoroutinefunction, signature
import pytest
from keycloak.connection import ConnectionManager
@ -9,9 +11,9 @@ from keycloak.exceptions import KeycloakConnectionError
def test_connection_proxy():
"""Test proxies of connection manager."""
cm = ConnectionManager(
base_url="http://test.test", proxies={"http://test.test": "localhost:8080"}
base_url="http://test.test", proxies={"http://test.test": "http://localhost:8080"}
)
assert cm._s.proxies == {"http://test.test": "localhost:8080"}
assert cm._s.proxies == {"http://test.test": "http://localhost:8080"}
def test_headers():
@ -39,3 +41,56 @@ def test_bad_connection():
cm.raw_post(path="bad", data={})
with pytest.raises(KeycloakConnectionError):
cm.raw_put(path="bad", data={})
@pytest.mark.asyncio
async def a_test_bad_connection():
"""Test bad connection."""
cm = ConnectionManager(base_url="http://not.real.domain")
with pytest.raises(KeycloakConnectionError):
await cm.a_raw_get(path="bad")
with pytest.raises(KeycloakConnectionError):
await cm.a_raw_delete(path="bad")
with pytest.raises(KeycloakConnectionError):
await cm.a_raw_post(path="bad", data={})
with pytest.raises(KeycloakConnectionError):
await cm.a_raw_put(path="bad", data={})
def test_counter_part():
"""Test that each function has its async counter part."""
con_methods = [
func for func in dir(ConnectionManager) if callable(getattr(ConnectionManager, func))
]
sync_methods = [
method
for method in con_methods
if not method.startswith("a_") and not method.startswith("_")
]
async_methods = [
method for method in con_methods if iscoroutinefunction(getattr(ConnectionManager, method))
]
for method in sync_methods:
if method in [
"aclose",
"add_param_headers",
"del_param_headers",
"clean_headers",
"exist_param_headers",
"param_headers",
]:
continue
async_method = f"a_{method}"
assert (async_method in con_methods) is True
sync_sign = signature(getattr(ConnectionManager, method))
async_sign = signature(getattr(ConnectionManager, async_method))
assert sync_sign.parameters == async_sign.parameters
for async_method in async_methods:
if async_method in ["aclose"]:
continue
if async_method[2:].startswith("_"):
continue
assert async_method[2:] in sync_methods

3123
tests/test_keycloak_admin.py
File diff suppressed because it is too large
View File

513
tests/test_keycloak_openid.py

@ -1,5 +1,6 @@
"""Test module for KeycloakOpenID."""
from inspect import iscoroutinefunction, signature
from typing import Tuple
from unittest import mock
@ -487,3 +488,515 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"expires_in": 600,
"interval": 5,
}
# async function start
@pytest.mark.asyncio
async def test_a_well_known(oid: KeycloakOpenID):
"""Test the well_known method.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
res = await oid.a_well_known()
assert res is not None
assert res != dict()
for key in [
"acr_values_supported",
"authorization_encryption_alg_values_supported",
"authorization_encryption_enc_values_supported",
"authorization_endpoint",
"authorization_signing_alg_values_supported",
"backchannel_authentication_endpoint",
"backchannel_authentication_request_signing_alg_values_supported",
"backchannel_logout_session_supported",
"backchannel_logout_supported",
"backchannel_token_delivery_modes_supported",
"check_session_iframe",
"claim_types_supported",
"claims_parameter_supported",
"claims_supported",
"code_challenge_methods_supported",
"device_authorization_endpoint",
"end_session_endpoint",
"frontchannel_logout_session_supported",
"frontchannel_logout_supported",
"grant_types_supported",
"id_token_encryption_alg_values_supported",
"id_token_encryption_enc_values_supported",
"id_token_signing_alg_values_supported",
"introspection_endpoint",
"introspection_endpoint_auth_methods_supported",
"introspection_endpoint_auth_signing_alg_values_supported",
"issuer",
"jwks_uri",
"mtls_endpoint_aliases",
"pushed_authorization_request_endpoint",
"registration_endpoint",
"request_object_encryption_alg_values_supported",
"request_object_encryption_enc_values_supported",
"request_object_signing_alg_values_supported",
"request_parameter_supported",
"request_uri_parameter_supported",
"require_pushed_authorization_requests",
"require_request_uri_registration",
"response_modes_supported",
"response_types_supported",
"revocation_endpoint",
"revocation_endpoint_auth_methods_supported",
"revocation_endpoint_auth_signing_alg_values_supported",
"scopes_supported",
"subject_types_supported",
"tls_client_certificate_bound_access_tokens",
"token_endpoint",
"token_endpoint_auth_methods_supported",
"token_endpoint_auth_signing_alg_values_supported",
"userinfo_encryption_alg_values_supported",
"userinfo_encryption_enc_values_supported",
"userinfo_endpoint",
"userinfo_signing_alg_values_supported",
]:
assert key in res
@pytest.mark.asyncio
async def test_a_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method.
:param env: Environment fixture
:type env: KeycloakTestEnv
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
res = await oid.a_auth_url(redirect_uri="http://test.test/*")
assert (
res
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
+ f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code"
+ "&redirect_uri=http://test.test/*&scope=email&state="
)
@pytest.mark.asyncio
async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
assert token == {
"access_token": mock.ANY,
"expires_in": mock.ANY,
"id_token": mock.ANY,
"not-before-policy": 0,
"refresh_expires_in": mock.ANY,
"refresh_token": mock.ANY,
"scope": mock.ANY,
"session_state": mock.ANY,
"token_type": "Bearer",
}
# Test with dummy totp
token = await oid.a_token(username=username, password=password, totp="123456")
assert token == {
"access_token": mock.ANY,
"expires_in": mock.ANY,
"id_token": mock.ANY,
"not-before-policy": 0,
"refresh_expires_in": mock.ANY,
"refresh_token": mock.ANY,
"scope": mock.ANY,
"session_state": mock.ANY,
"token_type": "Bearer",
}
# Test with extra param
token = await oid.a_token(username=username, password=password, extra_param="foo")
assert token == {
"access_token": mock.ANY,
"expires_in": mock.ANY,
"id_token": mock.ANY,
"not-before-policy": 0,
"refresh_expires_in": mock.ANY,
"refresh_token": mock.ANY,
"scope": mock.ANY,
"session_state": mock.ANY,
"token_type": "Bearer",
}
@pytest.mark.asyncio
async def test_a_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test the exchange token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
# Verify existing user
oid, username, password = oid_with_credentials
# Allow impersonation
await admin.a_change_current_realm(oid.realm_name)
await admin.a_assign_client_role(
user_id=await admin.a_get_user_id(username=username),
client_id=await admin.a_get_client_id(client_id="realm-management"),
roles=[
await admin.a_get_client_role(
client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation",
)
],
)
token = await oid.a_token(username=username, password=password)
assert await oid.a_userinfo(token=token["access_token"]) == {
"email": f"{username}@test.test",
"email_verified": True,
"family_name": "last",
"given_name": "first",
"name": "first last",
"preferred_username": username,
"sub": mock.ANY,
}
# Exchange token with the new user
new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username
)
assert await oid.a_userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test",
"email_verified": True,
"family_name": "last",
"given_name": "first",
"name": "first last",
"preferred_username": username,
"sub": mock.ANY,
}
assert token != new_token
@pytest.mark.asyncio
async def test_a_logout(oid_with_credentials):
"""Test logout.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
assert await oid.a_userinfo(token=token["access_token"]) != dict()
assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict()
with pytest.raises(KeycloakAuthenticationError):
await oid.a_userinfo(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_certs(oid: KeycloakOpenID):
"""Test certificates.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
assert len((await oid.a_certs())["keys"]) == 2
@pytest.mark.asyncio
async def test_a_public_key(oid: KeycloakOpenID):
"""Test public key.
:param oid: Keycloak OpenID client
:type oid: KeycloakOpenID
"""
assert await oid.a_public_key() is not None
@pytest.mark.asyncio
async def test_a_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test entitlement.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
resource_server_id = admin.get_client_authz_resources(
client_id=admin.get_client_id(oid.client_id)
)[0]["_id"]
with pytest.raises(KeycloakDeprecationError):
await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id)
@pytest.mark.asyncio
async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
assert (await oid.a_introspect(token=token["access_token"]))["active"]
assert await oid.a_introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
) == {"active": False}
with pytest.raises(KeycloakRPTNotFound):
await oid.a_introspect(
token=token["access_token"], token_type_hint="requesting_party_token"
)
@pytest.mark.asyncio
async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials
token = await oid.a_token(username=username, password=password)
decoded_access_token = await oid.a_decode_token(token=token["access_token"])
decoded_access_token_2 = await oid.a_decode_token(token=token["access_token"], validate=False)
decoded_refresh_token = await oid.a_decode_token(token=token["refresh_token"], validate=False)
assert decoded_access_token == decoded_access_token_2
assert decoded_access_token["preferred_username"] == username, decoded_access_token
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
@pytest.mark.asyncio
async def test_a_load_authorization_config(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
):
"""Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
assert "test-authz-rb-policy" in oid.authorization.policies
assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy)
assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role)
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2
assert isinstance(
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission
)
@pytest.mark.asyncio
async def test_a_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test has UMA access.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
assert (
str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
assert (
str(
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")
)
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
with pytest.raises(KeycloakPostError):
await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist")
await oid.a_logout(refresh_token=token["refresh_token"])
assert (
str(await oid.a_has_uma_access(token=token["access_token"], permissions=""))
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
)
assert (
str(
await oid.a_has_uma_access(
token=admin.connection.token["access_token"], permissions="Default Resource"
)
)
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})"
)
@pytest.mark.asyncio
async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
with pytest.raises(KeycloakAuthorizationConfigError):
await oid.a_get_policies(token=token["access_token"])
await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
assert await oid.a_get_policies(token=token["access_token"]) is None
orig_client_id = oid.client_id
oid.client_id = "account"
assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
] == ["Policy: test (role)"]
assert [
repr(x)
for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode")
] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id
await oid.a_logout(refresh_token=token["refresh_token"])
with pytest.raises(KeycloakInvalidTokenError):
await oid.a_get_policies(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
with pytest.raises(KeycloakAuthorizationConfigError):
await oid.a_get_permissions(token=token["access_token"])
await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
assert await oid.a_get_permissions(token=token["access_token"]) is None
orig_client_id = oid.client_id
oid.client_id = "account"
assert (
await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == []
)
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
policy.add_permission(
permission=Permission(
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS"
)
)
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
)
] == ["Permission: test-perm (resource)"]
assert [
repr(x)
for x in await oid.a_get_permissions(
token=token["access_token"], method_token_info="decode"
)
] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id
await oid.a_logout(refresh_token=token["refresh_token"])
with pytest.raises(KeycloakInvalidTokenError):
await oid.a_get_permissions(token=token["access_token"])
@pytest.mark.asyncio
async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
token = await oid.a_token(username=username, password=password)
assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1
assert (await oid.a_uma_permissions(token=token["access_token"]))[0][
"rsname"
] == "Default Resource"
@pytest.mark.asyncio
async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]):
"""Test device authorization flow.
:param oid_with_credentials_device: Keycloak OpenID client with pre-configured user
credentials and device authorization flow enabled
:type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]
"""
oid, _, _ = oid_with_credentials_device
res = await oid.a_device()
assert res == {
"device_code": mock.ANY,
"user_code": mock.ANY,
"verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device",
"verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/"
+ f"device?user_code={res['user_code']}",
"expires_in": 600,
"interval": 5,
}
def test_counter_part():
"""Test that each function has its async counter part."""
openid_methods = [
func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func))
]
sync_methods = [
method
for method in openid_methods
if not method.startswith("a_") and not method.startswith("_")
]
async_methods = [
method for method in openid_methods if iscoroutinefunction(getattr(KeycloakOpenID, method))
]
for method in sync_methods:
async_method = f"a_{method}"
assert (async_method in openid_methods) is True
sync_sign = signature(getattr(KeycloakOpenID, method))
async_sign = signature(getattr(KeycloakOpenID, async_method))
assert sync_sign.parameters == async_sign.parameters
for async_method in async_methods:
if async_method[2:].startswith("_"):
continue
assert async_method[2:] in sync_methods

316
tests/test_keycloak_uma.py

@ -1,6 +1,7 @@
"""Test module for KeycloakUMA."""
import re
from inspect import iscoroutinefunction, signature
import pytest
@ -310,3 +311,318 @@ def test_uma_permission_ticket(uma: KeycloakUMA):
uma.permission_ticket_create(permissions)
uma.resource_set_delete(resource["_id"])
# async function start
@pytest.mark.asyncio
async def test_a_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
res = uma.uma_well_known
assert res is not None
assert res != dict()
for key in ["resource_registration_endpoint"]:
assert key in res
@pytest.mark.asyncio
async def test_a_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
# Check that only the default resource is present
resource_sets = uma.resource_set_list()
resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
# Test query for resource sets
resource_set_list_ids = await uma.a_resource_set_list_ids()
assert len(resource_set_list_ids) == 1
resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default")
assert resource_set_list_ids2 == resource_set_list_ids
resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource")
assert resource_set_list_ids2 == resource_set_list_ids
resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True)
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(first=1)
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0)
assert len(resource_set_list_ids) == 0
# Test create resource set
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
}
created_resource = await uma.a_resource_set_create(resource_to_create)
assert created_resource
assert created_resource["_id"], created_resource
assert set(resource_to_create).issubset(set(created_resource)), created_resource
# Test create the same resource set
with pytest.raises(KeycloakPostError) as err:
await uma.a_resource_set_create(resource_to_create)
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
)
# Test get resource set
latest_resource = await uma.a_resource_set_read(created_resource["_id"])
assert latest_resource["name"] == created_resource["name"]
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
updated_resource = await uma.a_resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
# Test delete resource set
res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
with pytest.raises(KeycloakGetError) as err:
await uma.a_resource_set_read(created_resource["_id"])
err.match("404: b''")
# Test delete fail
with pytest.raises(KeycloakDeleteError) as err:
await uma.a_resource_set_delete(resource_id=created_resource["_id"])
assert err.match("404: b''")
@pytest.mark.asyncio
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
# Create some required test data
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
created_resource = await uma.a_resource_set_create(resource_to_create)
group_id = admin.create_group({"name": "UMAPolicyGroup"})
role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
other_client_id = admin.create_client({"name": "UMAOtherClient"})
client = admin.get_client(other_client_id)
resource_id = created_resource["_id"]
# Create a role policy
policy_to_create = {
"name": "TestPolicyRole",
"description": "Test resource policy description",
"scopes": ["test:read", "test:write"],
"roles": ["roleUMAPolicy"],
}
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
# Create a client policy
policy_to_create = {
"name": "TestPolicyClient",
"description": "Test resource policy description",
"scopes": ["test:read"],
"clients": [client["clientId"]],
}
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
policy_to_create = {
"name": "TestPolicyGroup",
"description": "Test resource policy description",
"scopes": ["test:read"],
"groups": ["/UMAPolicyGroup"],
}
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
policies = await uma.a_policy_query()
assert len(policies) == 3
policies = await uma.a_policy_query(name="TestPolicyGroup")
assert len(policies) == 1
policy_id = policy["id"]
await uma.a_policy_delete(policy_id)
with pytest.raises(KeycloakDeleteError) as err:
await uma.a_policy_delete(policy_id)
assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
)
policies = await uma.a_policy_query()
assert len(policies) == 2
policy = policies[0]
await uma.a_policy_update(policy_id=policy["id"], payload=policy)
policies = await uma.a_policy_query()
assert len(policies) == 2
policies = await uma.a_policy_query(name="Invalid")
assert len(policies) == 0
policies = await uma.a_policy_query(scope="Invalid")
assert len(policies) == 0
policies = await uma.a_policy_query(resource="Invalid")
assert len(policies) == 0
policies = await uma.a_policy_query(first=3)
assert len(policies) == 0
policies = await uma.a_policy_query(maximum=0)
assert len(policies) == 0
policies = await uma.a_policy_query(name=policy["name"])
assert len(policies) == 1
policies = await uma.a_policy_query(scope=policy["scopes"][0])
assert len(policies) == 2
policies = await uma.a_policy_query(resource=resource_id)
assert len(policies) == 2
await uma.a_resource_set_delete(resource_id)
await admin.a_delete_client(other_client_id)
await admin.a_delete_realm_role(role_id)
await admin.a_delete_group(group_id)
@pytest.mark.asyncio
async def test_a_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
resource_to_create = {
"name": "mytest",
"scopes": ["read", "write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
resource = await uma.a_resource_set_create(resource_to_create)
policy_to_create = {
"name": "TestPolicy",
"description": "Test resource policy description",
"scopes": [resource_to_create["scopes"][0]],
"clients": [uma.connection.client_id],
}
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
token = uma.connection.token
permissions = list()
assert await uma.a_permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource=resource_to_create["name"]))
assert await uma.a_permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource="not valid"))
assert not await uma.a_permissions_check(token["access_token"], permissions)
uma.resource_set_delete(resource["_id"])
@pytest.mark.asyncio
async def test_a_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
resource_to_create = {
"name": "mytest",
"scopes": ["read", "write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
resource = await uma.a_resource_set_create(resource_to_create)
policy_to_create = {
"name": "TestPolicy",
"description": "Test resource policy description",
"scopes": [resource_to_create["scopes"][0]],
"clients": [uma.connection.client_id],
}
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
permissions = (
UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
)
response = await uma.a_permission_ticket_create(permissions)
rpt = await uma.connection.keycloak_openid.a_token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
)
assert rpt
assert "access_token" in rpt
permissions = (UMAPermission(resource="invalid"),)
with pytest.raises(KeycloakPostError):
uma.permission_ticket_create(permissions)
await uma.a_resource_set_delete(resource["_id"])
def test_counter_part():
"""Test that each function has its async counter part."""
uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
sync_methods = [
method
for method in uma_methods
if not method.startswith("a_") and not method.startswith("_")
]
async_methods = [
method for method in uma_methods if iscoroutinefunction(getattr(KeycloakUMA, method))
]
for method in sync_methods:
async_method = f"a_{method}"
assert (async_method in uma_methods) is True
sync_sign = signature(getattr(KeycloakUMA, method))
async_sign = signature(getattr(KeycloakUMA, async_method))
assert sync_sign.parameters == async_sign.parameters
for async_method in async_methods:
if async_method[2:].startswith("_"):
continue
assert async_method[2:] in sync_methods

1
tox.env

@ -2,3 +2,4 @@ KEYCLOAK_ADMIN=admin
KEYCLOAK_ADMIN_PASSWORD=admin
KEYCLOAK_HOST={env:KEYCLOAK_HOST:localhost}
KEYCLOAK_PORT=8081
KEYCLOAK_DOCKER_IMAGE_TAG={env:KEYCLOAK_DOCKER_IMAGE_TAG:latest}
Loading…
Cancel
Save