|
|
@ -40,13 +40,14 @@ def test_keycloak_openid_init(env): |
|
|
|
assert isinstance(oid.authorization, Authorization) |
|
|
|
|
|
|
|
|
|
|
|
def test_well_known(oid: KeycloakOpenID): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_well_known(oid: KeycloakOpenID): |
|
|
|
"""Test the well_known method. |
|
|
|
|
|
|
|
:param oid: Keycloak OpenID client |
|
|
|
:type oid: KeycloakOpenID |
|
|
|
""" |
|
|
|
res = oid.well_known() |
|
|
|
res = await oid.well_known() |
|
|
|
assert res is not None |
|
|
|
assert res != dict() |
|
|
|
for key in [ |
|
|
@ -107,7 +108,8 @@ def test_well_known(oid: KeycloakOpenID): |
|
|
|
assert key in res |
|
|
|
|
|
|
|
|
|
|
|
def test_auth_url(env, oid: KeycloakOpenID): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_auth_url(env, oid: KeycloakOpenID): |
|
|
|
"""Test the auth_url method. |
|
|
|
|
|
|
|
:param env: Environment fixture |
|
|
@ -115,7 +117,7 @@ def test_auth_url(env, oid: KeycloakOpenID): |
|
|
|
:param oid: Keycloak OpenID client |
|
|
|
:type oid: KeycloakOpenID |
|
|
|
""" |
|
|
|
res = oid.auth_url(redirect_uri="http://test.test/*") |
|
|
|
res = await oid.auth_url(redirect_uri="http://test.test/*") |
|
|
|
assert ( |
|
|
|
res |
|
|
|
== f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}/realms/{oid.realm_name}" |
|
|
@ -124,14 +126,15 @@ def test_auth_url(env, oid: KeycloakOpenID): |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test the token method. |
|
|
|
|
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
assert token == { |
|
|
|
"access_token": mock.ANY, |
|
|
|
"expires_in": 300, |
|
|
@ -145,7 +148,7 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
} |
|
|
|
|
|
|
|
# Test with dummy totp |
|
|
|
token = oid.token(username=username, password=password, totp="123456") |
|
|
|
token = await oid.token(username=username, password=password, totp="123456") |
|
|
|
assert token == { |
|
|
|
"access_token": mock.ANY, |
|
|
|
"expires_in": 300, |
|
|
@ -159,7 +162,7 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
} |
|
|
|
|
|
|
|
# Test with extra param |
|
|
|
token = oid.token(username=username, password=password, extra_param="foo") |
|
|
|
token = await oid.token(username=username, password=password, extra_param="foo") |
|
|
|
assert token == { |
|
|
|
"access_token": mock.ANY, |
|
|
|
"expires_in": 300, |
|
|
@ -173,7 +176,8 @@ def test_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def test_exchange_token( |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_exchange_token( |
|
|
|
oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|
|
|
): |
|
|
|
"""Test the exchange token method. |
|
|
@ -188,19 +192,23 @@ def test_exchange_token( |
|
|
|
|
|
|
|
# Allow impersonation |
|
|
|
admin.realm_name = oid.realm_name |
|
|
|
admin.assign_client_role( |
|
|
|
user_id=admin.get_user_id(username=username), |
|
|
|
client_id=admin.get_client_id(client_id="realm-management"), |
|
|
|
roles=[ |
|
|
|
admin.get_client_role( |
|
|
|
client_id=admin.get_client_id(client_id="realm-management"), |
|
|
|
role_name="impersonation", |
|
|
|
) |
|
|
|
], |
|
|
|
user_id = await admin.get_user_id(username=username) |
|
|
|
client_id = await admin.get_client_id(client_id="realm-management") |
|
|
|
roles = [ |
|
|
|
await admin.get_client_role( |
|
|
|
client_id=client_id, |
|
|
|
role_name="impersonation", |
|
|
|
) |
|
|
|
] |
|
|
|
print(roles) |
|
|
|
await admin.assign_client_role( |
|
|
|
user_id=user_id, |
|
|
|
client_id=client_id, |
|
|
|
roles=roles |
|
|
|
) |
|
|
|
|
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
assert oid.userinfo(token=token["access_token"]) == { |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
assert await oid.userinfo(token=token["access_token"]) == { |
|
|
|
"email": f"{username}@test.test", |
|
|
|
"email_verified": False, |
|
|
|
"preferred_username": username, |
|
|
@ -208,13 +216,13 @@ def test_exchange_token( |
|
|
|
} |
|
|
|
|
|
|
|
# Exchange token with the new user |
|
|
|
new_token = oid.exchange_token( |
|
|
|
new_token = await oid.exchange_token( |
|
|
|
token=token["access_token"], |
|
|
|
client_id=oid.client_id, |
|
|
|
audience=oid.client_id, |
|
|
|
subject=username, |
|
|
|
) |
|
|
|
assert oid.userinfo(token=new_token["access_token"]) == { |
|
|
|
assert await oid.userinfo(token=new_token["access_token"]) == { |
|
|
|
"email": f"{username}@test.test", |
|
|
|
"email_verified": False, |
|
|
|
"preferred_username": username, |
|
|
@ -223,7 +231,8 @@ def test_exchange_token( |
|
|
|
assert token != new_token |
|
|
|
|
|
|
|
|
|
|
|
def test_logout(oid_with_credentials): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_logout(oid_with_credentials): |
|
|
|
"""Test logout. |
|
|
|
|
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
@ -231,33 +240,37 @@ def test_logout(oid_with_credentials): |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
|
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
assert oid.userinfo(token=token["access_token"]) != dict() |
|
|
|
assert oid.logout(refresh_token=token["refresh_token"]) == dict() |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
assert await oid.userinfo(token=token["access_token"]) != dict() |
|
|
|
assert await oid.logout(refresh_token=token["refresh_token"]) == dict() |
|
|
|
|
|
|
|
with pytest.raises(KeycloakAuthenticationError): |
|
|
|
oid.userinfo(token=token["access_token"]) |
|
|
|
await oid.userinfo(token=token["access_token"]) |
|
|
|
|
|
|
|
|
|
|
|
def test_certs(oid: KeycloakOpenID): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_certs(oid: KeycloakOpenID): |
|
|
|
"""Test certificates. |
|
|
|
|
|
|
|
:param oid: Keycloak OpenID client |
|
|
|
:type oid: KeycloakOpenID |
|
|
|
""" |
|
|
|
assert len(oid.certs()["keys"]) == 2 |
|
|
|
certs = await oid.certs() |
|
|
|
assert len(certs["keys"]) == 2 |
|
|
|
|
|
|
|
|
|
|
|
def test_public_key(oid: KeycloakOpenID): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_public_key(oid: KeycloakOpenID): |
|
|
|
"""Test public key. |
|
|
|
|
|
|
|
:param oid: Keycloak OpenID client |
|
|
|
:type oid: KeycloakOpenID |
|
|
|
""" |
|
|
|
assert oid.public_key() is not None |
|
|
|
assert await oid.public_key() is not None |
|
|
|
|
|
|
|
|
|
|
|
def test_entitlement( |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_entitlement( |
|
|
|
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|
|
|
): |
|
|
|
"""Test entitlement. |
|
|
@ -269,53 +282,61 @@ def test_entitlement( |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials_authz |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
resource_server_id = admin.get_client_authz_resources( |
|
|
|
client_id=admin.get_client_id(oid.client_id) |
|
|
|
)[0]["_id"] |
|
|
|
|
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
client_id = await admin.get_client_id(oid.client_id) |
|
|
|
with pytest.raises(KeycloakDeprecationError): |
|
|
|
resource_servers = await admin.get_client_authz_resources( |
|
|
|
client_id=client_id |
|
|
|
) |
|
|
|
resource_server_id = resource_servers[0]["_id"] |
|
|
|
oid.entitlement(token=token["access_token"], resource_server_id=resource_server_id) |
|
|
|
|
|
|
|
|
|
|
|
def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test introspect. |
|
|
|
|
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
|
|
|
|
assert oid.introspect(token=token["access_token"])["active"] |
|
|
|
assert oid.introspect( |
|
|
|
introspect = await oid.introspect(token=token["access_token"]) |
|
|
|
assert introspect["active"] |
|
|
|
|
|
|
|
introspect = await oid.introspect( |
|
|
|
token=token["access_token"], rpt="some", token_type_hint="requesting_party_token" |
|
|
|
) == {"active": False} |
|
|
|
) |
|
|
|
assert introspect == {"active": False} |
|
|
|
|
|
|
|
with pytest.raises(KeycloakRPTNotFound): |
|
|
|
oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token") |
|
|
|
await oid.introspect(token=token["access_token"], token_type_hint="requesting_party_token") |
|
|
|
|
|
|
|
|
|
|
|
def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test decode token. |
|
|
|
|
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
|
|
|
|
decoded_token = await oid.decode_token( |
|
|
|
token=token["access_token"], |
|
|
|
key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----", |
|
|
|
options={"verify_aud": False}, |
|
|
|
) |
|
|
|
assert ( |
|
|
|
oid.decode_token( |
|
|
|
token=token["access_token"], |
|
|
|
key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----", |
|
|
|
options={"verify_aud": False}, |
|
|
|
)["preferred_username"] |
|
|
|
decoded_token["preferred_username"] |
|
|
|
== username |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test load authorization config. |
|
|
|
|
|
|
|
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization |
|
|
@ -335,7 +356,8 @@ def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpe |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test get policies. |
|
|
|
|
|
|
|
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization |
|
|
@ -343,37 +365,38 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str |
|
|
|
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials_authz |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
|
|
|
|
with pytest.raises(KeycloakAuthorizationConfigError): |
|
|
|
oid.get_policies(token=token["access_token"]) |
|
|
|
await oid.get_policies(token=token["access_token"]) |
|
|
|
|
|
|
|
oid.load_authorization_config(path="tests/data/authz_settings.json") |
|
|
|
assert oid.get_policies(token=token["access_token"]) is None |
|
|
|
assert await oid.get_policies(token=token["access_token"]) is None |
|
|
|
|
|
|
|
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----" |
|
|
|
orig_client_id = oid.client_id |
|
|
|
oid.client_id = "account" |
|
|
|
assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == [] |
|
|
|
assert await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == [] |
|
|
|
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") |
|
|
|
policy.add_role(role="account/view-profile") |
|
|
|
oid.authorization.policies["test"] = policy |
|
|
|
assert [ |
|
|
|
str(x) |
|
|
|
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) |
|
|
|
for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) |
|
|
|
] == ["Policy: test (role)"] |
|
|
|
assert [ |
|
|
|
repr(x) |
|
|
|
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) |
|
|
|
for x in await oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) |
|
|
|
] == ["<Policy: test (role)>"] |
|
|
|
oid.client_id = orig_client_id |
|
|
|
|
|
|
|
oid.logout(refresh_token=token["refresh_token"]) |
|
|
|
with pytest.raises(KeycloakInvalidTokenError): |
|
|
|
oid.get_policies(token=token["access_token"]) |
|
|
|
await oid.get_policies(token=token["access_token"]) |
|
|
|
|
|
|
|
|
|
|
|
def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test get policies. |
|
|
|
|
|
|
|
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization |
|
|
@ -381,19 +404,19 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, |
|
|
|
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials_authz |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
|
|
|
|
with pytest.raises(KeycloakAuthorizationConfigError): |
|
|
|
oid.get_permissions(token=token["access_token"]) |
|
|
|
await oid.get_permissions(token=token["access_token"]) |
|
|
|
|
|
|
|
oid.load_authorization_config(path="tests/data/authz_settings.json") |
|
|
|
assert oid.get_permissions(token=token["access_token"]) is None |
|
|
|
assert await oid.get_permissions(token=token["access_token"]) is None |
|
|
|
|
|
|
|
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----" |
|
|
|
orig_client_id = oid.client_id |
|
|
|
oid.client_id = "account" |
|
|
|
assert ( |
|
|
|
oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == [] |
|
|
|
await oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == [] |
|
|
|
) |
|
|
|
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") |
|
|
|
policy.add_role(role="account/view-profile") |
|
|
@ -405,24 +428,25 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, |
|
|
|
oid.authorization.policies["test"] = policy |
|
|
|
assert [ |
|
|
|
str(x) |
|
|
|
for x in oid.get_permissions( |
|
|
|
for x in await oid.get_permissions( |
|
|
|
token=token["access_token"], method_token_info="decode", key=key |
|
|
|
) |
|
|
|
] == ["Permission: test-perm (resource)"] |
|
|
|
assert [ |
|
|
|
repr(x) |
|
|
|
for x in oid.get_permissions( |
|
|
|
for x in await oid.get_permissions( |
|
|
|
token=token["access_token"], method_token_info="decode", key=key |
|
|
|
) |
|
|
|
] == ["<Permission: test-perm (resource)>"] |
|
|
|
oid.client_id = orig_client_id |
|
|
|
|
|
|
|
oid.logout(refresh_token=token["refresh_token"]) |
|
|
|
await oid.logout(refresh_token=token["refresh_token"]) |
|
|
|
with pytest.raises(KeycloakInvalidTokenError): |
|
|
|
oid.get_permissions(token=token["access_token"]) |
|
|
|
await oid.get_permissions(token=token["access_token"]) |
|
|
|
|
|
|
|
|
|
|
|
def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test UMA permissions. |
|
|
|
|
|
|
|
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization |
|
|
@ -430,13 +454,15 @@ def test_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, |
|
|
|
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials_authz |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
|
|
|
|
assert len(oid.uma_permissions(token=token["access_token"])) == 1 |
|
|
|
assert oid.uma_permissions(token=token["access_token"])[0]["rsname"] == "Default Resource" |
|
|
|
assert len(await oid.uma_permissions(token=token["access_token"])) == 1 |
|
|
|
uma_permissions = await oid.uma_permissions(token=token["access_token"]) |
|
|
|
assert uma_permissions[0]["rsname"] == "Default Resource" |
|
|
|
|
|
|
|
|
|
|
|
def test_has_uma_access( |
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_has_uma_access( |
|
|
|
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|
|
|
): |
|
|
|
"""Test has UMA access. |
|
|
@ -448,27 +474,27 @@ def test_has_uma_access( |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials_authz |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
token = await oid.token(username=username, password=password) |
|
|
|
|
|
|
|
assert ( |
|
|
|
str(oid.has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
str(await oid.has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|
|
|
) |
|
|
|
assert ( |
|
|
|
str(oid.has_uma_access(token=token["access_token"], permissions="Default Resource")) |
|
|
|
str(await oid.has_uma_access(token=token["access_token"], permissions="Default Resource")) |
|
|
|
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|
|
|
) |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPostError): |
|
|
|
oid.has_uma_access(token=token["access_token"], permissions="Does not exist") |
|
|
|
await oid.has_uma_access(token=token["access_token"], permissions="Does not exist") |
|
|
|
|
|
|
|
oid.logout(refresh_token=token["refresh_token"]) |
|
|
|
await oid.logout(refresh_token=token["refresh_token"]) |
|
|
|
assert ( |
|
|
|
str(oid.has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
str(await oid.has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" |
|
|
|
) |
|
|
|
assert ( |
|
|
|
str(oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource")) |
|
|
|
str(await oid.has_uma_access(token=admin.token["access_token"], permissions="Default Resource")) |
|
|
|
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" |
|
|
|
+ "{'Default Resource'})" |
|
|
|
) |