Browse Source

Merge branch 'master' into feat/create_permission_for_scopes

pull/400/head
Richard Nemeth 3 years ago
committed by GitHub
parent
commit
613ff22092
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .github/workflows/daily.yaml
  2. 3
      .github/workflows/lint.yaml
  3. 5
      .pre-commit-config.yaml
  4. 78
      CHANGELOG.md
  5. 59
      README.md
  6. 1837
      poetry.lock
  7. 12
      pyproject.toml
  8. 4
      src/keycloak/__init__.py
  9. 5
      src/keycloak/authorization/__init__.py
  10. 1117
      src/keycloak/keycloak_admin.py
  11. 22
      src/keycloak/keycloak_openid.py
  12. 200
      src/keycloak/keycloak_uma.py
  13. 408
      src/keycloak/openid_connection.py
  14. 6
      src/keycloak/uma_permissions.py
  15. 67
      src/keycloak/urls_patterns.py
  16. 6
      test_keycloak_init.sh
  17. 55
      tests/conftest.py
  18. BIN
      tests/providers/asm-7.3.1.jar
  19. BIN
      tests/providers/asm-commons-7.3.1.jar
  20. BIN
      tests/providers/asm-tree-7.3.1.jar
  21. BIN
      tests/providers/asm-util-7.3.1.jar
  22. BIN
      tests/providers/nashorn-core-15.4.jar
  23. 433
      tests/test_keycloak_admin.py
  24. 3
      tests/test_keycloak_openid.py
  25. 104
      tests/test_keycloak_uma.py
  26. 16
      tox.ini

3
.github/workflows/daily.yaml

@ -11,6 +11,9 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
keycloak-version: ["20.0", "21.0", "latest"]
env:
KEYCLOAK_DOCKER_IMAGE_TAG: ${{ matrix.keycloak-version }}
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}

3
.github/workflows/lint.yaml

@ -56,9 +56,12 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
keycloak-version: ["20.0", "21.0", "latest"]
needs:
- check-commits
- check-linting
env:
KEYCLOAK_DOCKER_IMAGE_TAG: ${{ matrix.keycloak-version }}
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}

5
.pre-commit-config.yaml

@ -8,9 +8,10 @@ repos:
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
args: ["--maxkb=10000"]
- repo: https://github.com/compilerla/conventional-pre-commit
rev: v1.2.0
hooks:
- id: conventional-pre-commit
stages: [ commit-msg ]
args: [ ] # optional: list of Conventional Commits types to allow
stages: [commit-msg]
args: [] # optional: list of Conventional Commits types to allow

78
CHANGELOG.md

@ -1,3 +1,81 @@
## v2.14.0 (2023-03-17)
### Feat
- add initial access token support and policy delete method
## v2.13.2 (2023-03-06)
### Fix
- Refactor auto refresh (#415)
## v2.13.1 (2023-03-05)
### Fix
- Check if applyPolicies exists in the config (#367)
## v2.13.0 (2023-03-05)
### Feat
- implement cache clearing API (#414)
## v2.12.2 (2023-03-05)
### Fix
- get_group_by_path uses Keycloak API to load (#417)
## v2.12.1 (2023-03-05)
### Fix
- tests and upgraded deps (#419)
## v2.12.0 (2023-02-10)
### Feat
- add Keycloak UMA client (#403)
## v2.11.1 (2023-02-08)
### Fix
- do not include CODEOWNERS (#407)
## v2.11.0 (2023-02-08)
## v2.10.0 (2023-02-08)
### Feat
- update header if token is given
- init KeycloakAdmin with token
- Add Client Scopes of Client
## v2.9.0 (2023-01-11)
### Feat
- added default realm roles handlers
## v2.8.0 (2022-12-29)
### Feat
- **api**: add tests for create_authz_scopes
### Fix
- fix testing create_client_authz_scopes parameters
- fix linting
- add testcase for invalid client id
- create authz clients test case
- create authz clients test case
## v2.7.0 (2022-12-24)
### Refactor

59
README.md

@ -69,7 +69,7 @@ keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/",
realm_name="example_realm",
client_secret_key="secret")
# Get WellKnow
# Get WellKnown
config_well_known = keycloak_openid.well_known()
# Get Code With Oauth Authorization Request
@ -142,14 +142,19 @@ auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Sc
# KEYCLOAK ADMIN
from keycloak import KeycloakAdmin
from keycloak import KeycloakOpenIDConnection
keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
username='example-admin',
password='secret',
realm_name="master",
user_realm_name="only_if_other_realm_than_master",
client_secret_key="client-secret",
verify=True)
keycloak_connection = KeycloakOpenIDConnection(
server_url="http://localhost:8080/",
username='example-admin',
password='secret',
realm_name="master",
user_realm_name="only_if_other_realm_than_master",
client_id="my_client",
client_secret_key="client-secret",
verify=True)
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
# Add user
new_user = keycloak_admin.create_user({"email": "example@example.com",
@ -273,6 +278,9 @@ keycloak_admin.get_composite_client_roles_of_user(user_id="user_id", client_id="
keycloak_admin.delete_client_roles_of_user(client_id="client_id", user_id="user_id", roles={"id": "role-id"})
keycloak_admin.delete_client_roles_of_user(client_id="client_id", user_id="user_id", roles=[{"id": "role-id_1"}, {"id": "role-id_2"}])
# Get the client authorization settings
client_authz_settings = get_client_authz_settings(client_id="client_id")
# Get all client authorization resources
client_resources = get_client_authz_resources(client_id="client_id")
@ -303,9 +311,6 @@ sync_users(storage_id="storage_di", action="action")
# Get client role id from name
role_id = keycloak_admin.get_client_role_id(client_id=client_id, role_name="test")
# Get all roles for the realm or client
realm_roles = keycloak_admin.get_roles()
# Assign client role to user. Note that BOTH role_name and role_id appear to be required.
keycloak_admin.assign_client_role(client_id=client_id, user_id=user_id, role_id=role_id, role_name="test")
@ -344,4 +349,36 @@ keycloak_admin.get_users() # Get user in main realm
keycloak_admin.realm_name = "demo" # Change realm to 'demo'
keycloak_admin.get_users() # Get users in realm 'demo'
keycloak_admin.create_user(...) # Creates a new user in 'demo'
# KEYCLOAK UMA
from keycloak import KeycloakOpenIDConnection
from keycloak import KeycloakUMA
keycloak_connection = KeycloakOpenIDConnection(
server_url="http://localhost:8080/",
realm_name="master",
client_id="my_client",
client_secret_key="client-secret")
keycloak_uma = KeycloakUMA(connection=keycloak_connection)
# Create a resource set
resource_set = keycloak_uma.resource_set_create({
"name": "example_resource",
"scopes": ["example:read", "example:write"],
"type": "urn:example"})
# List resource sets
resource_sets = uma.resource_set_list()
# get resource set
latest_resource = uma.resource_set_read(resource_set["_id"])
# update resource set
latest_resource["name"] = "New Resource Name"
uma.resource_set_update(resource_set["_id"], latest_resource)
# delete resource set
uma.resource_set_delete(resource_id=resource_set["_id"])
```

1837
poetry.lock
File diff suppressed because it is too large
View File

12
pyproject.toml

@ -22,7 +22,7 @@ packages = [
{ include = "keycloak", from = "src/" },
{ include = "keycloak/**/*.py", from = "src/" },
]
include = ["LICENSE", "CHANGELOG.md", "CODEOWNERS", "CONTRIBUTING.md"]
include = ["LICENSE", "CHANGELOG.md", "CONTRIBUTING.md"]
[tool.poetry.urls]
Documentation = "https://python-keycloak.readthedocs.io/en/latest/"
@ -30,9 +30,9 @@ Documentation = "https://python-keycloak.readthedocs.io/en/latest/"
[tool.poetry.dependencies]
python = "^3.7"
requests = "^2.20.0"
requests = "^2.28.2"
python-jose = "^3.3.0"
urllib3 = "^1.26.0"
urllib3 = "^1.26.14"
mock = {version = "^4.0.3", optional = true}
alabaster = {version = "^0.7.12", optional = true}
commonmark = {version = "^0.9.1", optional = true}
@ -42,7 +42,8 @@ sphinx-rtd-theme = {version = "^1.0.0", optional = true}
readthedocs-sphinx-ext = {version = "^2.1.9", optional = true}
m2r2 = {version = "^0.3.2", optional = true}
sphinx-autoapi = {version = "^2.0.0", optional = true}
requests-toolbelt = "^0.9.1"
requests-toolbelt = "^0.10.1"
deprecation = "^2.1.0"
[tool.poetry.extras]
docs = [
@ -61,7 +62,7 @@ docs = [
tox = "^3.25.0"
pytest = "^7.1.2"
pytest-cov = "^3.0.0"
wheel = "^0.37.1"
wheel = "^0.38.4"
pre-commit = "^2.19.0"
isort = "^5.10.1"
black = "^22.3.0"
@ -72,6 +73,7 @@ cryptography = "^37.0.4"
codespell = "^2.1.0"
darglint = "^1.8.1"
twine = "^4.0.2"
freezegun = "^1.2.2"
[build-system]
requires = ["poetry-core>=1.0.0"]

4
src/keycloak/__init__.py

@ -42,6 +42,8 @@ from .exceptions import (
)
from .keycloak_admin import KeycloakAdmin
from .keycloak_openid import KeycloakOpenID
from .keycloak_uma import KeycloakUMA
from .openid_connection import KeycloakOpenIDConnection
__all__ = [
"__version__",
@ -61,4 +63,6 @@ __all__ = [
"KeycloakSecretNotFound",
"KeycloakAdmin",
"KeycloakOpenID",
"KeycloakOpenIDConnection",
"KeycloakUMA",
]

5
src/keycloak/authorization/__init__.py

@ -86,8 +86,9 @@ class Authorization:
permission.scopes = ast.literal_eval(pol["config"]["scopes"])
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
self.policies[policy_name].add_permission(permission)
if "applyPolicies" in pol["config"]:
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
self.policies[policy_name].add_permission(permission)
if pol["type"] == "resource":
permission = Permission(

1117
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

22
src/keycloak/keycloak_openid.py

@ -47,6 +47,7 @@ from .uma_permissions import AuthStatus, build_permission_param
from .urls_patterns import (
URL_AUTH,
URL_CERTS,
URL_CLIENT_REGISTRATION,
URL_ENTITLEMENT,
URL_INTROSPECT,
URL_LOGOUT,
@ -679,3 +680,24 @@ class KeycloakOpenID:
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
)
def register_client(self, token: str, payload: dict):
"""Create a client.
ClientRepresentation:
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:param token: Initial access token
:type token: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
data_raw = self.connection.raw_post(
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)

200
src/keycloak/keycloak_uma.py

@ -0,0 +1,200 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak UMA module.
The module contains a UMA compatible client for keycloak:
https://docs.kantarainitiative.org/uma/wg/rec-oauth-uma-federated-authz-2.0.html
"""
import json
from urllib.parse import quote_plus
from .exceptions import (
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
raise_error_from_response,
)
from .openid_connection import KeycloakOpenIDConnection
from .urls_patterns import URL_UMA_WELL_KNOWN
class KeycloakUMA:
"""Keycloak UMA client.
:param connection: OpenID connection manager
"""
def __init__(self, connection: KeycloakOpenIDConnection):
"""Init method.
:param connection: OpenID connection manager
:type connection: KeycloakOpenIDConnection
"""
self.connection = connection
custom_headers = self.connection.custom_headers or {}
custom_headers.update({"Content-Type": "application/json"})
self.connection.custom_headers = custom_headers
self._well_known = None
def _fetch_well_known(self):
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(URL_UMA_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@staticmethod
def format_url(url, **kwargs):
"""Substitute url path parameters.
Given a parameterized url string, returns the string after url encoding and substituting
the given params. For example,
`format_url("https://myserver/{my_resource}/{id}", my_resource="hello world", id="myid")`
would produce `https://myserver/hello+world/myid`.
:param url: url string to format
:type url: str
:param kwargs: dict containing kwargs to substitute
:type kwargs: dict
:return: formatted string
:rtype: str
"""
return url.format(**{k: quote_plus(v) for k, v in kwargs.items()})
@property
def uma_well_known(self):
"""Get the well_known UMA2 config.
:returns: It lists endpoints and other configuration options relevant
:rtype: dict
"""
# per instance cache
if not self._well_known:
self._well_known = self._fetch_well_known()
return self._well_known
def resource_set_create(self, payload):
"""Create a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#rfc.section.2.2.1
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param payload: ResourceRepresentation
:type payload: dict
:return: ResourceRepresentation with the _id property assigned
:rtype: dict
"""
data_raw = self.connection.raw_post(
self.uma_well_known["resource_registration_endpoint"], data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def resource_set_update(self, resource_id, payload):
"""Update a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#update-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param resource_id: id of the resource
:type resource_id: str
:param payload: ResourceRepresentation
:type payload: dict
:return: Response dict (empty)
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_put(url, data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
def resource_set_read(self, resource_id):
"""Read a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#read-resource-set
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:param resource_id: id of the resource
:type resource_id: str
:return: ResourceRepresentation
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_get(url)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_delete(self, resource_id):
"""Delete a resource set.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#delete-resource-set
:param resource_id: id of the resource
:type resource_id: str
:return: Response dict (empty)
:rtype: dict
"""
url = self.format_url(
self.uma_well_known["resource_registration_endpoint"] + "/{id}", id=resource_id
)
data_raw = self.connection.raw_delete(url)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def resource_set_list_ids(self):
"""List all resource set ids.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
:return: List of ids
:rtype: List[str]
"""
data_raw = self.connection.raw_get(self.uma_well_known["resource_registration_endpoint"])
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def resource_set_list(self):
"""List all resource sets.
Spec
https://docs.kantarainitiative.org/uma/rec-oauth-resource-reg-v1_0_1.html#list-resource-sets
ResourceRepresentation
https://www.keycloak.org/docs-api/20.0.0/rest-api/index.html#_resourcerepresentation
:yields: Iterator over a list of ResourceRepresentations
:rtype: Iterator[dict]
"""
for resource_id in self.resource_set_list_ids():
resource = self.resource_set_read(resource_id)
yield resource

408
src/keycloak/openid_connection.py

@ -0,0 +1,408 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID Connection Manager module.
The module contains mainly the implementation of KeycloakOpenIDConnection class.
This is an extension of the ConnectionManager class, and handles the automatic refresh
of openid tokens when required.
"""
from datetime import datetime, timedelta
from .connection import ConnectionManager
from .exceptions import KeycloakPostError
from .keycloak_openid import KeycloakOpenID
class KeycloakOpenIDConnection(ConnectionManager):
"""A class to help with OpenID connections which can auto refresh tokens.
:param object: _description_
:type object: _type_
"""
_server_url = None
_username = None
_password = None
_totp = None
_realm_name = None
_client_id = None
_verify = None
_client_secret_key = None
_connection = None
_custom_headers = None
_user_realm_name = None
_expires_at = None
def __init__(
self,
server_url,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
timeout=60,
):
"""Init method.
:param server_url: Keycloak server url
:type server_url: str
:param username: admin username
:type username: str
:param password: admin password
:type password: str
:param token: access and refresh tokens
:type token: dict
:param totp: Time based OTP
:type totp: str
:param realm_name: realm name
:type realm_name: str
:param client_id: client id
:type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:type client_secret_key: str
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param user_realm_name: The realm name of the user, if different from realm_name
:type user_realm_name: str
:param timeout: connection timeout in seconds
:type timeout: int
"""
# token is renewed when it hits 90% of its lifetime. This is to account for any possible
# clock skew.
self.token_lifetime_fraction = 0.9
self.server_url = server_url
self.username = username
self.password = password
self.token = token
self.totp = totp
self.realm_name = realm_name
self.client_id = client_id
self.verify = verify
self.client_secret_key = client_secret_key
self.user_realm_name = user_realm_name
self.timeout = timeout
if self.token is None:
self.get_token()
self.headers = (
{
"Authorization": "Bearer " + self.token.get("access_token"),
"Content-Type": "application/json",
}
if self.token is not None
else {}
)
self.custom_headers = custom_headers
super().__init__(
base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
)
@property
def server_url(self):
"""Get server url.
:returns: Keycloak server url
:rtype: str
"""
return self.base_url
@server_url.setter
def server_url(self, value):
self.base_url = value
@property
def realm_name(self):
"""Get realm name.
:returns: Realm name
:rtype: str
"""
return self._realm_name
@realm_name.setter
def realm_name(self, value):
self._realm_name = value
@property
def client_id(self):
"""Get client id.
:returns: Client id
:rtype: str
"""
return self._client_id
@client_id.setter
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
"""Get client secret key.
:returns: Client secret key
:rtype: str
"""
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def username(self):
"""Get username.
:returns: Admin username
:rtype: str
"""
return self._username
@username.setter
def username(self, value):
self._username = value
@property
def password(self):
"""Get password.
:returns: Admin password
:rtype: str
"""
return self._password
@password.setter
def password(self, value):
self._password = value
@property
def totp(self):
"""Get totp.
:returns: TOTP
:rtype: str
"""
return self._totp
@totp.setter
def totp(self, value):
self._totp = value
@property
def token(self):
"""Get token.
:returns: Access and refresh token
:rtype: dict
"""
return self._token
@token.setter
def token(self, value):
self._token = value
self._expires_at = datetime.now() + timedelta(
seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
)
@property
def expires_at(self):
"""Get token expiry time.
:returns: Datetime at which the current token will expire
:rtype: datetime
"""
return self._expires_at
@property
def user_realm_name(self):
"""Get user realm name.
:returns: User realm name
:rtype: str
"""
return self._user_realm_name
@user_realm_name.setter
def user_realm_name(self, value):
self._user_realm_name = value
@property
def custom_headers(self):
"""Get custom headers.
:returns: Custom headers
:rtype: dict
"""
return self._custom_headers
@custom_headers.setter
def custom_headers(self, value):
self._custom_headers = value
if self.custom_headers is not None:
# merge custom headers to main headers
self.headers.update(self.custom_headers)
def get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self.keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
)
grant_type = []
if self.client_secret_key:
if self.user_realm_name:
self.realm_name = self.user_realm_name
grant_type.append("client_credentials")
elif self.username and self.password:
grant_type.append("password")
if grant_type:
self.token = self.keycloak_openid.token(
self.username, self.password, grant_type=grant_type, totp=self.totp
)
else:
self.token = None
def refresh_token(self):
"""Refresh the token.
:raises KeycloakPostError: In case the refresh token request failed.
"""
refresh_token = self.token.get("refresh_token", None) if self.token else None
if refresh_token is None:
self.get_token()
else:
try:
self.token = self.keycloak_openid.refresh_token(refresh_token)
except KeycloakPostError as e:
list_errors = [
b"Refresh token expired",
b"Token is not active",
b"Session not active",
]
if e.response_code == 400 and any(err in e.response_body for err in list_errors):
self.get_token()
else:
raise
self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
def _refresh_if_required(self):
if datetime.now() >= self.expires_at:
self.refresh_token()
def raw_get(self, *args, **kwargs):
"""Call connection.raw_get.
If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
and try *get* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_get(*args, **kwargs)
return r
def raw_post(self, *args, **kwargs):
"""Call connection.raw_post.
If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
and try *post* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_post(*args, **kwargs)
return r
def raw_put(self, *args, **kwargs):
"""Call connection.raw_put.
If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
and try *put* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_put(*args, **kwargs)
return r
def raw_delete(self, *args, **kwargs):
"""Call connection.raw_delete.
If auto_refresh is set for *delete* and *access_token* is expired,
it will refresh the token and try *delete* once more.
:param args: Additional arguments
:type args: tuple
:param kwargs: Additional keyword arguments
:type kwargs: dict
:returns: Response
:rtype: Response
"""
self._refresh_if_required()
r = super().raw_delete(*args, **kwargs)
return r

6
src/keycloak/uma_permissions.py

@ -27,7 +27,7 @@ from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinit
class UMAPermission:
"""A class to conveniently assembly permissions.
"""A class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.
@ -143,7 +143,7 @@ class UMAPermission:
class Resource(UMAPermission):
"""An UMAPermission Resource class to conveniently assembly permissions.
"""A UMAPermission Resource class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.
@ -161,7 +161,7 @@ class Resource(UMAPermission):
class Scope(UMAPermission):
"""An UMAPermission Scope class to conveniently assembly permissions.
"""A UMAPermission Scope class to conveniently assemble permissions.
The class itself is callable, and will return the assembled permission.

67
src/keycloak/urls_patterns.py

@ -25,7 +25,8 @@
# OPENID URLS
URL_REALM = "realms/{realm-name}"
URL_WELL_KNOWN = "realms/{realm-name}/.well-known/openid-configuration"
URL_WELL_KNOWN_BASE = "realms/{realm-name}/.well-known"
URL_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/openid-configuration"
URL_TOKEN = "realms/{realm-name}/protocol/openid-connect/token"
URL_USERINFO = "realms/{realm-name}/protocol/openid-connect/userinfo"
URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout"
@ -37,6 +38,9 @@ URL_AUTH = (
"&scope={scope}&state={state}"
)
URL_CLIENT_REGISTRATION = URL_REALM + "/clients-registrations/default"
URL_CLIENT_UPDATE = URL_CLIENT_REGISTRATION + "/{client-id}"
# ADMIN URLS
URL_ADMIN_USERS = "admin/realms/{realm-name}/users"
URL_ADMIN_USERS_COUNT = "admin/realms/{realm-name}/users/count"
@ -77,10 +81,12 @@ URL_ADMIN_SERVER_INFO = "admin/serverinfo"
URL_ADMIN_GROUPS = "admin/realms/{realm-name}/groups"
URL_ADMIN_GROUP = "admin/realms/{realm-name}/groups/{id}"
URL_ADMIN_GROUP_BY_PATH = "admin/realms/{realm-name}/group-by-path/{path}"
URL_ADMIN_GROUP_CHILD = "admin/realms/{realm-name}/groups/{id}/children"
URL_ADMIN_GROUP_PERMISSIONS = "admin/realms/{realm-name}/groups/{id}/management/permissions"
URL_ADMIN_GROUP_MEMBERS = "admin/realms/{realm-name}/groups/{id}/members"
URL_ADMIN_CLIENT_INITIAL_ACCESS = "admin/realms/{realm-name}/clients-initial-access"
URL_ADMIN_CLIENTS = "admin/realms/{realm-name}/clients"
URL_ADMIN_CLIENT = URL_ADMIN_CLIENTS + "/{id}"
URL_ADMIN_CLIENT_ALL_SESSIONS = URL_ADMIN_CLIENT + "/user-sessions"
@ -95,34 +101,31 @@ URL_ADMIN_CLIENT_SCOPE_MAPPINGS_REALM_ROLES = URL_ADMIN_CLIENT + "/scope-mapping
URL_ADMIN_CLIENT_SCOPE_MAPPINGS_CLIENT_ROLES = (
URL_ADMIN_CLIENT + "/scope-mappings/clients/{client}"
)
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1&permission=false"
URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPES = URL_ADMIN_CLIENT + "/optional-client-scopes"
URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPE = (
URL_ADMIN_CLIENT_OPTIONAL_CLIENT_SCOPES + "/{client_scope_id}"
)
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_CLIENT + "/default-client-scopes"
URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE = (
URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPES + "/{client_scope_id}"
)
URL_ADMIN_CLIENT_AUTHZ = URL_ADMIN_CLIENT + "/authz/resource-server"
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT_AUTHZ + "/settings"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT_AUTHZ + "/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT_AUTHZ + "/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT_AUTHZ + "/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = URL_ADMIN_CLIENT_AUTHZ + "/policy?max=-1&permission=false"
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/role?max=-1"
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
)
URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/{policy-id}/scopes"
)
URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/{policy-id}/resources"
)
URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/scope/{scope-id}"
URL_ADMIN_CLIENT_AUTHZ + "/permission/resource?max=-1"
)
URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/scope?max=-1"
)
URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT + "/authz/resource-server/policy/client"
URL_ADMIN_CLIENT_AUTHZ_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/{policy-id}"
URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/scopes"
URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/resources"
URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope/{scope-id}"
URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/client"
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user"
URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}"
@ -193,9 +196,9 @@ URL_ADMIN_EVENTS_CONFIG = URL_ADMIN_EVENTS + "/config"
URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats"
URL_ADMIN_GROUPS_CLIENT_ROLES_COMPOSITE = URL_ADMIN_GROUPS_CLIENT_ROLES + "/composite"
URL_ADMIN_CLIENT_ROLE_CHILDREN = (
"admin/realms/{realm-name}/roles-by-id/{role-id}/composites/clients/{client-id}"
)
URL_ADMIN_REALM_ROLE_COMPOSITES = "admin/realms/{realm-name}/roles-by-id/{role-id}/composites"
URL_ADMIN_REALM_ROLE_COMPOSITES_REALM = URL_ADMIN_REALM_ROLE_COMPOSITES + "/realm"
URL_ADMIN_CLIENT_ROLE_CHILDREN = URL_ADMIN_REALM_ROLE_COMPOSITES + "/clients/{client-id}"
URL_ADMIN_CLIENT_CERT_UPLOAD = URL_ADMIN_CLIENT_CERTS + "/upload-certificate"
URL_ADMIN_REQUIRED_ACTIONS = URL_ADMIN_REALM + "/authentication/required-actions"
URL_ADMIN_REQUIRED_ACTIONS_ALIAS = URL_ADMIN_REQUIRED_ACTIONS + "/{action-alias}"
@ -204,3 +207,11 @@ URL_ADMIN_ATTACK_DETECTION = "admin/realms/{realm-name}/attack-detection/brute-f
URL_ADMIN_ATTACK_DETECTION_USER = (
"admin/realms/{realm-name}/attack-detection/brute-force/users/{id}"
)
URL_ADMIN_CLEAR_KEYS_CACHE = URL_ADMIN_REALM + "/clear-keys-cache"
URL_ADMIN_CLEAR_REALM_CACHE = URL_ADMIN_REALM + "/clear-realm-cache"
URL_ADMIN_CLEAR_USER_CACHE = URL_ADMIN_REALM + "/clear-user-cache"
# UMA URLS
URL_UMA_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/uma2-configuration"

6
test_keycloak_init.sh

@ -1,7 +1,8 @@
#!/usr/bin/env bash
CMD_ARGS=$1
KEYCLOAK_DOCKER_IMAGE="quay.io/keycloak/keycloak:latest"
KEYCLOAK_DOCKER_IMAGE_TAG="${KEYCLOAK_DOCKER_IMAGE_TAG:-latest}"
KEYCLOAK_DOCKER_IMAGE="quay.io/keycloak/keycloak:$KEYCLOAK_DOCKER_IMAGE_TAG"
function keycloak_stop() {
if [ "$(docker ps -q -f name=unittest_keycloak)" ]; then
@ -13,7 +14,8 @@ function keycloak_stop() {
function keycloak_start() {
echo "Starting keycloak docker container"
docker run -d --name unittest_keycloak -e KEYCLOAK_ADMIN="${KEYCLOAK_ADMIN}" -e KEYCLOAK_ADMIN_PASSWORD="${KEYCLOAK_ADMIN_PASSWORD}" -e KC_FEATURES="token-exchange,admin-fine-grained-authz" -p "${KEYCLOAK_PORT}:8080" "${KEYCLOAK_DOCKER_IMAGE}" start-dev
PWD=$(pwd)
docker run -d --name unittest_keycloak -e KEYCLOAK_ADMIN="${KEYCLOAK_ADMIN}" -e KEYCLOAK_ADMIN_PASSWORD="${KEYCLOAK_ADMIN_PASSWORD}" -e KC_FEATURES="token-exchange,admin-fine-grained-authz" -p "${KEYCLOAK_PORT}:8080" -v $PWD/tests/providers:/opt/keycloak/providers "${KEYCLOAK_DOCKER_IMAGE}" start-dev
SECONDS=0
until curl --silent --output /dev/null localhost:$KEYCLOAK_PORT; do
sleep 5;

55
tests/conftest.py

@ -4,7 +4,9 @@ import ipaddress
import os
import uuid
from datetime import datetime, timedelta
from typing import Tuple
import freezegun
import pytest
from cryptography import x509
from cryptography.hazmat.backends import default_backend
@ -12,7 +14,7 @@ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from keycloak import KeycloakAdmin, KeycloakOpenID
from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection, KeycloakUMA
class KeycloakTestEnv(object):
@ -150,6 +152,23 @@ def admin(env: KeycloakTestEnv):
)
@pytest.fixture
@freezegun.freeze_time("2023-02-25 10:00:00")
def admin_frozen(env: KeycloakTestEnv):
"""Fixture for initialized KeycloakAdmin class, with time frozen.
:param env: Keycloak test environment
:type env: KeycloakTestEnv
:returns: Keycloak admin
:rtype: KeycloakAdmin
"""
return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
)
@pytest.fixture
def oid(env: KeycloakTestEnv, realm: str, admin: KeycloakAdmin):
"""Fixture for initialized KeycloakOpenID class.
@ -475,3 +494,37 @@ def selfsigned_cert():
)
return cert_pem, key_pem
@pytest.fixture
def oid_connection_with_authz(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
"""Fixture for initialized KeycloakUMA class.
:param oid_with_credentials_authz: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
:yields: Keycloak OpenID connection manager
:rtype: KeycloakOpenIDConnection
"""
oid, _, _ = oid_with_credentials_authz
connection = KeycloakOpenIDConnection(
server_url=oid.connection.base_url,
realm_name=oid.realm_name,
client_id=oid.client_id,
client_secret_key=oid.client_secret_key,
timeout=60,
)
yield connection
@pytest.fixture
def uma(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Fixture for initialized KeycloakUMA class.
:param oid_connection_with_authz: Keycloak open id connection with pre-configured authz client
:type oid_connection_with_authz: KeycloakOpenIDConnection
:yields: Keycloak OpenID client
:rtype: KeycloakOpenID
"""
connection = oid_connection_with_authz
# Return UMA
yield KeycloakUMA(connection=connection)

BIN
tests/providers/asm-7.3.1.jar

BIN
tests/providers/asm-commons-7.3.1.jar

BIN
tests/providers/asm-tree-7.3.1.jar

BIN
tests/providers/asm-util-7.3.1.jar

BIN
tests/providers/nashorn-core-15.4.jar

433
tests/test_keycloak_admin.py

@ -1,9 +1,12 @@
"""Test the keycloak admin object."""
import copy
import uuid
from typing import Tuple
import freezegun
import pytest
from dateutil import parser as datetime_parser
import keycloak
from keycloak import KeycloakAdmin, KeycloakOpenID
@ -22,31 +25,6 @@ def test_keycloak_version():
assert keycloak.__version__, keycloak.__version__
def test_keycloak_admin_bad_init(env):
"""Test keycloak admin bad init.
:param env: Environment fixture
:type env: KeycloakTestEnv
"""
with pytest.raises(TypeError) as err:
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
auto_refresh_token=1,
)
assert err.match("Expected a list of strings")
with pytest.raises(TypeError) as err:
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
auto_refresh_token=["patch"],
)
assert err.match("Unexpected method in auto_refresh_token")
def test_keycloak_admin_init(env):
"""Test keycloak admin init.
@ -90,6 +68,15 @@ def test_keycloak_admin_init(env):
)
assert admin.token
token = admin.token
admin = KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
token=token,
realm_name=None,
user_realm_name=None,
)
assert admin.token == token
admin.create_realm(payload={"realm": "authz", "enabled": True})
admin.realm_name = "authz"
admin.create_client(
@ -597,20 +584,14 @@ def test_groups(admin: KeycloakAdmin, user: str):
# Test get group by path
res = admin.get_group_by_path(path="/main-group/subgroup-1")
assert res is None, res
res = admin.get_group_by_path(path="/main-group/subgroup-1", search_in_subgroups=True)
assert res is not None, res
assert res["id"] == subgroup_id_1, res
res = admin.get_group_by_path(
path="/main-group/subgroup-2/subsubgroup-1/test", search_in_subgroups=True
)
assert res is None, res
with pytest.raises(KeycloakGetError) as err:
admin.get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1/test")
assert err.match('404: b\'{"error":"Group path does not exist"}\'')
res = admin.get_group_by_path(
path="/main-group/subgroup-2/subsubgroup-1", search_in_subgroups=True
)
res = admin.get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1")
assert res is not None, res
assert res["id"] == subsubgroup_id_1
@ -852,6 +833,17 @@ def test_clients(admin: KeycloakAdmin, realm: str):
) == {"msg": "Already exists"}
assert len(admin.get_client_authz_policies(client_id=auth_client_id)) == 2
res = admin.create_client_authz_role_based_policy(
client_id=auth_client_id,
payload={"name": "test-authz-rb-policy-delete", "roles": [{"id": role_id}]},
)
res2 = admin.get_client_authz_policy(client_id=auth_client_id, policy_id=res["id"])
assert res["id"] == res2["id"]
admin.delete_client_authz_policy(client_id=auth_client_id, policy_id=res["id"])
with pytest.raises(KeycloakGetError) as err:
admin.get_client_authz_policy(client_id=auth_client_id, policy_id=res["id"])
assert err.match("404: b''")
# Test authz permissions
res = admin.get_client_authz_permissions(client_id=auth_client_id)
assert len(res) == 1, res
@ -890,6 +882,24 @@ def test_clients(admin: KeycloakAdmin, realm: str):
admin.get_client_authz_scopes(client_id=client_id)
assert err.match('404: b\'{"error":"HTTP 404 Not Found"}\'')
res = admin.create_client_authz_scopes(
client_id=auth_client_id, payload={"name": "test-authz-scope"}
)
assert res["name"] == "test-authz-scope", res
with pytest.raises(KeycloakPostError) as err:
admin.create_client_authz_scopes(
client_id="invalid_client_id", payload={"name": "test-authz-scope"}
)
assert err.match('404: b\'{"error":"Could not find client"')
assert admin.create_client_authz_scopes(
client_id=auth_client_id, payload={"name": "test-authz-scope"}
)
res = admin.get_client_authz_scopes(client_id=auth_client_id)
assert len(res) == 1
assert {x["name"] for x in res} == {"test-authz-scope"}
# Test service account user
res = admin.get_client_service_account_user(client_id=auth_client_id)
assert res["username"] == "service-account-authz-client", res
@ -1300,6 +1310,98 @@ def test_client_scope_client_roles(admin: KeycloakAdmin, realm: str, client: str
assert len(roles) == 0
def test_client_default_client_scopes(admin: KeycloakAdmin, realm: str, client: str):
"""Test client assignment of default client scopes.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
:param client: Keycloak client
:type client: str
"""
admin.realm_name = realm
client_id = admin.create_client(
payload={"name": "role-testing-client", "clientId": "role-testing-client"}
)
# Test get client default scopes
# keycloak default roles: web-origins, acr, profile, roles, email
default_client_scopes = admin.get_client_default_client_scopes(client_id)
assert len(default_client_scopes) == 5, default_client_scopes
# Test add a client scope to client default scopes
default_client_scope = "test-client-default-scope"
new_client_scope = {
"name": default_client_scope,
"description": f"Test Client Scope: {default_client_scope}",
"protocol": "openid-connect",
"attributes": {},
}
new_client_scope_id = admin.create_client_scope(new_client_scope, skip_exists=False)
new_default_client_scope_data = {
"realm": realm,
"client": client_id,
"clientScopeId": new_client_scope_id,
}
admin.add_client_default_client_scope(
client_id, new_client_scope_id, new_default_client_scope_data
)
default_client_scopes = admin.get_client_default_client_scopes(client_id)
assert len(default_client_scopes) == 6, default_client_scopes
# Test remove a client default scope
admin.delete_client_default_client_scope(client_id, new_client_scope_id)
default_client_scopes = admin.get_client_default_client_scopes(client_id)
assert len(default_client_scopes) == 5, default_client_scopes
def test_client_optional_client_scopes(admin: KeycloakAdmin, realm: str, client: str):
"""Test client assignment of optional client scopes.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
:param client: Keycloak client
:type client: str
"""
admin.realm_name = realm
client_id = admin.create_client(
payload={"name": "role-testing-client", "clientId": "role-testing-client"}
)
# Test get client optional scopes
# keycloak optional roles: microprofile-jwt, offline_access, address, phone
optional_client_scopes = admin.get_client_optional_client_scopes(client_id)
assert len(optional_client_scopes) == 4, optional_client_scopes
# Test add a client scope to client optional scopes
optional_client_scope = "test-client-optional-scope"
new_client_scope = {
"name": optional_client_scope,
"description": f"Test Client Scope: {optional_client_scope}",
"protocol": "openid-connect",
"attributes": {},
}
new_client_scope_id = admin.create_client_scope(new_client_scope, skip_exists=False)
new_optional_client_scope_data = {
"realm": realm,
"client": client_id,
"clientScopeId": new_client_scope_id,
}
admin.add_client_optional_client_scope(
client_id, new_client_scope_id, new_optional_client_scope_data
)
optional_client_scopes = admin.get_client_optional_client_scopes(client_id)
assert len(optional_client_scopes) == 5, optional_client_scopes
# Test remove a client optional scope
admin.delete_client_optional_client_scope(client_id, new_client_scope_id)
optional_client_scopes = admin.get_client_optional_client_scopes(client_id)
assert len(optional_client_scopes) == 4, optional_client_scopes
def test_client_roles(admin: KeycloakAdmin, client: str):
"""Test client roles.
@ -2098,94 +2200,66 @@ def test_events(admin: KeycloakAdmin, realm: str):
assert events == list()
def test_auto_refresh(admin: KeycloakAdmin, realm: str):
@freezegun.freeze_time("2023-02-25 10:00:00")
def test_auto_refresh(admin_frozen: KeycloakAdmin, realm: str):
"""Test auto refresh token.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param admin_frozen: Keycloak Admin client with time frozen in place
:type admin_frozen: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin = admin_frozen
# Test get refresh
admin.auto_refresh_token = list()
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
admin.connection.custom_headers = {
"Authorization": "Bearer bad",
"Content-Type": "application/json",
}
with pytest.raises(KeycloakAuthenticationError) as err:
admin.get_realm(realm_name=realm)
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get"]
del admin.token["refresh_token"]
assert admin.get_realm(realm_name=realm)
# Test bad refresh token
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
admin.token["refresh_token"] = "bad"
with pytest.raises(KeycloakPostError) as err:
admin.get_realm(realm_name="test-refresh")
assert err.match(
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\''
)
admin.realm_name = "master"
admin.get_token()
admin.realm_name = realm
# Freeze time to simulate the access token expiring
with freezegun.freeze_time("2023-02-25 10:05:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:05:00")
assert admin.get_realm(realm_name=realm)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:05:00")
# Test bad refresh token, but first make sure access token has expired again
with freezegun.freeze_time("2023-02-25 10:10:00"):
admin.connection.custom_headers = {"Content-Type": "application/json"}
admin.connection.token["refresh_token"] = "bad"
with pytest.raises(KeycloakPostError) as err:
admin.get_realm(realm_name="test-refresh")
assert err.match(
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\''
)
admin.connection.get_token()
# Test post refresh
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
with pytest.raises(KeycloakAuthenticationError) as err:
admin.create_realm(payload={"realm": "test-refresh"})
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post"]
admin.realm_name = "master"
admin.user_logout(user_id=admin.get_user_id(username=admin.username))
assert admin.create_realm(payload={"realm": "test-refresh"}) == b""
admin.realm_name = realm
with freezegun.freeze_time("2023-02-25 10:15:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:15:00")
admin.connection.token = None
assert admin.create_realm(payload={"realm": "test-refresh"}) == b""
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:15:00")
# Test update refresh
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
with pytest.raises(KeycloakAuthenticationError) as err:
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post", "put"]
assert (
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict()
)
with freezegun.freeze_time("2023-02-25 10:25:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00")
admin.connection.token = None
assert (
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
== dict()
)
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00")
# Test delete refresh
admin.connection = ConnectionManager(
base_url=admin.server_url,
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"},
timeout=60,
verify=admin.verify,
)
with pytest.raises(KeycloakAuthenticationError) as err:
admin.delete_realm(realm_name="test-refresh")
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'')
admin.auto_refresh_token = ["get", "post", "put", "delete"]
assert admin.delete_realm(realm_name="test-refresh") == dict()
with freezegun.freeze_time("2023-02-25 10:35:00"):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:35:00")
admin.connection.token = None
assert admin.delete_realm(realm_name="test-refresh") == dict()
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00")
def test_get_required_actions(admin: KeycloakAdmin, realm: str):
@ -2427,3 +2501,150 @@ def test_clear_bruteforce_attempts_for_all_users(
res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False})
res = admin.get_realm(realm_name=realm)
assert res["bruteForceProtected"] is False
def test_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> None:
"""Test that the default realm role is present in a brand new realm.
:param realm: Realm name
:type realm: str
:param admin: Keycloak admin
:type admin: KeycloakAdmin
"""
admin.realm_name = realm
assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()]
assert (
len([x["name"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"])
== 1
)
def test_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> None:
"""Test getter for the ID of the default realm role.
:param realm: Realm name
:type realm: str
:param admin: Keycloak admin
:type admin: KeycloakAdmin
"""
admin.realm_name = realm
assert (
admin.get_default_realm_role_id()
== [x["id"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"][0]
)
def test_realm_default_roles(admin: KeycloakAdmin, realm: str) -> None:
"""Test getting, adding and deleting default realm roles.
:param realm: Realm name
:type realm: str
:param admin: Keycloak admin
:type admin: KeycloakAdmin
"""
admin.realm_name = realm
# Test listing all default realm roles
roles = admin.get_realm_default_roles()
assert len(roles) == 2
assert {x["name"] for x in roles} == {"offline_access", "uma_authorization"}
with pytest.raises(KeycloakGetError) as err:
admin.realm_name = "doesnotexist"
admin.get_realm_default_roles()
assert err.match('404: b\'{"error":"Realm not found."}\'')
admin.realm_name = realm
# Test removing a default realm role
res = admin.remove_realm_default_roles(payload=[roles[0]])
assert res == {}
assert roles[0] not in admin.get_realm_default_roles()
assert len(admin.get_realm_default_roles()) == 1
with pytest.raises(KeycloakDeleteError) as err:
admin.remove_realm_default_roles(payload=[{"id": "bad id"}])
assert err.match('404: b\'{"error":"Could not find composite role"}\'')
# Test adding a default realm role
res = admin.add_realm_default_roles(payload=[roles[0]])
assert res == {}
assert roles[0] in admin.get_realm_default_roles()
assert len(admin.get_realm_default_roles()) == 2
with pytest.raises(KeycloakPostError) as err:
admin.add_realm_default_roles(payload=[{"id": "bad id"}])
assert err.match('404: b\'{"error":"Could not find composite role"}\'')
def test_clear_keys_cache(realm: str, admin: KeycloakAdmin) -> None:
"""Test clearing the keys cache.
:param realm: Realm name
:type realm: str
:param admin: Keycloak admin
:type admin: KeycloakAdmin
"""
admin.realm_name = realm
res = admin.clear_keys_cache()
assert res == {}
def test_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None:
"""Test clearing the realm cache.
:param realm: Realm name
:type realm: str
:param admin: Keycloak admin
:type admin: KeycloakAdmin
"""
admin.realm_name = realm
res = admin.clear_realm_cache()
assert res == {}
def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None:
"""Test clearing the user cache.
:param realm: Realm name
:type realm: str
:param admin: Keycloak admin
:type admin: KeycloakAdmin
"""
admin.realm_name = realm
res = admin.clear_user_cache()
assert res == {}
def test_initial_access_token(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str]
) -> None:
"""Test initial access token and client creation.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
res = admin.create_initial_access_token(2, 3)
assert "token" in res
assert res["count"] == 2
assert res["expiration"] == 3
oid, username, password = oid_with_credentials
client = str(uuid.uuid4())
secret = str(uuid.uuid4())
res = oid.register_client(
token=res["token"],
payload={
"name": client,
"clientId": client,
"enabled": True,
"publicClient": False,
"protocol": "openid-connect",
"secret": secret,
"clientAuthenticatorType": "client-secret",
},
)
assert res["clientId"] == client

3
tests/test_keycloak_openid.py

@ -4,6 +4,7 @@ from unittest import mock
import pytest
from keycloak import KeycloakAdmin, KeycloakOpenID
from keycloak.authorization import Authorization
from keycloak.authorization.permission import Permission
from keycloak.authorization.policy import Policy
@ -17,8 +18,6 @@ from keycloak.exceptions import (
KeycloakPostError,
KeycloakRPTNotFound,
)
from keycloak.keycloak_admin import KeycloakAdmin
from keycloak.keycloak_openid import KeycloakOpenID
def test_keycloak_openid_init(env):

104
tests/test_keycloak_uma.py

@ -0,0 +1,104 @@
"""Test module for KeycloakUMA."""
import re
import pytest
from keycloak import KeycloakOpenIDConnection, KeycloakUMA
from keycloak.exceptions import (
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
)
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Test KeycloakUMA's init method.
:param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
:type oid_connection_with_authz: KeycloakOpenIDConnection
"""
connection = oid_connection_with_authz
uma = KeycloakUMA(connection=connection)
assert isinstance(uma.connection, KeycloakOpenIDConnection)
# should initially be empty
assert uma._well_known is None
assert uma.uma_well_known
# should be cached after first reference
assert uma._well_known is not None
def test_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
res = uma.uma_well_known
assert res is not None
assert res != dict()
for key in ["resource_registration_endpoint"]:
assert key in res
def test_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
# Check that only the default resource is present
resource_sets = uma.resource_set_list()
resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
# Test create resource set
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
}
created_resource = uma.resource_set_create(resource_to_create)
assert created_resource
assert created_resource["_id"], created_resource
assert set(resource_to_create).issubset(set(created_resource)), created_resource
# Test create the same resource set
with pytest.raises(KeycloakPostError) as err:
uma.resource_set_create(resource_to_create)
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
)
# Test get resource set
latest_resource = uma.resource_set_read(created_resource["_id"])
assert latest_resource["name"] == created_resource["name"]
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = uma.resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
updated_resource = uma.resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
# Test delete resource set
res = uma.resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
with pytest.raises(KeycloakGetError) as err:
uma.resource_set_read(created_resource["_id"])
err.match("404: b''")
# Test delete fail
with pytest.raises(KeycloakDeleteError) as err:
uma.resource_set_delete(resource_id=created_resource["_id"])
assert err.match("404: b''")

16
tox.ini

@ -1,13 +1,12 @@
[tox]
requires =
tox-poetry
poetry
tox<4.0.0
isolated_build = true
envlist = check, apply-check, docs, tests, build, changelog
[testenv]
whitelist_externals =
bash
commands_pre =
poetry install --no-root --sync
[testenv:check]
commands =
@ -23,21 +22,18 @@ commands =
isort src/keycloak tests docs
[testenv:docs]
extras = docs
commands_pre =
poetry install --no-root --sync -E docs
commands =
sphinx-build -T -E -W -b html -d _build/doctrees -D language=en ./docs/source _build/html
[testenv:tests]
setenv = file|tox.env
passenv = CONTAINER_HOST
passenv = CONTAINER_HOST KEYCLOAK_DOCKER_IMAGE_TAG
commands =
./test_keycloak_init.sh "pytest -vv --cov=keycloak --cov-report term-missing {posargs}"
[testenv:build]
deps =
poetry
setenv =
POETRY_VIRTUALENVS_CREATE = false
commands =
poetry build --format sdist
poetry build --format wheel

Loading…
Cancel
Save