From 3409d63eee50bd8197dbbabd2819c86dba1160ae Mon Sep 17 00:00:00 2001 From: David Date: Fri, 17 May 2024 15:58:15 -0500 Subject: [PATCH] write tests for keycloak admin async functions --- tests/test_keycloak_admin.py | 590 +++++++++++++++++++++++++++++++++++ 1 file changed, 590 insertions(+) diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 990ad9b..56268b7 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -4619,3 +4619,593 @@ async def test_a_client_default_client_scopes(admin: KeycloakAdmin, realm: str, await admin.a_delete_client_default_client_scope(client_id, new_client_scope_id) default_client_scopes = await admin.a_get_client_default_client_scopes(client_id) assert len(default_client_scopes) == 5, default_client_scopes + + +@pytest.mark.asyncio +async def test_a_client_optional_client_scopes(admin: KeycloakAdmin, realm: str, client: str): + """Test client assignment of optional client scopes. + + :param admin: Keycloak admin + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + :param client: Keycloak client + :type client: str + """ + await admin.a_change_current_realm(realm) + + client_id = await admin.a_create_client( + payload={"name": "role-testing-client", "clientId": "role-testing-client"} + ) + # Test get client optional scopes + # keycloak optional roles: microprofile-jwt, offline_access, address, phone + optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id) + assert len(optional_client_scopes) == 4, optional_client_scopes + + # Test add a client scope to client optional scopes + optional_client_scope = "test-client-optional-scope" + new_client_scope = { + "name": optional_client_scope, + "description": f"Test Client Scope: {optional_client_scope}", + "protocol": "openid-connect", + "attributes": {}, + } + new_client_scope_id = await admin.a_create_client_scope(new_client_scope, skip_exists=False) + new_optional_client_scope_data = { + "realm": realm, + "client": client_id, + "clientScopeId": new_client_scope_id, + } + await admin.a_add_client_optional_client_scope( + client_id, new_client_scope_id, new_optional_client_scope_data + ) + optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id) + assert len(optional_client_scopes) == 5, optional_client_scopes + + # Test remove a client optional scope + await admin.a_delete_client_optional_client_scope(client_id, new_client_scope_id) + optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id) + assert len(optional_client_scopes) == 4, optional_client_scopes + +@pytest.mark.asyncio +async def test_a_client_roles(admin: KeycloakAdmin, client: str): + """Test client roles. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param client: Keycloak client + :type client: str + """ + # Test get client roles + res = await admin.a_get_client_roles(client_id=client) + assert len(res) == 0 + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_roles(client_id="bad") + assert err.match('404: b\'{"error":"Could not find client".*}\'') + + # Test create client role + client_role_id = await admin.a_create_client_role( + client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True + ) + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_role(client_role_id=client, payload={"name": "client-role-test"}) + assert err.match('409: b\'{"errorMessage":"Role with name client-role-test already exists"}\'') + client_role_id_2 = await admin.a_create_client_role( + client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True + ) + assert client_role_id == client_role_id_2 + + # Test get client role + res = await admin.a_get_client_role(client_id=client, role_name="client-role-test") + assert res["name"] == client_role_id + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_role(client_id=client, role_name="bad") + assert err.match(COULD_NOT_FIND_ROLE_REGEX) + + res_ = admin.a_get_client_role_id(client_id=client, role_name="client-role-test") + assert res_ == res["id"] + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_role_id(client_id=client, role_name="bad") + assert err.match(COULD_NOT_FIND_ROLE_REGEX) + assert len(await admin.a_get_client_roles(client_id=client)) == 1 + + # Test update client role + res = await admin.a_update_client_role( + client_id=client, role_name="client-role-test", payload={"name": "client-role-test-update"} + ) + assert res == dict() + with pytest.raises(KeycloakPutError) as err: + res = await admin.a_update_client_role( + client_id=client, + role_name="client-role-test", + payload={"name": "client-role-test-update"}, + ) + assert err.match(COULD_NOT_FIND_ROLE_REGEX) + + # Test user with client role + res = await admin.a_get_client_role_members(client_id=client, role_name="client-role-test-update") + assert len(res) == 0 + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_role_members(client_id=client, role_name="bad") + assert err.match(COULD_NOT_FIND_ROLE_REGEX) + + user_id = await admin.a_create_user(payload={"username": "test", "email": "test@test.test"}) + with pytest.raises(KeycloakPostError) as err: + await admin.a_assign_client_role(user_id=user_id, client_id=client, roles=["bad"]) + assert err.match(UNKOWN_ERROR_REGEX), err + res = await admin.a_assign_client_role( + user_id=user_id, + client_id=client, + roles=[admin.get_client_role(client_id=client, role_name="client-role-test-update")], + ) + assert res == dict() + assert ( + len(admin.get_client_role_members(client_id=client, role_name="client-role-test-update")) + == 1 + ) + + roles = await admin.a_get_client_roles_of_user(user_id=user_id, client_id=client) + assert len(roles) == 1, roles + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_roles_of_user(user_id=user_id, client_id="bad") + assert err.match(CLIENT_NOT_FOUND_REGEX) + + roles = await admin.a_get_composite_client_roles_of_user(user_id=user_id, client_id=client) + assert len(roles) == 1, roles + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_composite_client_roles_of_user(user_id=user_id, client_id="bad") + assert err.match(CLIENT_NOT_FOUND_REGEX) + + roles = await admin.a_get_available_client_roles_of_user(user_id=user_id, client_id=client) + assert len(roles) == 0, roles + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_composite_client_roles_of_user(user_id=user_id, client_id="bad") + assert err.match(CLIENT_NOT_FOUND_REGEX) + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_client_roles_of_user(user_id=user_id, client_id=client, roles=["bad"]) + assert err.match(UNKOWN_ERROR_REGEX), err + await admin.a_delete_client_roles_of_user( + user_id=user_id, + client_id=client, + roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], + ) + assert len(await admin.a_get_client_roles_of_user(user_id=user_id, client_id=client)) == 0 + + # Test groups and client roles + res = await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update") + assert len(res) == 0 + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_role_groups(client_id=client, role_name="bad") + assert err.match(COULD_NOT_FIND_ROLE_REGEX) + + group_id = await admin.a_create_group(payload={"name": "test-group"}) + res = await admin.a_get_group_client_roles(group_id=group_id, client_id=client) + assert len(res) == 0 + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_group_client_roles(group_id=group_id, client_id="bad") + assert err.match(CLIENT_NOT_FOUND_REGEX) + + with pytest.raises(KeycloakPostError) as err: + await admin.a_assign_group_client_roles(group_id=group_id, client_id=client, roles=["bad"]) + assert err.match(UNKOWN_ERROR_REGEX), err + res = await admin.a_assign_group_client_roles( + group_id=group_id, + client_id=client, + roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], + ) + assert res == dict() + assert ( + len(await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update")) + == 1 + ) + assert len(await admin.a_get_group_client_roles(group_id=group_id, client_id=client)) == 1 + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_group_client_roles(group_id=group_id, client_id=client, roles=["bad"]) + assert err.match(UNKOWN_ERROR_REGEX), err + res = await admin.a_delete_group_client_roles( + group_id=group_id, + client_id=client, + roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], + ) + assert res == dict() + + # Test composite client roles + with pytest.raises(KeycloakPostError) as err: + await admin.a_add_composite_client_roles_to_role( + client_role_id=client, role_name="client-role-test-update", roles=["bad"] + ) + assert err.match(UNKOWN_ERROR_REGEX), err + res = await admin.a_add_composite_client_roles_to_role( + client_role_id=client, + role_name="client-role-test-update", + roles=[await admin.a_get_realm_role(role_name="offline_access")], + ) + assert res == dict() + assert await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")[ + "composite" + ] + + # Test delete of client role + res = await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update") + assert res == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update") + assert err.match(COULD_NOT_FIND_ROLE_REGEX) + + # Test of roles by id - Get role + await admin.a_create_client_role( + client_role_id=client, payload={"name": "client-role-by-id-test"}, skip_exists=True + ) + role = await admin.a_get_client_role(client_id=client, role_name="client-role-by-id-test") + res = admin.a_get_role_by_id(role_id=role["id"]) + assert res["name"] == "client-role-by-id-test" + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_role_by_id(role_id="bad") + assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX) + + # Test of roles by id - Update role + res = await admin.a_update_role_by_id( + role_id=role["id"], payload={"name": "client-role-by-id-test-update"} + ) + assert res == dict() + with pytest.raises(KeycloakPutError) as err: + res = await admin.a_update_role_by_id( + role_id="bad", payload={"name": "client-role-by-id-test-update"} + ) + assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX) + + # Test of roles by id - Delete role + res = await admin.a_delete_role_by_id(role_id=role["id"]) + assert res == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_role_by_id(role_id="bad") + assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX) + +@pytest.mark.asyncio +async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str): + """Test enable token exchange. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + :raises AssertionError: In case of bad configuration + """ + # Test enabling token exchange between two confidential clients + await admin.a_change_current_realm(realm) + + # Create test clients + source_client_id = await admin.a_create_client( + payload={"name": "Source Client", "clientId": "source-client"} + ) + target_client_id = await admin.a_create_client( + payload={"name": "Target Client", "clientId": "target-client"} + ) + for c in await admin.a_get_clients(): + if c["clientId"] == "realm-management": + realm_management_id = c["id"] + break + else: + raise AssertionError("Missing realm management client") + + # Enable permissions on the Superset client + await admin.a_update_client_management_permissions( + payload={"enabled": True}, client_id=target_client_id + ) + + # Fetch various IDs and strings needed when creating the permission + token_exchange_permission_id = await admin.a_get_client_management_permissions( + client_id=target_client_id + )["scopePermissions"]["token-exchange"] + scopes = await admin.a_get_client_authz_policy_scopes( + client_id=realm_management_id, policy_id=token_exchange_permission_id + ) + + for s in scopes: + if s["name"] == "token-exchange": + token_exchange_scope_id = s["id"] + break + else: + raise AssertionError("Missing token-exchange scope") + + resources = await admin.a_get_client_authz_policy_resources( + client_id=realm_management_id, policy_id=token_exchange_permission_id + ) + for r in resources: + if r["name"] == f"client.resource.{target_client_id}": + token_exchange_resource_id = r["_id"] + break + else: + raise AssertionError("Missing client resource") + + # Create a client policy for source client + policy_name = "Exchange source client token with target client token" + client_policy_id = await admin.a_create_client_authz_client_policy( + payload={ + "type": "client", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "name": policy_name, + "clients": [source_client_id], + }, + client_id=realm_management_id, + )["id"] + policies = await admin.a_get_client_authz_client_policies(client_id=realm_management_id) + for policy in policies: + if policy["name"] == policy_name: + assert policy["clients"] == [source_client_id] + break + else: + raise AssertionError("Missing client policy") + + # Update permissions on the target client to reference this policy + permission_name = await admin.a_get_client_authz_scope_permission( + client_id=realm_management_id, scope_id=token_exchange_permission_id + )["name"] + await admin.a_update_client_authz_scope_permission( + payload={ + "id": token_exchange_permission_id, + "name": permission_name, + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [token_exchange_resource_id], + "scopes": [token_exchange_scope_id], + "policies": [client_policy_id], + }, + client_id=realm_management_id, + scope_id=token_exchange_permission_id, + ) + + # Create permissions on the target client to reference this policy + await admin.a_create_client_authz_scope_permission( + payload={ + "id": "some-id", + "name": "test-permission", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [token_exchange_resource_id], + "scopes": [token_exchange_scope_id], + "policies": [client_policy_id], + }, + client_id=realm_management_id, + ) + permission_name = await admin.a_get_client_authz_scope_permission( + client_id=realm_management_id, scope_id=token_exchange_permission_id + )["name"] + assert permission_name.startswith("token-exchange.permission.client.") + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_client_authz_scope_permission( + payload={"name": "test-permission", "scopes": [token_exchange_scope_id]}, + client_id="realm_management_id", + ) + assert err.match('404: b\'{"error":"Could not find client".*}\'') + +@pytest.mark.asyncio +async def test_a_email(admin: KeycloakAdmin, user: str): + """Test email. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param user: Keycloak user + :type user: str + """ + # Emails will fail as we don't have SMTP test setup + with pytest.raises(KeycloakPutError) as err: + await admin.a_send_update_account(user_id=user, payload=dict()) + assert err.match(UNKOWN_ERROR_REGEX), err + + admin.update_user(user_id=user, payload={"enabled": True}) + with pytest.raises(KeycloakPutError) as err: + await admin.a_send_verify_email(user_id=user) + assert err.match('500: b\'{"errorMessage":"Failed to send .*"}\'') + +@pytest.mark.asyncio +async def test_a_get_sessions(admin: KeycloakAdmin): + """Test get sessions. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + sessions = await admin.a_get_sessions(user_id=admin.get_user_id(username=admin.connection.username)) + assert len(sessions) >= 1 + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_sessions(user_id="bad") + assert err.match(USER_NOT_FOUND_REGEX) + +@pytest.mark.asyncio +async def test_a_get_client_installation_provider(admin: KeycloakAdmin, client: str): + """Test get client installation provider. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param client: Keycloak client + :type client: str + """ + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_client_installation_provider(client_id=client, provider_id="bad") + assert err.match('404: b\'{"error":"Unknown Provider".*}\'') + + installation = await admin.a_get_client_installation_provider( + client_id=client, provider_id="keycloak-oidc-keycloak-json" + ) + assert set(installation.keys()) == { + "auth-server-url", + "confidential-port", + "credentials", + "realm", + "resource", + "ssl-required", + } + +@pytest.mark.asyncio +async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): + """Test auth flows. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + await admin.a_change_current_realm(realm) + + res = await admin.a_get_authentication_flows() + assert len(res) <= 8, res + default_flows = len(res) + assert {x["alias"] for x in res}.issubset( + { + "reset credentials", + "browser", + "registration", + "http challenge", + "docker auth", + "direct grant", + "first broker login", + "clients", + } + ) + assert set(res[0].keys()) == { + "alias", + "authenticationExecutions", + "builtIn", + "description", + "id", + "providerId", + "topLevel", + } + assert {x["alias"] for x in res}.issubset( + { + "reset credentials", + "browser", + "registration", + "docker auth", + "direct grant", + "first broker login", + "clients", + "http challenge", + } + ) + + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_authentication_flow_for_id(flow_id="bad") + assert err.match('404: b\'{"error":"Could not find flow with id".*}\'') + browser_flow_id = [x for x in res if x["alias"] == "browser"][0]["id"] + res = await admin.a_get_authentication_flow_for_id(flow_id=browser_flow_id) + assert res["alias"] == "browser" + + # Test copying + with pytest.raises(KeycloakPostError) as err: + await admin.a_copy_authentication_flow(payload=dict(), flow_alias="bad") + assert err.match("404: b''") + + res = await admin.a_copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser") + assert res == b"", res + assert len(await admin.a_get_authentication_flows()) == (default_flows + 1) + + # Test create + res = await admin.a_create_authentication_flow( + payload={"alias": "test-create", "providerId": "basic-flow"} + ) + assert res == b"" + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_authentication_flow(payload={"alias": "test-create", "builtIn": False}) + assert err.match('409: b\'{"errorMessage":"Flow test-create already exists"}\'') + assert await admin.a_create_authentication_flow( + payload={"alias": "test-create"}, skip_exists=True + ) == {"msg": "Already exists"} + + # Test flow executions + res = await admin.a_get_authentication_flow_executions(flow_alias="browser") + assert len(res) == 8, res + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_authentication_flow_executions(flow_alias="bad") + assert err.match("404: b''") + exec_id = res[0]["id"] + + res = await admin.a_get_authentication_flow_execution(execution_id=exec_id) + assert set(res.keys()) == { + "alternative", + "authenticator", + "authenticatorFlow", + "conditional", + "disabled", + "enabled", + "id", + "parentFlow", + "priority", + "required", + "requirement", + }, res + with pytest.raises(KeycloakGetError) as err: + await admin.a_get_authentication_flow_execution(execution_id="bad") + assert err.match(ILLEGAL_EXECUTION_REGEX) + + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_authentication_flow_execution(payload=dict(), flow_alias="browser") + assert err.match('400: b\'{"error":"It is illegal to add execution to a built in flow".*}\'') + + res = await admin.a_create_authentication_flow_execution( + payload={"provider": "auth-cookie"}, flow_alias="test-create" + ) + assert res == b"" + assert len(await admin.a_get_authentication_flow_executions(flow_alias="test-create")) == 1 + + with pytest.raises(KeycloakPutError) as err: + await admin.a_update_authentication_flow_executions( + payload={"required": "yes"}, flow_alias="test-create" + ) + assert err.match('400: b\'{"error":"Unrecognized field') + payload = await admin.a_get_authentication_flow_executions(flow_alias="test-create")[0] + payload["displayName"] = "test" + res = await admin.a_update_authentication_flow_executions(payload=payload, flow_alias="test-create") + assert res + + exec_id = await admin.a_get_authentication_flow_executions(flow_alias="test-create")[0]["id"] + res = await admin.a_delete_authentication_flow_execution(execution_id=exec_id) + assert res == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_authentication_flow_execution(execution_id=exec_id) + assert err.match(ILLEGAL_EXECUTION_REGEX) + + # Test subflows + res = await admin.a_create_authentication_flow_subflow( + payload={ + "alias": "test-subflow", + "provider": "basic-flow", + "type": "something", + "description": "something", + }, + flow_alias="test-browser", + ) + assert res == b"" + with pytest.raises(KeycloakPostError) as err: + await admin.a_create_authentication_flow_subflow( + payload={"alias": "test-subflow", "providerId": "basic-flow"}, + flow_alias="test-browser", + ) + assert err.match('409: b\'{"errorMessage":"New flow alias name already exists"}\'') + res = await admin.a_create_authentication_flow_subflow( + payload={ + "alias": "test-subflow", + "provider": "basic-flow", + "type": "something", + "description": "something", + }, + flow_alias="test-create", + skip_exists=True, + ) + assert res == {"msg": "Already exists"} + + # Test delete auth flow + flow_id = [x for x in await admin.a_get_authentication_flows() if x["alias"] == "test-browser"][0][ + "id" + ] + res = await admin.a_delete_authentication_flow(flow_id=flow_id) + assert res == dict() + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_delete_authentication_flow(flow_id=flow_id) + assert err.match('404: b\'{"error":"Could not find flow with id".*}\'') +