|  |  | @ -1,8 +1,12 @@ | 
			
		
	
		
			
				
					|  |  |  | """Test module for KeycloakOpenID.""" | 
			
		
	
		
			
				
					|  |  |  | from unittest import mock | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | import pytest | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | from keycloak.authorization import Authorization | 
			
		
	
		
			
				
					|  |  |  | from keycloak.connection import ConnectionManager | 
			
		
	
		
			
				
					|  |  |  | from keycloak.exceptions import KeycloakDeprecationError, KeycloakRPTNotFound | 
			
		
	
		
			
				
					|  |  |  | from keycloak.keycloak_admin import KeycloakAdmin | 
			
		
	
		
			
				
					|  |  |  | from keycloak.keycloak_openid import KeycloakOpenID | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  | @ -105,7 +109,7 @@ def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]): | 
			
		
	
		
			
				
					|  |  |  |         "not-before-policy": 0, | 
			
		
	
		
			
				
					|  |  |  |         "refresh_expires_in": 1800, | 
			
		
	
		
			
				
					|  |  |  |         "refresh_token": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "scope": "profile email", | 
			
		
	
		
			
				
					|  |  |  |         "scope": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "session_state": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "token_type": "Bearer", | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
	
		
			
				
					|  |  | @ -118,7 +122,7 @@ def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]): | 
			
		
	
		
			
				
					|  |  |  |         "not-before-policy": 0, | 
			
		
	
		
			
				
					|  |  |  |         "refresh_expires_in": 1800, | 
			
		
	
		
			
				
					|  |  |  |         "refresh_token": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "scope": "profile email", | 
			
		
	
		
			
				
					|  |  |  |         "scope": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "session_state": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "token_type": "Bearer", | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
	
		
			
				
					|  |  | @ -131,7 +135,104 @@ def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]): | 
			
		
	
		
			
				
					|  |  |  |         "not-before-policy": 0, | 
			
		
	
		
			
				
					|  |  |  |         "refresh_expires_in": 1800, | 
			
		
	
		
			
				
					|  |  |  |         "refresh_token": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "scope": "profile email", | 
			
		
	
		
			
				
					|  |  |  |         "scope": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "session_state": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |         "token_type": "Bearer", | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | def test_exchange_token( | 
			
		
	
		
			
				
					|  |  |  |     oid_with_credentials: tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin | 
			
		
	
		
			
				
					|  |  |  | ): | 
			
		
	
		
			
				
					|  |  |  |     """Test the exchange token method.""" | 
			
		
	
		
			
				
					|  |  |  |     # Verify existing user | 
			
		
	
		
			
				
					|  |  |  |     oid, username, password = oid_with_credentials | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     # Allow impersonation | 
			
		
	
		
			
				
					|  |  |  |     admin.realm_name = oid.realm_name | 
			
		
	
		
			
				
					|  |  |  |     admin.assign_client_role( | 
			
		
	
		
			
				
					|  |  |  |         user_id=admin.get_user_id(username=username), | 
			
		
	
		
			
				
					|  |  |  |         client_id=admin.get_client_id(client_name="realm-management"), | 
			
		
	
		
			
				
					|  |  |  |         roles=[ | 
			
		
	
		
			
				
					|  |  |  |             admin.get_client_role( | 
			
		
	
		
			
				
					|  |  |  |                 client_id=admin.get_client_id(client_name="realm-management"), | 
			
		
	
		
			
				
					|  |  |  |                 role_name="impersonation", | 
			
		
	
		
			
				
					|  |  |  |             ) | 
			
		
	
		
			
				
					|  |  |  |         ], | 
			
		
	
		
			
				
					|  |  |  |     ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     token = oid.token(username=username, password=password) | 
			
		
	
		
			
				
					|  |  |  |     assert oid.userinfo(token=token["access_token"]) == { | 
			
		
	
		
			
				
					|  |  |  |         "email": f"{username}@test.test", | 
			
		
	
		
			
				
					|  |  |  |         "email_verified": False, | 
			
		
	
		
			
				
					|  |  |  |         "preferred_username": username, | 
			
		
	
		
			
				
					|  |  |  |         "sub": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     # Exchange token with the new user | 
			
		
	
		
			
				
					|  |  |  |     new_token = oid.exchange_token( | 
			
		
	
		
			
				
					|  |  |  |         token=token["access_token"], | 
			
		
	
		
			
				
					|  |  |  |         client_id=oid.client_id, | 
			
		
	
		
			
				
					|  |  |  |         audience=oid.client_id, | 
			
		
	
		
			
				
					|  |  |  |         subject=username, | 
			
		
	
		
			
				
					|  |  |  |     ) | 
			
		
	
		
			
				
					|  |  |  |     assert oid.userinfo(token=new_token["access_token"]) == { | 
			
		
	
		
			
				
					|  |  |  |         "email": f"{username}@test.test", | 
			
		
	
		
			
				
					|  |  |  |         "email_verified": False, | 
			
		
	
		
			
				
					|  |  |  |         "preferred_username": username, | 
			
		
	
		
			
				
					|  |  |  |         "sub": mock.ANY, | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |     assert token != new_token | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | def test_certs(oid: KeycloakOpenID): | 
			
		
	
		
			
				
					|  |  |  |     """Test certificates.""" | 
			
		
	
		
			
				
					|  |  |  |     assert len(oid.certs()["keys"]) == 2 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | def test_public_key(oid: KeycloakOpenID): | 
			
		
	
		
			
				
					|  |  |  |     """Test public key.""" | 
			
		
	
		
			
				
					|  |  |  |     assert oid.public_key() is not None | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | def test_entitlement( | 
			
		
	
		
			
				
					|  |  |  |     oid_with_credentials_authz: tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin | 
			
		
	
		
			
				
					|  |  |  | ): | 
			
		
	
		
			
				
					|  |  |  |     """Test entitlement.""" | 
			
		
	
		
			
				
					|  |  |  |     oid, username, password = oid_with_credentials_authz | 
			
		
	
		
			
				
					|  |  |  |     token = oid.token(username=username, password=password) | 
			
		
	
		
			
				
					|  |  |  |     resource_server_id = admin.get_client_authz_resources( | 
			
		
	
		
			
				
					|  |  |  |         client_id=admin.get_client_id(oid.client_id) | 
			
		
	
		
			
				
					|  |  |  |     )[0]["_id"] | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     with pytest.raises(KeycloakDeprecationError): | 
			
		
	
		
			
				
					|  |  |  |         oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | def test_introspect(oid_with_credentials: tuple[KeycloakOpenID, str, str]): | 
			
		
	
		
			
				
					|  |  |  |     """Test introspect.""" | 
			
		
	
		
			
				
					|  |  |  |     oid, username, password = oid_with_credentials | 
			
		
	
		
			
				
					|  |  |  |     token = oid.token(username=username, password=password) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     assert oid.introspect(token=token["access_token"])["active"] | 
			
		
	
		
			
				
					|  |  |  |     assert oid.introspect( | 
			
		
	
		
			
				
					|  |  |  |         token=token["access_token"], rpt="some", token_type_hint="requesting_party_token" | 
			
		
	
		
			
				
					|  |  |  |     ) == {"active": False} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     with pytest.raises(KeycloakRPTNotFound): | 
			
		
	
		
			
				
					|  |  |  |         oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token") | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | def test_decode_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]): | 
			
		
	
		
			
				
					|  |  |  |     """Test decode token.""" | 
			
		
	
		
			
				
					|  |  |  |     oid, username, password = oid_with_credentials | 
			
		
	
		
			
				
					|  |  |  |     token = oid.token(username=username, password=password) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     assert ( | 
			
		
	
		
			
				
					|  |  |  |         oid.decode_token( | 
			
		
	
		
			
				
					|  |  |  |             token=token["access_token"], | 
			
		
	
		
			
				
					|  |  |  |             key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----", | 
			
		
	
		
			
				
					|  |  |  |             options={"verify_aud": False}, | 
			
		
	
		
			
				
					|  |  |  |         )["preferred_username"] | 
			
		
	
		
			
				
					|  |  |  |         == username | 
			
		
	
		
			
				
					|  |  |  |     ) |