650 lines
22 KiB

"""Test module for KeycloakUMA."""
import re
from inspect import iscoroutinefunction, signature
import pytest
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA
from keycloak.exceptions import (
KeycloakDeleteError,
KeycloakGetError,
KeycloakPostError,
KeycloakPutError,
)
from keycloak.uma_permissions import UMAPermission
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
"""Test KeycloakUMA's init method.
:param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
:type oid_connection_with_authz: KeycloakOpenIDConnection
"""
connection = oid_connection_with_authz
uma = KeycloakUMA(connection=connection)
assert isinstance(uma.connection, KeycloakOpenIDConnection)
# should initially be empty
assert uma._well_known is None
assert uma.uma_well_known
# should be cached after first reference
assert uma._well_known is not None
def test_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
res = uma.uma_well_known
assert res is not None
assert res != dict()
for key in ["resource_registration_endpoint"]:
assert key in res
def test_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
# Check that only the default resource is present
resource_sets = uma.resource_set_list()
resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
# Test query for resource sets
resource_set_list_ids = uma.resource_set_list_ids()
assert len(resource_set_list_ids) == 1
resource_set_list_ids2 = uma.resource_set_list_ids(name="Default")
assert resource_set_list_ids2 == resource_set_list_ids
resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource")
assert resource_set_list_ids2 == resource_set_list_ids
resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True)
assert len(resource_set_list_ids) == 0
resource_set_list_ids = uma.resource_set_list_ids(first=1)
assert len(resource_set_list_ids) == 0
resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = uma.resource_set_list_ids(name="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = uma.resource_set_list_ids(maximum=0)
assert len(resource_set_list_ids) == 0
# Test create resource set
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
"uris": ["/some_resources/*"],
}
created_resource = uma.resource_set_create(resource_to_create)
assert created_resource
assert created_resource["_id"], created_resource
assert set(resource_to_create).issubset(set(created_resource)), created_resource
# Test getting resource with wildcard
# Without matchingUri query option
resource_set_list_ids = uma.resource_set_list_ids(uri="/some_resources/resource")
assert len(resource_set_list_ids) == 0
# With matchingUri query option
resource_set_list_ids = uma.resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True
)
assert len(resource_set_list_ids) == 1
# Test create the same resource set
with pytest.raises(KeycloakPostError) as err:
uma.resource_set_create(resource_to_create)
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
)
# Test get resource set
latest_resource = uma.resource_set_read(created_resource["_id"])
assert latest_resource["name"] == created_resource["name"]
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = uma.resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
updated_resource = uma.resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
# Test delete resource set
res = uma.resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
with pytest.raises(KeycloakGetError) as err:
uma.resource_set_read(created_resource["_id"])
err.match("404: b''")
# Test delete fail
with pytest.raises(KeycloakDeleteError) as err:
uma.resource_set_delete(resource_id=created_resource["_id"])
assert err.match("404: b''")
def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
# Create some required test data
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
created_resource = uma.resource_set_create(resource_to_create)
group_id = admin.create_group({"name": "UMAPolicyGroup"})
role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
other_client_id = admin.create_client({"name": "UMAOtherClient"})
client = admin.get_client(other_client_id)
resource_id = created_resource["_id"]
# Create a role policy
policy_to_create = {
"name": "TestPolicyRole",
"description": "Test resource policy description",
"scopes": ["test:read", "test:write"],
"roles": ["roleUMAPolicy"],
}
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
# Create a client policy
policy_to_create = {
"name": "TestPolicyClient",
"description": "Test resource policy description",
"scopes": ["test:read"],
"clients": [client["clientId"]],
}
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
policy_to_create = {
"name": "TestPolicyGroup",
"description": "Test resource policy description",
"scopes": ["test:read"],
"groups": ["/UMAPolicyGroup"],
}
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
policies = uma.policy_query()
assert len(policies) == 3
policies = uma.policy_query(name="TestPolicyGroup")
assert len(policies) == 1
policy_id = policy["id"]
uma.policy_delete(policy_id)
with pytest.raises(KeycloakDeleteError) as err:
uma.policy_delete(policy_id)
assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
)
policies = uma.policy_query()
assert len(policies) == 2
policy = policies[0]
uma.policy_update(policy_id=policy["id"], payload=policy)
policies = uma.policy_query()
assert len(policies) == 2
policies = uma.policy_query(name="Invalid")
assert len(policies) == 0
policies = uma.policy_query(scope="Invalid")
assert len(policies) == 0
policies = uma.policy_query(resource="Invalid")
assert len(policies) == 0
policies = uma.policy_query(first=3)
assert len(policies) == 0
policies = uma.policy_query(maximum=0)
assert len(policies) == 0
policies = uma.policy_query(name=policy["name"])
assert len(policies) == 1
policies = uma.policy_query(scope=policy["scopes"][0])
assert len(policies) == 2
policies = uma.policy_query(resource=resource_id)
assert len(policies) == 2
uma.resource_set_delete(resource_id)
admin.delete_client(other_client_id)
admin.delete_realm_role(role_id)
admin.delete_group(group_id)
def test_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
resource_to_create = {
"name": "mytest",
"scopes": ["read", "write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
resource = uma.resource_set_create(resource_to_create)
policy_to_create = {
"name": "TestPolicy",
"description": "Test resource policy description",
"scopes": [resource_to_create["scopes"][0]],
"clients": [uma.connection.client_id],
}
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
token = uma.connection.token
permissions = list()
assert uma.permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource=resource_to_create["name"]))
assert uma.permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource="not valid"))
assert not uma.permissions_check(token["access_token"], permissions)
uma.resource_set_delete(resource["_id"])
def test_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
resource_to_create = {
"name": "mytest",
"scopes": ["read", "write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
resource = uma.resource_set_create(resource_to_create)
policy_to_create = {
"name": "TestPolicy",
"description": "Test resource policy description",
"scopes": [resource_to_create["scopes"][0]],
"clients": [uma.connection.client_id],
}
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
permissions = (
UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
)
response = uma.permission_ticket_create(permissions)
rpt = uma.connection.keycloak_openid.token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
)
assert rpt
assert "access_token" in rpt
permissions = (UMAPermission(resource="invalid"),)
with pytest.raises(KeycloakPostError):
uma.permission_ticket_create(permissions)
uma.resource_set_delete(resource["_id"])
# async function start
@pytest.mark.asyncio
async def test_a_uma_well_known(uma: KeycloakUMA):
"""Test the well_known method.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
res = uma.uma_well_known
assert res is not None
assert res != dict()
for key in ["resource_registration_endpoint"]:
assert key in res
@pytest.mark.asyncio
async def test_a_uma_resource_sets(uma: KeycloakUMA):
"""Test resource sets.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
# Check that only the default resource is present
resource_sets = uma.resource_set_list()
resource_set_list = list(resource_sets)
assert len(resource_set_list) == 1, resource_set_list
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
# Test query for resource sets
resource_set_list_ids = await uma.a_resource_set_list_ids()
assert len(resource_set_list_ids) == 1
resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default")
assert resource_set_list_ids2 == resource_set_list_ids
resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource")
assert resource_set_list_ids2 == resource_set_list_ids
resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True)
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(first=1)
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid")
assert len(resource_set_list_ids) == 0
resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0)
assert len(resource_set_list_ids) == 0
# Test create resource set
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
"uris": ["/some_resources/*"],
}
created_resource = await uma.a_resource_set_create(resource_to_create)
assert created_resource
assert created_resource["_id"], created_resource
assert set(resource_to_create).issubset(set(created_resource)), created_resource
# Test getting resource with wildcard
# Without matchingUri query option
resource_set_list_ids = await uma.a_resource_set_list_ids(uri="/some_resources/resource")
assert len(resource_set_list_ids) == 0
# With matchingUri query option
resource_set_list_ids = await uma.a_resource_set_list_ids(
uri="/some_resources/resource", matchingUri=True
)
assert len(resource_set_list_ids) == 1
# Test create the same resource set
with pytest.raises(KeycloakPostError) as err:
await uma.a_resource_set_create(resource_to_create)
assert err.match(
re.escape(
'409: b\'{"error":"invalid_request","error_description":'
'"Resource with name [mytest] already exists."}\''
)
)
# Test get resource set
latest_resource = await uma.a_resource_set_read(created_resource["_id"])
assert latest_resource["name"] == created_resource["name"]
# Test update resource set
latest_resource["name"] = "New Resource Name"
res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
assert res == dict(), res
updated_resource = await uma.a_resource_set_read(created_resource["_id"])
assert updated_resource["name"] == "New Resource Name"
# Test update resource set fail
with pytest.raises(KeycloakPutError) as err:
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
assert err.match('400: b\'{"error":"Unrecognized field')
# Test delete resource set
res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
assert res == dict(), res
with pytest.raises(KeycloakGetError) as err:
await uma.a_resource_set_read(created_resource["_id"])
err.match("404: b''")
# Test delete fail
with pytest.raises(KeycloakDeleteError) as err:
await uma.a_resource_set_delete(resource_id=created_resource["_id"])
assert err.match("404: b''")
@pytest.mark.asyncio
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
"""Test policies.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
"""
# Create some required test data
resource_to_create = {
"name": "mytest",
"scopes": ["test:read", "test:write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
created_resource = await uma.a_resource_set_create(resource_to_create)
group_id = admin.create_group({"name": "UMAPolicyGroup"})
role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
other_client_id = admin.create_client({"name": "UMAOtherClient"})
client = admin.get_client(other_client_id)
resource_id = created_resource["_id"]
# Create a role policy
policy_to_create = {
"name": "TestPolicyRole",
"description": "Test resource policy description",
"scopes": ["test:read", "test:write"],
"roles": ["roleUMAPolicy"],
}
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
# Create a client policy
policy_to_create = {
"name": "TestPolicyClient",
"description": "Test resource policy description",
"scopes": ["test:read"],
"clients": [client["clientId"]],
}
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
policy_to_create = {
"name": "TestPolicyGroup",
"description": "Test resource policy description",
"scopes": ["test:read"],
"groups": ["/UMAPolicyGroup"],
}
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
assert policy
policies = await uma.a_policy_query()
assert len(policies) == 3
policies = await uma.a_policy_query(name="TestPolicyGroup")
assert len(policies) == 1
policy_id = policy["id"]
await uma.a_policy_delete(policy_id)
with pytest.raises(KeycloakDeleteError) as err:
await uma.a_policy_delete(policy_id)
assert err.match(
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
)
policies = await uma.a_policy_query()
assert len(policies) == 2
policy = policies[0]
await uma.a_policy_update(policy_id=policy["id"], payload=policy)
policies = await uma.a_policy_query()
assert len(policies) == 2
policies = await uma.a_policy_query(name="Invalid")
assert len(policies) == 0
policies = await uma.a_policy_query(scope="Invalid")
assert len(policies) == 0
policies = await uma.a_policy_query(resource="Invalid")
assert len(policies) == 0
policies = await uma.a_policy_query(first=3)
assert len(policies) == 0
policies = await uma.a_policy_query(maximum=0)
assert len(policies) == 0
policies = await uma.a_policy_query(name=policy["name"])
assert len(policies) == 1
policies = await uma.a_policy_query(scope=policy["scopes"][0])
assert len(policies) == 2
policies = await uma.a_policy_query(resource=resource_id)
assert len(policies) == 2
await uma.a_resource_set_delete(resource_id)
await admin.a_delete_client(other_client_id)
await admin.a_delete_realm_role(role_id)
await admin.a_delete_group(group_id)
@pytest.mark.asyncio
async def test_a_uma_access(uma: KeycloakUMA):
"""Test permission access checks.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
resource_to_create = {
"name": "mytest",
"scopes": ["read", "write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
resource = await uma.a_resource_set_create(resource_to_create)
policy_to_create = {
"name": "TestPolicy",
"description": "Test resource policy description",
"scopes": [resource_to_create["scopes"][0]],
"clients": [uma.connection.client_id],
}
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
token = uma.connection.token
permissions = list()
assert await uma.a_permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource=resource_to_create["name"]))
assert await uma.a_permissions_check(token["access_token"], permissions)
permissions.append(UMAPermission(resource="not valid"))
assert not await uma.a_permissions_check(token["access_token"], permissions)
uma.resource_set_delete(resource["_id"])
@pytest.mark.asyncio
async def test_a_uma_permission_ticket(uma: KeycloakUMA):
"""Test permission ticket generation.
:param uma: Keycloak UMA client
:type uma: KeycloakUMA
"""
resource_to_create = {
"name": "mytest",
"scopes": ["read", "write"],
"type": "urn:test",
"ownerManagedAccess": True,
}
resource = await uma.a_resource_set_create(resource_to_create)
policy_to_create = {
"name": "TestPolicy",
"description": "Test resource policy description",
"scopes": [resource_to_create["scopes"][0]],
"clients": [uma.connection.client_id],
}
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
permissions = (
UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
)
response = await uma.a_permission_ticket_create(permissions)
rpt = await uma.connection.keycloak_openid.a_token(
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
)
assert rpt
assert "access_token" in rpt
permissions = (UMAPermission(resource="invalid"),)
with pytest.raises(KeycloakPostError):
uma.permission_ticket_create(permissions)
await uma.a_resource_set_delete(resource["_id"])
def test_counter_part():
"""Test that each function has its async counter part."""
uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
sync_methods = [
method
for method in uma_methods
if not method.startswith("a_") and not method.startswith("_")
]
async_methods = [
method for method in uma_methods if iscoroutinefunction(getattr(KeycloakUMA, method))
]
for method in sync_methods:
async_method = f"a_{method}"
assert (async_method in uma_methods) is True
sync_sign = signature(getattr(KeycloakUMA, method))
async_sign = signature(getattr(KeycloakUMA, async_method))
assert sync_sign.parameters == async_sign.parameters
for async_method in async_methods:
if async_method[2:].startswith("_"):
continue
assert async_method[2:] in sync_methods