Browse Source
Merge pull request #354 from marcospereirampj/test/openid
Merge pull request #354 from marcospereirampj/test/openid
Test/openidpull/357/head v1.9.0
Richard Nemeth
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1415 additions and 616 deletions
-
3.github/workflows/lint.yaml
-
1.gitignore
-
4MANIFEST.in
-
3docs/source/conf.py
-
341poetry.lock
-
3pyproject.toml
-
2src/keycloak/__init__.py
-
10src/keycloak/authorization/__init__.py
-
26src/keycloak/authorization/permission.py
-
40src/keycloak/authorization/policy.py
-
9src/keycloak/authorization/role.py
-
15src/keycloak/connection.py
-
42src/keycloak/exceptions.py
-
621src/keycloak/keycloak_admin.py
-
111src/keycloak/keycloak_openid.py
-
42src/keycloak/uma_permissions.py
-
2src/keycloak/urls_patterns.py
-
5test_keycloak_init.sh
-
1tests/__init__.py
-
216tests/conftest.py
-
45tests/data/authz_settings.json
-
31tests/test_keycloak_admin.py
-
393tests/test_keycloak_openid.py
-
14tests/test_license.py
-
28tests/test_uma_permissions.py
-
3tests/test_urls_patterns.py
-
16tox.ini
@ -1,4 +0,0 @@ |
|||
include LICENSE |
|||
include requirements.txt |
|||
include dev-requirements.txt |
|||
include docs-requirements.txt |
621
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1 @@ |
|||
"""Tests module.""" |
@ -0,0 +1,45 @@ |
|||
{ |
|||
"allowRemoteResourceManagement": true, |
|||
"policyEnforcementMode": "ENFORCING", |
|||
"policies": [ |
|||
{ |
|||
"name": "Default Policy", |
|||
"type": "js", |
|||
"logic": "POSITIVE", |
|||
"decisionStrategy": "AFFIRMATIVE", |
|||
"config": { |
|||
"code": "// by default, grants any permission associated with this policy\n$evaluation.grant();\n" |
|||
} |
|||
}, |
|||
{ |
|||
"name": "test-authz-rb-policy", |
|||
"type": "role", |
|||
"logic": "POSITIVE", |
|||
"decisionStrategy": "UNANIMOUS", |
|||
"config": { |
|||
"roles": "[{\"id\":\"offline_access\",\"required\":false}]" |
|||
} |
|||
}, |
|||
{ |
|||
"name": "Default Permission", |
|||
"type": "resource", |
|||
"logic": "POSITIVE", |
|||
"decisionStrategy": "UNANIMOUS", |
|||
"config": { |
|||
"applyPolicies": "[\"test-authz-rb-policy\"]" |
|||
} |
|||
}, |
|||
{ |
|||
"name": "Test scope", |
|||
"type": "scope", |
|||
"logic": "POSITIVE", |
|||
"decisionStrategy": "UNANIMOUS", |
|||
"config": { |
|||
"scopes": "[]", |
|||
"applyPolicies": "[\"test-authz-rb-policy\"]" |
|||
} |
|||
} |
|||
], |
|||
"scopes": [], |
|||
"decisionStrategy": "UNANIMOUS" |
|||
} |
@ -0,0 +1,393 @@ |
|||
"""Test module for KeycloakOpenID.""" |
|||
from typing import Tuple |
|||
from unittest import mock |
|||
|
|||
import pytest |
|||
|
|||
from keycloak.authorization import Authorization |
|||
from keycloak.authorization.permission import Permission |
|||
from keycloak.authorization.policy import Policy |
|||
from keycloak.authorization.role import Role |
|||
from keycloak.connection import ConnectionManager |
|||
from keycloak.exceptions import ( |
|||
KeycloakAuthenticationError, |
|||
KeycloakAuthorizationConfigError, |
|||
KeycloakDeprecationError, |
|||
KeycloakInvalidTokenError, |
|||
KeycloakPostError, |
|||
KeycloakRPTNotFound, |
|||
) |
|||
from keycloak.keycloak_admin import KeycloakAdmin |
|||
from keycloak.keycloak_openid import KeycloakOpenID |
|||
|
|||
|
|||
def test_keycloak_openid_init(env): |
|||
"""Test KeycloakOpenId's init method.""" |
|||
oid = KeycloakOpenID( |
|||
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", |
|||
realm_name="master", |
|||
client_id="admin-cli", |
|||
) |
|||
|
|||
assert oid.client_id == "admin-cli" |
|||
assert oid.client_secret_key is None |
|||
assert oid.realm_name == "master" |
|||
assert isinstance(oid.connection, ConnectionManager) |
|||
assert isinstance(oid.authorization, Authorization) |
|||
|
|||
|
|||
def test_well_known(oid: KeycloakOpenID): |
|||
"""Test the well_known method.""" |
|||
res = oid.well_known() |
|||
assert res is not None |
|||
assert res != dict() |
|||
for key in [ |
|||
"acr_values_supported", |
|||
"authorization_encryption_alg_values_supported", |
|||
"authorization_encryption_enc_values_supported", |
|||
"authorization_endpoint", |
|||
"authorization_signing_alg_values_supported", |
|||
"backchannel_authentication_endpoint", |
|||
"backchannel_authentication_request_signing_alg_values_supported", |
|||
"backchannel_logout_session_supported", |
|||
"backchannel_logout_supported", |
|||
"backchannel_token_delivery_modes_supported", |
|||
"check_session_iframe", |
|||
"claim_types_supported", |
|||
"claims_parameter_supported", |
|||
"claims_supported", |
|||
"code_challenge_methods_supported", |
|||
"device_authorization_endpoint", |
|||
"end_session_endpoint", |
|||
"frontchannel_logout_session_supported", |
|||
"frontchannel_logout_supported", |
|||
"grant_types_supported", |
|||
"id_token_encryption_alg_values_supported", |
|||
"id_token_encryption_enc_values_supported", |
|||
"id_token_signing_alg_values_supported", |
|||
"introspection_endpoint", |
|||
"introspection_endpoint_auth_methods_supported", |
|||
"introspection_endpoint_auth_signing_alg_values_supported", |
|||
"issuer", |
|||
"jwks_uri", |
|||
"mtls_endpoint_aliases", |
|||
"pushed_authorization_request_endpoint", |
|||
"registration_endpoint", |
|||
"request_object_encryption_alg_values_supported", |
|||
"request_object_encryption_enc_values_supported", |
|||
"request_object_signing_alg_values_supported", |
|||
"request_parameter_supported", |
|||
"request_uri_parameter_supported", |
|||
"require_pushed_authorization_requests", |
|||
"require_request_uri_registration", |
|||
"response_modes_supported", |
|||
"response_types_supported", |
|||
"revocation_endpoint", |
|||
"revocation_endpoint_auth_methods_supported", |
|||
"revocation_endpoint_auth_signing_alg_values_supported", |
|||
"scopes_supported", |
|||
"subject_types_supported", |
|||
"tls_client_certificate_bound_access_tokens", |
|||
"token_endpoint", |
|||
"token_endpoint_auth_methods_supported", |
|||
"token_endpoint_auth_signing_alg_values_supported", |
|||
"userinfo_encryption_alg_values_supported", |
|||
"userinfo_encryption_enc_values_supported", |
|||
"userinfo_endpoint", |
|||
"userinfo_signing_alg_values_supported", |
|||
]: |
|||
assert key in res |
|||
|
|||
|
|||
def test_auth_url(env, oid: KeycloakOpenID): |
|||
"""Test the auth_url method.""" |
|||
res = oid.auth_url(redirect_uri="http://test.test/*") |
|||
assert ( |
|||
res |
|||
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}" |
|||
+ f"/protocol/openid-connect/auth?client_id={oid.client_id}&response_type=code" |
|||
+ "&redirect_uri=http://test.test/*&scope=email&state= " |
|||
) |
|||
|
|||
|
|||
def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|||
"""Test the token method.""" |
|||
oid, username, password = oid_with_credentials |
|||
token = oid.token(username=username, password=password) |
|||
assert token == { |
|||
"access_token": mock.ANY, |
|||
"expires_in": 300, |
|||
"not-before-policy": 0, |
|||
"refresh_expires_in": 1800, |
|||
"refresh_token": mock.ANY, |
|||
"scope": mock.ANY, |
|||
"session_state": mock.ANY, |
|||
"token_type": "Bearer", |
|||
} |
|||
|
|||
# Test with dummy totp |
|||
token = oid.token(username=username, password=password, totp="123456") |
|||
assert token == { |
|||
"access_token": mock.ANY, |
|||
"expires_in": 300, |
|||
"not-before-policy": 0, |
|||
"refresh_expires_in": 1800, |
|||
"refresh_token": mock.ANY, |
|||
"scope": mock.ANY, |
|||
"session_state": mock.ANY, |
|||
"token_type": "Bearer", |
|||
} |
|||
|
|||
# Test with extra param |
|||
token = oid.token(username=username, password=password, extra_param="foo") |
|||
assert token == { |
|||
"access_token": mock.ANY, |
|||
"expires_in": 300, |
|||
"not-before-policy": 0, |
|||
"refresh_expires_in": 1800, |
|||
"refresh_token": mock.ANY, |
|||
"scope": mock.ANY, |
|||
"session_state": mock.ANY, |
|||
"token_type": "Bearer", |
|||
} |
|||
|
|||
|
|||
def test_exchange_token( |
|||
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|||
): |
|||
"""Test the exchange token method.""" |
|||
# Verify existing user |
|||
oid, username, password = oid_with_credentials |
|||
|
|||
# Allow impersonation |
|||
admin.realm_name = oid.realm_name |
|||
admin.assign_client_role( |
|||
user_id=admin.get_user_id(username=username), |
|||
client_id=admin.get_client_id(client_name="realm-management"), |
|||
roles=[ |
|||
admin.get_client_role( |
|||
client_id=admin.get_client_id(client_name="realm-management"), |
|||
role_name="impersonation", |
|||
) |
|||
], |
|||
) |
|||
|
|||
token = oid.token(username=username, password=password) |
|||
assert oid.userinfo(token=token["access_token"]) == { |
|||
"email": f"{username}@test.test", |
|||
"email_verified": False, |
|||
"preferred_username": username, |
|||
"sub": mock.ANY, |
|||
} |
|||
|
|||
# Exchange token with the new user |
|||
new_token = oid.exchange_token( |
|||
token=token["access_token"], |
|||
client_id=oid.client_id, |
|||
audience=oid.client_id, |
|||
subject=username, |
|||
) |
|||
assert oid.userinfo(token=new_token["access_token"]) == { |
|||
"email": f"{username}@test.test", |
|||
"email_verified": False, |
|||
"preferred_username": username, |
|||
"sub": mock.ANY, |
|||
} |
|||
assert token != new_token |
|||
|
|||
|
|||
def test_logout(oid_with_credentials): |
|||
"""Test logout.""" |
|||
oid, username, password = oid_with_credentials |
|||
|
|||
token = oid.token(username=username, password=password) |
|||
assert oid.userinfo(token=token["access_token"]) != dict() |
|||
assert oid.logout(refresh_token=token["refresh_token"]) == dict() |
|||
|
|||
with pytest.raises(KeycloakAuthenticationError): |
|||
oid.userinfo(token=token["access_token"]) |
|||
|
|||
|
|||
def test_certs(oid: KeycloakOpenID): |
|||
"""Test certificates.""" |
|||
assert len(oid.certs()["keys"]) == 2 |
|||
|
|||
|
|||
def test_public_key(oid: KeycloakOpenID): |
|||
"""Test public key.""" |
|||
assert oid.public_key() is not None |
|||
|
|||
|
|||
def test_entitlement( |
|||
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|||
): |
|||
"""Test entitlement.""" |
|||
oid, username, password = oid_with_credentials_authz |
|||
token = oid.token(username=username, password=password) |
|||
resource_server_id = admin.get_client_authz_resources( |
|||
client_id=admin.get_client_id(oid.client_id) |
|||
)[0]["_id"] |
|||
|
|||
with pytest.raises(KeycloakDeprecationError): |
|||
oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id) |
|||
|
|||
|
|||
def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|||
"""Test introspect.""" |
|||
oid, username, password = oid_with_credentials |
|||
token = oid.token(username=username, password=password) |
|||
|
|||
assert oid.introspect(token=token["access_token"])["active"] |
|||
assert oid.introspect( |
|||
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token" |
|||
) == {"active": False} |
|||
|
|||
with pytest.raises(KeycloakRPTNotFound): |
|||
oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token") |
|||
|
|||
|
|||
def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|||
"""Test decode token.""" |
|||
oid, username, password = oid_with_credentials |
|||
token = oid.token(username=username, password=password) |
|||
|
|||
assert ( |
|||
oid.decode_token( |
|||
token=token["access_token"], |
|||
key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----", |
|||
options={"verify_aud": False}, |
|||
)["preferred_username"] |
|||
== username |
|||
) |
|||
|
|||
|
|||
def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|||
"""Test load authorization config.""" |
|||
oid, username, password = oid_with_credentials_authz |
|||
|
|||
oid.load_authorization_config(path="tests/data/authz_settings.json") |
|||
assert "test-authz-rb-policy" in oid.authorization.policies |
|||
assert isinstance(oid.authorization.policies["test-authz-rb-policy"], Policy) |
|||
assert len(oid.authorization.policies["test-authz-rb-policy"].roles) == 1 |
|||
assert isinstance(oid.authorization.policies["test-authz-rb-policy"].roles[0], Role) |
|||
assert len(oid.authorization.policies["test-authz-rb-policy"].permissions) == 2 |
|||
assert isinstance( |
|||
oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission |
|||
) |
|||
|
|||
|
|||
def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|||
"""Test get policies.""" |
|||
oid, username, password = oid_with_credentials_authz |
|||
token = oid.token(username=username, password=password) |
|||
|
|||
with pytest.raises(KeycloakAuthorizationConfigError): |
|||
oid.get_policies(token=token["access_token"]) |
|||
|
|||
oid.load_authorization_config(path="tests/data/authz_settings.json") |
|||
assert oid.get_policies(token=token["access_token"]) is None |
|||
|
|||
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----" |
|||
orig_client_id = oid.client_id |
|||
oid.client_id = "account" |
|||
assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == [] |
|||
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") |
|||
policy.add_role(role="account/view-profile") |
|||
oid.authorization.policies["test"] = policy |
|||
assert [ |
|||
str(x) |
|||
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) |
|||
] == ["Policy: test (role)"] |
|||
assert [ |
|||
repr(x) |
|||
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) |
|||
] == ["<Policy: test (role)>"] |
|||
oid.client_id = orig_client_id |
|||
|
|||
oid.logout(refresh_token=token["refresh_token"]) |
|||
with pytest.raises(KeycloakInvalidTokenError): |
|||
oid.get_policies(token=token["access_token"]) |
|||
|
|||
|
|||
def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|||
"""Test get policies.""" |
|||
oid, username, password = oid_with_credentials_authz |
|||
token = oid.token(username=username, password=password) |
|||
|
|||
with pytest.raises(KeycloakAuthorizationConfigError): |
|||
oid.get_permissions(token=token["access_token"]) |
|||
|
|||
oid.load_authorization_config(path="tests/data/authz_settings.json") |
|||
assert oid.get_permissions(token=token["access_token"]) is None |
|||
|
|||
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----" |
|||
orig_client_id = oid.client_id |
|||
oid.client_id = "account" |
|||
assert ( |
|||
oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == [] |
|||
) |
|||
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") |
|||
policy.add_role(role="account/view-profile") |
|||
policy.add_permission( |
|||
permission=Permission( |
|||
name="test-perm", type="resource", logic="POSITIVE", decision_strategy="UNANIMOUS" |
|||
) |
|||
) |
|||
oid.authorization.policies["test"] = policy |
|||
assert [ |
|||
str(x) |
|||
for x in oid.get_permissions( |
|||
token=token["access_token"], method_token_info="decode", key=key |
|||
) |
|||
] == ["Permission: test-perm (resource)"] |
|||
assert [ |
|||
repr(x) |
|||
for x in oid.get_permissions( |
|||
token=token["access_token"], method_token_info="decode", key=key |
|||
) |
|||
] == ["<Permission: test-perm (resource)>"] |
|||
oid.client_id = orig_client_id |
|||
|
|||
oid.logout(refresh_token=token["refresh_token"]) |
|||
with pytest.raises(KeycloakInvalidTokenError): |
|||
oid.get_permissions(token=token["access_token"]) |
|||
|
|||
|
|||
def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|||
"""Test UMA permissions.""" |
|||
oid, username, password = oid_with_credentials_authz |
|||
token = oid.token(username=username, password=password) |
|||
|
|||
assert len(oid.uma_permissions(token=token["access_token"])) == 1 |
|||
assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource" |
|||
|
|||
|
|||
def test_has_uma_access( |
|||
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|||
): |
|||
"""Test has UMA access.""" |
|||
oid, username, password = oid_with_credentials_authz |
|||
token = oid.token(username=username, password=password) |
|||
|
|||
assert ( |
|||
str(oid.has_uma_access(token=token["access_token"], permissions="")) |
|||
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|||
) |
|||
assert ( |
|||
str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource")) |
|||
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|||
) |
|||
|
|||
with pytest.raises(KeycloakPostError): |
|||
oid.has_uma_access(token=token["access_token"], permissions="Does not exist") |
|||
|
|||
oid.logout(refresh_token=token["refresh_token"]) |
|||
assert ( |
|||
str(oid.has_uma_access(token=token["access_token"], permissions="")) |
|||
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" |
|||
) |
|||
assert ( |
|||
str(oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource")) |
|||
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" |
|||
+ "{'Default Resource'})" |
|||
) |
@ -0,0 +1,14 @@ |
|||
"""Tests for license.""" |
|||
import os |
|||
|
|||
|
|||
def test_license_present(): |
|||
"""Test that the MIT license is present in the header of each module file.""" |
|||
for path, _, files in os.walk("src/keycloak"): |
|||
for _file in files: |
|||
if _file.endswith(".py"): |
|||
with open(os.path.join(path, _file), "r") as fp: |
|||
content = fp.read() |
|||
assert content.startswith( |
|||
"# -*- coding: utf-8 -*-\n#\n# The MIT License (MIT)\n#\n#" |
|||
) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue