Browse Source

Merge e0f6479828 into c5848e806d

pull/706/merge
Michael Mair 3 weeks ago
committed by GitHub
parent
commit
e843c1e043
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 51
      src/keycloak/keycloak_uma.py
  2. 34
      tests/test_keycloak_uma.py

51
src/keycloak/keycloak_uma.py

@ -442,17 +442,23 @@ class KeycloakUMA:
:returns: Keycloak decision :returns: Keycloak decision
:rtype: boolean :rtype: boolean
""" """
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": ",".join(str(permission) for permission in permissions),
"response_mode": "decision",
"audience": self.connection.client_id,
**extra_payload,
}
payload = [
("grant_type", "urn:ietf:params:oauth:grant-type:uma-ticket"),
("response_mode", "decision"),
("audience", self.connection.client_id),
]
payload.extend([("permission", str(p)) for p in permissions])
if extra_payload and isinstance(extra_payload, dict):
for k, v in extra_payload.items():
payload.append((k, v))
elif extra_payload:
msg = "Attribute extra_payload needs to be of type dict."
raise AttributeError(msg)
# Everyone always has the null set of permissions # Everyone always has the null set of permissions
# However keycloak cannot evaluate the null set # However keycloak cannot evaluate the null set
if len(payload["permission"]) == 0:
if len([k for k, _ in payload if k == "permission"]) == 0:
return True return True
if self.connection.base_url is None: if self.connection.base_url is None:
@ -472,7 +478,10 @@ class KeycloakUMA:
) )
connection.add_param_headers("Authorization", "Bearer " + token) connection.add_param_headers("Authorization", "Bearer " + token)
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = connection.raw_post(self.uma_well_known["token_endpoint"], data=payload)
data_raw = connection.raw_post(
self.uma_well_known["token_endpoint"],
data="&".join([f"{k}={v}" for k, v in payload]),
)
try: try:
data = raise_error_from_response(data_raw, KeycloakPostError) data = raise_error_from_response(data_raw, KeycloakPostError)
except KeycloakPostError: except KeycloakPostError:
@ -935,17 +944,23 @@ class KeycloakUMA:
:returns: Keycloak decision :returns: Keycloak decision
:rtype: boolean :rtype: boolean
""" """
payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
"permission": ",".join(str(permission) for permission in permissions),
"response_mode": "decision",
"audience": self.connection.client_id,
**extra_payload,
}
payload = [
("grant_type", "urn:ietf:params:oauth:grant-type:uma-ticket"),
("response_mode", "decision"),
("audience", self.connection.client_id),
]
payload.extend([("permission", str(p)) for p in permissions])
if extra_payload and isinstance(extra_payload, dict):
for k, v in extra_payload.items():
payload.append((k, v))
elif extra_payload:
msg = "Attribute extra_payload needs to be of type dict."
raise AttributeError(msg)
# Everyone always has the null set of permissions # Everyone always has the null set of permissions
# However keycloak cannot evaluate the null set # However keycloak cannot evaluate the null set
if len(payload["permission"]) == 0:
if len([k for k, _ in payload if k == "permission"]) == 0:
return True return True
if self.connection.base_url is None: if self.connection.base_url is None:
@ -967,7 +982,7 @@ class KeycloakUMA:
connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded") connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
data_raw = await connection.a_raw_post( data_raw = await connection.a_raw_post(
(await self.a_uma_well_known)["token_endpoint"], (await self.a_uma_well_known)["token_endpoint"],
data=payload,
data="&".join([f"{k}={v}" for k, v in payload]),
) )
try: try:
data = raise_error_from_response(data_raw, KeycloakPostError) data = raise_error_from_response(data_raw, KeycloakPostError)

34
tests/test_keycloak_uma.py

@ -292,8 +292,24 @@ def test_uma_access(uma: KeycloakUMA) -> None:
assert uma.permissions_check(token["access_token"], permissions) assert uma.permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource="not valid")) permissions.append(UMAPermission(resource="not valid"))
assert uma.permissions_check(token["access_token"], permissions)
permissions = [UMAPermission(resource="not valid")]
assert not uma.permissions_check(token["access_token"], permissions)
resource_without_a_policy = {
"name": "test_without_policy",
"scopes": ["read", "write"],
"type": "urn:test-no-policy",
"ownerManagedAccess": True,
}
resource_no_policy = uma.resource_set_create(resource_without_a_policy)
permissions = [UMAPermission(resource=resource_without_a_policy["name"])]
assert not uma.permissions_check(token["access_token"], permissions) assert not uma.permissions_check(token["access_token"], permissions)
uma.resource_set_delete(resource["_id"]) uma.resource_set_delete(resource["_id"])
uma.resource_set_delete(resource_no_policy["_id"])
def test_uma_permission_ticket(uma: KeycloakUMA) -> None: def test_uma_permission_ticket(uma: KeycloakUMA) -> None:
@ -603,8 +619,24 @@ async def test_a_uma_access(uma: KeycloakUMA) -> None:
assert await uma.a_permissions_check(token["access_token"], permissions) assert await uma.a_permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource="not valid")) permissions.append(UMAPermission(resource="not valid"))
assert await uma.a_permissions_check(token["access_token"], permissions)
permissions = [UMAPermission(resource="not valid")]
assert not await uma.a_permissions_check(token["access_token"], permissions) assert not await uma.a_permissions_check(token["access_token"], permissions)
uma.resource_set_delete(resource["_id"])
resource_without_a_policy = {
"name": "test_without_policy",
"scopes": ["read", "write"],
"type": "urn:test-no-policy",
"ownerManagedAccess": True,
}
resource_no_policy = await uma.a_resource_set_create(resource_without_a_policy)
permissions = [UMAPermission(resource=resource_without_a_policy["name"])]
assert not await uma.a_permissions_check(token["access_token"], permissions)
await uma.a_resource_set_delete(resource["_id"])
await uma.a_resource_set_delete(resource_no_policy["_id"])
@pytest.mark.asyncio @pytest.mark.asyncio

Loading…
Cancel
Save