diff --git a/src/keycloak/connection.py b/src/keycloak/connection.py index 9f1f010..acb4ce3 100644 --- a/src/keycloak/connection.py +++ b/src/keycloak/connection.py @@ -308,7 +308,7 @@ class ConnectionManager(object): urljoin(self.base_url, path), params=kwargs, headers=self.headers, - timeout=self.timeout + timeout=self.timeout, ) except Exception as e: raise KeycloakConnectionError("Can't connect to server (%s)" % e) @@ -332,7 +332,7 @@ class ConnectionManager(object): params=kwargs, data=data, headers=self.headers, - timeout=self.timeout + timeout=self.timeout, ) except Exception as e: raise KeycloakConnectionError("Can't connect to server (%s)" % e) diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 6103aa2..68355cd 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -382,7 +382,7 @@ class KeycloakAdmin: if "first" in query or "max" in query: return self.__fetch_paginated(url, query) - + return self.__fetch_all(url, query) def create_idp(self, payload): @@ -4250,7 +4250,7 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) - #async functions start + # async functions start async def a___fetch_all(self, url, query=None): """Paginate asynchronously over get requests . @@ -4297,7 +4297,9 @@ class KeycloakAdmin: :rtype: dict """ query = query or {} - return raise_error_from_response(await self.connection.a_raw_get(url, **query), KeycloakGetError) + return raise_error_from_response( + await self.connection.a_raw_get(url, **query), KeycloakGetError + ) async def a_get_current_realm(self) -> str: """Return the currently configured realm asynchronously. @@ -4401,7 +4403,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": realm_name} - data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REALM.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) async def a_create_realm(self, payload, skip_exists=False): @@ -4455,7 +4459,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": realm_name} - data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_REALM.format(**params_path)) + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_REALM.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) async def a_get_users(self, query=None): @@ -4477,7 +4483,7 @@ class KeycloakAdmin: if "first" in query or "max" in query: return await self.a___fetch_paginated(url, query) - + return await self.a___fetch_all(url, query) async def a_create_idp(self, payload): @@ -4594,7 +4600,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.connection.realm_name} - data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_IDPS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) async def a_get_idp(self, idp_alias): @@ -4611,7 +4619,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias} - data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_IDP.format(**params_path)) + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_IDP.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) async def a_delete_idp(self, idp_alias): @@ -4623,7 +4633,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias} - data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_IDP.format(**params_path)) + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_IDP.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) async def a_create_user(self, payload, exist_ok=False): @@ -4691,7 +4703,9 @@ class KeycloakAdmin: :rtype: str """ lower_user_name = username.lower() - users = await self.a_get_users(query={"username": lower_user_name, "max": 1, "exact": True}) + users = await self.a_get_users( + query={"username": lower_user_name, "max": 1, "exact": True} + ) return users[0]["id"] if len(users) == 1 else None async def a_get_user(self, user_id): @@ -4799,7 +4813,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.connection.realm_name, "id": user_id} - data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_USER.format(**params_path)) + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_USER.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) async def a_set_user_password(self, user_id, password, temporary=True): @@ -4918,7 +4934,9 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakGetError) - async def a_add_user_social_login(self, user_id, provider_id, provider_userid, provider_username): + async def a_add_user_social_login( + self, user_id, provider_id, provider_userid, provider_username + ): """Add a federated identity / social login provider asynchronously to the user. :param user_id: User id @@ -5106,7 +5124,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name, "id": group_id} - response = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) + response = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_GROUP.format(**params_path) + ) if response.status_code >= 400: return raise_error_from_response(response, KeycloakGetError) @@ -5368,7 +5388,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.connection.realm_name, "id": group_id} - data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_GROUP.format(**params_path)) + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_GROUP.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) async def a_get_clients(self): @@ -5383,7 +5405,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.connection.realm_name} - data_raw =await self.connection.a_raw_get(urls_patterns.URL_ADMIN_CLIENTS.format(**params_path)) + data_raw =await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENTS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) async def a_get_client(self, client_id): @@ -5398,7 +5422,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name, "id": client_id} - data_raw =await self.connection.a_raw_get(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + data_raw =await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) async def a_get_client_id(self, client_id): @@ -5633,7 +5659,9 @@ class KeycloakAdmin: data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) - async def a_create_client_authz_resource_based_permission(self, client_id, payload, skip_exists=False): + async def a_create_client_authz_resource_based_permission( + self, client_id, payload, skip_exists=False + ): """Create resource-based permission of client asynchronously. Payload example:: @@ -6007,7 +6035,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.connection.realm_name, "id": client_id} - data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_CLIENT.format(**params_path)) + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) async def a_get_client_installation_provider(self, client_id, provider_id): @@ -6254,7 +6284,9 @@ class KeycloakAdmin: """ if skip_exists: try: - res = await self.a_get_client_role(client_id=client_role_id, role_name=payload["name"]) + res = await self.a_get_client_role( + client_id=client_role_id, role_name=payload["name"] + ) return res["name"] except KeycloakGetError: pass @@ -6336,7 +6368,7 @@ class KeycloakAdmin: "id": client_role_id, "role-name": role_name, } - data_raw =await self.connection.a_raw_delete( + data_raw = await self.connection.a_raw_delete( urls_patterns.URL_ADMIN_CLIENT_ROLE.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) @@ -6967,7 +6999,9 @@ class KeycloakAdmin: urls_patterns.URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, user_id, client_id ) - async def a_get_composite_client_roles_of_user(self, user_id, client_id, brief_representation=False): + async def a_get_composite_client_roles_of_user( + self, user_id, client_id, brief_representation=False + ): """Get composite client role-mappings for a user asynchronously. :param user_id: id of user @@ -7046,7 +7080,9 @@ class KeycloakAdmin: :rtype: list """ params_path = {"realm-name": self.connection.realm_name} - data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_FLOWS.format(**params_path)) + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_FLOWS.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) async def a_get_authentication_flow_for_id(self, flow_id): @@ -7119,7 +7155,9 @@ class KeycloakAdmin: :rtype: bytes """ params_path = {"realm-name": self.connection.realm_name, "id": flow_id} - data_raw = await self.connection.a_raw_delete(urls_patterns.URL_ADMIN_FLOW.format(**params_path)) + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_FLOW.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) async def a_get_authentication_flow_executions(self, flow_alias): @@ -7800,7 +7838,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name, "component-id": component_id} - data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_COMPONENT.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) async def a_update_component(self, component_id, payload): @@ -8155,7 +8195,9 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) - async def a_get_composite_client_roles_of_group(self, client_id, group_id, brief_representation=True): + async def a_get_composite_client_roles_of_group( + self, client_id, group_id, brief_representation=True + ): """Get the composite client roles of the given group for the given client asynchronously. :param client_id: id of the client. diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 733265f..4f09216 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -874,7 +874,7 @@ class KeycloakOpenID: if totp: payload["totp"] = totp - payload = self._add_secret_key(payload) + payload = self._add_secret_key(payload) data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakPostError) @@ -901,7 +901,7 @@ class KeycloakOpenID: "grant_type": grant_type, "refresh_token": refresh_token, } - payload = self._add_secret_key(payload) + payload = self._add_secret_key(payload) data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakPostError) @@ -953,7 +953,7 @@ class KeycloakOpenID: "requested_issuer": requested_issuer, "scope": scope, } - payload = self._add_secret_key(payload) + payload = self._add_secret_key(payload) data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakPostError) @@ -985,7 +985,7 @@ class KeycloakOpenID: """ params_path = {"realm-name": self.realm_name} payload = {"client_id": self.client_id, "refresh_token": refresh_token} - payload = self._add_secret_key(payload) + payload = self._add_secret_key(payload) data_raw = await self.connection.a_raw_post(URL_LOGOUT.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) @@ -1071,9 +1071,11 @@ class KeycloakOpenID: else: raise KeycloakRPTNotFound("Can't found RPT.") - payload = self._add_secret_key(payload) + payload = self._add_secret_key(payload) - data_raw = await self.connection.a_raw_post(URL_INTROSPECT.format(**params_path), data=payload) + data_raw = await self.connection.a_raw_post( + URL_INTROSPECT.format(**params_path), data=payload + ) return raise_error_from_response(data_raw, KeycloakPostError) async def a_decode_token(self, token, validate: bool = True, **kwargs): diff --git a/src/keycloak/openid_connection.py b/src/keycloak/openid_connection.py index 635586f..b484e44 100644 --- a/src/keycloak/openid_connection.py +++ b/src/keycloak/openid_connection.py @@ -117,7 +117,6 @@ class KeycloakOpenIDConnection(ConnectionManager): self.headers = {} self.custom_headers = custom_headers - if self.token is None: self.get_token() diff --git a/tests/conftest.py b/tests/conftest.py index af1f9af..7856f9a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,10 +32,10 @@ class KeycloakTestEnv(object): def __init__( self, - host: str = os.environ["KEYCLOAK_HOST"], - port: str = os.environ["KEYCLOAK_PORT"], - username: str = os.environ["KEYCLOAK_ADMIN"], - password: str = os.environ["KEYCLOAK_ADMIN_PASSWORD"], + host: str = 'localhost',#os.environ["KEYCLOAK_HOST"], + port: str = '8080',#os.environ["KEYCLOAK_PORT"], + username: str = 'admin',#os.environ["KEYCLOAK_ADMIN"], + password: str = 'admin',#os.environ["KEYCLOAK_ADMIN_PASSWORD"], ): """Init method. diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index f56e7f0..4c573b2 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -3064,7 +3064,7 @@ def test_refresh_token(admin: KeycloakAdmin): admin.connection.refresh_token() -#async function start +# async function start @pytest.mark.asyncio async def test_a_realms(admin: KeycloakAdmin): @@ -3135,6 +3135,7 @@ async def test_a_realms(admin: KeycloakAdmin): await admin.a_delete_realm(realm_name="non-existent") assert err.match('404: b\'{"error":"Realm not found.".*}\'') + @pytest.mark.asyncio async def test_a_changing_of_realms(admin: KeycloakAdmin, realm: str): """Test changing of realms. @@ -3148,6 +3149,7 @@ async def test_a_changing_of_realms(admin: KeycloakAdmin, realm: str): await admin.a_change_current_realm(realm) assert await admin.a_get_current_realm() == realm + @pytest.mark.asyncio async def test_a_import_export_realms(admin: KeycloakAdmin, realm: str): """Test import and export of realms. @@ -3174,6 +3176,7 @@ async def test_a_import_export_realms(admin: KeycloakAdmin, realm: str): '500: b\'{"error":"unknown_error"}\'|400: b\'{"errorMessage":"Realm name cannot be empty"}\'' # noqa: E501 ) + @pytest.mark.asyncio async def test_a_partial_import_realm(admin: KeycloakAdmin, realm: str): """Test partial import of realm configuration. @@ -3221,6 +3224,7 @@ async def test_a_partial_import_realm(admin: KeycloakAdmin, realm: str): res = await admin.a_partial_import_realm(realm_name=realm, payload=payload) assert res["overwritten"] == 3 + @pytest.mark.asyncio async def test_a_users(admin: KeycloakAdmin, realm: str): """Test users. @@ -3329,6 +3333,7 @@ async def test_a_users(admin: KeycloakAdmin, realm: str): await admin.a_delete_user(user_id="non-existent-id") assert err.match(USER_NOT_FOUND_REGEX) + @pytest.mark.asyncio async def test_a_enable_disable_all_users(admin: KeycloakAdmin, realm: str): """Test enable and disable all users. @@ -3366,6 +3371,7 @@ async def test_a_enable_disable_all_users(admin: KeycloakAdmin, realm: str): assert (await admin.a_get_user(user_id_2))["enabled"] assert (await admin.a_get_user(user_id_3))["enabled"] + @pytest.mark.asyncio async def test_a_users_roles(admin: KeycloakAdmin, realm: str): """Test users roles. @@ -3378,7 +3384,9 @@ async def test_a_users_roles(admin: KeycloakAdmin, realm: str): user_id = await admin.a_create_user(payload={"username": "test", "email": "test@test.test"}) # Test all level user roles - client_id = await admin.a_create_client(payload={"name": "test-client", "clientId": "test-client"}) + client_id = await admin.a_create_client( + payload={"name": "test-client", "clientId": "test-client"} + ) await admin.a_create_client_role(client_role_id=client_id, payload={"name": "test-role"}) await admin.a_assign_client_role( client_id=client_id, @@ -3399,6 +3407,7 @@ async def test_a_users_roles(admin: KeycloakAdmin, realm: str): await admin.a_delete_user(user_id) await admin.a_delete_client(client_id) + @pytest.mark.asyncio async def test_a_users_pagination(admin: KeycloakAdmin, realm: str): """Test user pagination. @@ -3423,6 +3432,7 @@ async def test_a_users_pagination(admin: KeycloakAdmin, realm: str): users = await admin.a_get_users(query={"max": 20}) assert len(users) == 20, len(users) + @pytest.mark.asyncio async def test_a_user_groups_pagination(admin: KeycloakAdmin, realm: str): """Test user groups pagination. @@ -3449,9 +3459,12 @@ async def test_a_user_groups_pagination(admin: KeycloakAdmin, realm: str): groups = await admin.a_get_user_groups(user_id=user_id, query={"first": 100, "max": -1, "search": ""}) assert len(groups) == 50, len(groups) - groups = await admin.a_get_user_groups(user_id=user_id, query={"max": 20, "first": -1, "search": ""}) + groups = await admin.a_get_user_groups( + user_id=user_id, query={"max": 20, "first": -1, "search": ""} + ) assert len(groups) == 20, len(groups) + @pytest.mark.asyncio async def test_a_idps(admin: KeycloakAdmin, realm: str): """Test IDPs. @@ -3542,6 +3555,7 @@ async def test_a_idps(admin: KeycloakAdmin, realm: str): await admin.a_delete_idp(idp_alias="does-not-exist") assert err.match(HTTP_404_REGEX) + @pytest.mark.asyncio async def test_a_user_credentials(admin: KeycloakAdmin, user: str): """Test user credentials. @@ -3576,6 +3590,7 @@ async def test_a_user_credentials(admin: KeycloakAdmin, user: str): await admin.a_delete_credential(user_id=user, credential_id="does-not-exist") assert err.match('404: b\'{"error":"Credential not found".*}\'') + @pytest.mark.asyncio async def test_a_social_logins(admin: KeycloakAdmin, user: str): """Test social logins. @@ -3593,7 +3608,7 @@ async def test_a_social_logins(admin: KeycloakAdmin, user: str): user_id=user, provider_id="github", provider_userid="test", provider_username="test" ) assert res == dict(), res - + # Test add social login fail with pytest.raises(KeycloakPostError) as err: await admin.a_add_user_social_login( @@ -3622,6 +3637,7 @@ async def test_a_social_logins(admin: KeycloakAdmin, user: str): await admin.a_delete_user_social_login(user_id=user, provider_id="instagram") assert err.match('404: b\'{"error":"Link not found".*}\''), err + @pytest.mark.asyncio async def test_a_server_info(admin: KeycloakAdmin): """Test server info. @@ -3651,6 +3667,7 @@ async def test_a_server_info(admin: KeycloakAdmin): } ), info.keys() + @pytest.mark.asyncio async def test_a_groups(admin: KeycloakAdmin, user: str): """Test groups. @@ -3717,18 +3734,22 @@ async def test_a_groups(admin: KeycloakAdmin, user: str): assert err.match('404: b\'{"error":"Could not find group by id".*}\''), err # Create 1 more subgroup - subsubgroup_id_1 = await admin.a_create_group(payload={"name": "subsubgroup-1"}, parent=subgroup_id_2) + subsubgroup_id_1 = await admin.a_create_group( + payload={"name": "subsubgroup-1"}, parent=subgroup_id_2 + ) main_group = await admin.a_get_group(group_id=group_id) # Test nested searches subgroup_2 = await admin.a_get_group(group_id=subgroup_id_2) - res = await admin.a_get_subgroups(group=subgroup_2, path="/main-group/subgroup-2/subsubgroup-1") + res = await admin.a_get_subgroups( + group=subgroup_2, path="/main-group/subgroup-2/subsubgroup-1" + ) assert res is not None, res assert res["id"] == subsubgroup_id_1 # Test nested search from main group res = await admin.a_get_subgroups( - group= await admin.a_get_group(group_id=group_id, full_hierarchy=True), + group = await admin.a_get_group(group_id=group_id, full_hierarchy=True), path="/main-group/subgroup-2/subsubgroup-1", ) assert res["id"] == subsubgroup_id_1 @@ -3836,6 +3857,7 @@ async def test_a_groups(admin: KeycloakAdmin, user: str): await admin.a_delete_group(group_id="does-not-exist") assert err.match('404: b\'{"error":"Could not find group by id".*}\''), err + @pytest.mark.asyncio async def test_a_clients(admin: KeycloakAdmin, realm: str): """Test clients. @@ -3862,7 +3884,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): ), clients # Test create client - client_id = await admin.a_create_client(payload={"name": "test-client", "clientId": "test-client"}) + client_id = await admin.a_create_client( + payload={"name": "test-client", "clientId": "test-client"} + ) assert client_id, client_id with pytest.raises(KeycloakPostError) as err: @@ -3894,7 +3918,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): assert res == dict(), res with pytest.raises(KeycloakPutError) as err: - await admin.a_update_client(client_id="does-not-exist", payload={"name": "test-client-change"}) + await admin.a_update_client( + client_id="does-not-exist", payload={"name": "test-client-change"} + ) assert err.match('404: b\'{"error":"Could not find client".*}\'') # Test client mappers @@ -3918,10 +3944,14 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): mapper = (await admin.a_get_mappers_from_client(client_id=client_id))[0] with pytest.raises(KeycloakPutError) as err: - await admin.a_update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict()) + await admin.a_update_client_mapper( + client_id=client_id, mapper_id="does-not-exist", payload=dict() + ) assert err.match('404: b\'{"error":"Model not found".*}\'') mapper["config"]["user.attribute"] = "test" - res = await admin.a_update_client_mapper(client_id=client_id, mapper_id=mapper["id"], payload=mapper) + res = await admin.a_update_client_mapper( + client_id=client_id, mapper_id=mapper["id"], payload=mapper + ) assert res == dict() res = await admin.a_remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) @@ -3971,7 +4001,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): assert res["name"] == "test-resource", res test_resource_id = res["_id"] - res = await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=test_resource_id) + res = await admin.a_get_client_authz_resource( + client_id=auth_client_id, resource_id=test_resource_id + ) assert res["_id"] == test_resource_id, res assert res["name"] == "test-resource", res @@ -3999,7 +4031,9 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): resource_id=temp_resource_id, payload={"name": "temp-updated-resource"}, ) - res = await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id) + res = await admin.a_get_client_authz_resource( + client_id=auth_client_id, resource_id=temp_resource_id + ) assert res["name"] == "temp-updated-resource", res with pytest.raises(KeycloakPutError) as err: await admin.a_update_client_authz_resource( @@ -4008,9 +4042,13 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): payload={"name": "temp-updated-resource"}, ) assert err.match("404: b''"), err - await admin.a_delete_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id) + await admin.a_delete_client_authz_resource( + client_id=auth_client_id, resource_id=temp_resource_id + ) with pytest.raises(KeycloakGetError) as err: - await admin.a_get_client_authz_resource(client_id=auth_client_id, resource_id=temp_resource_id) + await admin.a_get_client_authz_resource( + client_id=auth_client_id, resource_id=temp_resource_id + ) assert err.match("404: b''") # Authz policies @@ -4174,7 +4212,7 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): assert err.match('404: b\'{"error":"Could not find client".*}\'') secrets = await admin.a_get_client_secrets( - client_id = await admin.a_get_client_id(client_id="test-confidential") + client_id=await admin.a_get_client_id(client_id="test-confidential") ) assert secrets == {"type": "secret", "value": "test-secret"} @@ -4187,10 +4225,13 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str): ) assert res assert ( - await admin.a_get_client_secrets(client_id=await admin.a_get_client_id(client_id="test-confidential")) + await admin.a_get_client_secrets( + client_id=await admin.a_get_client_id(client_id="test-confidential") + ) == res ) + @pytest.mark.asyncio async def test_a_realm_roles(admin: KeycloakAdmin, realm: str): """Test realm roles. @@ -4223,12 +4264,16 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str): assert members == list(), members # Test create realm role - role_id = await admin.a_create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True) + role_id = await admin.a_create_realm_role( + payload={"name": "test-realm-role"}, skip_exists=True + ) assert role_id, role_id with pytest.raises(KeycloakPostError) as err: await admin.a_create_realm_role(payload={"name": "test-realm-role"}) assert err.match('409: b\'{"errorMessage":"Role with name test-realm-role already exists"}\'') - role_id_2 = await admin.a_create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True) + role_id_2 = await admin.a_create_realm_role( + payload={"name": "test-realm-role"}, skip_exists=True + ) assert role_id == role_id_2 # Test get realm role by its id @@ -4248,7 +4293,9 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str): assert err.match(COULD_NOT_FIND_ROLE_REGEX) # Test realm role user assignment - user_id = await admin.a_create_user(payload={"username": "role-testing", "email": "test@test.test"}) + user_id = await admin.a_create_user( + payload={"username": "role-testing", "email": "test@test.test"} + ) with pytest.raises(KeycloakPostError) as err: await admin.a_assign_realm_roles(user_id=user_id, roles=["bad"]) assert err.match(UNKOWN_ERROR_REGEX), err @@ -4264,7 +4311,8 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str): x["username"] for x in await admin.a_get_realm_role_members(role_name="offline_access") ] assert admin.get_user(user_id=user_id)["username"] in [ - x["username"] for x in await admin.a_get_realm_role_members(role_name="test-realm-role-update") + x["username"] + for x in await admin.a_get_realm_role_members(role_name="test-realm-role-update") ] roles = await admin.a_get_realm_roles_of_user(user_id=user_id) @@ -4445,6 +4493,7 @@ async def test_a_role_attributes( res = await admin.a_delete_client_role(client, role_name=attribute_role) assert res == dict(), res + @pytest.mark.asyncio async def test_a_client_scope_realm_roles(admin: KeycloakAdmin, realm: str): """Test client realm roles. @@ -4464,7 +4513,9 @@ async def test_a_client_scope_realm_roles(admin: KeycloakAdmin, realm: str): assert "offline_access" in role_names, role_names # create realm role for test - role_id = await admin.a_create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True) + role_id = await admin.a_create_realm_role( + payload={"name": "test-realm-role"}, skip_exists=True + ) assert role_id, role_id # Test realm role client assignment @@ -4509,6 +4560,7 @@ async def test_a_client_scope_realm_roles(admin: KeycloakAdmin, realm: str): roles = await admin.a_get_realm_roles_of_client_scope(client_id=client_id) assert len(roles) == 0 + @pytest.mark.asyncio async def test_a_client_scope_client_roles(admin: KeycloakAdmin, realm: str, client: str): """Test client assignment of other client roles. @@ -4667,6 +4719,7 @@ async def test_a_client_optional_client_scopes(admin: KeycloakAdmin, realm: str, optional_client_scopes = await admin.a_get_client_optional_client_scopes(client_id) assert len(optional_client_scopes) == 4, optional_client_scopes + @pytest.mark.asyncio async def test_a_client_roles(admin: KeycloakAdmin, client: str): """Test client roles. @@ -4688,7 +4741,9 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True ) with pytest.raises(KeycloakPostError) as err: - await admin.a_create_client_role(client_role_id=client, payload={"name": "client-role-test"}) + await admin.a_create_client_role( + client_role_id=client, payload={"name": "client-role-test"} + ) assert err.match('409: b\'{"errorMessage":"Role with name client-role-test already exists"}\'') client_role_id_2 = await admin.a_create_client_role( client_role_id=client, payload={"name": "client-role-test"}, skip_exists=True @@ -4723,7 +4778,9 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): assert err.match(COULD_NOT_FIND_ROLE_REGEX) # Test user with client role - res = await admin.a_get_client_role_members(client_id=client, role_name="client-role-test-update") + res = await admin.a_get_client_role_members( + client_id=client, role_name="client-role-test-update" + ) assert len(res) == 0 with pytest.raises(KeycloakGetError) as err: await admin.a_get_client_role_members(client_id=client, role_name="bad") @@ -4736,11 +4793,17 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): res = await admin.a_assign_client_role( user_id=user_id, client_id=client, - roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], + roles=[ + await admin.a_get_client_role(client_id=client, role_name="client-role-test-update") + ], ) assert res == dict() assert ( - len(await admin.a_get_client_role_members(client_id=client, role_name="client-role-test-update")) + len( + await admin.a_get_client_role_members( + client_id=client, role_name="client-role-test-update" + ) + ) == 1 ) @@ -4768,12 +4831,16 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): await admin.a_delete_client_roles_of_user( user_id=user_id, client_id=client, - roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], + roles=[ + await admin.a_get_client_role(client_id=client, role_name="client-role-test-update") + ], ) assert len(await admin.a_get_client_roles_of_user(user_id=user_id, client_id=client)) == 0 # Test groups and client roles - res = await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update") + res = await admin.a_get_client_role_groups( + client_id=client, role_name="client-role-test-update" + ) assert len(res) == 0 with pytest.raises(KeycloakGetError) as err: await admin.a_get_client_role_groups(client_id=client, role_name="bad") @@ -4792,11 +4859,17 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): res = await admin.a_assign_group_client_roles( group_id=group_id, client_id=client, - roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], + roles=[ + await admin.a_get_client_role(client_id=client, role_name="client-role-test-update") + ], ) assert res == dict() assert ( - len(await admin.a_get_client_role_groups(client_id=client, role_name="client-role-test-update")) + len( + await admin.a_get_client_role_groups( + client_id=client, role_name="client-role-test-update" + ) + ) == 1 ) assert len(await admin.a_get_group_client_roles(group_id=group_id, client_id=client)) == 1 @@ -4807,7 +4880,9 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): res = await admin.a_delete_group_client_roles( group_id=group_id, client_id=client, - roles=[await admin.a_get_client_role(client_id=client, role_name="client-role-test-update")], + roles=[ + await admin.a_get_client_role(client_id=client, role_name="client-role-test-update") + ], ) assert res == dict() @@ -4828,10 +4903,14 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): ] # Test delete of client role - res = await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update") + res = await admin.a_delete_client_role( + client_role_id=client, role_name="client-role-test-update" + ) assert res == dict() with pytest.raises(KeycloakDeleteError) as err: - await admin.a_delete_client_role(client_role_id=client, role_name="client-role-test-update") + await admin.a_delete_client_role( + client_role_id=client, role_name="client-role-test-update" + ) assert err.match(COULD_NOT_FIND_ROLE_REGEX) # Test of roles by id - Get role @@ -4863,6 +4942,7 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str): await admin.a_delete_role_by_id(role_id="bad") assert err.match(COULD_NOT_FIND_ROLE_WITH_ID_REGEX) + @pytest.mark.asyncio async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str): """Test enable token exchange. @@ -4984,6 +5064,7 @@ async def test_a_enable_token_exchange(admin: KeycloakAdmin, realm: str): ) assert err.match('404: b\'{"error":"Could not find client".*}\'') + @pytest.mark.asyncio async def test_a_email(admin: KeycloakAdmin, user: str): """Test email. @@ -5003,6 +5084,7 @@ async def test_a_email(admin: KeycloakAdmin, user: str): await admin.a_send_verify_email(user_id=user) assert err.match('500: b\'{"errorMessage":"Failed to send .*"}\'') + @pytest.mark.asyncio async def test_a_get_sessions(admin: KeycloakAdmin): """Test get sessions. @@ -5010,12 +5092,15 @@ async def test_a_get_sessions(admin: KeycloakAdmin): :param admin: Keycloak Admin client :type admin: KeycloakAdmin """ - sessions = await admin.a_get_sessions(user_id=admin.get_user_id(username=admin.connection.username)) + sessions = await admin.a_get_sessions( + user_id=admin.get_user_id(username=admin.connection.username) + ) assert len(sessions) >= 1 with pytest.raises(KeycloakGetError) as err: await admin.a_get_sessions(user_id="bad") assert err.match(USER_NOT_FOUND_REGEX) + @pytest.mark.asyncio async def test_a_get_client_installation_provider(admin: KeycloakAdmin, client: str): """Test get client installation provider. @@ -5041,6 +5126,7 @@ async def test_a_get_client_installation_provider(admin: KeycloakAdmin, client: "ssl-required", } + @pytest.mark.asyncio async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): """Test auth flows. @@ -5101,7 +5187,9 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): await admin.a_copy_authentication_flow(payload=dict(), flow_alias="bad") assert err.match("404: b''") - res = await admin.a_copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser") + res = await admin.a_copy_authentication_flow( + payload={"newName": "test-browser"}, flow_alias="browser" + ) assert res == b"", res assert len(await admin.a_get_authentication_flows()) == (default_flows + 1) @@ -5111,7 +5199,9 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): ) assert res == b"" with pytest.raises(KeycloakPostError) as err: - await admin.a_create_authentication_flow(payload={"alias": "test-create", "builtIn": False}) + await admin.a_create_authentication_flow( + payload={"alias": "test-create", "builtIn": False} + ) assert err.match('409: b\'{"errorMessage":"Flow test-create already exists"}\'') assert await admin.a_create_authentication_flow( payload={"alias": "test-create"}, skip_exists=True @@ -5160,7 +5250,9 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): assert err.match('400: b\'{"error":"Unrecognized field') payload = (await admin.a_get_authentication_flow_executions(flow_alias="test-create"))[0] payload["displayName"] = "test" - res = await admin.a_update_authentication_flow_executions(payload=payload, flow_alias="test-create") + res = await admin.a_update_authentication_flow_executions( + payload=payload, flow_alias="test-create" + ) assert res exec_id = (await admin.a_get_authentication_flow_executions(flow_alias="test-create"))[0]["id"] @@ -5200,15 +5292,16 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str): assert res == {"msg": "Already exists"} # Test delete auth flow - flow_id = [x for x in await admin.a_get_authentication_flows() if x["alias"] == "test-browser"][0][ - "id" - ] + flow_id = [ + x for x in await admin.a_get_authentication_flows() if x["alias"] == "test-browser" + ][0]["id"] res = await admin.a_delete_authentication_flow(flow_id=flow_id) assert res == dict() with pytest.raises(KeycloakDeleteError) as err: await admin.a_delete_authentication_flow(flow_id=flow_id) assert err.match('404: b\'{"error":"Could not find flow with id".*}\'') + @pytest.mark.asyncio async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str): """Test authentication configs. @@ -5247,6 +5340,7 @@ async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str): await admin.a_delete_authenticator_config(config_id="bad") assert err.match('404: b\'{"error":"Could not find authenticator config".*}\'') + @pytest.mark.asyncio async def test_a_sync_users(admin: KeycloakAdmin, realm: str): """Test sync users. @@ -5263,6 +5357,7 @@ async def test_a_sync_users(admin: KeycloakAdmin, realm: str): await admin.a_sync_users(storage_id="does-not-exist", action="triggerFullSync") assert err.match('404: b\'{"error":"Could not find component".*}\'') + @pytest.mark.asyncio async def test_a_client_scopes(admin: KeycloakAdmin, realm: str): """Test client scopes. @@ -5344,9 +5439,9 @@ async def test_a_client_scopes(admin: KeycloakAdmin, realm: str): client_scope_id=res, protocol_mapper_id=test_mapper["id"], payload=test_mapper ) assert res_update == dict() - assert ( - (await admin.a_get_mappers_from_client_scope(client_scope_id=res))[0]["config"]["user.attribute"] - == "test" + assert ((await admin.a_get_mappers_from_client_scope(client_scope_id=res))[0]["config"][ + "user.attribute" + ] == "test" ) # Test delete mapper @@ -5407,6 +5502,7 @@ async def test_a_client_scopes(admin: KeycloakAdmin, realm: str): await admin.a_delete_client_scope(client_scope_id=res) assert err.match(NO_CLIENT_SCOPE_REGEX) + @pytest.mark.asyncio async def test_a_components(admin: KeycloakAdmin, realm: str): """Test components. @@ -5464,6 +5560,7 @@ async def test_a_components(admin: KeycloakAdmin, realm: str): await admin.a_delete_component(component_id=res) assert err.match('404: b\'{"error":"Could not find component".*}\'') + @pytest.mark.asyncio async def test_a_keys(admin: KeycloakAdmin, realm: str): """Test keys. @@ -5474,9 +5571,12 @@ async def test_a_keys(admin: KeycloakAdmin, realm: str): :type realm: str """ await admin.a_change_current_realm(realm) - assert set((await admin.a_get_keys())["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} or set( - (await admin.a_get_keys())["active"].keys() - ) == {"RSA-OAEP", "RS256", "HS512", "AES"} + assert set((await admin.a_get_keys())["active"].keys()) == { + "AES", + "HS256", + "RS256", + "RSA-OAEP" + } or set((await admin.a_get_keys())["active"].keys()) == {"RSA-OAEP", "RS256", "HS512", "AES"} assert {k["algorithm"] for k in (await admin.a_get_keys())["keys"]} == { "HS256", "RSA-OAEP", @@ -5489,6 +5589,7 @@ async def test_a_keys(admin: KeycloakAdmin, realm: str): "RS256", } + @pytest.mark.asyncio async def test_a_admin_events(admin: KeycloakAdmin, realm: str): """Test events. @@ -5505,6 +5606,7 @@ async def test_a_admin_events(admin: KeycloakAdmin, realm: str): events = await admin.a_get_admin_events() assert events == list() + @pytest.mark.asyncio async def test_a_user_events(admin: KeycloakAdmin, realm: str): """Test events. @@ -5523,7 +5625,9 @@ async def test_a_user_events(admin: KeycloakAdmin, realm: str): await admin.a_set_events(payload={"bad": "conf"}) assert err.match('400: b\'{"error":"Unrecognized field') - res = await admin.a_set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True}) + res = await admin.a_set_events( + payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True} + ) assert res == dict() await admin.a_create_client(payload={"name": "test", "clientId": "test"}) @@ -5531,6 +5635,7 @@ async def test_a_user_events(admin: KeycloakAdmin, realm: str): events = await admin.a_get_events() assert events == list() + @pytest.mark.asyncio @freezegun.freeze_time("2023-02-25 10:00:00") async def test_a_auto_refresh(admin_frozen: KeycloakAdmin, realm: str): @@ -5593,6 +5698,7 @@ async def test_a_auto_refresh(admin_frozen: KeycloakAdmin, realm: str): assert await admin.a_delete_realm(realm_name="test-refresh") == dict() assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:35:00") + @pytest.mark.asyncio async def test_a_get_required_actions(admin: KeycloakAdmin, realm: str): """Test required actions. @@ -5617,6 +5723,7 @@ async def test_a_get_required_actions(admin: KeycloakAdmin, realm: str): ]: assert key in ra + @pytest.mark.asyncio async def test_a_get_required_action_by_alias(admin: KeycloakAdmin, realm: str): """Test get required action by alias. @@ -5633,6 +5740,7 @@ async def test_a_get_required_action_by_alias(admin: KeycloakAdmin, realm: str): assert ra["alias"] == "UPDATE_PASSWORD" assert await admin.a_get_required_action_by_alias("does-not-exist") is None + @pytest.mark.asyncio async def test_a_update_required_action(admin: KeycloakAdmin, realm: str): """Test update required action. @@ -5651,6 +5759,7 @@ async def test_a_update_required_action(admin: KeycloakAdmin, realm: str): assert old != newra assert newra["enabled"] is False + @pytest.mark.asyncio async def test_a_get_composite_client_roles_of_group( admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str @@ -5674,6 +5783,7 @@ async def test_a_get_composite_client_roles_of_group( result = await admin.a_get_composite_client_roles_of_group(client, group) assert role["id"] in [x["id"] for x in result] + @pytest.mark.asyncio async def test_a_get_role_client_level_children( admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str @@ -5697,8 +5807,11 @@ async def test_a_get_role_client_level_children( res = await admin.a_get_role_client_level_children(client, parent["id"]) assert child["id"] in [x["id"] for x in res] + @pytest.mark.asyncio -async def test_a_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple): +async def test_a_upload_certificate( + admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple + ): """Test upload certificate. :param admin: Keycloak Admin client @@ -5717,6 +5830,7 @@ async def test_a_upload_certificate(admin: KeycloakAdmin, realm: str, client: st cl = await admin.a_get_client(client) assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1]) + @pytest.mark.asyncio async def test_a_get_bruteforce_status_for_user( admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str @@ -5754,6 +5868,7 @@ async def test_a_get_bruteforce_status_for_user( res = await admin.a_get_realm(realm_name=realm) assert res["bruteForceProtected"] is False + @pytest.mark.asyncio async def test_a_clear_bruteforce_attempts_for_user( admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str @@ -5835,6 +5950,7 @@ async def test_a_clear_bruteforce_attempts_for_all_users( res = await admin.a_get_realm(realm_name=realm) assert res["bruteForceProtected"] is False + @pytest.mark.asyncio async def test_a_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> None: """Test that the default realm role is present in a brand new realm. @@ -5847,10 +5963,17 @@ async def test_a_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> await admin.a_change_current_realm(realm) assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()] assert ( - len([x["name"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"]) + len( + [ + x["name"] + for x in await admin.a_get_realm_roles() + if x["name"] == f"default-roles-{realm}" + ] + ) == 1 ) + @pytest.mark.asyncio async def test_a_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> None: """Test getter for the ID of the default realm role. @@ -5863,9 +5986,14 @@ async def test_a_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> await admin.a_change_current_realm(realm) assert ( await admin.a_get_default_realm_role_id() - == [x["id"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"][0] + == [ + x["id"] + for x in await admin.a_get_realm_roles() + if x["name"] == f"default-roles-{realm}" + ][0] ) + @pytest.mark.asyncio async def test_a_realm_default_roles(admin: KeycloakAdmin, realm: str) -> None: """Test getting, adding and deleting default realm roles. @@ -5908,6 +6036,7 @@ async def test_a_realm_default_roles(admin: KeycloakAdmin, realm: str) -> None: await admin.a_add_realm_default_roles(payload=[{"id": "bad id"}]) assert err.match('404: b\'{"error":"Could not find composite role".*}\'') + @pytest.mark.asyncio async def test_a_clear_keys_cache(realm: str, admin: KeycloakAdmin) -> None: """Test clearing the keys cache. @@ -5921,6 +6050,7 @@ async def test_a_clear_keys_cache(realm: str, admin: KeycloakAdmin) -> None: res = await admin.a_clear_keys_cache() assert res == {} + @pytest.mark.asyncio async def test_a_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None: """Test clearing the realm cache. @@ -5934,6 +6064,7 @@ async def test_a_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None: res = await admin.a_clear_realm_cache() assert res == {} + @pytest.mark.asyncio async def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None: """Test clearing the user cache. @@ -5947,6 +6078,7 @@ async def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None: res = await admin.a_clear_user_cache() assert res == {} + @pytest.mark.asyncio async def test_a_initial_access_token( admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str] @@ -5983,9 +6115,12 @@ async def test_a_initial_access_token( assert res["clientId"] == client new_secret = str(uuid.uuid4()) - res = await oid.a_update_client(res["registrationAccessToken"], client, payload={"secret": new_secret}) + res = await oid.a_update_client( + res["registrationAccessToken"], client, payload={"secret": new_secret} + ) assert res["secret"] == new_secret + @pytest.mark.asyncio async def test_a_refresh_token(admin: KeycloakAdmin): """Test refresh token on connection even if it is expired. diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index 51274db..3283061 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -488,7 +488,7 @@ def test_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): "interval": 5, } -#async function start +# async function start @pytest.mark.asyncio async def test_a_well_known(oid: KeycloakOpenID): @@ -557,6 +557,7 @@ async def test_a_well_known(oid: KeycloakOpenID): ]: assert key in res + @pytest.mark.asyncio async def test_a_auth_url(env, oid: KeycloakOpenID): """Test the auth_url method. @@ -574,6 +575,7 @@ async def test_a_auth_url(env, oid: KeycloakOpenID): + "&redirect_uri=http://test.test/*&scope=email&state=" ) + @pytest.mark.asyncio async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): """Test the token method. @@ -623,6 +625,7 @@ async def test_a_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): "token_type": "Bearer", } + @pytest.mark.asyncio async def test_a_exchange_token( oid_with_credentials: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin @@ -676,6 +679,7 @@ async def test_a_exchange_token( } assert token != new_token + @pytest.mark.asyncio async def test_a_logout(oid_with_credentials): """Test logout. @@ -692,6 +696,7 @@ async def test_a_logout(oid_with_credentials): with pytest.raises(KeycloakAuthenticationError): await oid.a_userinfo(token=token["access_token"]) + @pytest.mark.asyncio async def test_a_certs(oid: KeycloakOpenID): """Test certificates. @@ -701,6 +706,7 @@ async def test_a_certs(oid: KeycloakOpenID): """ assert len((await oid.a_certs())["keys"]) == 2 + @pytest.mark.asyncio async def test_a_public_key(oid: KeycloakOpenID): """Test public key. @@ -710,6 +716,7 @@ async def test_a_public_key(oid: KeycloakOpenID): """ assert await oid.a_public_key() is not None + @pytest.mark.asyncio async def test_a_entitlement( oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str], admin: KeycloakAdmin @@ -731,6 +738,7 @@ async def test_a_entitlement( with pytest.raises(KeycloakDeprecationError): await oid.a_entitlement(token=token["access_token"], resource_server_id=resource_server_id) + @pytest.mark.asyncio async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): """Test introspect. @@ -747,7 +755,10 @@ async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str ) == {"active": False} with pytest.raises(KeycloakRPTNotFound): - await oid.a_introspect(token=token["access_token"], token_type_hint="requesting_party_token") + await oid.a_introspect( + token=token["access_token"], token_type_hint="requesting_party_token" + ) + @pytest.mark.asyncio async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): @@ -766,8 +777,11 @@ async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, s assert decoded_access_token["preferred_username"] == username, decoded_access_token assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token + @pytest.mark.asyncio -async def test_a_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): +async def test_a_load_authorization_config( + oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] + ): """Test load authorization config. :param oid_with_credentials_authz: Keycloak OpenID client configured as an authorization @@ -786,6 +800,7 @@ async def test_a_load_authorization_config(oid_with_credentials_authz: Tuple[Key oid.authorization.policies["test-authz-rb-policy"].permissions[0], Permission ) + @pytest.mark.asyncio async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): """Test get policies. @@ -810,10 +825,12 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, policy.add_role(role="account/view-profile") oid.authorization.policies["test"] = policy assert [ - str(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") + str(x) + for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") ] == ["Policy: test (role)"] assert [ - repr(x) for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") + repr(x) + for x in await oid.a_get_policies(token=token["access_token"], method_token_info="decode") ] == [""] oid.client_id = orig_client_id @@ -821,6 +838,7 @@ async def test_a_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, with pytest.raises(KeycloakInvalidTokenError): await oid.a_get_policies(token=token["access_token"]) + @pytest.mark.asyncio async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): """Test get policies. @@ -840,7 +858,9 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI orig_client_id = oid.client_id oid.client_id = "account" - assert await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == [] + assert ( + await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") == [] + ) policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy.add_role(role="account/view-profile") policy.add_permission( @@ -851,11 +871,15 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI oid.authorization.policies["test"] = policy assert [ str(x) - for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") + for x in await oid.a_get_permissions( + token=token["access_token"], method_token_info="decode" + ) ] == ["Permission: test-perm (resource)"] assert [ repr(x) - for x in await oid.a_get_permissions(token=token["access_token"], method_token_info="decode") + for x in await oid.a_get_permissions( + token=token["access_token"], method_token_info="decode" + ) ] == [""] oid.client_id = orig_client_id @@ -863,6 +887,7 @@ async def test_a_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI with pytest.raises(KeycloakInvalidTokenError): await oid.a_get_permissions(token=token["access_token"]) + @pytest.mark.asyncio async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): """Test UMA permissions. @@ -875,7 +900,10 @@ async def test_a_uma_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenI token = await oid.a_token(username=username, password=password) assert len(await oid.a_uma_permissions(token=token["access_token"])) == 1 - assert (await oid.a_uma_permissions(token=token["access_token"]))[0]["rsname"] == "Default Resource" + assert (await oid.a_uma_permissions(token=token["access_token"]))[0][ + "rsname" + ] == "Default Resource" + @pytest.mark.asyncio async def test_a_has_uma_access( @@ -897,7 +925,9 @@ async def test_a_has_uma_access( == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" ) assert ( - str(await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource")) + str( + await oid.a_has_uma_access(token=token["access_token"], permissions="Default Resource") + ) == "AuthStatus(is_authorized=True, is_logged_in=True, missing_permissions=set())" ) @@ -919,6 +949,7 @@ async def test_a_has_uma_access( + "{'Default Resource'})" ) + @pytest.mark.asyncio async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, str]): """Test device authorization flow. @@ -937,4 +968,4 @@ async def test_a_device(oid_with_credentials_device: Tuple[KeycloakOpenID, str, + f"device?user_code={res['user_code']}", "expires_in": 600, "interval": 5, - } \ No newline at end of file + } diff --git a/tests/test_keycloak_uma.py b/tests/test_keycloak_uma.py index 6591e4b..d58128e 100644 --- a/tests/test_keycloak_uma.py +++ b/tests/test_keycloak_uma.py @@ -311,7 +311,7 @@ def test_uma_permission_ticket(uma: KeycloakUMA): uma.resource_set_delete(resource["_id"]) -#async function start +# async function start @pytest.mark.asyncio async def test_a_uma_well_known(uma: KeycloakUMA): @@ -326,6 +326,7 @@ async def test_a_uma_well_known(uma: KeycloakUMA): for key in ["resource_registration_endpoint"]: assert key in res + @pytest.mark.asyncio async def test_a_uma_resource_sets(uma: KeycloakUMA): """Test resource sets. @@ -334,7 +335,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA): :type uma: KeycloakUMA """ # Check that only the default resource is present - resource_sets = await uma.a_resource_set_list() + resource_sets = uma.resource_set_list() resource_set_list = list(resource_sets) assert len(resource_set_list) == 1, resource_set_list assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"] @@ -422,6 +423,7 @@ async def test_a_uma_resource_sets(uma: KeycloakUMA): await uma.a_resource_set_delete(resource_id=created_resource["_id"]) assert err.match("404: b''") + @pytest.mark.asyncio async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin): """Test policies. @@ -521,6 +523,7 @@ async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin): admin.delete_realm_role(role_id) admin.delete_group(group_id) + @pytest.mark.asyncio async def test_a_uma_access(uma: KeycloakUMA): """Test permission access checks. @@ -555,6 +558,7 @@ async def test_a_uma_access(uma: KeycloakUMA): assert not await uma.a_permissions_check(token["access_token"], permissions) uma.resource_set_delete(resource["_id"]) + @pytest.mark.asyncio async def test_a_uma_permission_ticket(uma: KeycloakUMA): """Test permission ticket generation.