Browse Source

feat: implement authz import request (#685)

* feat: implement authz import request

* chore: linting

* chore: fixing docs

---------

Co-authored-by: Richard Nemeth <ryshoooo@gmail.com>
master v5.11.0
Igor Pronin 5 days ago
committed by GitHub
parent
commit
05763ae4e6
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 54
      src/keycloak/keycloak_admin.py
  2. 1
      src/keycloak/urls_patterns.py
  3. 70
      tests/test_keycloak_admin.py

54
src/keycloak/keycloak_admin.py

@ -2216,6 +2216,33 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
def import_client_authz_config(self, client_id: str, payload: dict) -> dict:
"""
Import client authorization configuration.
ResourceServerRepresentation
https://www.keycloak.org/docs-api/latest/rest-api/index.html#ResourceServerRepresentation
:param client_id: id in ClientRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation
:type client_id: str
:param payload: ResourceServerRepresentation
:type payload: dict
:return: None
"""
params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLIENT_AUTHZ_IMPORT.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_NO_CONTENT],
)
def create_client_authz_resource(
self,
client_id: str,
@ -7527,6 +7554,33 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_import_client_authz_config(self, client_id: str, payload: dict) -> dict:
"""
Import client authorization configuration asynchronously.
ResourceServerRepresentation
https://www.keycloak.org/docs-api/latest/rest-api/index.html#ResourceServerRepresentation
:param client_id: id in ClientRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation
:type client_id: str
:param payload: ResourceServerRepresentation
:type payload: dict
:return: None
"""
params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw = await self.connection.a_raw_post(
urls_patterns.URL_ADMIN_CLIENT_AUTHZ_IMPORT.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw,
KeycloakPostError,
expected_codes=[HTTP_NO_CONTENT],
)
async def a_create_client_authz_resource(
self,
client_id: str,

1
src/keycloak/urls_patterns.py

@ -115,6 +115,7 @@ URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE = (
URL_ADMIN_CLIENT_AUTHZ = URL_ADMIN_CLIENT + "/authz/resource-server"
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT_AUTHZ + "/settings"
URL_ADMIN_CLIENT_AUTHZ_IMPORT = URL_ADMIN_CLIENT_AUTHZ + "/import"
URL_ADMIN_CLIENT_AUTHZ_RESOURCE = URL_ADMIN_CLIENT_AUTHZ + "/resource/{resource-id}"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT_AUTHZ + "/resource"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT_AUTHZ + "/scope"

70
tests/test_keycloak_admin.py

@ -1434,6 +1434,41 @@ def test_clients(admin: KeycloakAdmin, realm: str) -> None:
UNKOWN_ERROR_REGEX,
)
# Test import authz
authz_config = admin.get_client_authz_settings(client_id=auth_client_id)
authz_config["resources"] = [{"name": "test-import-resource"}]
authz_config["policies"] = [
{
"name": "test-import-policy",
"type": "time",
"config": {"hourEnd": "18", "hour": "9"},
}
]
admin.import_client_authz_config(client_id=auth_client_id, payload=authz_config)
exported = admin.get_client_authz_settings(client_id=auth_client_id)
assert (
len(
[
resource
for resource in exported["resources"]
if resource["name"] == "test-import-resource"
]
)
== 1
)
assert (
len(
[
resource
for resource in exported["policies"]
if resource["name"] == "test-import-policy"
]
)
== 1
)
# Test delete client
res = admin.delete_client(client_id=auth_client_id)
assert res == {}, res
@ -5042,6 +5077,41 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str) -> None:
UNKOWN_ERROR_REGEX,
)
# Test async import authz
authz_config = await admin.a_get_client_authz_settings(client_id=auth_client_id)
authz_config["resources"] = [{"name": "test-import-resource"}]
authz_config["policies"] = [
{
"name": "test-import-policy",
"type": "time",
"config": {"hourEnd": "18", "hour": "9"},
}
]
await admin.a_import_client_authz_config(client_id=auth_client_id, payload=authz_config)
exported = await admin.a_get_client_authz_settings(client_id=auth_client_id)
assert (
len(
[
resource
for resource in exported["resources"]
if resource["name"] == "test-import-resource"
]
)
== 1
)
assert (
len(
[
resource
for resource in exported["policies"]
if resource["name"] == "test-import-policy"
]
)
== 1
)
# Test delete client
res = await admin.a_delete_client(client_id=auth_client_id)
assert res == {}, res

Loading…
Cancel
Save