Browse Source

Merge pull request #1 from PilotDataPlatform/async

Updating to use async
pull/585/head
Greg McCoy 2 years ago
committed by GitHub
parent
commit
ce7099465b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CHANGELOG.md
  2. 172
      README.md
  3. 1740
      poetry.lock
  4. 4
      pyproject.toml
  5. 43
      src/keycloak/connection.py
  6. 4
      src/keycloak/exceptions.py
  7. 883
      src/keycloak/keycloak_admin.py
  8. 191
      src/keycloak/keycloak_openid.py
  9. 106
      tests/conftest.py
  10. 11
      tests/test_connection.py
  11. 1146
      tests/test_keycloak_admin.py
  12. 276
      tests/test_keycloak_openid.py

2
CHANGELOG.md

@ -1,3 +1,5 @@
## Unreleased
## v2.9.0 (2023-01-11) ## v2.9.0 (2023-01-11)
### Feat ### Feat

172
README.md

@ -1,6 +1,8 @@
[![CircleCI](https://github.com/marcospereirampj/python-keycloak/actions/workflows/daily.yaml/badge.svg)](https://github.com/marcospereirampj/python-keycloak/)
[![Documentation Status](https://readthedocs.org/projects/python-keycloak/badge/?version=latest)](http://python-keycloak.readthedocs.io/en/latest/?badge=latest) [![Documentation Status](https://readthedocs.org/projects/python-keycloak/badge/?version=latest)](http://python-keycloak.readthedocs.io/en/latest/?badge=latest)
# Async Pilot Keycloak fork
This repo is a fork of https://github.com/marcospereirampj/python-keycloak at version 2.9.0 by Indoc Research to support async.
# Python Keycloak # Python Keycloak
For review- see https://github.com/marcospereirampj/python-keycloak For review- see https://github.com/marcospereirampj/python-keycloak
@ -15,14 +17,14 @@ For review- see https://github.com/marcospereirampj/python-keycloak
### Manually ### Manually
`$ python setup.py install`
`$ pip install .`
## Dependencies ## Dependencies
python-keycloak depends on: python-keycloak depends on:
- Python 3 - Python 3
- [requests](https://requests.readthedocs.io)
- [httpx](https://www.python-httpx.org/)
- [python-jose](http://python-jose.readthedocs.io/en/latest/) - [python-jose](http://python-jose.readthedocs.io/en/latest/)
- [urllib3](https://urllib3.readthedocs.io/en/stable/) - [urllib3](https://urllib3.readthedocs.io/en/stable/)
@ -70,73 +72,76 @@ keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/",
client_secret_key="secret") client_secret_key="secret")
# Get WellKnow # Get WellKnow
config_well_known = keycloak_openid.well_known()
config_well_known = await keycloak_openid.well_known()
# Get Code With Oauth Authorization Request # Get Code With Oauth Authorization Request
auth_url = keycloak_openid.auth_url(
auth_url = await keycloak_openid.auth_url(
redirect_uri="your_call_back_url", redirect_uri="your_call_back_url",
scope="email", scope="email",
state="your_state_info") state="your_state_info")
# Get Access Token With Code # Get Access Token With Code
access_token = keycloak_openid.token(
access_token = await keycloak_openid.token(
grant_type='authorization_code', grant_type='authorization_code',
code='the_code_you_get_from_auth_url_callback', code='the_code_you_get_from_auth_url_callback',
redirect_uri="your_call_back_url") redirect_uri="your_call_back_url")
# Get Token # Get Token
token = keycloak_openid.token("user", "password")
token = keycloak_openid.token("user", "password", totp="012345")
token = await keycloak_openid.token("user", "password")
token = await keycloak_openid.token("user", "password", totp="012345")
# Get token using Token Exchange # Get token using Token Exchange
token = keycloak_openid.exchange_token(token['access_token'], "my_client", "other_client", "some_user")
token = await keycloak_openid.exchange_token(token['access_token'], "my_client", "other_client", "some_user")
# Get Userinfo # Get Userinfo
userinfo = keycloak_openid.userinfo(token['access_token'])
userinfo = await keycloak_openid.userinfo(token['access_token'])
# Refresh token # Refresh token
token = keycloak_openid.refresh_token(token['refresh_token'])
token = await keycloak_openid.refresh_token(token['refresh_token'])
# Logout # Logout
keycloak_openid.logout(token['refresh_token'])
await keycloak_openid.logout(token['refresh_token'])
# Get Certs # Get Certs
certs = keycloak_openid.certs()
certs = await keycloak_openid.certs()
# Get RPT (Entitlement) # Get RPT (Entitlement)
token = keycloak_openid.token("user", "password")
rpt = keycloak_openid.entitlement(token['access_token'], "resource_id")
token = await keycloak_openid.token("user", "password")
rpt = await keycloak_openid.entitlement(token['access_token'], "resource_id")
# Instropect RPT # Instropect RPT
token_rpt_info = keycloak_openid.introspect(keycloak_openid.introspect(token['access_token'], rpt=rpt['rpt'],
token_rpt_info = await keycloak_openid.introspect(keycloak_openid.introspect(token['access_token'], rpt=rpt['rpt'],
token_type_hint="requesting_party_token")) token_type_hint="requesting_party_token"))
# Introspect Token # Introspect Token
token_info = keycloak_openid.introspect(token['access_token'])
token_info = await keycloak_openid.introspect(token['access_token'])
# Decode Token # Decode Token
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----" KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True} options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options)
token_info = await keycloak_openid.decode_token(token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options)
# Get permissions by token # Get permissions by token
token = keycloak_openid.token("user", "password")
keycloak_openid.load_authorization_config("example-authz-config.json")
policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY)
permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect')
token = await keycloak_openid.token("user", "password")
await keycloak_openid.load_authorization_config("example-authz-config.json")
await policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY)
await permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect')
# Get UMA-permissions by token # Get UMA-permissions by token
token = keycloak_openid.token("user", "password")
permissions = keycloak_openid.uma_permissions(token['access_token'])
# Currently unsupported for async version
# token = keycloak_openid.token("user", "password")
# permissions = keycloak_openid.uma_permissions(token['access_token'])
# Get UMA-permissions by token with specific resource and scope requested # Get UMA-permissions by token with specific resource and scope requested
token = keycloak_openid.token("user", "password")
permissions = keycloak_openid.uma_permissions(token['access_token'], permissions="Resource#Scope")
# Currently unsupported for async version
# token = keycloak_openid.token("user", "password")
# permissions = keycloak_openid.uma_permissions(token['access_token'], permissions="Resource#Scope")
# Get auth status for a specific resource and scope by token # Get auth status for a specific resource and scope by token
token = keycloak_openid.token("user", "password")
auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope")
# Currently unsupported for async version
# token = keycloak_openid.token("user", "password")
# auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope")
# KEYCLOAK ADMIN # KEYCLOAK ADMIN
@ -150,9 +155,10 @@ keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
user_realm_name="only_if_other_realm_than_master", user_realm_name="only_if_other_realm_than_master",
client_secret_key="client-secret", client_secret_key="client-secret",
verify=True) verify=True)
await keycloak_admin.connect()
# Add user # Add user
new_user = keycloak_admin.create_user({"email": "example@example.com",
new_user = await keycloak_admin.create_user({"email": "example@example.com",
"username": "example@example.com", "username": "example@example.com",
"enabled": True, "enabled": True,
"firstName": "Example", "firstName": "Example",
@ -160,7 +166,7 @@ new_user = keycloak_admin.create_user({"email": "example@example.com",
# Add user and raise exception if username already exists # Add user and raise exception if username already exists
# exist_ok currently defaults to True for backwards compatibility reasons # exist_ok currently defaults to True for backwards compatibility reasons
new_user = keycloak_admin.create_user({"email": "example@example.com",
new_user = await keycloak_admin.create_user({"email": "example@example.com",
"username": "example@example.com", "username": "example@example.com",
"enabled": True, "enabled": True,
"firstName": "Example", "firstName": "Example",
@ -168,7 +174,7 @@ new_user = keycloak_admin.create_user({"email": "example@example.com",
exist_ok=False) exist_ok=False)
# Add user and set password # Add user and set password
new_user = keycloak_admin.create_user({"email": "example@example.com",
new_user = await keycloak_admin.create_user({"email": "example@example.com",
"username": "example@example.com", "username": "example@example.com",
"enabled": True, "enabled": True,
"firstName": "Example", "firstName": "Example",
@ -176,7 +182,7 @@ new_user = keycloak_admin.create_user({"email": "example@example.com",
"credentials": [{"value": "secret","type": "password",}]}) "credentials": [{"value": "secret","type": "password",}]})
# Add user and specify a locale # Add user and specify a locale
new_user = keycloak_admin.create_user({"email": "example@example.fr",
new_user = await keycloak_admin.create_user({"email": "example@example.fr",
"username": "example@example.fr", "username": "example@example.fr",
"enabled": True, "enabled": True,
"firstName": "Example", "firstName": "Example",
@ -186,92 +192,92 @@ new_user = keycloak_admin.create_user({"email": "example@example.fr",
}}) }})
# User counter # User counter
count_users = keycloak_admin.users_count()
count_users = await keycloak_admin.users_count()
# Get users Returns a list of users, filtered according to query parameters # Get users Returns a list of users, filtered according to query parameters
users = keycloak_admin.get_users({})
users = await keycloak_admin.get_users({})
# Get user ID from username # Get user ID from username
user_id_keycloak = keycloak_admin.get_user_id("username-keycloak")
user_id_keycloak = await keycloak_admin.get_user_id("username-keycloak")
# Get User # Get User
user = keycloak_admin.get_user("user-id-keycloak")
user = await keycloak_admin.get_user("user-id-keycloak")
# Update User # Update User
response = keycloak_admin.update_user(user_id="user-id-keycloak",
response = await keycloak_admin.update_user(user_id="user-id-keycloak",
payload={'firstName': 'Example Update'}) payload={'firstName': 'Example Update'})
# Update User Password # Update User Password
response = keycloak_admin.set_user_password(user_id="user-id-keycloak", password="secret", temporary=True)
response = await keycloak_admin.set_user_password(user_id="user-id-keycloak", password="secret", temporary=True)
# Get User Credentials # Get User Credentials
credentials = keycloak_admin.get_credentials(user_id='user_id')
credentials = await keycloak_admin.get_credentials(user_id='user_id')
# Get User Credential by ID # Get User Credential by ID
credential = keycloak_admin.get_credential(user_id='user_id', credential_id='credential_id')
credential = await keycloak_admin.get_credential(user_id='user_id', credential_id='credential_id')
# Delete User Credential # Delete User Credential
response = keycloak_admin.delete_credential(user_id='user_id', credential_id='credential_id')
response = await keycloak_admin.delete_credential(user_id='user_id', credential_id='credential_id')
# Delete User # Delete User
response = keycloak_admin.delete_user(user_id="user-id-keycloak")
response = await keycloak_admin.delete_user(user_id="user-id-keycloak")
# Get consents granted by the user # Get consents granted by the user
consents = keycloak_admin.consents_user(user_id="user-id-keycloak")
consents = await keycloak_admin.consents_user(user_id="user-id-keycloak")
# Send User Action # Send User Action
response = keycloak_admin.send_update_account(user_id="user-id-keycloak",
response = await keycloak_admin.send_update_account(user_id="user-id-keycloak",
payload=['UPDATE_PASSWORD']) payload=['UPDATE_PASSWORD'])
# Send Verify Email # Send Verify Email
response = keycloak_admin.send_verify_email(user_id="user-id-keycloak")
response = await keycloak_admin.send_verify_email(user_id="user-id-keycloak")
# Get sessions associated with the user # Get sessions associated with the user
sessions = keycloak_admin.get_sessions(user_id="user-id-keycloak")
sessions = await keycloak_admin.get_sessions(user_id="user-id-keycloak")
# Get themes, social providers, auth providers, and event listeners available on this server # Get themes, social providers, auth providers, and event listeners available on this server
server_info = keycloak_admin.get_server_info()
server_info = await keycloak_admin.get_server_info()
# Get clients belonging to the realm Returns a list of clients belonging to the realm # Get clients belonging to the realm Returns a list of clients belonging to the realm
clients = keycloak_admin.get_clients()
clients = await keycloak_admin.get_clients()
# Get client - id (not client-id) from client by name # Get client - id (not client-id) from client by name
client_id = keycloak_admin.get_client_id("my-client")
client_id = await keycloak_admin.get_client_id("my-client")
# Get representation of the client - id of client (not client-id) # Get representation of the client - id of client (not client-id)
client = keycloak_admin.get_client(client_id="client_id")
client = await keycloak_admin.get_client(client_id="client_id")
# Get all roles for the realm or client # Get all roles for the realm or client
realm_roles = keycloak_admin.get_realm_roles()
realm_roles = await keycloak_admin.get_realm_roles()
# Get all roles for the client # Get all roles for the client
client_roles = keycloak_admin.get_client_roles(client_id="client_id")
client_roles = await keycloak_admin.get_client_roles(client_id="client_id")
# Get client role # Get client role
role = keycloak_admin.get_client_role(client_id="client_id", role_name="role_name")
role = await keycloak_admin.get_client_role(client_id="client_id", role_name="role_name")
# Warning: Deprecated # Warning: Deprecated
# Get client role id from name # Get client role id from name
role_id = keycloak_admin.get_client_role_id(client_id="client_id", role_name="test")
role_id = await keycloak_admin.get_client_role_id(client_id="client_id", role_name="test")
# Create client role # Create client role
keycloak_admin.create_client_role(client_role_id='client_id', payload={'name': 'roleName', 'clientRole': True})
await keycloak_admin.create_client_role(client_role_id='client_id', payload={'name': 'roleName', 'clientRole': True})
# Assign client role to user. Note that BOTH role_name and role_id appear to be required. # Assign client role to user. Note that BOTH role_name and role_id appear to be required.
keycloak_admin.assign_client_role(client_id="client_id", user_id="user_id", role_id="role_id", role_name="test")
await keycloak_admin.assign_client_role(client_id="client_id", user_id="user_id", role_id="role_id", role_name="test")
# Retrieve client roles of a user. # Retrieve client roles of a user.
keycloak_admin.get_client_roles_of_user(user_id="user_id", client_id="client_id")
await keycloak_admin.get_client_roles_of_user(user_id="user_id", client_id="client_id")
# Retrieve available client roles of a user. # Retrieve available client roles of a user.
keycloak_admin.get_available_client_roles_of_user(user_id="user_id", client_id="client_id")
await keycloak_admin.get_available_client_roles_of_user(user_id="user_id", client_id="client_id")
# Retrieve composite client roles of a user. # Retrieve composite client roles of a user.
keycloak_admin.get_composite_client_roles_of_user(user_id="user_id", client_id="client_id")
await keycloak_admin.get_composite_client_roles_of_user(user_id="user_id", client_id="client_id")
# Delete client roles of a user. # Delete client roles of a user.
keycloak_admin.delete_client_roles_of_user(client_id="client_id", user_id="user_id", roles={"id": "role-id"})
keycloak_admin.delete_client_roles_of_user(client_id="client_id", user_id="user_id", roles=[{"id": "role-id_1"}, {"id": "role-id_2"}])
await keycloak_admin.delete_client_roles_of_user(client_id="client_id", user_id="user_id", roles={"id": "role-id"})
await keycloak_admin.delete_client_roles_of_user(client_id="client_id", user_id="user_id", roles=[{"id": "role-id_1"}, {"id": "role-id_2"}])
# Get all client authorization resources # Get all client authorization resources
client_resources = get_client_authz_resources(client_id="client_id") client_resources = get_client_authz_resources(client_id="client_id")
@ -286,62 +292,62 @@ client_permissions = get_client_authz_permissions(client_id="client_id")
client_policies = get_client_authz_policies(client_id="client_id") client_policies = get_client_authz_policies(client_id="client_id")
# Create new group # Create new group
group = keycloak_admin.create_group({"name": "Example Group"})
group = await keycloak_admin.create_group({"name": "Example Group"})
# Get all groups # Get all groups
groups = keycloak_admin.get_groups()
groups = await keycloak_admin.get_groups()
# Get group # Get group
group = keycloak_admin.get_group(group_id='group_id')
group = await keycloak_admin.get_group(group_id='group_id')
# Get group by name # Get group by name
group = keycloak_admin.get_group_by_path(path='/group/subgroup', search_in_subgroups=True)
group = await keycloak_admin.get_group_by_path(path='/group/subgroup', search_in_subgroups=True)
# Function to trigger user sync from provider # Function to trigger user sync from provider
sync_users(storage_id="storage_di", action="action") sync_users(storage_id="storage_di", action="action")
# Get client role id from name # Get client role id from name
role_id = keycloak_admin.get_client_role_id(client_id=client_id, role_name="test")
role_id = await keycloak_admin.get_client_role_id(client_id=client_id, role_name="test")
# Get all roles for the realm or client # Get all roles for the realm or client
realm_roles = keycloak_admin.get_roles()
realm_roles = await keycloak_admin.get_roles()
# Assign client role to user. Note that BOTH role_name and role_id appear to be required. # Assign client role to user. Note that BOTH role_name and role_id appear to be required.
keycloak_admin.assign_client_role(client_id=client_id, user_id=user_id, role_id=role_id, role_name="test")
await keycloak_admin.assign_client_role(client_id=client_id, user_id=user_id, role_id=role_id, role_name="test")
# Assign realm roles to user # Assign realm roles to user
keycloak_admin.assign_realm_roles(user_id=user_id, roles=realm_roles)
await keycloak_admin.assign_realm_roles(user_id=user_id, roles=realm_roles)
# Assign realm roles to client's scope # Assign realm roles to client's scope
keycloak_admin.assign_realm_roles_to_client_scope(client_id=client_id, roles=realm_roles)
await keycloak_admin.assign_realm_roles_to_client_scope(client_id=client_id, roles=realm_roles)
# Get realm roles assigned to client's scope # Get realm roles assigned to client's scope
keycloak_admin.get_realm_roles_of_client_scope(client_id=client_id)
await keycloak_admin.get_realm_roles_of_client_scope(client_id=client_id)
# Remove realm roles assigned to client's scope # Remove realm roles assigned to client's scope
keycloak_admin.delete_realm_roles_of_client_scope(client_id=client_id, roles=realm_roles)
await keycloak_admin.delete_realm_roles_of_client_scope(client_id=client_id, roles=realm_roles)
another_client_id = keycloak_admin.get_client_id("my-client-2")
another_client_id = await keycloak_admin.get_client_id("my-client-2")
# Assign client roles to client's scope # Assign client roles to client's scope
keycloak_admin.assign_client_roles_to_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles)
await keycloak_admin.assign_client_roles_to_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles)
# Get client roles assigned to client's scope # Get client roles assigned to client's scope
keycloak_admin.get_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id)
await keycloak_admin.get_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id)
# Remove client roles assigned to client's scope # Remove client roles assigned to client's scope
keycloak_admin.delete_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles)
await keycloak_admin.delete_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles)
# Get all ID Providers # Get all ID Providers
idps = keycloak_admin.get_idps()
idps = await keycloak_admin.get_idps()
# Create a new Realm # Create a new Realm
keycloak_admin.create_realm(payload={"realm": "demo"}, skip_exists=False)
await keycloak_admin.create_realm(payload={"realm": "demo"}, skip_exists=False)
# Changing Realm # Changing Realm
keycloak_admin = KeycloakAdmin(realm_name="main", ...)
keycloak_admin.get_users() # Get user in main realm
keycloak_admin.realm_name = "demo" # Change realm to 'demo'
keycloak_admin.get_users() # Get users in realm 'demo'
keycloak_admin.create_user(...) # Creates a new user in 'demo'
await keycloak_admin = KeycloakAdmin(realm_name="main", ...)
await keycloak_admin.get_users() # Get user in main realm
await keycloak_admin.realm_name = "demo" # Change realm to 'demo'
await keycloak_admin.get_users() # Get users in realm 'demo'
await keycloak_admin.create_user(...) # Creates a new user in 'demo'
``` ```

1740
poetry.lock
File diff suppressed because it is too large
View File

4
pyproject.toml

@ -30,7 +30,7 @@ Documentation = "https://python-keycloak.readthedocs.io/en/latest/"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.7" python = "^3.7"
requests = "^2.20.0"
httpx = "^0.23.0"
python-jose = "^3.3.0" python-jose = "^3.3.0"
urllib3 = "^1.26.0" urllib3 = "^1.26.0"
mock = {version = "^4.0.3", optional = true} mock = {version = "^4.0.3", optional = true}
@ -42,7 +42,6 @@ sphinx-rtd-theme = {version = "^1.0.0", optional = true}
readthedocs-sphinx-ext = {version = "^2.1.9", optional = true} readthedocs-sphinx-ext = {version = "^2.1.9", optional = true}
m2r2 = {version = "^0.3.2", optional = true} m2r2 = {version = "^0.3.2", optional = true}
sphinx-autoapi = {version = "^2.0.0", optional = true} sphinx-autoapi = {version = "^2.0.0", optional = true}
requests-toolbelt = "^0.9.1"
[tool.poetry.extras] [tool.poetry.extras]
docs = [ docs = [
@ -61,6 +60,7 @@ docs = [
tox = "^3.25.0" tox = "^3.25.0"
pytest = "^7.1.2" pytest = "^7.1.2"
pytest-cov = "^3.0.0" pytest-cov = "^3.0.0"
pytest-asyncio = "0.20.3"
wheel = "^0.37.1" wheel = "^0.37.1"
pre-commit = "^2.19.0" pre-commit = "^2.19.0"
isort = "^5.10.1" isort = "^5.10.1"

43
src/keycloak/connection.py

@ -28,8 +28,7 @@ try:
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
from urlparse import urljoin from urlparse import urljoin
import requests
from requests.adapters import HTTPAdapter
import httpx
from .exceptions import KeycloakConnectionError from .exceptions import KeycloakConnectionError
@ -67,26 +66,19 @@ class ConnectionManager(object):
self.headers = headers self.headers = headers
self.timeout = timeout self.timeout = timeout
self.verify = verify self.verify = verify
self._s = requests.Session()
self._s = httpx.AsyncClient(verify=verify)
self._s.auth = lambda x: x # don't let requests add auth headers self._s.auth = lambda x: x # don't let requests add auth headers
# retry once to reset connection with Keycloak after tomcat's ConnectionTimeout # retry once to reset connection with Keycloak after tomcat's ConnectionTimeout
# see https://github.com/marcospereirampj/python-keycloak/issues/36 # see https://github.com/marcospereirampj/python-keycloak/issues/36
for protocol in ("https://", "http://"):
adapter = HTTPAdapter(max_retries=1)
# adds POST to retry whitelist
allowed_methods = set(adapter.max_retries.allowed_methods)
allowed_methods.add("POST")
adapter.max_retries.allowed_methods = frozenset(allowed_methods)
self._s.mount(protocol, adapter)
self._s.transport = httpx.AsyncHTTPTransport(retries=1)
if proxies: if proxies:
self._s.proxies.update(proxies)
self._s.proxies = proxies
def __del__(self):
async def close(self):
"""Del method.""" """Del method."""
self._s.close()
await self._s.aclose()
@property @property
def base_url(self): def base_url(self):
@ -182,7 +174,7 @@ class ConnectionManager(object):
""" """
self.headers.pop(key, None) self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
async def raw_get(self, path, **kwargs):
"""Submit get request to the path. """Submit get request to the path.
:param path: Path for request. :param path: Path for request.
@ -194,17 +186,17 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server. :raises KeycloakConnectionError: HttpError Can't connect to server.
""" """
try: try:
return self._s.get(
return await self._s.request(
"GET",
urljoin(self.base_url, path), urljoin(self.base_url, path),
params=kwargs, params=kwargs,
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
verify=self.verify,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e) raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_post(self, path, data, **kwargs):
async def raw_post(self, path, data, **kwargs):
"""Submit post request to the path. """Submit post request to the path.
:param path: Path for request. :param path: Path for request.
@ -218,18 +210,18 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server. :raises KeycloakConnectionError: HttpError Can't connect to server.
""" """
try: try:
return self._s.post(
return await self._s.post(
urljoin(self.base_url, path), urljoin(self.base_url, path),
params=kwargs, params=kwargs,
data=data, data=data,
files=kwargs.get('files'),
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
verify=self.verify,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e) raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_put(self, path, data, **kwargs):
async def raw_put(self, path, data, **kwargs):
"""Submit put request to the path. """Submit put request to the path.
:param path: Path for request. :param path: Path for request.
@ -243,18 +235,17 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server. :raises KeycloakConnectionError: HttpError Can't connect to server.
""" """
try: try:
return self._s.put(
return await self._s.put(
urljoin(self.base_url, path), urljoin(self.base_url, path),
params=kwargs, params=kwargs,
data=data, data=data,
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
verify=self.verify,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e) raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_delete(self, path, data=None, **kwargs):
async def raw_delete(self, path, data=None, **kwargs):
"""Submit delete request to the path. """Submit delete request to the path.
:param path: Path for request. :param path: Path for request.
@ -268,13 +259,13 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server. :raises KeycloakConnectionError: HttpError Can't connect to server.
""" """
try: try:
return self._s.delete(
return await self._s.request(
"DELETE",
urljoin(self.base_url, path), urljoin(self.base_url, path),
params=kwargs, params=kwargs,
data=data or dict(), data=data or dict(),
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
verify=self.verify,
) )
except Exception as e: except Exception as e:
raise KeycloakConnectionError("Can't connect to server (%s)" % e) raise KeycloakConnectionError("Can't connect to server (%s)" % e)

4
src/keycloak/exceptions.py

@ -23,8 +23,6 @@
"""Keycloak custom exceptions module.""" """Keycloak custom exceptions module."""
import requests
class KeycloakError(Exception): class KeycloakError(Exception):
"""Base class for custom Keycloak errors. """Base class for custom Keycloak errors.
@ -167,7 +165,7 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
expected_codes = [200, 201, 204] expected_codes = [200, 201, 204]
if response.status_code in expected_codes: if response.status_code in expected_codes:
if response.status_code == requests.codes.no_content:
if response.status_code == 204:
return {} return {}
try: try:

883
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

191
src/keycloak/keycloak_openid.py

@ -198,7 +198,7 @@ class KeycloakOpenID:
""" """
return self.client_id + "/" + role return self.client_id + "/" + role
def _token_info(self, token, method_token_info, **kwargs):
async def _token_info(self, token, method_token_info, **kwargs):
"""Getter for the token data. """Getter for the token data.
:param token: Token :param token: Token
@ -211,13 +211,13 @@ class KeycloakOpenID:
:rtype: dict :rtype: dict
""" """
if method_token_info == "introspect": if method_token_info == "introspect":
token_info = self.introspect(token)
token_info = await self.introspect(token)
else: else:
token_info = self.decode_token(token, **kwargs) token_info = self.decode_token(token, **kwargs)
return token_info return token_info
def well_known(self):
async def well_known(self):
"""Get the well_known object. """Get the well_known object.
The most important endpoint to understand is the well-known configuration The most important endpoint to understand is the well-known configuration
@ -228,10 +228,10 @@ class KeycloakOpenID:
:rtype: dict :rtype: dict
""" """
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_WELL_KNOWN.format(**params_path))
data_raw = await self.connection.raw_get(URL_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def auth_url(self, redirect_uri, scope="email", state=""):
async def auth_url(self, redirect_uri, scope="email", state=""):
"""Get authorization URL endpoint. """Get authorization URL endpoint.
:param redirect_uri: Redirect url to receive oauth code :param redirect_uri: Redirect url to receive oauth code
@ -243,8 +243,9 @@ class KeycloakOpenID:
:returns: Authorization URL Full Build :returns: Authorization URL Full Build
:rtype: str :rtype: str
""" """
well_known = await self.well_known()
params_path = { params_path = {
"authorization-endpoint": self.well_known()["authorization_endpoint"],
"authorization-endpoint": well_known["authorization_endpoint"],
"client-id": self.client_id, "client-id": self.client_id,
"redirect-uri": redirect_uri, "redirect-uri": redirect_uri,
"scope": scope, "scope": scope,
@ -252,7 +253,7 @@ class KeycloakOpenID:
} }
return URL_AUTH.format(**params_path) return URL_AUTH.format(**params_path)
def token(
async def token(
self, self,
username="", username="",
password="", password="",
@ -308,10 +309,10 @@ class KeycloakOpenID:
payload["totp"] = totp payload["totp"] = totp
payload = self._add_secret_key(payload) payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
data_raw = await self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
async def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
"""Refresh the user token. """Refresh the user token.
The token endpoint is used to obtain tokens. Tokens can either be obtained by The token endpoint is used to obtain tokens. Tokens can either be obtained by
@ -335,10 +336,10 @@ class KeycloakOpenID:
"refresh_token": refresh_token, "refresh_token": refresh_token,
} }
payload = self._add_secret_key(payload) payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
data_raw = await self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def exchange_token(
async def exchange_token(
self, self,
token: str, token: str,
client_id: str, client_id: str,
@ -378,10 +379,10 @@ class KeycloakOpenID:
"scope": scope, "scope": scope,
} }
payload = self._add_secret_key(payload) payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
data_raw = await self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def userinfo(self, token):
async def userinfo(self, token):
"""Get the user info object. """Get the user info object.
The userinfo endpoint returns standard claims about the authenticated user, The userinfo endpoint returns standard claims about the authenticated user,
@ -396,10 +397,10 @@ class KeycloakOpenID:
""" """
self.connection.add_param_headers("Authorization", "Bearer " + token) self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path))
data_raw = await self.connection.raw_get(URL_USERINFO.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def logout(self, refresh_token):
async def logout(self, refresh_token):
"""Log out the authenticated user. """Log out the authenticated user.
:param refresh_token: Refresh token from Keycloak :param refresh_token: Refresh token from Keycloak
@ -410,10 +411,10 @@ class KeycloakOpenID:
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "refresh_token": refresh_token} payload = {"client_id": self.client_id, "refresh_token": refresh_token}
payload = self._add_secret_key(payload) payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload)
data_raw = await self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
def certs(self):
async def certs(self):
"""Get certificates. """Get certificates.
The certificate endpoint returns the public keys enabled by the realm, encoded as a The certificate endpoint returns the public keys enabled by the realm, encoded as a
@ -426,10 +427,10 @@ class KeycloakOpenID:
:rtype: dict :rtype: dict
""" """
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_CERTS.format(**params_path))
data_raw = await self.connection.raw_get(URL_CERTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def public_key(self):
async def public_key(self):
"""Retrieve the public key. """Retrieve the public key.
The public key is exposed by the realm page directly. The public key is exposed by the realm page directly.
@ -438,10 +439,10 @@ class KeycloakOpenID:
:rtype: str :rtype: str
""" """
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_REALM.format(**params_path))
data_raw = await self.connection.raw_get(URL_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
def entitlement(self, token, resource_server_id):
async def entitlement(self, token, resource_server_id):
"""Get entitlements from the token. """Get entitlements from the token.
Client applications can use a specific endpoint to obtain a special security token Client applications can use a specific endpoint to obtain a special security token
@ -459,14 +460,14 @@ class KeycloakOpenID:
""" """
self.connection.add_param_headers("Authorization", "Bearer " + token) self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id} params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
data_raw = await self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
if data_raw.status_code == 404: if data_raw.status_code == 404:
return raise_error_from_response(data_raw, KeycloakDeprecationError) return raise_error_from_response(data_raw, KeycloakDeprecationError)
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover
def introspect(self, token, rpt=None, token_type_hint=None):
async def introspect(self, token, rpt=None, token_type_hint=None):
"""Introspect the user token. """Introspect the user token.
The introspection endpoint is used to retrieve the active state of a token. The introspection endpoint is used to retrieve the active state of a token.
@ -497,7 +498,7 @@ class KeycloakOpenID:
payload = self._add_secret_key(payload) payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
data_raw = await self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def decode_token(self, token, key, algorithms=["RS256"], **kwargs): def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
@ -536,7 +537,7 @@ class KeycloakOpenID:
self.authorization.load_config(authorization_json) self.authorization.load_config(authorization_json)
def get_policies(self, token, method_token_info="introspect", **kwargs):
async def get_policies(self, token, method_token_info="introspect", **kwargs):
"""Get policies by user token. """Get policies by user token.
:param token: User token :param token: User token
@ -555,7 +556,7 @@ class KeycloakOpenID:
"Keycloak settings not found. Load Authorization Keycloak settings." "Keycloak settings not found. Load Authorization Keycloak settings."
) )
token_info = self._token_info(token, method_token_info, **kwargs)
token_info = await self._token_info(token, method_token_info, **kwargs)
if method_token_info == "introspect" and not token_info["active"]: if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.") raise KeycloakInvalidTokenError("Token expired or invalid.")
@ -574,7 +575,7 @@ class KeycloakOpenID:
return list(set(policies)) return list(set(policies))
def get_permissions(self, token, method_token_info="introspect", **kwargs):
async def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""Get permission by user token. """Get permission by user token.
:param token: user token :param token: user token
@ -593,7 +594,7 @@ class KeycloakOpenID:
"Keycloak settings not found. Load Authorization Keycloak settings." "Keycloak settings not found. Load Authorization Keycloak settings."
) )
token_info = self._token_info(token, method_token_info, **kwargs)
token_info = await self._token_info(token, method_token_info, **kwargs)
if method_token_info == "introspect" and not token_info["active"]: if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.") raise KeycloakInvalidTokenError("Token expired or invalid.")
@ -612,70 +613,70 @@ class KeycloakOpenID:
return list(set(permissions)) return list(set(permissions))
def uma_permissions(self, token, permissions=""):
"""Get UMA permissions by user token with requested permissions.
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
:param token: user token
:type token: str
:param permissions: list of uma permissions list(resource:scope) requested by the user
:type permissions: str
:returns: Keycloak server response
:rtype: dict
"""
permission = build_permission_param(permissions)
params_path = {"realm-name": self.realm_name}
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": permission,
"response_mode": "permissions",
"audience": self.client_id,
}
self.connection.add_param_headers("Authorization", "Bearer " + token)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def has_uma_access(self, token, permissions):
"""Determine whether user has uma permissions with specified user token.
:param token: user token
:type token: str
:param permissions: list of uma permissions (resource:scope)
:type permissions: str
:return: Authentication status
:rtype: AuthStatus
:raises KeycloakAuthenticationError: In case of failed authentication
:raises KeycloakPostError: In case of failed request to Keycloak
"""
needed = build_permission_param(permissions)
try:
granted = self.uma_permissions(token, permissions)
except (KeycloakPostError, KeycloakAuthenticationError) as e:
if e.response_code == 403: # pragma: no cover
return AuthStatus(
is_logged_in=True, is_authorized=False, missing_permissions=needed
)
elif e.response_code == 401:
return AuthStatus(
is_logged_in=False, is_authorized=False, missing_permissions=needed
)
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes: # pragma: no cover
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
)
#async def uma_permissions(self, token, permissions=""):
# """Get UMA permissions by user token with requested permissions.
# The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
# invoked by confidential clients.
# http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint
# :param token: user token
# :type token: str
# :param permissions: list of uma permissions list(resource:scope) requested by the user
# :type permissions: str
# :returns: Keycloak server response
# :rtype: dict
# """
# permission = build_permission_param(permissions)
# params_path = {"realm-name": self.realm_name}
# payload = {
# "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
# "permission": permission,
# "response_mode": "permissions",
# "audience": self.client_id,
# }
# self.connection.add_param_headers("Authorization", "Bearer " + token)
# data_raw = await self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
# return raise_error_from_response(data_raw, KeycloakPostError)
#async def has_uma_access(self, token, permissions):
# """Determine whether user has uma permissions with specified user token.
# :param token: user token
# :type token: str
# :param permissions: list of uma permissions (resource:scope)
# :type permissions: str
# :return: Authentication status
# :rtype: AuthStatus
# :raises KeycloakAuthenticationError: In case of failed authentication
# :raises KeycloakPostError: In case of failed request to Keycloak
# """
# needed = build_permission_param(permissions)
# try:
# granted = await self.uma_permissions(token, permissions)
# except (KeycloakPostError, KeycloakAuthenticationError) as e:
# if e.response_code == 403: # pragma: no cover
# return AuthStatus(
# is_logged_in=True, is_authorized=False, missing_permissions=needed
# )
# elif e.response_code == 401:
# return AuthStatus(
# is_logged_in=False, is_authorized=False, missing_permissions=needed
# )
# raise
# for resource_struct in granted:
# resource = resource_struct["rsname"]
# scopes = resource_struct.get("scopes", None)
# if not scopes:
# needed.discard(resource)
# continue
# for scope in scopes: # pragma: no cover
# needed.discard("{}#{}".format(resource, scope))
# return AuthStatus(
# is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
# )

106
tests/conftest.py

@ -6,6 +6,7 @@ import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
import pytest import pytest
import pytest_asyncio
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
@ -134,8 +135,8 @@ def env():
return KeycloakTestEnv() return KeycloakTestEnv()
@pytest.fixture
def admin(env: KeycloakTestEnv):
@pytest_asyncio.fixture
async def admin(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class. """Fixture for initialized KeycloakAdmin class.
:param env: Keycloak test environment :param env: Keycloak test environment
@ -143,15 +144,17 @@ def admin(env: KeycloakTestEnv):
:returns: Keycloak admin :returns: Keycloak admin
:rtype: KeycloakAdmin :rtype: KeycloakAdmin
""" """
return KeycloakAdmin(
admin = KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN, username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD, password=env.KEYCLOAK_ADMIN_PASSWORD,
) )
await admin.connect()
return admin
@pytest.fixture
def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
@pytest_asyncio.fixture
async def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for initialized KeycloakOpenID class. """Fixture for initialized KeycloakOpenID class.
:param env: Keycloak test environment :param env: Keycloak test environment
@ -167,7 +170,7 @@ def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
admin.realm_name = realm admin.realm_name = realm
# Create client # Create client
client = str(uuid.uuid4()) client = str(uuid.uuid4())
client_id = admin.create_client(
client_id = await admin.create_client(
payload={ payload={
"name": client, "name": client,
"clientId": client, "clientId": client,
@ -183,11 +186,11 @@ def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
client_id=client, client_id=client,
) )
# Cleanup # Cleanup
admin.delete_client(client_id=client_id)
await admin.delete_client(client_id=client_id)
@pytest.fixture
def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
@pytest_asyncio.fixture
async def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials. """Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment :param env: Keycloak test environment
@ -204,7 +207,7 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
# Create client # Create client
client = str(uuid.uuid4()) client = str(uuid.uuid4())
secret = str(uuid.uuid4()) secret = str(uuid.uuid4())
client_id = admin.create_client(
client_id = await admin.create_client(
payload={ payload={
"name": client, "name": client,
"clientId": client, "clientId": client,
@ -218,7 +221,7 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
# Create user # Create user
username = str(uuid.uuid4()) username = str(uuid.uuid4())
password = str(uuid.uuid4()) password = str(uuid.uuid4())
user_id = admin.create_user(
user_id = await admin.create_user(
payload={ payload={
"username": username, "username": username,
"email": f"{username}@test.test", "email": f"{username}@test.test",
@ -239,12 +242,12 @@ def oid_with_credentials(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin)
) )
# Cleanup # Cleanup
admin.delete_client(client_id=client_id)
admin.delete_user(user_id=user_id)
await admin.delete_client(client_id=client_id)
await admin.delete_user(user_id=user_id)
@pytest.fixture
def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
@pytest_asyncio.fixture
async def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for an initialized KeycloakOpenID class and a random user credentials. """Fixture for an initialized KeycloakOpenID class and a random user credentials.
:param env: Keycloak test environment :param env: Keycloak test environment
@ -261,7 +264,7 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
# Create client # Create client
client = str(uuid.uuid4()) client = str(uuid.uuid4())
secret = str(uuid.uuid4()) secret = str(uuid.uuid4())
client_id = admin.create_client(
client_id = await admin.create_client(
payload={ payload={
"name": client, "name": client,
"clientId": client, "clientId": client,
@ -274,17 +277,20 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
"serviceAccountsEnabled": True, "serviceAccountsEnabled": True,
} }
) )
admin.create_client_authz_role_based_policy(
role = await admin.get_realm_role(role_name="offline_access")
payload = {
"name": "test-authz-rb-policy",
"roles": [{"id": role["id"]}],
}
print(payload)
await admin.create_client_authz_role_based_policy(
client_id=client_id, client_id=client_id,
payload={
"name": "test-authz-rb-policy",
"roles": [{"id": admin.get_realm_role(role_name="offline_access")["id"]}],
},
payload=payload,
) )
# Create user # Create user
username = str(uuid.uuid4()) username = str(uuid.uuid4())
password = str(uuid.uuid4()) password = str(uuid.uuid4())
user_id = admin.create_user(
user_id = await admin.create_user(
payload={ payload={
"username": username, "username": username,
"email": f"{username}@test.test", "email": f"{username}@test.test",
@ -305,12 +311,12 @@ def oid_with_credentials_authz(env: KeycloakTestEnv, realm: str, admin: Keycloak
) )
# Cleanup # Cleanup
admin.delete_client(client_id=client_id)
admin.delete_user(user_id=user_id)
await admin.delete_client(client_id=client_id)
await admin.delete_user(user_id=user_id)
@pytest.fixture
def realm(admin: KeycloakAdmin) -> str:
@pytest_asyncio.fixture
async def realm(admin: KeycloakAdmin) -> str:
"""Fixture for a new random realm. """Fixture for a new random realm.
:param admin: Keycloak admin :param admin: Keycloak admin
@ -319,13 +325,13 @@ def realm(admin: KeycloakAdmin) -> str:
:rtype: str :rtype: str
""" """
realm_name = str(uuid.uuid4()) realm_name = str(uuid.uuid4())
admin.create_realm(payload={"realm": realm_name, "enabled": True})
await admin.create_realm(payload={"realm": realm_name, "enabled": True})
yield realm_name yield realm_name
admin.delete_realm(realm_name=realm_name)
await admin.delete_realm(realm_name=realm_name)
@pytest.fixture
def user(admin: KeycloakAdmin, realm: str) -> str:
@pytest_asyncio.fixture
async def user(admin: KeycloakAdmin, realm: str) -> str:
"""Fixture for a new random user. """Fixture for a new random user.
:param admin: Keycloak admin :param admin: Keycloak admin
@ -337,13 +343,13 @@ def user(admin: KeycloakAdmin, realm: str) -> str:
""" """
admin.realm_name = realm admin.realm_name = realm
username = str(uuid.uuid4()) username = str(uuid.uuid4())
user_id = admin.create_user(payload={"username": username, "email": f"{username}@test.test"})
user_id = await admin.create_user(payload={"username": username, "email": f"{username}@test.test"})
yield user_id yield user_id
admin.delete_user(user_id=user_id)
await admin.delete_user(user_id=user_id)
@pytest.fixture
def group(admin: KeycloakAdmin, realm: str) -> str:
@pytest_asyncio.fixture
async def group(admin: KeycloakAdmin, realm: str) -> str:
"""Fixture for a new random group. """Fixture for a new random group.
:param admin: Keycloak admin :param admin: Keycloak admin
@ -355,13 +361,13 @@ def group(admin: KeycloakAdmin, realm: str) -> str:
""" """
admin.realm_name = realm admin.realm_name = realm
group_name = str(uuid.uuid4()) group_name = str(uuid.uuid4())
group_id = admin.create_group(payload={"name": group_name})
group_id = await admin.create_group(payload={"name": group_name})
yield group_id yield group_id
admin.delete_group(group_id=group_id)
await admin.delete_group(group_id=group_id)
@pytest.fixture
def client(admin: KeycloakAdmin, realm: str) -> str:
@pytest_asyncio.fixture
async def client(admin: KeycloakAdmin, realm: str) -> str:
"""Fixture for a new random client. """Fixture for a new random client.
:param admin: Keycloak admin :param admin: Keycloak admin
@ -373,13 +379,13 @@ def client(admin: KeycloakAdmin, realm: str) -> str:
""" """
admin.realm_name = realm admin.realm_name = realm
client = str(uuid.uuid4()) client = str(uuid.uuid4())
client_id = admin.create_client(payload={"name": client, "clientId": client})
client_id = await admin.create_client(payload={"name": client, "clientId": client})
yield client_id yield client_id
admin.delete_client(client_id=client_id)
await admin.delete_client(client_id=client_id)
@pytest.fixture
def client_role(admin: KeycloakAdmin, realm: str, client: str) -> str:
@pytest_asyncio.fixture
async def client_role(admin: KeycloakAdmin, realm: str, client: str) -> str:
"""Fixture for a new random client role. """Fixture for a new random client role.
:param admin: Keycloak admin :param admin: Keycloak admin
@ -393,13 +399,13 @@ def client_role(admin: KeycloakAdmin, realm: str, client: str) -> str:
""" """
admin.realm_name = realm admin.realm_name = realm
role = str(uuid.uuid4()) role = str(uuid.uuid4())
admin.create_client_role(client, {"name": role, "composite": False})
await admin.create_client_role(client, {"name": role, "composite": False})
yield role yield role
admin.delete_client_role(client, role)
await admin.delete_client_role(client, role)
@pytest.fixture
def composite_client_role(admin: KeycloakAdmin, realm: str, client: str, client_role: str) -> str:
@pytest_asyncio.fixture
async def composite_client_role(admin: KeycloakAdmin, realm: str, client: str, client_role: str) -> str:
"""Fixture for a new random composite client role. """Fixture for a new random composite client role.
:param admin: Keycloak admin :param admin: Keycloak admin
@ -415,11 +421,11 @@ def composite_client_role(admin: KeycloakAdmin, realm: str, client: str, client_
""" """
admin.realm_name = realm admin.realm_name = realm
role = str(uuid.uuid4()) role = str(uuid.uuid4())
admin.create_client_role(client, {"name": role, "composite": True})
role_repr = admin.get_client_role(client, client_role)
admin.add_composite_client_roles_to_role(client, role, roles=[role_repr])
await admin.create_client_role(client, {"name": role, "composite": True})
role_repr = await admin.get_client_role(client, client_role)
await admin.add_composite_client_roles_to_role(client, role, roles=[role_repr])
yield role yield role
admin.delete_client_role(client, role)
await admin.delete_client_role(client, role)
@pytest.fixture @pytest.fixture

11
tests/test_connection.py

@ -28,14 +28,15 @@ def test_headers():
assert not cm.exist_param_headers(key="H") assert not cm.exist_param_headers(key="H")
def test_bad_connection():
@pytest.mark.asyncio
async def test_bad_connection():
"""Test bad connection.""" """Test bad connection."""
cm = ConnectionManager(base_url="http://not.real.domain") cm = ConnectionManager(base_url="http://not.real.domain")
with pytest.raises(KeycloakConnectionError): with pytest.raises(KeycloakConnectionError):
cm.raw_get(path="bad")
await cm.raw_get(path="bad")
with pytest.raises(KeycloakConnectionError): with pytest.raises(KeycloakConnectionError):
cm.raw_delete(path="bad")
await cm.raw_delete(path="bad")
with pytest.raises(KeycloakConnectionError): with pytest.raises(KeycloakConnectionError):
cm.raw_post(path="bad", data={})
await cm.raw_post(path="bad", data={})
with pytest.raises(KeycloakConnectionError): with pytest.raises(KeycloakConnectionError):
cm.raw_put(path="bad", data={})
await cm.raw_put(path="bad", data={})

1146
tests/test_keycloak_admin.py
File diff suppressed because it is too large
View File

276
tests/test_keycloak_openid.py

@ -40,13 +40,14 @@ def test_keycloak_openid_init(env):
assert isinstance(oid.authorization, Authorization) assert isinstance(oid.authorization, Authorization)
def test_well_known(oid: KeycloakOpenID):
@pytest.mark.asyncio
async def test_well_known(oid: KeycloakOpenID):
"""Test the well_known method. """Test the well_known method.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
""" """
res = oid.well_known()
res = await oid.well_known()
assert res is not None assert res is not None
assert res != dict() assert res != dict()
for key in [ for key in [
@ -107,7 +108,8 @@ def test_well_known(oid: KeycloakOpenID):
assert key in res assert key in res
def test_auth_url(env, oid: KeycloakOpenID):
@pytest.mark.asyncio
async def test_auth_url(env, oid: KeycloakOpenID):
"""Test the auth_url method. """Test the auth_url method.
:param env: Environment fixture :param env: Environment fixture
@ -115,7 +117,7 @@ def test_auth_url(env, oid: KeycloakOpenID):
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
""" """
res = oid.auth_url(redirect_uri="http://test.test/*")
res = await oid.auth_url(redirect_uri="http://test.test/*")
assert ( assert (
res res
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}" == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}"
@ -124,14 +126,15 @@ def test_auth_url(env, oid: KeycloakOpenID):
) )
def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test the token method. """Test the token method.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
""" """
oid, username, password = oid_with_credentials oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password)
token = await oid.token(username=username, password=password)
assert token == { assert token == {
"access_token": mock.ANY, "access_token": mock.ANY,
"expires_in": 300, "expires_in": 300,
@ -145,7 +148,7 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
} }
# Test with dummy totp # Test with dummy totp
token = oid.token(username=username, password=password, totp="123456")
token = await oid.token(username=username, password=password, totp="123456")
assert token == { assert token == {
"access_token": mock.ANY, "access_token": mock.ANY,
"expires_in": 300, "expires_in": 300,
@ -159,7 +162,7 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
} }
# Test with extra param # Test with extra param
token = oid.token(username=username, password=password, extra_param="foo")
token = await oid.token(username=username, password=password, extra_param="foo")
assert token == { assert token == {
"access_token": mock.ANY, "access_token": mock.ANY,
"expires_in": 300, "expires_in": 300,
@ -173,7 +176,8 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
} }
def test_exchange_token(
@pytest.mark.asyncio
async def test_exchange_token(
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
): ):
"""Test the exchange token method. """Test the exchange token method.
@ -188,19 +192,23 @@ def test_exchange_token(
# Allow impersonation # Allow impersonation
admin.realm_name = oid.realm_name admin.realm_name = oid.realm_name
admin.assign_client_role(
user_id=admin.get_user_id(username=username),
client_id=admin.get_client_id(client_id="realm-management"),
roles=[
admin.get_client_role(
client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation",
)
],
user_id = await admin.get_user_id(username=username)
client_id = await admin.get_client_id(client_id="realm-management")
roles = [
await admin.get_client_role(
client_id=client_id,
role_name="impersonation",
)
]
print(roles)
await admin.assign_client_role(
user_id=user_id,
client_id=client_id,
roles=roles
) )
token = oid.token(username=username, password=password)
assert oid.userinfo(token=token["access_token"]) == {
token = await oid.token(username=username, password=password)
assert await oid.userinfo(token=token["access_token"]) == {
"email": f"{username}@test.test", "email": f"{username}@test.test",
"email_verified": False, "email_verified": False,
"preferred_username": username, "preferred_username": username,
@ -208,13 +216,13 @@ def test_exchange_token(
} }
# Exchange token with the new user # Exchange token with the new user
new_token = oid.exchange_token(
new_token = await oid.exchange_token(
token=token["access_token"], token=token["access_token"],
client_id=oid.client_id, client_id=oid.client_id,
audience=oid.client_id, audience=oid.client_id,
subject=username, subject=username,
) )
assert oid.userinfo(token=new_token["access_token"]) == {
assert await oid.userinfo(token=new_token["access_token"]) == {
"email": f"{username}@test.test", "email": f"{username}@test.test",
"email_verified": False, "email_verified": False,
"preferred_username": username, "preferred_username": username,
@ -223,7 +231,8 @@ def test_exchange_token(
assert token != new_token assert token != new_token
def test_logout(oid_with_credentials):
@pytest.mark.asyncio
async def test_logout(oid_with_credentials):
"""Test logout. """Test logout.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
@ -231,33 +240,37 @@ def test_logout(oid_with_credentials):
""" """
oid, username, password = oid_with_credentials oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password)
assert oid.userinfo(token=token["access_token"]) != dict()
assert oid.logout(refresh_token=token["refresh_token"]) == dict()
token = await oid.token(username=username, password=password)
assert await oid.userinfo(token=token["access_token"]) != dict()
assert await oid.logout(refresh_token=token["refresh_token"]) == dict()
with pytest.raises(KeycloakAuthenticationError): with pytest.raises(KeycloakAuthenticationError):
oid.userinfo(token=token["access_token"])
await oid.userinfo(token=token["access_token"])
def test_certs(oid: KeycloakOpenID):
@pytest.mark.asyncio
async def test_certs(oid: KeycloakOpenID):
"""Test certificates. """Test certificates.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
""" """
assert len(oid.certs()["keys"]) == 2
certs = await oid.certs()
assert len(certs["keys"]) == 2
def test_public_key(oid: KeycloakOpenID):
@pytest.mark.asyncio
async def test_public_key(oid: KeycloakOpenID):
"""Test public key. """Test public key.
:param oid: Keycloak OpenID client :param oid: Keycloak OpenID client
:type oid: KeycloakOpenID :type oid: KeycloakOpenID
""" """
assert oid.public_key() is not None
assert await oid.public_key() is not None
def test_entitlement(
@pytest.mark.asyncio
async def test_entitlement(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
): ):
"""Test entitlement. """Test entitlement.
@ -269,53 +282,62 @@ def test_entitlement(
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
""" """
oid, username, password = oid_with_credentials_authz oid, username, password = oid_with_credentials_authz
token = oid.token(username=username, password=password)
resource_server_id = admin.get_client_authz_resources(
client_id=admin.get_client_id(oid.client_id)
)[0]["_id"]
token = await oid.token(username=username, password=password)
client_id = await admin.get_client_id(oid.client_id)
with pytest.raises(KeycloakDeprecationError): with pytest.raises(KeycloakDeprecationError):
oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
resource_servers = await admin.get_client_authz_resources(
client_id=client_id
)
resource_server_id = resource_servers[0]["_id"]
await oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id)
def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test introspect. """Test introspect.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
""" """
oid, username, password = oid_with_credentials oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password)
token = await oid.token(username=username, password=password)
assert oid.introspect(token=token["access_token"])["active"]
assert oid.introspect(
introspect = await oid.introspect(token=token["access_token"])
assert introspect["active"]
introspect = await oid.introspect(
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token" token=token["access_token"], rpt="some", token_type_hint="requesting_party_token"
) == {"active": False}
)
assert introspect == {"active": False}
with pytest.raises(KeycloakRPTNotFound): with pytest.raises(KeycloakRPTNotFound):
oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
await oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token")
def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""Test decode token. """Test decode token.
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
""" """
oid, username, password = oid_with_credentials oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password)
token = await oid.token(username=username, password=password)
public_key = await oid.public_key()
decoded_token = oid.decode_token(
token=token["access_token"],
key="-----BEGIN PUBLIC KEY-----\n" + public_key + "\n-----END PUBLIC KEY-----",
options={"verify_aud": False},
)
assert ( assert (
oid.decode_token(
token=token["access_token"],
key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----",
options={"verify_aud": False},
)["preferred_username"]
decoded_token["preferred_username"]
== username == username
) )
def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test load authorization config. """Test load authorization config.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
@ -335,7 +357,8 @@ def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpe
) )
def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies. """Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
@ -343,37 +366,38 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
""" """
oid, username, password = oid_with_credentials_authz oid, username, password = oid_with_credentials_authz
token = oid.token(username=username, password=password)
token = await oid.token(username=username, password=password)
with pytest.raises(KeycloakAuthorizationConfigError): with pytest.raises(KeycloakAuthorizationConfigError):
oid.get_policies(token=token["access_token"])
await oid.get_policies(token=token["access_token"])
oid.load_authorization_config(path="tests/data/authz_settings.json") oid.load_authorization_config(path="tests/data/authz_settings.json")
assert oid.get_policies(token=token["access_token"]) is None
assert await oid.get_policies(token=token["access_token"]) is None
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
key = "-----BEGIN PUBLIC KEY-----\n" + await oid.public_key() + "\n-----END PUBLIC KEY-----"
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
assert await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) str(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
] == ["Policy: test (role)"] ] == ["Policy: test (role)"]
assert [ assert [
repr(x) repr(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
] == ["<Policy: test (role)>"] ] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id
oid.logout(refresh_token=token["refresh_token"])
await oid.logout(refresh_token=token["refresh_token"])
with pytest.raises(KeycloakInvalidTokenError): with pytest.raises(KeycloakInvalidTokenError):
oid.get_policies(token=token["access_token"])
await oid.get_policies(token=token["access_token"])
def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
@pytest.mark.asyncio
async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test get policies. """Test get policies.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
@ -381,19 +405,19 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
""" """
oid, username, password = oid_with_credentials_authz oid, username, password = oid_with_credentials_authz
token = oid.token(username=username, password=password)
token = await oid.token(username=username, password=password)
with pytest.raises(KeycloakAuthorizationConfigError): with pytest.raises(KeycloakAuthorizationConfigError):
oid.get_permissions(token=token["access_token"])
await oid.get_permissions(token=token["access_token"])
oid.load_authorization_config(path="tests/data/authz_settings.json") oid.load_authorization_config(path="tests/data/authz_settings.json")
assert oid.get_permissions(token=token["access_token"]) is None
assert await oid.get_permissions(token=token["access_token"]) is None
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
key = "-----BEGIN PUBLIC KEY-----\n" + await oid.public_key() + "\n-----END PUBLIC KEY-----"
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert ( assert (
oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
await oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
) )
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
@ -405,70 +429,74 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) str(x)
for x in oid.get_permissions(
for x in await oid.get_permissions(
token=token["access_token"], method_token_info="decode", key=key token=token["access_token"], method_token_info="decode", key=key
) )
] == ["Permission: test-perm (resource)"] ] == ["Permission: test-perm (resource)"]
assert [ assert [
repr(x) repr(x)
for x in oid.get_permissions(
for x in await oid.get_permissions(
token=token["access_token"], method_token_info="decode", key=key token=token["access_token"], method_token_info="decode", key=key
) )
] == ["<Permission: test-perm (resource)>"] ] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id
oid.logout(refresh_token=token["refresh_token"])
await oid.logout(refresh_token=token["refresh_token"])
with pytest.raises(KeycloakInvalidTokenError): with pytest.raises(KeycloakInvalidTokenError):
oid.get_permissions(token=token["access_token"])
def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Test UMA permissions.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
"""
oid, username, password = oid_with_credentials_authz
token = oid.token(username=username, password=password)
assert len(oid.uma_permissions(token=token["access_token"])) == 1
assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource"
def test_has_uma_access(
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
):
"""Test has UMA access.
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
oid, username, password = oid_with_credentials_authz
token = oid.token(username=username, password=password)
assert (
str(oid.has_uma_access(token=token["access_token"], permissions=""))
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
assert (
str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
)
with pytest.raises(KeycloakPostError):
oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
oid.logout(refresh_token=token["refresh_token"])
assert (
str(oid.has_uma_access(token=token["access_token"], permissions=""))
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
)
assert (
str(oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource"))
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
+ "{'Default Resource'})"
)
await oid.get_permissions(token=token["access_token"])
#@pytest.mark.asyncio
#async def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
# """Test UMA permissions.
#
# :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
# server with client credentials
# :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
# """
# oid, username, password = oid_with_credentials_authz
# token = await oid.token(username=username, password=password)
#
# assert len(await oid.uma_permissions(token=token["access_token"])) == 1
# uma_permissions = await oid.uma_permissions(token=token["access_token"])
# assert uma_permissions[0]["rsname"] == "Default Resource"
#
#
#@pytest.mark.asyncio
#async def test_has_uma_access(
# oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin
#):
# """Test has UMA access.
#
# :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization
# server with client credentials
# :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
# :param admin: Keycloak Admin client
# :type admin: KeycloakAdmin
# """
# oid, username, password = oid_with_credentials_authz
# token = await oid.token(username=username, password=password)
# print(token)
#
# assert (
# str(await oid.has_uma_access(token=token["access_token"], permissions=""))
# == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
# )
# assert (
# str(await oid.has_uma_access(token=token["access_token"], permissions="Default Resource"))
# == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())"
# )
#
# with pytest.raises(KeycloakPostError):
# await oid.has_uma_access(token=token["access_token"], permissions="Does not exist")
#
# await oid.logout(refresh_token=token["refresh_token"])
# assert (
# str(await oid.has_uma_access(token=token["access_token"], permissions=""))
# == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())"
# )
# assert (
# str(await oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource"))
# == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions="
# + "{'Default Resource'})"
# )
Loading…
Cancel
Save