You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
650 lines
22 KiB
650 lines
22 KiB
"""Test module for KeycloakUMA."""
|
|
|
|
import re
|
|
from inspect import iscoroutinefunction, signature
|
|
|
|
import pytest
|
|
|
|
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA
|
|
from keycloak.exceptions import (
|
|
KeycloakDeleteError,
|
|
KeycloakGetError,
|
|
KeycloakPostError,
|
|
KeycloakPutError,
|
|
)
|
|
from keycloak.uma_permissions import UMAPermission
|
|
|
|
|
|
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
|
|
"""Test KeycloakUMA's init method.
|
|
|
|
:param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
|
|
:type oid_connection_with_authz: KeycloakOpenIDConnection
|
|
"""
|
|
connection = oid_connection_with_authz
|
|
uma = KeycloakUMA(connection=connection)
|
|
|
|
assert isinstance(uma.connection, KeycloakOpenIDConnection)
|
|
# should initially be empty
|
|
assert uma._well_known is None
|
|
assert uma.uma_well_known
|
|
# should be cached after first reference
|
|
assert uma._well_known is not None
|
|
|
|
|
|
def test_uma_well_known(uma: KeycloakUMA):
|
|
"""Test the well_known method.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
res = uma.uma_well_known
|
|
assert res is not None
|
|
assert res != dict()
|
|
for key in ["resource_registration_endpoint"]:
|
|
assert key in res
|
|
|
|
|
|
def test_uma_resource_sets(uma: KeycloakUMA):
|
|
"""Test resource sets.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
# Check that only the default resource is present
|
|
resource_sets = uma.resource_set_list()
|
|
resource_set_list = list(resource_sets)
|
|
assert len(resource_set_list) == 1, resource_set_list
|
|
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
|
|
|
|
# Test query for resource sets
|
|
resource_set_list_ids = uma.resource_set_list_ids()
|
|
assert len(resource_set_list_ids) == 1
|
|
|
|
resource_set_list_ids2 = uma.resource_set_list_ids(name="Default")
|
|
assert resource_set_list_ids2 == resource_set_list_ids
|
|
|
|
resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource")
|
|
assert resource_set_list_ids2 == resource_set_list_ids
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True)
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(first=1)
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(name="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(maximum=0)
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
# Test create resource set
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["test:read", "test:write"],
|
|
"type": "urn:test",
|
|
"uris": ["/some_resources/*"],
|
|
}
|
|
created_resource = uma.resource_set_create(resource_to_create)
|
|
assert created_resource
|
|
assert created_resource["_id"], created_resource
|
|
assert set(resource_to_create).issubset(set(created_resource)), created_resource
|
|
|
|
# Test getting resource with wildcard
|
|
# Without matchingUri query option
|
|
resource_set_list_ids = uma.resource_set_list_ids(uri="/some_resources/resource")
|
|
assert len(resource_set_list_ids) == 0
|
|
# With matchingUri query option
|
|
resource_set_list_ids = uma.resource_set_list_ids(
|
|
uri="/some_resources/resource", matchingUri=True
|
|
)
|
|
assert len(resource_set_list_ids) == 1
|
|
|
|
# Test create the same resource set
|
|
with pytest.raises(KeycloakPostError) as err:
|
|
uma.resource_set_create(resource_to_create)
|
|
assert err.match(
|
|
re.escape(
|
|
'409: b\'{"error":"invalid_request","error_description":'
|
|
'"Resource with name [mytest] already exists."}\''
|
|
)
|
|
)
|
|
|
|
# Test get resource set
|
|
latest_resource = uma.resource_set_read(created_resource["_id"])
|
|
assert latest_resource["name"] == created_resource["name"]
|
|
|
|
# Test update resource set
|
|
latest_resource["name"] = "New Resource Name"
|
|
res = uma.resource_set_update(created_resource["_id"], latest_resource)
|
|
assert res == dict(), res
|
|
updated_resource = uma.resource_set_read(created_resource["_id"])
|
|
assert updated_resource["name"] == "New Resource Name"
|
|
|
|
# Test update resource set fail
|
|
with pytest.raises(KeycloakPutError) as err:
|
|
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
|
|
assert err.match('400: b\'{"error":"Unrecognized field')
|
|
|
|
# Test delete resource set
|
|
res = uma.resource_set_delete(resource_id=created_resource["_id"])
|
|
assert res == dict(), res
|
|
with pytest.raises(KeycloakGetError) as err:
|
|
uma.resource_set_read(created_resource["_id"])
|
|
err.match("404: b''")
|
|
|
|
# Test delete fail
|
|
with pytest.raises(KeycloakDeleteError) as err:
|
|
uma.resource_set_delete(resource_id=created_resource["_id"])
|
|
assert err.match("404: b''")
|
|
|
|
|
|
def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
|
|
"""Test policies.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
:param admin: Keycloak Admin client
|
|
:type admin: KeycloakAdmin
|
|
"""
|
|
# Create some required test data
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["test:read", "test:write"],
|
|
"type": "urn:test",
|
|
"ownerManagedAccess": True,
|
|
}
|
|
created_resource = uma.resource_set_create(resource_to_create)
|
|
group_id = admin.create_group({"name": "UMAPolicyGroup"})
|
|
role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
|
|
other_client_id = admin.create_client({"name": "UMAOtherClient"})
|
|
client = admin.get_client(other_client_id)
|
|
|
|
resource_id = created_resource["_id"]
|
|
|
|
# Create a role policy
|
|
policy_to_create = {
|
|
"name": "TestPolicyRole",
|
|
"description": "Test resource policy description",
|
|
"scopes": ["test:read", "test:write"],
|
|
"roles": ["roleUMAPolicy"],
|
|
}
|
|
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
|
|
assert policy
|
|
|
|
# Create a client policy
|
|
policy_to_create = {
|
|
"name": "TestPolicyClient",
|
|
"description": "Test resource policy description",
|
|
"scopes": ["test:read"],
|
|
"clients": [client["clientId"]],
|
|
}
|
|
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
|
|
assert policy
|
|
|
|
policy_to_create = {
|
|
"name": "TestPolicyGroup",
|
|
"description": "Test resource policy description",
|
|
"scopes": ["test:read"],
|
|
"groups": ["/UMAPolicyGroup"],
|
|
}
|
|
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
|
|
assert policy
|
|
|
|
policies = uma.policy_query()
|
|
assert len(policies) == 3
|
|
|
|
policies = uma.policy_query(name="TestPolicyGroup")
|
|
assert len(policies) == 1
|
|
|
|
policy_id = policy["id"]
|
|
uma.policy_delete(policy_id)
|
|
with pytest.raises(KeycloakDeleteError) as err:
|
|
uma.policy_delete(policy_id)
|
|
assert err.match(
|
|
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
|
|
)
|
|
|
|
policies = uma.policy_query()
|
|
assert len(policies) == 2
|
|
|
|
policy = policies[0]
|
|
uma.policy_update(policy_id=policy["id"], payload=policy)
|
|
|
|
policies = uma.policy_query()
|
|
assert len(policies) == 2
|
|
|
|
policies = uma.policy_query(name="Invalid")
|
|
assert len(policies) == 0
|
|
policies = uma.policy_query(scope="Invalid")
|
|
assert len(policies) == 0
|
|
policies = uma.policy_query(resource="Invalid")
|
|
assert len(policies) == 0
|
|
policies = uma.policy_query(first=3)
|
|
assert len(policies) == 0
|
|
policies = uma.policy_query(maximum=0)
|
|
assert len(policies) == 0
|
|
|
|
policies = uma.policy_query(name=policy["name"])
|
|
assert len(policies) == 1
|
|
policies = uma.policy_query(scope=policy["scopes"][0])
|
|
assert len(policies) == 2
|
|
policies = uma.policy_query(resource=resource_id)
|
|
assert len(policies) == 2
|
|
|
|
uma.resource_set_delete(resource_id)
|
|
admin.delete_client(other_client_id)
|
|
admin.delete_realm_role(role_id)
|
|
admin.delete_group(group_id)
|
|
|
|
|
|
def test_uma_access(uma: KeycloakUMA):
|
|
"""Test permission access checks.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["read", "write"],
|
|
"type": "urn:test",
|
|
"ownerManagedAccess": True,
|
|
}
|
|
resource = uma.resource_set_create(resource_to_create)
|
|
|
|
policy_to_create = {
|
|
"name": "TestPolicy",
|
|
"description": "Test resource policy description",
|
|
"scopes": [resource_to_create["scopes"][0]],
|
|
"clients": [uma.connection.client_id],
|
|
}
|
|
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
|
|
|
|
token = uma.connection.token
|
|
permissions = list()
|
|
assert uma.permissions_check(token["access_token"], permissions)
|
|
|
|
permissions.append(UMAPermission(resource=resource_to_create["name"]))
|
|
assert uma.permissions_check(token["access_token"], permissions)
|
|
|
|
permissions.append(UMAPermission(resource="not valid"))
|
|
assert not uma.permissions_check(token["access_token"], permissions)
|
|
uma.resource_set_delete(resource["_id"])
|
|
|
|
|
|
def test_uma_permission_ticket(uma: KeycloakUMA):
|
|
"""Test permission ticket generation.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["read", "write"],
|
|
"type": "urn:test",
|
|
"ownerManagedAccess": True,
|
|
}
|
|
resource = uma.resource_set_create(resource_to_create)
|
|
|
|
policy_to_create = {
|
|
"name": "TestPolicy",
|
|
"description": "Test resource policy description",
|
|
"scopes": [resource_to_create["scopes"][0]],
|
|
"clients": [uma.connection.client_id],
|
|
}
|
|
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
|
|
permissions = (
|
|
UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
|
|
)
|
|
response = uma.permission_ticket_create(permissions)
|
|
|
|
rpt = uma.connection.keycloak_openid.token(
|
|
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
|
|
)
|
|
assert rpt
|
|
assert "access_token" in rpt
|
|
|
|
permissions = (UMAPermission(resource="invalid"),)
|
|
with pytest.raises(KeycloakPostError):
|
|
uma.permission_ticket_create(permissions)
|
|
|
|
uma.resource_set_delete(resource["_id"])
|
|
|
|
|
|
# async function start
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_a_uma_well_known(uma: KeycloakUMA):
|
|
"""Test the well_known method.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
res = uma.uma_well_known
|
|
assert res is not None
|
|
assert res != dict()
|
|
for key in ["resource_registration_endpoint"]:
|
|
assert key in res
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_a_uma_resource_sets(uma: KeycloakUMA):
|
|
"""Test resource sets.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
# Check that only the default resource is present
|
|
resource_sets = uma.resource_set_list()
|
|
resource_set_list = list(resource_sets)
|
|
assert len(resource_set_list) == 1, resource_set_list
|
|
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
|
|
|
|
# Test query for resource sets
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids()
|
|
assert len(resource_set_list_ids) == 1
|
|
|
|
resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default")
|
|
assert resource_set_list_ids2 == resource_set_list_ids
|
|
|
|
resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource")
|
|
assert resource_set_list_ids2 == resource_set_list_ids
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True)
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(first=1)
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid")
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0)
|
|
assert len(resource_set_list_ids) == 0
|
|
|
|
# Test create resource set
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["test:read", "test:write"],
|
|
"type": "urn:test",
|
|
"uris": ["/some_resources/*"],
|
|
}
|
|
created_resource = await uma.a_resource_set_create(resource_to_create)
|
|
assert created_resource
|
|
assert created_resource["_id"], created_resource
|
|
assert set(resource_to_create).issubset(set(created_resource)), created_resource
|
|
|
|
# Test getting resource with wildcard
|
|
# Without matchingUri query option
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(uri="/some_resources/resource")
|
|
assert len(resource_set_list_ids) == 0
|
|
# With matchingUri query option
|
|
resource_set_list_ids = await uma.a_resource_set_list_ids(
|
|
uri="/some_resources/resource", matchingUri=True
|
|
)
|
|
assert len(resource_set_list_ids) == 1
|
|
|
|
# Test create the same resource set
|
|
with pytest.raises(KeycloakPostError) as err:
|
|
await uma.a_resource_set_create(resource_to_create)
|
|
assert err.match(
|
|
re.escape(
|
|
'409: b\'{"error":"invalid_request","error_description":'
|
|
'"Resource with name [mytest] already exists."}\''
|
|
)
|
|
)
|
|
|
|
# Test get resource set
|
|
latest_resource = await uma.a_resource_set_read(created_resource["_id"])
|
|
assert latest_resource["name"] == created_resource["name"]
|
|
|
|
# Test update resource set
|
|
latest_resource["name"] = "New Resource Name"
|
|
res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
|
|
assert res == dict(), res
|
|
updated_resource = await uma.a_resource_set_read(created_resource["_id"])
|
|
assert updated_resource["name"] == "New Resource Name"
|
|
|
|
# Test update resource set fail
|
|
with pytest.raises(KeycloakPutError) as err:
|
|
uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
|
|
assert err.match('400: b\'{"error":"Unrecognized field')
|
|
|
|
# Test delete resource set
|
|
res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
|
|
assert res == dict(), res
|
|
with pytest.raises(KeycloakGetError) as err:
|
|
await uma.a_resource_set_read(created_resource["_id"])
|
|
err.match("404: b''")
|
|
|
|
# Test delete fail
|
|
with pytest.raises(KeycloakDeleteError) as err:
|
|
await uma.a_resource_set_delete(resource_id=created_resource["_id"])
|
|
assert err.match("404: b''")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
|
|
"""Test policies.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
:param admin: Keycloak Admin client
|
|
:type admin: KeycloakAdmin
|
|
"""
|
|
# Create some required test data
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["test:read", "test:write"],
|
|
"type": "urn:test",
|
|
"ownerManagedAccess": True,
|
|
}
|
|
created_resource = await uma.a_resource_set_create(resource_to_create)
|
|
group_id = admin.create_group({"name": "UMAPolicyGroup"})
|
|
role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
|
|
other_client_id = admin.create_client({"name": "UMAOtherClient"})
|
|
client = admin.get_client(other_client_id)
|
|
|
|
resource_id = created_resource["_id"]
|
|
|
|
# Create a role policy
|
|
policy_to_create = {
|
|
"name": "TestPolicyRole",
|
|
"description": "Test resource policy description",
|
|
"scopes": ["test:read", "test:write"],
|
|
"roles": ["roleUMAPolicy"],
|
|
}
|
|
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
|
|
assert policy
|
|
|
|
# Create a client policy
|
|
policy_to_create = {
|
|
"name": "TestPolicyClient",
|
|
"description": "Test resource policy description",
|
|
"scopes": ["test:read"],
|
|
"clients": [client["clientId"]],
|
|
}
|
|
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
|
|
assert policy
|
|
|
|
policy_to_create = {
|
|
"name": "TestPolicyGroup",
|
|
"description": "Test resource policy description",
|
|
"scopes": ["test:read"],
|
|
"groups": ["/UMAPolicyGroup"],
|
|
}
|
|
policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
|
|
assert policy
|
|
|
|
policies = await uma.a_policy_query()
|
|
assert len(policies) == 3
|
|
|
|
policies = await uma.a_policy_query(name="TestPolicyGroup")
|
|
assert len(policies) == 1
|
|
|
|
policy_id = policy["id"]
|
|
await uma.a_policy_delete(policy_id)
|
|
with pytest.raises(KeycloakDeleteError) as err:
|
|
await uma.a_policy_delete(policy_id)
|
|
assert err.match(
|
|
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
|
|
)
|
|
|
|
policies = await uma.a_policy_query()
|
|
assert len(policies) == 2
|
|
|
|
policy = policies[0]
|
|
await uma.a_policy_update(policy_id=policy["id"], payload=policy)
|
|
|
|
policies = await uma.a_policy_query()
|
|
assert len(policies) == 2
|
|
|
|
policies = await uma.a_policy_query(name="Invalid")
|
|
assert len(policies) == 0
|
|
policies = await uma.a_policy_query(scope="Invalid")
|
|
assert len(policies) == 0
|
|
policies = await uma.a_policy_query(resource="Invalid")
|
|
assert len(policies) == 0
|
|
policies = await uma.a_policy_query(first=3)
|
|
assert len(policies) == 0
|
|
policies = await uma.a_policy_query(maximum=0)
|
|
assert len(policies) == 0
|
|
|
|
policies = await uma.a_policy_query(name=policy["name"])
|
|
assert len(policies) == 1
|
|
policies = await uma.a_policy_query(scope=policy["scopes"][0])
|
|
assert len(policies) == 2
|
|
policies = await uma.a_policy_query(resource=resource_id)
|
|
assert len(policies) == 2
|
|
|
|
await uma.a_resource_set_delete(resource_id)
|
|
await admin.a_delete_client(other_client_id)
|
|
await admin.a_delete_realm_role(role_id)
|
|
await admin.a_delete_group(group_id)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_a_uma_access(uma: KeycloakUMA):
|
|
"""Test permission access checks.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["read", "write"],
|
|
"type": "urn:test",
|
|
"ownerManagedAccess": True,
|
|
}
|
|
resource = await uma.a_resource_set_create(resource_to_create)
|
|
|
|
policy_to_create = {
|
|
"name": "TestPolicy",
|
|
"description": "Test resource policy description",
|
|
"scopes": [resource_to_create["scopes"][0]],
|
|
"clients": [uma.connection.client_id],
|
|
}
|
|
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
|
|
|
|
token = uma.connection.token
|
|
permissions = list()
|
|
assert await uma.a_permissions_check(token["access_token"], permissions)
|
|
|
|
permissions.append(UMAPermission(resource=resource_to_create["name"]))
|
|
assert await uma.a_permissions_check(token["access_token"], permissions)
|
|
|
|
permissions.append(UMAPermission(resource="not valid"))
|
|
assert not await uma.a_permissions_check(token["access_token"], permissions)
|
|
uma.resource_set_delete(resource["_id"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_a_uma_permission_ticket(uma: KeycloakUMA):
|
|
"""Test permission ticket generation.
|
|
|
|
:param uma: Keycloak UMA client
|
|
:type uma: KeycloakUMA
|
|
"""
|
|
resource_to_create = {
|
|
"name": "mytest",
|
|
"scopes": ["read", "write"],
|
|
"type": "urn:test",
|
|
"ownerManagedAccess": True,
|
|
}
|
|
resource = await uma.a_resource_set_create(resource_to_create)
|
|
|
|
policy_to_create = {
|
|
"name": "TestPolicy",
|
|
"description": "Test resource policy description",
|
|
"scopes": [resource_to_create["scopes"][0]],
|
|
"clients": [uma.connection.client_id],
|
|
}
|
|
await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
|
|
permissions = (
|
|
UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
|
|
)
|
|
response = await uma.a_permission_ticket_create(permissions)
|
|
|
|
rpt = await uma.connection.keycloak_openid.a_token(
|
|
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
|
|
)
|
|
assert rpt
|
|
assert "access_token" in rpt
|
|
|
|
permissions = (UMAPermission(resource="invalid"),)
|
|
with pytest.raises(KeycloakPostError):
|
|
uma.permission_ticket_create(permissions)
|
|
|
|
await uma.a_resource_set_delete(resource["_id"])
|
|
|
|
|
|
def test_counter_part():
|
|
"""Test that each function has its async counter part."""
|
|
uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
|
|
sync_methods = [
|
|
method
|
|
for method in uma_methods
|
|
if not method.startswith("a_") and not method.startswith("_")
|
|
]
|
|
async_methods = [
|
|
method for method in uma_methods if iscoroutinefunction(getattr(KeycloakUMA, method))
|
|
]
|
|
|
|
for method in sync_methods:
|
|
async_method = f"a_{method}"
|
|
assert (async_method in uma_methods) is True
|
|
sync_sign = signature(getattr(KeycloakUMA, method))
|
|
async_sign = signature(getattr(KeycloakUMA, async_method))
|
|
assert sync_sign.parameters == async_sign.parameters
|
|
|
|
for async_method in async_methods:
|
|
if async_method[2:].startswith("_"):
|
|
continue
|
|
|
|
assert async_method[2:] in sync_methods
|