|
|
@ -3,13 +3,14 @@ import re |
|
|
|
|
|
|
|
import pytest |
|
|
|
|
|
|
|
from keycloak import KeycloakOpenIDConnection, KeycloakUMA |
|
|
|
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA |
|
|
|
from keycloak.exceptions import ( |
|
|
|
KeycloakDeleteError, |
|
|
|
KeycloakGetError, |
|
|
|
KeycloakPostError, |
|
|
|
KeycloakPutError, |
|
|
|
) |
|
|
|
from keycloak.uma_permissions import UMAPermission |
|
|
|
|
|
|
|
|
|
|
|
def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection): |
|
|
@ -54,6 +55,40 @@ def test_uma_resource_sets(uma: KeycloakUMA): |
|
|
|
assert len(resource_set_list) == 1, resource_set_list |
|
|
|
assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] |
|
|
|
|
|
|
|
# Test query for resource sets |
|
|
|
resource_set_list_ids = uma.resource_set_list_ids() |
|
|
|
assert len(resource_set_list_ids) == 1 |
|
|
|
|
|
|
|
resource_set_list_ids2 = uma.resource_set_list_ids(name="Default") |
|
|
|
assert resource_set_list_ids2 == resource_set_list_ids |
|
|
|
|
|
|
|
resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource") |
|
|
|
assert resource_set_list_ids2 == resource_set_list_ids |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True) |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(first=1) |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid") |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid") |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid") |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(name="Invalid") |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid") |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
resource_set_list_ids = uma.resource_set_list_ids(maximum=0) |
|
|
|
assert len(resource_set_list_ids) == 0 |
|
|
|
|
|
|
|
# Test create resource set |
|
|
|
resource_to_create = { |
|
|
|
"name": "mytest", |
|
|
@ -102,3 +137,175 @@ def test_uma_resource_sets(uma: KeycloakUMA): |
|
|
|
with pytest.raises(KeycloakDeleteError) as err: |
|
|
|
uma.resource_set_delete(resource_id=created_resource["_id"]) |
|
|
|
assert err.match("404: b''") |
|
|
|
|
|
|
|
|
|
|
|
def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin): |
|
|
|
"""Test policies. |
|
|
|
|
|
|
|
:param uma: Keycloak UMA client |
|
|
|
:type uma: KeycloakUMA |
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
# Create some required test data |
|
|
|
resource_to_create = { |
|
|
|
"name": "mytest", |
|
|
|
"scopes": ["test:read", "test:write"], |
|
|
|
"type": "urn:test", |
|
|
|
"ownerManagedAccess": True, |
|
|
|
} |
|
|
|
created_resource = uma.resource_set_create(resource_to_create) |
|
|
|
group_id = admin.create_group({"name": "UMAPolicyGroup"}) |
|
|
|
role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"}) |
|
|
|
other_client_id = admin.create_client({"name": "UMAOtherClient"}) |
|
|
|
client = admin.get_client(other_client_id) |
|
|
|
|
|
|
|
resource_id = created_resource["_id"] |
|
|
|
|
|
|
|
# Create a role policy |
|
|
|
policy_to_create = { |
|
|
|
"name": "TestPolicyRole", |
|
|
|
"description": "Test resource policy description", |
|
|
|
"scopes": ["test:read", "test:write"], |
|
|
|
"roles": ["roleUMAPolicy"], |
|
|
|
} |
|
|
|
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create) |
|
|
|
assert policy |
|
|
|
|
|
|
|
# Create a client policy |
|
|
|
policy_to_create = { |
|
|
|
"name": "TestPolicyClient", |
|
|
|
"description": "Test resource policy description", |
|
|
|
"scopes": ["test:read"], |
|
|
|
"clients": [client["clientId"]], |
|
|
|
} |
|
|
|
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create) |
|
|
|
assert policy |
|
|
|
|
|
|
|
policy_to_create = { |
|
|
|
"name": "TestPolicyGroup", |
|
|
|
"description": "Test resource policy description", |
|
|
|
"scopes": ["test:read"], |
|
|
|
"groups": ["/UMAPolicyGroup"], |
|
|
|
} |
|
|
|
policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create) |
|
|
|
assert policy |
|
|
|
|
|
|
|
policies = uma.policy_query() |
|
|
|
assert len(policies) == 3 |
|
|
|
|
|
|
|
policies = uma.policy_query(name="TestPolicyGroup") |
|
|
|
assert len(policies) == 1 |
|
|
|
|
|
|
|
policy_id = policy["id"] |
|
|
|
uma.policy_delete(policy_id) |
|
|
|
with pytest.raises(KeycloakDeleteError) as err: |
|
|
|
uma.policy_delete(policy_id) |
|
|
|
assert err.match( |
|
|
|
'404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'' |
|
|
|
) |
|
|
|
|
|
|
|
policies = uma.policy_query() |
|
|
|
assert len(policies) == 2 |
|
|
|
|
|
|
|
policy = policies[0] |
|
|
|
uma.policy_update(policy_id=policy["id"], payload=policy) |
|
|
|
|
|
|
|
policies = uma.policy_query() |
|
|
|
assert len(policies) == 2 |
|
|
|
|
|
|
|
policies = uma.policy_query(name="Invalid") |
|
|
|
assert len(policies) == 0 |
|
|
|
policies = uma.policy_query(scope="Invalid") |
|
|
|
assert len(policies) == 0 |
|
|
|
policies = uma.policy_query(resource="Invalid") |
|
|
|
assert len(policies) == 0 |
|
|
|
policies = uma.policy_query(first=3) |
|
|
|
assert len(policies) == 0 |
|
|
|
policies = uma.policy_query(maximum=0) |
|
|
|
assert len(policies) == 0 |
|
|
|
|
|
|
|
policies = uma.policy_query(name=policy["name"]) |
|
|
|
assert len(policies) == 1 |
|
|
|
policies = uma.policy_query(scope=policy["scopes"][0]) |
|
|
|
assert len(policies) == 2 |
|
|
|
policies = uma.policy_query(resource=resource_id) |
|
|
|
assert len(policies) == 2 |
|
|
|
|
|
|
|
uma.resource_set_delete(resource_id) |
|
|
|
admin.delete_client(other_client_id) |
|
|
|
admin.delete_realm_role(role_id) |
|
|
|
admin.delete_group(group_id) |
|
|
|
|
|
|
|
|
|
|
|
def test_uma_access(uma: KeycloakUMA): |
|
|
|
"""Test permission access checks. |
|
|
|
|
|
|
|
:param uma: Keycloak UMA client |
|
|
|
:type uma: KeycloakUMA |
|
|
|
""" |
|
|
|
resource_to_create = { |
|
|
|
"name": "mytest", |
|
|
|
"scopes": ["read", "write"], |
|
|
|
"type": "urn:test", |
|
|
|
"ownerManagedAccess": True, |
|
|
|
} |
|
|
|
resource = uma.resource_set_create(resource_to_create) |
|
|
|
|
|
|
|
policy_to_create = { |
|
|
|
"name": "TestPolicy", |
|
|
|
"description": "Test resource policy description", |
|
|
|
"scopes": [resource_to_create["scopes"][0]], |
|
|
|
"clients": [uma.connection.client_id], |
|
|
|
} |
|
|
|
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) |
|
|
|
|
|
|
|
token = uma.connection.token |
|
|
|
permissions = list() |
|
|
|
assert uma.permissions_check(token["access_token"], permissions) |
|
|
|
|
|
|
|
permissions.append(UMAPermission(resource=resource_to_create["name"])) |
|
|
|
assert uma.permissions_check(token["access_token"], permissions) |
|
|
|
|
|
|
|
permissions.append(UMAPermission(resource="not valid")) |
|
|
|
assert not uma.permissions_check(token["access_token"], permissions) |
|
|
|
uma.resource_set_delete(resource["_id"]) |
|
|
|
|
|
|
|
|
|
|
|
def test_uma_permission_ticket(uma: KeycloakUMA): |
|
|
|
"""Test permission ticket generation. |
|
|
|
|
|
|
|
:param uma: Keycloak UMA client |
|
|
|
:type uma: KeycloakUMA |
|
|
|
""" |
|
|
|
resource_to_create = { |
|
|
|
"name": "mytest", |
|
|
|
"scopes": ["read", "write"], |
|
|
|
"type": "urn:test", |
|
|
|
"ownerManagedAccess": True, |
|
|
|
} |
|
|
|
resource = uma.resource_set_create(resource_to_create) |
|
|
|
|
|
|
|
policy_to_create = { |
|
|
|
"name": "TestPolicy", |
|
|
|
"description": "Test resource policy description", |
|
|
|
"scopes": [resource_to_create["scopes"][0]], |
|
|
|
"clients": [uma.connection.client_id], |
|
|
|
} |
|
|
|
uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) |
|
|
|
permissions = ( |
|
|
|
UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]), |
|
|
|
) |
|
|
|
response = uma.permission_ticket_create(permissions) |
|
|
|
|
|
|
|
rpt = uma.connection.keycloak_openid.token( |
|
|
|
grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"] |
|
|
|
) |
|
|
|
assert rpt |
|
|
|
assert "access_token" in rpt |
|
|
|
|
|
|
|
permissions = (UMAPermission(resource="invalid"),) |
|
|
|
with pytest.raises(KeycloakPostError): |
|
|
|
uma.permission_ticket_create(permissions) |
|
|
|
|
|
|
|
uma.resource_set_delete(resource["_id"]) |