|  |  | @ -5,6 +5,7 @@ import os | 
			
		
	
		
			
				
					|  |  |  | import uuid | 
			
		
	
		
			
				
					|  |  |  | from inspect import iscoroutinefunction, signature | 
			
		
	
		
			
				
					|  |  |  | from typing import Tuple | 
			
		
	
		
			
				
					|  |  |  | from unittest.mock import ANY, patch | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | import freezegun | 
			
		
	
		
			
				
					|  |  |  | import pytest | 
			
		
	
	
		
			
				
					|  |  | @ -12,7 +13,12 @@ from dateutil import parser as datetime_parser | 
			
		
	
		
			
				
					|  |  |  | from packaging.version import Version | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | import keycloak | 
			
		
	
		
			
				
					|  |  |  | from keycloak import KeycloakAdmin, KeycloakOpenID, KeycloakOpenIDConnection | 
			
		
	
		
			
				
					|  |  |  | from keycloak import ( | 
			
		
	
		
			
				
					|  |  |  |     KeycloakAdmin, | 
			
		
	
		
			
				
					|  |  |  |     KeycloakConnectionError, | 
			
		
	
		
			
				
					|  |  |  |     KeycloakOpenID, | 
			
		
	
		
			
				
					|  |  |  |     KeycloakOpenIDConnection, | 
			
		
	
		
			
				
					|  |  |  | ) | 
			
		
	
		
			
				
					|  |  |  | from keycloak.connection import ConnectionManager | 
			
		
	
		
			
				
					|  |  |  | from keycloak.exceptions import ( | 
			
		
	
		
			
				
					|  |  |  |     KeycloakAuthenticationError, | 
			
		
	
	
		
			
				
					|  |  | @ -5170,6 +5176,55 @@ async def test_a_email(admin: KeycloakAdmin, user: str): | 
			
		
	
		
			
				
					|  |  |  |     assert err.match('500: b\'{"errorMessage":"Failed to send .*"}\'') | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | @pytest.mark.asyncio | 
			
		
	
		
			
				
					|  |  |  | async def test_a_email_query_param_handling(admin: KeycloakAdmin, user: str): | 
			
		
	
		
			
				
					|  |  |  |     """Test that the optional parameters are correctly transformed into query params. | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     :param admin: Keycloak Admin client | 
			
		
	
		
			
				
					|  |  |  |     :type admin: KeycloakAdmin | 
			
		
	
		
			
				
					|  |  |  |     :param user: Keycloak user | 
			
		
	
		
			
				
					|  |  |  |     :type user: str | 
			
		
	
		
			
				
					|  |  |  |     """ | 
			
		
	
		
			
				
					|  |  |  |     with patch.object( | 
			
		
	
		
			
				
					|  |  |  |         admin.connection.async_s, "put", side_effect=Exception("An expected error") | 
			
		
	
		
			
				
					|  |  |  |     ) as mock_put, pytest.raises(KeycloakConnectionError): | 
			
		
	
		
			
				
					|  |  |  |         await admin.a_send_update_account( | 
			
		
	
		
			
				
					|  |  |  |             user_id=user, | 
			
		
	
		
			
				
					|  |  |  |             payload=["UPDATE_PASSWORD"], | 
			
		
	
		
			
				
					|  |  |  |             client_id="update-account-client-id", | 
			
		
	
		
			
				
					|  |  |  |             redirect_uri="https://example.com", | 
			
		
	
		
			
				
					|  |  |  |         ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     mock_put.assert_awaited_once_with( | 
			
		
	
		
			
				
					|  |  |  |         ANY, | 
			
		
	
		
			
				
					|  |  |  |         data='["UPDATE_PASSWORD"]', | 
			
		
	
		
			
				
					|  |  |  |         params={ | 
			
		
	
		
			
				
					|  |  |  |             "client_id": "update-account-client-id", | 
			
		
	
		
			
				
					|  |  |  |             "redirect_uri": "https://example.com", | 
			
		
	
		
			
				
					|  |  |  |         }, | 
			
		
	
		
			
				
					|  |  |  |         headers=ANY, | 
			
		
	
		
			
				
					|  |  |  |         timeout=60, | 
			
		
	
		
			
				
					|  |  |  |     ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     with patch.object( | 
			
		
	
		
			
				
					|  |  |  |         admin.connection.async_s, "put", side_effect=Exception("An expected error") | 
			
		
	
		
			
				
					|  |  |  |     ) as mock_put, pytest.raises(KeycloakConnectionError): | 
			
		
	
		
			
				
					|  |  |  |         await admin.a_send_verify_email( | 
			
		
	
		
			
				
					|  |  |  |             user_id=user, client_id="verify-client-id", redirect_uri="https://example.com" | 
			
		
	
		
			
				
					|  |  |  |         ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     mock_put.assert_awaited_once_with( | 
			
		
	
		
			
				
					|  |  |  |         ANY, | 
			
		
	
		
			
				
					|  |  |  |         data=ANY, | 
			
		
	
		
			
				
					|  |  |  |         params={ | 
			
		
	
		
			
				
					|  |  |  |             "client_id": "verify-client-id", | 
			
		
	
		
			
				
					|  |  |  |             "redirect_uri": "https://example.com", | 
			
		
	
		
			
				
					|  |  |  |         }, | 
			
		
	
		
			
				
					|  |  |  |         headers=ANY, | 
			
		
	
		
			
				
					|  |  |  |         timeout=60, | 
			
		
	
		
			
				
					|  |  |  |     ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | @pytest.mark.asyncio | 
			
		
	
		
			
				
					|  |  |  | async def test_a_get_sessions(admin: KeycloakAdmin): | 
			
		
	
		
			
				
					|  |  |  |     """Test get sessions. | 
			
		
	
	
		
			
				
					|  |  | 
 |