diff --git a/tests/test_connection.py b/tests/test_connection.py index 85730cd..6b1f8ff 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -9,9 +9,9 @@ from keycloak.exceptions import KeycloakConnectionError def test_connection_proxy(): """Test proxies of connection manager.""" cm = ConnectionManager( - base_url="http://test.test", proxies={"http://test.test": "localhost:8080"} + base_url="http://test.test", proxies={"http://test.test": "http://localhost:8080"} ) - assert cm._s.proxies == {"http://test.test": "localhost:8080"} + assert cm._s.proxies == {"http://test.test": "http://localhost:8080"} def test_headers(): diff --git a/tests/test_keycloak_uma.py b/tests/test_keycloak_uma.py index 76682a5..6591e4b 100644 --- a/tests/test_keycloak_uma.py +++ b/tests/test_keycloak_uma.py @@ -310,3 +310,286 @@ def test_uma_permission_ticket(uma: KeycloakUMA): uma.permission_ticket_create(permissions) uma.resource_set_delete(resource["_id"]) + +#async function start + +@pytest.mark.asyncio +async def test_a_uma_well_known(uma: KeycloakUMA): + """Test the well_known method. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + res = uma.uma_well_known + assert res is not None + assert res != dict() + for key in ["resource_registration_endpoint"]: + assert key in res + +@pytest.mark.asyncio +async def test_a_uma_resource_sets(uma: KeycloakUMA): + """Test resource sets. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + # Check that only the default resource is present + resource_sets = await uma.a_resource_set_list() + resource_set_list = list(resource_sets) + assert len(resource_set_list) == 1, resource_set_list + assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] + + # Test query for resource sets + resource_set_list_ids = await uma.a_resource_set_list_ids() + assert len(resource_set_list_ids) == 1 + + resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default") + assert resource_set_list_ids2 == resource_set_list_ids + + resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource") + assert resource_set_list_ids2 == resource_set_list_ids + + resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True) + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(first=1) + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid") + assert len(resource_set_list_ids) == 0 + + resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0) + assert len(resource_set_list_ids) == 0 + + # Test create resource set + resource_to_create = { + "name": "mytest", + "scopes": ["test:read", "test:write"], + "type": "urn:test", + } + created_resource = await uma.a_resource_set_create(resource_to_create) + assert created_resource + assert created_resource["_id"], created_resource + assert set(resource_to_create).issubset(set(created_resource)), created_resource + + # Test create the same resource set + with pytest.raises(KeycloakPostError) as err: + await uma.a_resource_set_create(resource_to_create) + assert err.match( + re.escape( + '409: b\'{"error":"invalid_request","error_description":' + '"Resource with name [mytest] already exists."}\'' + ) + ) + + # Test get resource set + latest_resource = await uma.a_resource_set_read(created_resource["_id"]) + assert latest_resource["name"] == created_resource["name"] + + # Test update resource set + latest_resource["name"] = "New Resource Name" + res = await uma.a_resource_set_update(created_resource["_id"], latest_resource) + assert res == dict(), res + updated_resource = await uma.a_resource_set_read(created_resource["_id"]) + assert updated_resource["name"] == "New Resource Name" + + # Test update resource set fail + with pytest.raises(KeycloakPutError) as err: + uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + # Test delete resource set + res = await uma.a_resource_set_delete(resource_id=created_resource["_id"]) + assert res == dict(), res + with pytest.raises(KeycloakGetError) as err: + await uma.a_resource_set_read(created_resource["_id"]) + err.match("404: b''") + + # Test delete fail + with pytest.raises(KeycloakDeleteError) as err: + await uma.a_resource_set_delete(resource_id=created_resource["_id"]) + assert err.match("404: b''") + +@pytest.mark.asyncio +async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin): + """Test policies. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + # Create some required test data + resource_to_create = { + "name": "mytest", + "scopes": ["test:read", "test:write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + created_resource = await uma.a_resource_set_create(resource_to_create) + group_id = admin.create_group({"name": "UMAPolicyGroup"}) + role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"}) + other_client_id = admin.create_client({"name": "UMAOtherClient"}) + client = admin.get_client(other_client_id) + + resource_id = created_resource["_id"] + + # Create a role policy + policy_to_create = { + "name": "TestPolicyRole", + "description": "Test resource policy description", + "scopes": ["test:read", "test:write"], + "roles": ["roleUMAPolicy"], + } + policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + # Create a client policy + policy_to_create = { + "name": "TestPolicyClient", + "description": "Test resource policy description", + "scopes": ["test:read"], + "clients": [client["clientId"]], + } + policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + policy_to_create = { + "name": "TestPolicyGroup", + "description": "Test resource policy description", + "scopes": ["test:read"], + "groups": ["/UMAPolicyGroup"], + } + policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create) + assert policy + + policies = await uma.a_policy_query() + assert len(policies) == 3 + + policies = await uma.a_policy_query(name="TestPolicyGroup") + assert len(policies) == 1 + + policy_id = policy["id"] + await uma.a_policy_delete(policy_id) + with pytest.raises(KeycloakDeleteError) as err: + await uma.a_policy_delete(policy_id) + assert err.match( + '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\'' + ) + + policies = await uma.a_policy_query() + assert len(policies) == 2 + + policy = policies[0] + await uma.a_policy_update(policy_id=policy["id"], payload=policy) + + policies = await uma.a_policy_query() + assert len(policies) == 2 + + policies = await uma.a_policy_query(name="Invalid") + assert len(policies) == 0 + policies = await uma.a_policy_query(scope="Invalid") + assert len(policies) == 0 + policies = await uma.a_policy_query(resource="Invalid") + assert len(policies) == 0 + policies = await uma.a_policy_query(first=3) + assert len(policies) == 0 + policies = await uma.a_policy_query(maximum=0) + assert len(policies) == 0 + + policies = await uma.a_policy_query(name=policy["name"]) + assert len(policies) == 1 + policies = await uma.a_policy_query(scope=policy["scopes"][0]) + assert len(policies) == 2 + policies = await uma.a_policy_query(resource=resource_id) + assert len(policies) == 2 + + uma.a_resource_set_delete(resource_id) + admin.delete_client(other_client_id) + admin.delete_realm_role(role_id) + admin.delete_group(group_id) + +@pytest.mark.asyncio +async def test_a_uma_access(uma: KeycloakUMA): + """Test permission access checks. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + resource_to_create = { + "name": "mytest", + "scopes": ["read", "write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + resource = await uma.a_resource_set_create(resource_to_create) + + policy_to_create = { + "name": "TestPolicy", + "description": "Test resource policy description", + "scopes": [resource_to_create["scopes"][0]], + "clients": [uma.connection.client_id], + } + await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) + + token = uma.connection.token + permissions = list() + assert await uma.a_permissions_check(token["access_token"], permissions) + + permissions.append(UMAPermission(resource=resource_to_create["name"])) + assert await uma.a_permissions_check(token["access_token"], permissions) + + permissions.append(UMAPermission(resource="not valid")) + assert not await uma.a_permissions_check(token["access_token"], permissions) + uma.resource_set_delete(resource["_id"]) + +@pytest.mark.asyncio +async def test_a_uma_permission_ticket(uma: KeycloakUMA): + """Test permission ticket generation. + + :param uma: Keycloak UMA client + :type uma: KeycloakUMA + """ + resource_to_create = { + "name": "mytest", + "scopes": ["read", "write"], + "type": "urn:test", + "ownerManagedAccess": True, + } + resource = await uma.a_resource_set_create(resource_to_create) + + policy_to_create = { + "name": "TestPolicy", + "description": "Test resource policy description", + "scopes": [resource_to_create["scopes"][0]], + "clients": [uma.connection.client_id], + } + await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create) + permissions = ( + UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]), + ) + response = await uma.a_permission_ticket_create(permissions) + + rpt = await uma.connection.keycloak_openid.a_token( + grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"] + ) + assert rpt + assert "access_token" in rpt + + permissions = (UMAPermission(resource="invalid"),) + with pytest.raises(KeycloakPostError): + uma.permission_ticket_create(permissions) + + await uma.a_resource_set_delete(resource["_id"])