Browse Source

feat: more authentication flows and executions methods

* feat: add more request

* feat: add more request

* feat: add new endpoints

* fix: revert CHNGELOG

* feat: add tests

* fix: async methods, docs, deps update, ruff formatting

* chore: test py

---------

Co-authored-by: Aleksey Kuznetsov <alekkuznetsov@ptsecurity.com>
Co-authored-by: Richard Nemeth <ryshoooo@gmail.com>
pull/637/head v5.3.0
Aleksey Kuznetsov 6 days ago
committed by GitHub
parent
commit
3868652841
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      .gitignore
  2. 64
      poetry.lock
  3. 1
      pyproject.toml
  4. 178
      src/keycloak/keycloak_admin.py
  5. 9
      src/keycloak/urls_patterns.py
  6. 123
      tests/test_keycloak_admin.py
  7. 2
      tox.ini

2
.gitignore

@ -110,5 +110,3 @@ s3air-authz-config.json
_build
.ruff_cache
.DS_Store
test.py

64
poetry.lock

@ -110,18 +110,18 @@ files = [
[[package]]
name = "babel"
version = "2.16.0"
version = "2.17.0"
description = "Internationalization utilities"
optional = false
python-versions = ">=3.8"
groups = ["docs"]
files = [
{file = "babel-2.16.0-py3-none-any.whl", hash = "sha256:368b5b98b37c06b7daf6696391c3240c938b37767d4584413e8438c5c435fa8b"},
{file = "babel-2.16.0.tar.gz", hash = "sha256:d1f3554ca26605fe173f3de0c65f750f5a42f924499bf134de6423582298e316"},
{file = "babel-2.17.0-py3-none-any.whl", hash = "sha256:4d0b53093fdfb4b21c92b5213dba5a1b23885afa8383709427046b21c366e5f2"},
{file = "babel-2.17.0.tar.gz", hash = "sha256:0c54cffb19f690cdcc52a3b50bcbf71e07a808d1c80d549f2459b9d2cf0afb9d"},
]
[package.extras]
dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"]
dev = ["backports.zoneinfo", "freezegun (>=1.0,<2.0)", "jinja2 (>=3.0)", "pytest (>=6.0)", "pytest-cov", "pytz", "setuptools", "tzdata"]
[[package]]
name = "backports-tarfile"
@ -176,14 +176,14 @@ files = [
[[package]]
name = "certifi"
version = "2024.12.14"
version = "2025.1.31"
description = "Python package for providing Mozilla's CA Bundle."
optional = false
python-versions = ">=3.6"
groups = ["main", "dev", "docs"]
files = [
{file = "certifi-2024.12.14-py3-none-any.whl", hash = "sha256:1275f7a45be9464efc1173084eaa30f866fe2e47d389406136d332ed4967ec56"},
{file = "certifi-2024.12.14.tar.gz", hash = "sha256:b650d30f370c2b724812bee08008be0c4163b163ddaec3f2546c1caf65f191db"},
{file = "certifi-2025.1.31-py3-none-any.whl", hash = "sha256:ca78db4565a652026a4db2bcdf68f2fb589ea80d0be70e03929ed730746b84fe"},
{file = "certifi-2025.1.31.tar.gz", hash = "sha256:3d5da6925056f6f18f119200434a4780a94263f10d1c21d032a6f6b2baa20651"},
]
[[package]]
@ -395,14 +395,14 @@ files = [
[[package]]
name = "codespell"
version = "2.4.0"
version = "2.4.1"
description = "Fix common misspellings in text files"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
files = [
{file = "codespell-2.4.0-py3-none-any.whl", hash = "sha256:b4c5b779f747dd481587aeecb5773301183f52b94b96ed51a28126d0482eec1d"},
{file = "codespell-2.4.0.tar.gz", hash = "sha256:587d45b14707fb8ce51339ba4cce50ae0e98ce228ef61f3c5e160e34f681be58"},
{file = "codespell-2.4.1-py3-none-any.whl", hash = "sha256:3dadafa67df7e4a3dbf51e0d7315061b80d265f9552ebd699b3dd6834b47e425"},
{file = "codespell-2.4.1.tar.gz", hash = "sha256:299fcdcb09d23e81e35a671bbe746d5ad7e8385972e65dbb833a2eaac33c01e5"},
]
[package.extras]
@ -1383,14 +1383,14 @@ dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments
[[package]]
name = "pytest-asyncio"
version = "0.25.2"
version = "0.25.3"
description = "Pytest support for asyncio"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "pytest_asyncio-0.25.2-py3-none-any.whl", hash = "sha256:0d0bb693f7b99da304a0634afc0a4b19e49d5e0de2d670f38dc4bfa5727c5075"},
{file = "pytest_asyncio-0.25.2.tar.gz", hash = "sha256:3f8ef9a98f45948ea91a0ed3dc4268b5326c0e7bce73892acc654df4262ad45f"},
{file = "pytest_asyncio-0.25.3-py3-none-any.whl", hash = "sha256:9e89518e0f9bd08928f97a3482fdc4e244df17529460bc038291ccaf8f85c7c3"},
{file = "pytest_asyncio-0.25.3.tar.gz", hash = "sha256:fc1da2cf9f125ada7e710b4ddad05518d4cee187ae9412e9ac9271003497f07a"},
]
[package.dependencies]
@ -1636,30 +1636,30 @@ jupyter = ["ipywidgets (>=7.5.1,<9)"]
[[package]]
name = "ruff"
version = "0.9.3"
version = "0.9.4"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "ruff-0.9.3-py3-none-linux_armv6l.whl", hash = "sha256:7f39b879064c7d9670197d91124a75d118d00b0990586549949aae80cdc16624"},
{file = "ruff-0.9.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:a187171e7c09efa4b4cc30ee5d0d55a8d6c5311b3e1b74ac5cb96cc89bafc43c"},
{file = "ruff-0.9.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:c59ab92f8e92d6725b7ded9d4a31be3ef42688a115c6d3da9457a5bda140e2b4"},
{file = "ruff-0.9.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2dc153c25e715be41bb228bc651c1e9b1a88d5c6e5ed0194fa0dfea02b026439"},
{file = "ruff-0.9.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:646909a1e25e0dc28fbc529eab8eb7bb583079628e8cbe738192853dbbe43af5"},
{file = "ruff-0.9.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5a5a46e09355695fbdbb30ed9889d6cf1c61b77b700a9fafc21b41f097bfbba4"},
{file = "ruff-0.9.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:c4bb09d2bbb394e3730d0918c00276e79b2de70ec2a5231cd4ebb51a57df9ba1"},
{file = "ruff-0.9.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:96a87ec31dc1044d8c2da2ebbed1c456d9b561e7d087734336518181b26b3aa5"},
{file = "ruff-0.9.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9bb7554aca6f842645022fe2d301c264e6925baa708b392867b7a62645304df4"},
{file = "ruff-0.9.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cabc332b7075a914ecea912cd1f3d4370489c8018f2c945a30bcc934e3bc06a6"},
{file = "ruff-0.9.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:33866c3cc2a575cbd546f2cd02bdd466fed65118e4365ee538a3deffd6fcb730"},
{file = "ruff-0.9.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:006e5de2621304c8810bcd2ee101587712fa93b4f955ed0985907a36c427e0c2"},
{file = "ruff-0.9.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:ba6eea4459dbd6b1be4e6bfc766079fb9b8dd2e5a35aff6baee4d9b1514ea519"},
{file = "ruff-0.9.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:90230a6b8055ad47d3325e9ee8f8a9ae7e273078a66401ac66df68943ced029b"},
{file = "ruff-0.9.3-py3-none-win32.whl", hash = "sha256:eabe5eb2c19a42f4808c03b82bd313fc84d4e395133fb3fc1b1516170a31213c"},
{file = "ruff-0.9.3-py3-none-win_amd64.whl", hash = "sha256:040ceb7f20791dfa0e78b4230ee9dce23da3b64dd5848e40e3bf3ab76468dcf4"},
{file = "ruff-0.9.3-py3-none-win_arm64.whl", hash = "sha256:800d773f6d4d33b0a3c60e2c6ae8f4c202ea2de056365acfa519aa48acf28e0b"},
{file = "ruff-0.9.3.tar.gz", hash = "sha256:8293f89985a090ebc3ed1064df31f3b4b56320cdfcec8b60d3295bddb955c22a"},
{file = "ruff-0.9.4-py3-none-linux_armv6l.whl", hash = "sha256:64e73d25b954f71ff100bb70f39f1ee09e880728efb4250c632ceed4e4cdf706"},
{file = "ruff-0.9.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6ce6743ed64d9afab4fafeaea70d3631b4d4b28b592db21a5c2d1f0ef52934bf"},
{file = "ruff-0.9.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:54499fb08408e32b57360f6f9de7157a5fec24ad79cb3f42ef2c3f3f728dfe2b"},
{file = "ruff-0.9.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:37c892540108314a6f01f105040b5106aeb829fa5fb0561d2dcaf71485021137"},
{file = "ruff-0.9.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:de9edf2ce4b9ddf43fd93e20ef635a900e25f622f87ed6e3047a664d0e8f810e"},
{file = "ruff-0.9.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:87c90c32357c74f11deb7fbb065126d91771b207bf9bfaaee01277ca59b574ec"},
{file = "ruff-0.9.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:56acd6c694da3695a7461cc55775f3a409c3815ac467279dfa126061d84b314b"},
{file = "ruff-0.9.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e0c93e7d47ed951b9394cf352d6695b31498e68fd5782d6cbc282425655f687a"},
{file = "ruff-0.9.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1d4c8772670aecf037d1bf7a07c39106574d143b26cfe5ed1787d2f31e800214"},
{file = "ruff-0.9.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bfc5f1d7afeda8d5d37660eeca6d389b142d7f2b5a1ab659d9214ebd0e025231"},
{file = "ruff-0.9.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:faa935fc00ae854d8b638c16a5f1ce881bc3f67446957dd6f2af440a5fc8526b"},
{file = "ruff-0.9.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:a6c634fc6f5a0ceae1ab3e13c58183978185d131a29c425e4eaa9f40afe1e6d6"},
{file = "ruff-0.9.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:433dedf6ddfdec7f1ac7575ec1eb9844fa60c4c8c2f8887a070672b8d353d34c"},
{file = "ruff-0.9.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:d612dbd0f3a919a8cc1d12037168bfa536862066808960e0cc901404b77968f0"},
{file = "ruff-0.9.4-py3-none-win32.whl", hash = "sha256:db1192ddda2200671f9ef61d9597fcef89d934f5d1705e571a93a67fb13a4402"},
{file = "ruff-0.9.4-py3-none-win_amd64.whl", hash = "sha256:05bebf4cdbe3ef75430d26c375773978950bbf4ee3c95ccb5448940dc092408e"},
{file = "ruff-0.9.4-py3-none-win_arm64.whl", hash = "sha256:585792f1e81509e38ac5123492f8875fbc36f3ede8185af0a26df348e5154f41"},
{file = "ruff-0.9.4.tar.gz", hash = "sha256:6907ee3529244bb0ed066683e075f09285b38dd5b4039370df6ff06041ca19e7"},
]
[[package]]

1
pyproject.toml

@ -81,6 +81,7 @@ select = ["ALL"]
ignore = [
"BLE001",
"C901",
"COM812",
"D203",
"D212",
"FBT001",

178
src/keycloak/keycloak_admin.py

@ -3631,6 +3631,31 @@ class KeycloakAdmin:
skip_exists=skip_exists,
)
def update_authentication_flow(self, flow_id: str, payload: dict) -> bytes:
"""
Update an authentication flow.
AuthenticationFlowRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationflowrepresentation
:param flow_id: The id of the flow
:type flow_id: str
:param payload: AuthenticationFlowRepresentation
:type payload: dict
:return: Keycloak server response
:rtype: bytes
"""
params_path = {"id": flow_id, "realm-name": self.connection.realm_name}
data_raw = self.connection.raw_put(
urls_patterns.URL_ADMIN_FLOW.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPutError,
expected_codes=[HTTP_ACCEPTED, HTTP_NO_CONTENT],
)
def copy_authentication_flow(self, payload: dict, flow_alias: str) -> bytes:
"""
Copy existing authentication flow under a new name.
@ -3782,6 +3807,45 @@ class KeycloakAdmin:
expected_codes=[HTTP_NO_CONTENT],
)
def change_execution_priority(self, execution_id: str, diff: int) -> None:
"""
Raise or lower execution priority of diff time.
:param execution_id: The ID of the execution
:type execution_id: str
:param diff: The difference in priority, positive to raise, negative to lower, the value
is the number of times
:type diff: int
:raises KeycloakPostError: when post requests are failed
"""
params_path = {"id": execution_id, "realm-name": self.connection.realm_name}
if diff > 0:
for _ in range(diff):
data_raw = self.connection.raw_post(
urls_patterns.URL_AUTHENTICATION_EXECUTION_RAISE_PRIORITY.format(
**params_path,
),
data="{}",
)
raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_NO_CONTENT],
)
elif diff < 0:
for _ in range(-diff):
data_raw = self.connection.raw_post(
urls_patterns.URL_AUTHENTICATION_EXECUTION_LOWER_PRIORITY.format(
**params_path,
),
data="{}",
)
raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_NO_CONTENT],
)
def create_authentication_flow_subflow(
self,
payload: dict,
@ -3863,6 +3927,31 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
def create_execution_config(self, execution_id: str, payload: dict) -> bytes:
"""
Update execution with new configuration.
AuthenticatorConfigRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticatorconfigrepresentation
:param execution_id: The ID of the execution
:type execution_id: str
:param payload: Configuration to add to the execution
:type payload: dir
:return: Response(json)
:rtype: dict
"""
params_path = {"id": execution_id, "realm-name": self.connection.realm_name}
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_FLOWS_EXECUTION_CONFIG.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_CREATED],
)
def update_authenticator_config(self, payload: dict, config_id: str) -> bytes:
"""
Update an authenticator configuration.
@ -10492,3 +10581,92 @@ class KeycloakAdmin:
KeycloakPostError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_change_execution_priority(self, execution_id: str, diff: int) -> None:
"""
Raise or lower execution priority of diff time.
:param execution_id: The ID of the execution
:type execution_id: str
:param diff: The difference in priority, positive to raise, negative to lower, the value
is the number of times
:type diff: int
:raises KeycloakPostError: when post requests are failed
"""
params_path = {"id": execution_id, "realm-name": self.connection.realm_name}
if diff > 0:
for _ in range(diff):
data_raw = await self.connection.a_raw_post(
urls_patterns.URL_AUTHENTICATION_EXECUTION_RAISE_PRIORITY.format(
**params_path,
),
data="{}",
)
raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_NO_CONTENT],
)
elif diff < 0:
for _ in range(-diff):
data_raw = await self.connection.a_raw_post(
urls_patterns.URL_AUTHENTICATION_EXECUTION_LOWER_PRIORITY.format(
**params_path,
),
data="{}",
)
raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_create_execution_config(self, execution_id: str, payload: dict) -> bytes:
"""
Update execution with new configuration.
AuthenticatorConfigRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticatorconfigrepresentation
:param execution_id: The ID of the execution
:type execution_id: str
:param payload: Configuration to add to the execution
:type payload: dir
:return: Response(json)
:rtype: dict
"""
params_path = {"id": execution_id, "realm-name": self.connection.realm_name}
data_raw = await self.connection.a_raw_post(
urls_patterns.URL_ADMIN_FLOWS_EXECUTION_CONFIG.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_CREATED],
)
async def a_update_authentication_flow(self, flow_id: str, payload: dict) -> bytes:
"""
Update an authentication flow.
AuthenticationFlowRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationflowrepresentation
:param flow_id: The id of the flow
:type flow_id: str
:param payload: AuthenticationFlowRepresentation
:type payload: dict
:return: Keycloak server response
:rtype: bytes
"""
params_path = {"id": flow_id, "realm-name": self.connection.realm_name}
data_raw = await self.connection.a_raw_put(
urls_patterns.URL_ADMIN_FLOW.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPutError,
expected_codes=[HTTP_ACCEPTED, HTTP_NO_CONTENT],
)

9
src/keycloak/urls_patterns.py

@ -229,3 +229,12 @@ URL_ADMIN_CLEAR_USER_CACHE = URL_ADMIN_REALM + "/clear-user-cache"
# UMA URLS
URL_UMA_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/uma2-configuration"
URL_AUTHENTICATION_EXECUTION_RAISE_PRIORITY = (
"admin/realms/{realm-name}/authentication/executions/{id}/raise-priority"
)
URL_AUTHENTICATION_EXECUTION_LOWER_PRIORITY = (
"admin/realms/{realm-name}/authentication/executions/{id}/lower-priority"
)
URL_ADMIN_FLOWS_EXECUTION_CONFIG = URL_ADMIN_FLOWS_EXECUTION + "/config"

123
tests/test_keycloak_admin.py

@ -204,7 +204,6 @@ def test_realms(admin: KeycloakAdmin) -> None:
assert "master" in realm_names, realm_names
assert "test" in realm_names, realm_names
if os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] == "latest" or Version(
os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"],
) >= Version("24"):
@ -2449,6 +2448,17 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str) -> None:
skip_exists=True,
) == {"msg": "Already exists"}
# Update
res = admin.get_authentication_flows()
browser_flow_id = next(x for x in res if x["alias"] == "browser")["id"]
flow = admin.get_authentication_flow_for_id(flow_id=browser_flow_id)
del flow["authenticationExecutions"]
del flow["id"]
flow["description"] = "test description"
res = admin.update_authentication_flow(flow_id=browser_flow_id, payload=flow)
res = admin.get_authentication_flow_for_id(flow_id=browser_flow_id)
assert res["description"] == "test description"
# Test flow executions
res = admin.get_authentication_flow_executions(flow_alias="browser")
assert len(res) in [8, 12], res
@ -2550,6 +2560,37 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str) -> None:
)
def test_auth_flow_execution_priority(admin: KeycloakAdmin, realm: str) -> None:
"""
Test execution priority.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin.change_current_realm(realm)
_ = admin.create_authentication_flow(
payload={"alias": "test-create", "providerId": "basic-flow"},
)
_ = admin.create_authentication_flow_execution(
payload={"provider": "auth-cookie"},
flow_alias="test-create",
)
_ = admin.create_authentication_flow_execution(
payload={"provider": "auth-cookie"},
flow_alias="test-create",
)
executions = admin.get_authentication_flow_executions(flow_alias="test-create")
priority_list = [ex["id"] for ex in executions]
_ = admin.change_execution_priority(priority_list[1], 1)
new_executions = admin.get_authentication_flow_executions(flow_alias="test-create")
assert executions != new_executions
_ = admin.change_execution_priority(priority_list[1], -1)
new_executions = admin.get_authentication_flow_executions(flow_alias="test-create")
assert executions == new_executions
def test_authentication_configs(admin: KeycloakAdmin, realm: str) -> None:
"""
Test authentication configs.
@ -2572,10 +2613,23 @@ def test_authentication_configs(admin: KeycloakAdmin, realm: str) -> None:
"properties": [],
"providerId": "auth-cookie",
}
# Test authenticator config
# Currently unable to find a sustainable way to fetch the config id,
# therefore testing only failures
executions = admin.get_authentication_flow_executions(flow_alias="browser")
execution = next(ex for ex in executions if ex["configurable"])
_ = admin.create_execution_config(
execution["id"],
{
"alias": "test.provisioning.property",
"config": {"test.provisioning.property": "value2"},
},
)
executions = admin.get_authentication_flow_executions(flow_alias="browser")
execution_config_id = next(ex for ex in executions if ex.get("id") == execution["id"])[
"authenticationConfig"
]
res = admin.get_authenticator_config(config_id=execution_config_id)
assert res["config"]["test.provisioning.property"] == "value2"
with pytest.raises(KeycloakGetError) as err:
admin.get_authenticator_config(config_id="bad")
assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'')
@ -3453,7 +3507,6 @@ async def test_a_realms(admin: KeycloakAdmin) -> None:
user_profile = await admin.a_get_realm_users_profile()
assert "attributes" in user_profile
if os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] == "latest" or Version(
os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"],
) >= Version("24"):
@ -5889,6 +5942,17 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str) -> None:
skip_exists=True,
) == {"msg": "Already exists"}
# Update
res = await admin.a_get_authentication_flows()
browser_flow_id = next(x for x in res if x["alias"] == "browser")["id"]
flow = await admin.a_get_authentication_flow_for_id(flow_id=browser_flow_id)
del flow["authenticationExecutions"]
del flow["id"]
flow["description"] = "test description"
res = await admin.a_update_authentication_flow(flow_id=browser_flow_id, payload=flow)
res = await admin.a_get_authentication_flow_for_id(flow_id=browser_flow_id)
assert res["description"] == "test description"
# Test flow executions
res = await admin.a_get_authentication_flow_executions(flow_alias="browser")
assert len(res) in [8, 12], res
@ -5993,6 +6057,38 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str) -> None:
)
@pytest.mark.asyncio
async def test_a_auth_flow_execution_priority(admin: KeycloakAdmin, realm: str) -> None:
"""
Test execution priority.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
await admin.a_change_current_realm(realm)
_ = await admin.a_create_authentication_flow(
payload={"alias": "test-create", "providerId": "basic-flow"},
)
_ = await admin.a_create_authentication_flow_execution(
payload={"provider": "auth-cookie"},
flow_alias="test-create",
)
_ = await admin.a_create_authentication_flow_execution(
payload={"provider": "auth-cookie"},
flow_alias="test-create",
)
executions = await admin.a_get_authentication_flow_executions(flow_alias="test-create")
priority_list = [ex["id"] for ex in executions]
_ = await admin.a_change_execution_priority(priority_list[1], 1)
new_executions = await admin.a_get_authentication_flow_executions(flow_alias="test-create")
assert executions != new_executions
_ = await admin.a_change_execution_priority(priority_list[1], -1)
new_executions = await admin.a_get_authentication_flow_executions(flow_alias="test-create")
assert executions == new_executions
@pytest.mark.asyncio
async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str) -> None:
"""
@ -6017,6 +6113,23 @@ async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str) -> Non
"providerId": "auth-cookie",
}
# Test authenticator config
executions = await admin.a_get_authentication_flow_executions(flow_alias="browser")
execution = next(ex for ex in executions if ex["configurable"])
_ = await admin.a_create_execution_config(
execution["id"],
{
"alias": "test.provisioning.property",
"config": {"test.provisioning.property": "value2"},
},
)
executions = await admin.a_get_authentication_flow_executions(flow_alias="browser")
execution_config_id = next(ex for ex in executions if ex.get("id") == execution["id"])[
"authenticationConfig"
]
res = await admin.a_get_authenticator_config(config_id=execution_config_id)
assert res["config"]["test.provisioning.property"] == "value2"
# Test authenticator config
# Currently unable to find a sustainable way to fetch the config id,
# therefore testing only failures

2
tox.ini

@ -11,11 +11,13 @@ commands_pre =
[testenv:check]
commands =
ruff check src/keycloak tests docs
ruff format --check src/keycloak tests docs
codespell src tests docs
[testenv:apply-check]
commands =
ruff check --fix src/keycloak tests docs
ruff format src/keycloak tests docs
[testenv:docs]
commands =

Loading…
Cancel
Save