Browse Source

fix: delete request fix and test cases fix

pull/566/head
David 11 months ago
parent
commit
412662a760
  1. 11
      src/keycloak/connection.py
  2. 2
      src/keycloak/keycloak_openid.py
  3. 48
      tests/test_keycloak_admin.py
  4. 12
      tests/test_keycloak_openid.py

11
src/keycloak/connection.py

@ -328,8 +328,9 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return await self.async_s.post(
urljoin(self.base_url, path),
return await self.async_s.request(
method="POST",
url=urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
@ -376,8 +377,10 @@ class ConnectionManager(object):
:raises KeycloakConnectionError: HttpError Can't connect to server.
"""
try:
return await self.async_s.delete(
urljoin(self.base_url, path),
return await self.async_s.request(
method="DELETE",
url=urljoin(self.base_url, path),
data=data or dict(),
params=kwargs,
headers=self.headers,
timeout=self.timeout,

2
src/keycloak/keycloak_openid.py

@ -1230,7 +1230,7 @@ class KeycloakOpenID:
}
self.connection.add_param_headers("Authorization", "Bearer " + token)
data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_has_uma_access(self, token, permissions):

48
tests/test_keycloak_admin.py

@ -3769,11 +3769,11 @@ async def test_a_groups(admin: KeycloakAdmin, user: str):
await admin.a_get_group_children(group_id=group_id, full_hierarchy=True, query={"max": 10})
# Test that query params are passed
# if os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] == "latest" or Version(
# os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"]
# ) >= Version("23"):
res = await admin.a_get_group_children(group_id=group_id, query={"max": 1})
assert len(res) == 1
if os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] == "latest" or Version(
os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"]
) >= Version("23"):
res = await admin.a_get_group_children(group_id=group_id, query={"max": 1})
assert len(res) == 1
assert err.match("Cannot use both query and full_hierarchy parameters")
@ -4324,7 +4324,7 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str):
assert "test-realm-role-update" in [x["name"] for x in roles]
with pytest.raises(KeycloakDeleteError) as err:
await admin.a_delete_realm_roles_of_user(user_id=user_id, roles=["bad"])
admin.delete_realm_roles_of_user(user_id=user_id, roles=["bad"])
assert err.match(UNKOWN_ERROR_REGEX), err
res = await admin.a_delete_realm_roles_of_user(
user_id=user_id, roles=[await admin.a_get_realm_role(role_name="offline_access")]
@ -4901,7 +4901,7 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
roles=[await admin.a_get_realm_role(role_name="offline_access")],
)
assert res == dict()
assert await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")[
assert (await admin.a_get_client_role(client_id=client, role_name="client-role-test-update"))[
"composite"
]
@ -4921,7 +4921,7 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str):
client_role_id=client, payload={"name": "client-role-by-id-test"}, skip_exists=True
)
role = await admin.a_get_client_role(client_id=client, role_name="client-role-by-id-test")
res = admin.a_get_role_by_id(role_id=role["id"])
res = await admin.a_get_role_by_id(role_id=role["id"])
assert res["name"] == "client-role-by-id-test"
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_role_by_id(role_id="bad")
@ -5005,15 +5005,17 @@ async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str):
# Create a client policy for source client
policy_name = "Exchange source client token with target client token"
client_policy_id = await admin.a_create_client_authz_client_policy(
payload={
"type": "client",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"name": policy_name,
"clients": [source_client_id],
},
client_id=realm_management_id,
client_policy_id = (
await admin.a_create_client_authz_client_policy(
payload={
"type": "client",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"name": policy_name,
"clients": [source_client_id],
},
client_id=realm_management_id,
)
)["id"]
policies = await admin.a_get_client_authz_client_policies(client_id=realm_management_id)
for policy in policies:
@ -5024,8 +5026,10 @@ async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str):
raise AssertionError("Missing client policy")
# Update permissions on the target client to reference this policy
permission_name = await admin.a_get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
permission_name = (
await admin.a_get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
)
)["name"]
await admin.a_update_client_authz_scope_permission(
payload={
@ -5056,8 +5060,10 @@ async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str):
},
client_id=realm_management_id,
)
permission_name = await admin.a_get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
permission_name = (
await admin.a_get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
)
)["name"]
assert permission_name.startswith("token-exchange.permission.client.")
with pytest.raises(KeycloakPostError) as err:

12
tests/test_keycloak_openid.py

@ -643,12 +643,12 @@ async def test_a_exchange_token(
oid, username, password = oid_with_credentials
# Allow impersonation
admin.change_current_realm(oid.realm_name)
admin.assign_client_role(
user_id=admin.get_user_id(username=username),
client_id=admin.get_client_id(client_id="realm-management"),
await admin.a_change_current_realm(oid.realm_name)
await admin.a_assign_client_role(
user_id=await admin.a_get_user_id(username=username),
client_id=await admin.a_get_client_id(client_id="realm-management"),
roles=[
admin.get_client_role(
await admin.a_get_client_role(
client_id=admin.get_client_id(client_id="realm-management"),
role_name="impersonation",
)
@ -667,7 +667,7 @@ async def test_a_exchange_token(
}
# Exchange token with the new user
new_token = await oid.a_exchange_token(
new_token = oid.exchange_token(
token=token["access_token"], audience=oid.client_id, subject=username
)
assert await oid.a_userinfo(token=new_token["access_token"]) == {

Loading…
Cancel
Save