diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index dd3067a..51274db 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -487,3 +487,454 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): "expires_in": 600, "interval": 5, } + +#async function start + +@pytest.mark.asyncio +async def test_a_well_known(oid: KeycloakOpenID): + """Test the well_known method. + + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + res = await oid.a_well_known() + assert res is not None + assert res != dict() + for key in [ + "acr_values_supported", + "authorization_encryption_alg_values_supported", + "authorization_encryption_enc_values_supported", + "authorization_endpoint", + "authorization_signing_alg_values_supported", + "backchannel_authentication_endpoint", + "backchannel_authentication_request_signing_alg_values_supported", + "backchannel_logout_session_supported", + "backchannel_logout_supported", + "backchannel_token_delivery_modes_supported", + "check_session_iframe", + "claim_types_supported", + "claims_parameter_supported", + "claims_supported", + "code_challenge_methods_supported", + "device_authorization_endpoint", + "end_session_endpoint", + "frontchannel_logout_session_supported", + "frontchannel_logout_supported", + "grant_types_supported", + "id_token_encryption_alg_values_supported", + "id_token_encryption_enc_values_supported", + "id_token_signing_alg_values_supported", + "introspection_endpoint", + "introspection_endpoint_auth_methods_supported", + "introspection_endpoint_auth_signing_alg_values_supported", + "issuer", + "jwks_uri", + "mtls_endpoint_aliases", + "pushed_authorization_request_endpoint", + "registration_endpoint", + "request_object_encryption_alg_values_supported", + "request_object_encryption_enc_values_supported", + "request_object_signing_alg_values_supported", + "request_parameter_supported", + "request_uri_parameter_supported", + "require_pushed_authorization_requests", + "require_request_uri_registration", + "response_modes_supported", + "response_types_supported", + "revocation_endpoint", + "revocation_endpoint_auth_methods_supported", + "revocation_endpoint_auth_signing_alg_values_supported", + "scopes_supported", + "subject_types_supported", + "tls_client_certificate_bound_access_tokens", + "token_endpoint", + "token_endpoint_auth_methods_supported", + "token_endpoint_auth_signing_alg_values_supported", + "userinfo_encryption_alg_values_supported", + "userinfo_encryption_enc_values_supported", + "userinfo_endpoint", + "userinfo_signing_alg_values_supported", + ]: + assert key in res + +@pytest.mark.asyncio +async def test_a_auth_url(env, oid: KeycloakOpenID): + """Test the auth_url method. + + :param env: Environment fixture + :type env: KeycloakTestEnv + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + res = await oid.a_auth_url(redirect_uri="http://test.test/*") + assert ( + res + == f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}" + + f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code" + + "&redirect_uri=http://test.test/*&scope=email&state=" + ) + +@pytest.mark.asyncio +async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test the token method. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = await oid.a_token(username=username, password=password) + assert token == { + "access_token": mock.ANY, + "expires_in": mock.ANY, + "id_token": mock.ANY, + "not-before-policy": 0, + "refresh_expires_in": mock.ANY, + "refresh_token": mock.ANY, + "scope": mock.ANY, + "session_state": mock.ANY, + "token_type": "Bearer", + } + + # Test with dummy totp + token = await oid.a_token(username=username, password=password, totp="123456") + assert token == { + "access_token": mock.ANY, + "expires_in": mock.ANY, + "id_token": mock.ANY, + "not-before-policy": 0, + "refresh_expires_in": mock.ANY, + "refresh_token": mock.ANY, + "scope": mock.ANY, + "session_state": mock.ANY, + "token_type": "Bearer", + } + + # Test with extra param + token = await oid.a_token(username=username, password=password, extra_param="foo") + assert token == { + "access_token": mock.ANY, + "expires_in": mock.ANY, + "id_token": mock.ANY, + "not-before-policy": 0, + "refresh_expires_in": mock.ANY, + "refresh_token": mock.ANY, + "scope": mock.ANY, + "session_state": mock.ANY, + "token_type": "Bearer", + } + +@pytest.mark.asyncio +async def test_a_exchange_token( + oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin +): + """Test the exchange token method. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + # Verify existing user + oid, username, password = oid_with_credentials + + # Allow impersonation + admin.change_current_realm(oid.realm_name) + admin.assign_client_role( + user_id=admin.get_user_id(username=username), + client_id=admin.get_client_id(client_id="realm-management"), + roles=[ + admin.get_client_role( + client_id=admin.get_client_id(client_id="realm-management"), + role_name="impersonation", + ) + ], + ) + + token = await oid.a_token(username=username, password=password) + assert await oid.a_userinfo(token=token["access_token"]) == { + "email": f"{username}@test.test", + "email_verified": True, + "family_name": "last", + "given_name": "first", + "name": "first last", + "preferred_username": username, + "sub": mock.ANY, + } + + # Exchange token with the new user + new_token = await oid.a_exchange_token( + token=token["access_token"], audience=oid.client_id, subject=username + ) + assert await oid.a_userinfo(token=new_token["access_token"]) == { + "email": f"{username}@test.test", + "email_verified": True, + "family_name": "last", + "given_name": "first", + "name": "first last", + "preferred_username": username, + "sub": mock.ANY, + } + assert token != new_token + +@pytest.mark.asyncio +async def test_a_logout(oid_with_credentials): + """Test logout. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + + token = await oid.a_token(username=username, password=password) + assert await oid.a_userinfo(token=token["access_token"]) != dict() + assert await oid.a_logout(refresh_token=token["refresh_token"]) == dict() + + with pytest.raises(KeycloakAuthenticationError): + await oid.a_userinfo(token=token["access_token"]) + +@pytest.mark.asyncio +async def test_a_certs(oid: KeycloakOpenID): + """Test certificates. + + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + assert len((await oid.a_certs())["keys"]) == 2 + +@pytest.mark.asyncio +async def test_a_public_key(oid: KeycloakOpenID): + """Test public key. + + :param oid: Keycloak OpenID client + :type oid: KeycloakOpenID + """ + assert await oid.a_public_key() is not None + +@pytest.mark.asyncio +async def test_a_entitlement( + oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin +): + """Test entitlement. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + resource_server_id = admin.get_client_authz_resources( + client_id=admin.get_client_id(oid.client_id) + )[0]["_id"] + + with pytest.raises(KeycloakDeprecationError): + await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id) + +@pytest.mark.asyncio +async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test introspect. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = await oid.a_token(username=username, password=password) + + assert (await oid.a_introspect(token=token["access_token"]))["active"] + assert await oid.a_introspect( + token=token["access_token"], rpt="some", token_type_hint="requesting_party_token" + ) == {"active": False} + + with pytest.raises(KeycloakRPTNotFound): + await oid.a_introspect(token=token["access_token"], token_type_hint="requesting_party_token") + +@pytest.mark.asyncio +async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test decode token. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = await oid.a_token(username=username, password=password) + decoded_access_token = await oid.a_decode_token(token=token["access_token"]) + decoded_access_token_2 = await oid.a_decode_token(token=token["access_token"], validate=False) + decoded_refresh_token = await oid.a_decode_token(token=token["refresh_token"], validate=False) + + assert decoded_access_token == decoded_access_token_2 + assert decoded_access_token["preferred_username"] == username, decoded_access_token + assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token + +@pytest.mark.asyncio +async def test_a_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test load authorization config. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + + await oid.a_load_authorization_config(path="tests/data/authz_settings.json") + assert "test-authz-rb-policy" in oid.authorization.policies + assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy) + assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1 + assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role) + assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2 + assert isinstance( + oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission + ) + +@pytest.mark.asyncio +async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test get policies. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + with pytest.raises(KeycloakAuthorizationConfigError): + await oid.a_get_policies(token=token["access_token"]) + + await oid.a_load_authorization_config(path="tests/data/authz_settings.json") + assert await oid.a_get_policies(token=token["access_token"]) is None + + orig_client_id = oid.client_id + oid.client_id = "account" + assert await oid.a_get_policies(token=token["access_token"], method_token_info="decode") == [] + policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") + policy.add_role(role="account/view-profile") + oid.authorization.policies["test"] = policy + assert [ + str(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") + ] == ["Policy: test (role)"] + assert [ + repr(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") + ] == [""] + oid.client_id = orig_client_id + + await oid.a_logout(refresh_token=token["refresh_token"]) + with pytest.raises(KeycloakInvalidTokenError): + await oid.a_get_policies(token=token["access_token"]) + +@pytest.mark.asyncio +async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test get policies. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + with pytest.raises(KeycloakAuthorizationConfigError): + await oid.a_get_permissions(token=token["access_token"]) + + await oid.a_load_authorization_config(path="tests/data/authz_settings.json") + assert await oid.a_get_permissions(token=token["access_token"]) is None + + orig_client_id = oid.client_id + oid.client_id = "account" + assert await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == [] + policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") + policy.add_role(role="account/view-profile") + policy.add_permission( + permission=Permission( + name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS" + ) + ) + oid.authorization.policies["test"] = policy + assert [ + str(x) + for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") + ] == ["Permission: test-perm (resource)"] + assert [ + repr(x) + for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") + ] == [""] + oid.client_id = orig_client_id + + await oid.a_logout(refresh_token=token["refresh_token"]) + with pytest.raises(KeycloakInvalidTokenError): + await oid.a_get_permissions(token=token["access_token"]) + +@pytest.mark.asyncio +async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): + """Test UMA permissions. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1 + assert (await oid.a_uma_permissions(token=token["access_token"]))[0]["rsname"] == "Default Resource" + +@pytest.mark.asyncio +async def test_a_has_uma_access( + oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin +): + """Test has UMA access. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) + == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" + ) + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")) + == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" + ) + + with pytest.raises(KeycloakPostError): + await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist") + + await oid.a_logout(refresh_token=token["refresh_token"]) + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) + == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" + ) + assert ( + str( + await oid.a_has_uma_access( + token=admin.connection.token["access_token"], permissions="Default Resource" + ) + ) + == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" + + "{'Default Resource'})" + ) + +@pytest.mark.asyncio +async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): + """Test device authorization flow. + + :param oid_with_credentials_device: Keycloak OpenID client with pre-configured user + credentials and device authorization flow enabled + :type oid_with_credentials_device: Tuple[KeycloakOpenID, str, str] + """ + oid, _, _ = oid_with_credentials_device + res = await oid.a_device() + assert res == { + "device_code": mock.ANY, + "user_code": mock.ANY, + "verification_uri": f"http://localhost:8081/realms/{oid.realm_name}/device", + "verification_uri_complete": f"http://localhost:8081/realms/{oid.realm_name}/" + + f"device?user_code={res['user_code']}", + "expires_in": 600, + "interval": 5, + } \ No newline at end of file