diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 8a61faa..0f4eb57 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -6146,3 +6146,17 @@ async def test_a_refresh_token(admin: KeycloakAdmin): assert admin.connection.token is not None await admin.a_user_logout(await admin.a_get_user_id(admin.connection.username)) admin.connection.refresh_token() + + +def test_counter_part(): + """Test that each function has its async counter part.""" + admin_methods = [func for func in dir(KeycloakAdmin) if callable(getattr(KeycloakAdmin, func))] + sync_methods = [ + method + for method in admin_methods + if not method.startswith("a_") and not method.startswith("_") + ] + + for method in sync_methods: + async_method = f"a_{method}" + assert (async_method in admin_methods) is True diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index 25df3f1..f05fbd7 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -803,6 +803,51 @@ async def test_a_load_authorization_config( ) +@pytest.mark.asyncio +async def test_a_has_uma_access( + oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin +): + """Test has UMA access. + + :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization + server with client credentials + :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + """ + oid, username, password = oid_with_credentials_authz + token = await oid.a_token(username=username, password=password) + + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) + == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" + ) + assert ( + str( + await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource") + ) + == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" + ) + + with pytest.raises(KeycloakPostError): + await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist") + + await oid.a_logout(refresh_token=token["refresh_token"]) + assert ( + str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) + == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" + ) + assert ( + str( + await oid.a_has_uma_access( + token=admin.connection.token["access_token"], permissions="Default Resource" + ) + ) + == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" + + "{'Default Resource'})" + ) + + @pytest.mark.asyncio async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): """Test get policies. @@ -907,51 +952,6 @@ async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI ] == "Default Resource" -@pytest.mark.asyncio -async def test_a_has_uma_access( - oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin -): - """Test has UMA access. - - :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization - server with client credentials - :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] - :param admin: Keycloak Admin client - :type admin: KeycloakAdmin - """ - oid, username, password = oid_with_credentials_authz - token = await oid.a_token(username=username, password=password) - - assert ( - str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) - == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" - ) - assert ( - str( - await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource") - ) - == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" - ) - - with pytest.raises(KeycloakPostError): - await oid.a_has_uma_access(token=token["access_token"], permissions="Does not exist") - - await oid.a_logout(refresh_token=token["refresh_token"]) - assert ( - str(await oid.a_has_uma_access(token=token["access_token"], permissions="")) - == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=set())" - ) - assert ( - str( - await oid.a_has_uma_access( - token=admin.connection.token["access_token"], permissions="Default Resource" - ) - ) - == "AuthStatus(is_authorized=False, is_logged_in=False, missing_permissions=" - + "{'Default Resource'})" - ) - - @pytest.mark.asyncio async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): """Test device authorization flow. @@ -971,3 +971,19 @@ async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, "expires_in": 600, "interval": 5, } + + +def test_counter_part(): + """Test that each function has its async counter part.""" + openid_methods = [ + func for func in dir(KeycloakOpenID) if callable(getattr(KeycloakOpenID, func)) + ] + sync_methods = [ + method + for method in openid_methods + if not method.startswith("a_") and not method.startswith("_") + ] + + for method in sync_methods: + async_method = f"a_{method}" + assert (async_method in openid_methods) is True