diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 5916016..f694db8 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -5650,3 +5650,349 @@ async def test_a_update_required_action(admin: KeycloakAdmin, realm: str): newra = await admin.a_get_required_action_by_alias("UPDATE_PASSWORD") assert old != newra assert newra["enabled"] is False + +@pytest.mark.asyncio +async def test_a_get_composite_client_roles_of_group( + admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str +): + """Test get composite client roles of group. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + :param client: Keycloak client + :type client: str + :param group: Keycloak group + :type group: str + :param composite_client_role: Composite client role + :type composite_client_role: str + """ + await admin.a_change_current_realm(realm) + role = await admin.a_get_client_role(client, composite_client_role) + await admin.a_assign_group_client_roles(group_id=group, client_id=client, roles=[role]) + result = await admin.a_get_composite_client_roles_of_group(client, group) + assert role["id"] in [x["id"] for x in result] + +@pytest.mark.asyncio +async def test_a_get_role_client_level_children( + admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str +): + """Test get children of composite client role. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + :param client: Keycloak client + :type client: str + :param composite_client_role: Composite client role + :type composite_client_role: str + :param client_role: Client role + :type client_role: str + """ + await admin.a_change_current_realm(realm) + child = await admin.a_get_client_role(client, client_role) + parent = await admin.a_get_client_role(client, composite_client_role) + res = await admin.a_get_role_client_level_children(client, parent["id"]) + assert child["id"] in [x["id"] for x in res] + +@pytest.mark.asyncio +async def test_a_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple): + """Test upload certificate. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + :param client: Keycloak client + :type client: str + :param selfsigned_cert: Selfsigned certificates + :type selfsigned_cert: tuple + """ + await admin.a_change_current_realm(realm) + cert, _ = selfsigned_cert + cert = cert.decode("utf-8").strip() + admin.upload_certificate(client, cert) + cl = await admin.a_get_client(client) + assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1]) + +@pytest.mark.asyncio +async def test_a_get_bruteforce_status_for_user( + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str +): + """Test users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param realm: Keycloak realm + :type realm: str + """ + oid, username, password = oid_with_credentials + await admin.a_change_current_realm(realm) + + # Turn on bruteforce protection + res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": True}) + res = await admin.a_get_realm(realm_name=realm) + assert res["bruteForceProtected"] is True + + # Test login user with wrong credentials + try: + oid.token(username=username, password="wrongpassword") + except KeycloakAuthenticationError: + pass + + user_id = await admin.a_get_user_id(username) + bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) + + assert bruteforce_status["numFailures"] == 1 + + # Cleanup + res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": False}) + res = await admin.a_get_realm(realm_name=realm) + assert res["bruteForceProtected"] is False + +@pytest.mark.asyncio +async def test_a_clear_bruteforce_attempts_for_user( + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str +): + """Test users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param realm: Keycloak realm + :type realm: str + """ + oid, username, password = oid_with_credentials + await admin.a_change_current_realm(realm) + + # Turn on bruteforce protection + res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": True}) + res = await admin.a_get_realm(realm_name=realm) + assert res["bruteForceProtected"] is True + + # Test login user with wrong credentials + try: + oid.token(username=username, password="wrongpassword") + except KeycloakAuthenticationError: + pass + + user_id = await admin.a_get_user_id(username) + bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 1 + + res = await admin.a_clear_bruteforce_attempts_for_user(user_id) + bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 0 + + # Cleanup + res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": False}) + res = await admin.a_get_realm(realm_name=realm) + assert res["bruteForceProtected"] is False + + +@pytest.mark.asyncio +async def test_a_clear_bruteforce_attempts_for_all_users( + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str +): + """Test users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param realm: Keycloak realm + :type realm: str + """ + oid, username, password = oid_with_credentials + await admin.a_change_current_realm(realm) + + # Turn on bruteforce protection + res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": True}) + res = await admin.a_get_realm(realm_name=realm) + assert res["bruteForceProtected"] is True + + # Test login user with wrong credentials + try: + oid.token(username=username, password="wrongpassword") + except KeycloakAuthenticationError: + pass + + user_id = await admin.a_get_user_id(username) + bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 1 + + res = await admin.a_clear_all_bruteforce_attempts() + bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 0 + + # Cleanup + res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": False}) + res = await admin.a_get_realm(realm_name=realm) + assert res["bruteForceProtected"] is False + +@pytest.mark.asyncio +async def test_a_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> None: + """Test that the default realm role is present in a brand new realm. + + :param realm: Realm name + :type realm: str + :param admin: Keycloak admin + :type admin: KeycloakAdmin + """ + await admin.a_change_current_realm(realm) + assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()] + assert ( + len([x["name"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"]) + == 1 + ) + +@pytest.mark.asyncio +async def test_a_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> None: + """Test getter for the ID of the default realm role. + + :param realm: Realm name + :type realm: str + :param admin: Keycloak admin + :type admin: KeycloakAdmin + """ + await admin.a_change_current_realm(realm) + assert ( + await admin.a_get_default_realm_role_id() + == [x["id"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"][0] + ) + +@pytest.mark.asyncio +async def test_a_realm_default_roles(admin: KeycloakAdmin, realm: str) -> None: + """Test getting, adding and deleting default realm roles. + + :param realm: Realm name + :type realm: str + :param admin: Keycloak admin + :type admin: KeycloakAdmin + """ + await admin.a_change_current_realm(realm) + + # Test listing all default realm roles + roles = await admin.a_get_realm_default_roles() + assert len(roles) == 2 + assert {x["name"] for x in roles} == {"offline_access", "uma_authorization"} + + with pytest.raises(KeycloakGetError) as err: + await admin.a_change_current_realm("doesnotexist") + await admin.a_get_realm_default_roles() + assert err.match('404: b\'{"error":"Realm not found.".*}\'') + await admin.a_change_current_realm(realm) + + # Test removing a default realm role + res = await admin.a_remove_realm_default_roles(payload=[roles[0]]) + assert res == {} + assert roles[0] not in await admin.a_get_realm_default_roles() + assert len(await admin.a_get_realm_default_roles()) == 1 + + with pytest.raises(KeycloakDeleteError) as err: + await admin.a_remove_realm_default_roles(payload=[{"id": "bad id"}]) + assert err.match('404: b\'{"error":"Could not find composite role".*}\'') + + # Test adding a default realm role + res = await admin.a_add_realm_default_roles(payload=[roles[0]]) + assert res == {} + assert roles[0] in await admin.a_get_realm_default_roles() + assert len(await admin.a_get_realm_default_roles()) == 2 + + with pytest.raises(KeycloakPostError) as err: + await admin.a_add_realm_default_roles(payload=[{"id": "bad id"}]) + assert err.match('404: b\'{"error":"Could not find composite role".*}\'') + +@pytest.mark.asyncio +async def test_a_clear_keys_cache(realm: str, admin: KeycloakAdmin) -> None: + """Test clearing the keys cache. + + :param realm: Realm name + :type realm: str + :param admin: Keycloak admin + :type admin: KeycloakAdmin + """ + await admin.a_change_current_realm(realm) + res = await admin.a_clear_keys_cache() + assert res == {} + +@pytest.mark.asyncio +async def test_a_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None: + """Test clearing the realm cache. + + :param realm: Realm name + :type realm: str + :param admin: Keycloak admin + :type admin: KeycloakAdmin + """ + await admin.a_change_current_realm(realm) + res = await admin.a_clear_realm_cache() + assert res == {} + +@pytest.mark.asyncio +def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None: + """Test clearing the user cache. + + :param realm: Realm name + :type realm: str + :param admin: Keycloak admin + :type admin: KeycloakAdmin + """ + await admin.a_change_current_realm(realm) + res = await admin.a_clear_user_cache() + assert res == {} + +@pytest.mark.asyncio +async def test_a_initial_access_token( + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str] +) -> None: + """Test initial access token and client creation. + + :param admin: Keycloak admin + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + res = await admin.a_create_initial_access_token(2, 3) + assert "token" in res + assert res["count"] == 2 + assert res["expiration"] == 3 + + oid, username, password = oid_with_credentials + + client = str(uuid.uuid4()) + secret = str(uuid.uuid4()) + + res = oid.register_client( + token=res["token"], + payload={ + "name": "DynamicRegisteredClient", + "clientId": client, + "enabled": True, + "publicClient": False, + "protocol": "openid-connect", + "secret": secret, + "clientAuthenticatorType": "client-secret", + }, + ) + assert res["clientId"] == client + + new_secret = str(uuid.uuid4()) + res = await oid.a_update_client(res["registrationAccessToken"], client, payload={"secret": new_secret}) + assert res["secret"] == new_secret + +@pytest.mark.asyncio +async def test_a_refresh_token(admin: KeycloakAdmin): + """Test refresh token on connection even if it is expired. + + :param admin: Keycloak admin + :type admin: KeycloakAdmin + """ + assert admin.connection.token is not None + await admin.a_user_logout(await admin.a_get_user_id(admin.connection.username)) + admin.connection.refresh_token()