|
|
@ -803,6 +803,51 @@ async def test_a_load_authorization_config( |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_has_uma_access( |
|
|
|
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|
|
|
): |
|
|
|
"""Test has UMA access. |
|
|
|
|
|
|
|
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization |
|
|
|
server with client credentials |
|
|
|
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] |
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials_authz |
|
|
|
token = await oid.a_token(username=username, password=password) |
|
|
|
|
|
|
|
assert ( |
|
|
|
str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|
|
|
) |
|
|
|
assert ( |
|
|
|
str( |
|
|
|
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource") |
|
|
|
) |
|
|
|
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|
|
|
) |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPostError): |
|
|
|
await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist") |
|
|
|
|
|
|
|
await oid.a_logout(refresh_token=token["refresh_token"]) |
|
|
|
assert ( |
|
|
|
str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" |
|
|
|
) |
|
|
|
assert ( |
|
|
|
str( |
|
|
|
await oid.a_has_uma_access( |
|
|
|
token=admin.connection.token["access_token"], permissions="Default Resource" |
|
|
|
) |
|
|
|
) |
|
|
|
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" |
|
|
|
+ "{'Default Resource'})" |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test get policies. |
|
|
@ -907,51 +952,6 @@ async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI |
|
|
|
] == "Default Resource" |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_has_uma_access( |
|
|
|
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin |
|
|
|
): |
|
|
|
"""Test has UMA access. |
|
|
|
|
|
|
|
:param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization |
|
|
|
server with client credentials |
|
|
|
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] |
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials_authz |
|
|
|
token = await oid.a_token(username=username, password=password) |
|
|
|
|
|
|
|
assert ( |
|
|
|
str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|
|
|
) |
|
|
|
assert ( |
|
|
|
str( |
|
|
|
await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource") |
|
|
|
) |
|
|
|
== "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" |
|
|
|
) |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPostError): |
|
|
|
await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist") |
|
|
|
|
|
|
|
await oid.a_logout(refresh_token=token["refresh_token"]) |
|
|
|
assert ( |
|
|
|
str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) |
|
|
|
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" |
|
|
|
) |
|
|
|
assert ( |
|
|
|
str( |
|
|
|
await oid.a_has_uma_access( |
|
|
|
token=admin.connection.token["access_token"], permissions="Default Resource" |
|
|
|
) |
|
|
|
) |
|
|
|
== "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" |
|
|
|
+ "{'Default Resource'})" |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test device authorization flow. |
|
|
@ -971,3 +971,19 @@ async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, |
|
|
|
"expires_in": 600, |
|
|
|
"interval": 5, |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def test_counter_part(): |
|
|
|
"""Test that each function has its async counter part.""" |
|
|
|
openid_methods = [ |
|
|
|
func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func)) |
|
|
|
] |
|
|
|
sync_methods = [ |
|
|
|
method |
|
|
|
for method in openid_methods |
|
|
|
if not method.startswith("a_") and not method.startswith("_") |
|
|
|
] |
|
|
|
|
|
|
|
for method in sync_methods: |
|
|
|
async_method = f"a_{method}" |
|
|
|
assert (async_method in openid_methods) is True |