From 09cf503415761bd902f505bbeb4ee596c271e30b Mon Sep 17 00:00:00 2001 From: Armin Shoughi Date: Sat, 29 Apr 2023 11:42:15 +0330 Subject: [PATCH] fix: initializing KeycloakAdmin without server_url According to the project readme, we could initialize a KeycloakAdmin object with a KeycloakOpenIDConnection object without other arguments but, server_url is required. I made server_url optional and wrote a test for it. --- src/keycloak/keycloak_admin.py | 48 ++++++++++---------- tests/test_keycloak_admin.py | 83 ++++++++++++++++++++-------------- 2 files changed, 72 insertions(+), 59 deletions(-) diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index a812318..88f54a3 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -87,21 +87,21 @@ class KeycloakAdmin: _connection = None def __init__( - self, - server_url, - username=None, - password=None, - token=None, - totp=None, - realm_name="master", - client_id="admin-cli", - verify=True, - client_secret_key=None, - custom_headers=None, - user_realm_name=None, - auto_refresh_token=None, - timeout=60, - connection: Optional[KeycloakOpenIDConnection] = None, + self, + server_url=None, + username=None, + password=None, + token=None, + totp=None, + realm_name="master", + client_id="admin-cli", + verify=True, + client_secret_key=None, + custom_headers=None, + user_realm_name=None, + auto_refresh_token=None, + timeout=60, + connection: Optional[KeycloakOpenIDConnection] = None, ): """Init method. @@ -815,7 +815,7 @@ class KeycloakAdmin: ) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) _last_slash_idx = data_raw.headers["Location"].rindex("/") - return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203 def users_count(self, query=None): """Count users. @@ -1118,7 +1118,7 @@ class KeycloakAdmin: return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) def send_update_account( - self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None + self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None ): """Send an update account email to the user. @@ -1339,7 +1339,7 @@ class KeycloakAdmin: ) try: _last_slash_idx = data_raw.headers["Location"].rindex("/") - return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203 except KeyError: return @@ -1965,7 +1965,7 @@ class KeycloakAdmin: data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) _last_slash_idx = data_raw.headers["Location"].rindex("/") - return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203 def update_client(self, client_id, payload): """Update a client. @@ -2218,7 +2218,7 @@ class KeycloakAdmin: data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) _last_slash_idx = data_raw.headers["Location"].rindex("/") - return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203 def add_composite_client_roles_to_role(self, client_role_id, role_name, roles): """Add composite roles to client role. @@ -2385,7 +2385,7 @@ class KeycloakAdmin: data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) _last_slash_idx = data_raw.headers["Location"].rindex("/") - return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203 def get_realm_role(self, role_name): """Get realm role by role name. @@ -2851,7 +2851,7 @@ class KeycloakAdmin: ) def _get_client_roles_of_user( - self, client_level_role_mapping_url, user_id, client_id, **params + self, client_level_role_mapping_url, user_id, client_id, **params ): """Get client roles of a single user helper. @@ -3281,7 +3281,7 @@ class KeycloakAdmin: data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) _last_slash_idx = data_raw.headers["Location"].rindex("/") - return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203 def update_client_scope(self, client_scope_id, payload): """Update a client scope. @@ -3650,7 +3650,7 @@ class KeycloakAdmin: ) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) _last_slash_idx = data_raw.headers["Location"].rindex("/") - return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203 def get_component(self, component_id): """Get representation of the component. diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index e984023..8420bba 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -19,6 +19,8 @@ from keycloak.exceptions import ( KeycloakPutError, ) +from src.keycloak import KeycloakOpenIDConnection + def test_keycloak_version(): """Test version.""" @@ -101,15 +103,26 @@ def test_keycloak_admin_init(env): admin.delete_realm(realm_name="authz") assert ( - KeycloakAdmin( - server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", - username=None, - password=None, - client_secret_key=None, - custom_headers={"custom": "header"}, - ).token - is None + KeycloakAdmin( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + username=None, + password=None, + client_secret_key=None, + custom_headers={"custom": "header"}, + ).token + is None + ) + + keycloak_connection = KeycloakOpenIDConnection( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + username=env.KEYCLOAK_ADMIN, + password=env.KEYCLOAK_ADMIN_PASSWORD, + realm_name="master", + client_id="admin-cli", + verify=True ) + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + assert keycloak_admin.token def test_realms(admin: KeycloakAdmin): @@ -960,8 +973,8 @@ def test_clients(admin: KeycloakAdmin, realm: str): ) assert res assert ( - admin.get_client_secrets(client_id=admin.get_client_id(client_id="test-confidential")) - == res + admin.get_client_secrets(client_id=admin.get_client_id(client_id="test-confidential")) + == res ) @@ -1136,12 +1149,12 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str): ], ) def test_role_attributes( - admin: KeycloakAdmin, - realm: str, - client: str, - arg_brief_repr: dict, - includes_attributes: bool, - testcase: str, + admin: KeycloakAdmin, + realm: str, + client: str, + arg_brief_repr: dict, + includes_attributes: bool, + testcase: str, ): """Test getting role attributes for bulk calls. @@ -1489,8 +1502,8 @@ def test_client_roles(admin: KeycloakAdmin, client: str): ) assert res == dict() assert ( - len(admin.get_client_role_members(client_id=client, role_name="client-role-test-update")) - == 1 + len(admin.get_client_role_members(client_id=client, role_name="client-role-test-update")) + == 1 ) roles = admin.get_client_roles_of_user(user_id=user_id, client_id=client) @@ -1545,8 +1558,8 @@ def test_client_roles(admin: KeycloakAdmin, client: str): ) assert res == dict() assert ( - len(admin.get_client_role_groups(client_id=client, role_name="client-role-test-update")) - == 1 + len(admin.get_client_role_groups(client_id=client, role_name="client-role-test-update")) + == 1 ) assert len(admin.get_group_client_roles(group_id=group_id, client_id=client)) == 1 @@ -2025,8 +2038,8 @@ def test_client_scopes(admin: KeycloakAdmin, realm: str): ) assert res_update == dict() assert ( - admin.get_mappers_from_client_scope(client_scope_id=res)[0]["config"]["user.attribute"] - == "test" + admin.get_mappers_from_client_scope(client_scope_id=res)[0]["config"]["user.attribute"] + == "test" ) # Test delete mapper @@ -2119,7 +2132,7 @@ def test_components(admin: KeycloakAdmin, realm: str): "name": "Test Component", "providerId": "max-clients", "providerType": "org.keycloak.services.clientregistration." - + "policy.ClientRegistrationPolicy", + + "policy.ClientRegistrationPolicy", "config": {"max-clients": ["1000"]}, } ) @@ -2238,8 +2251,8 @@ def test_auto_refresh(admin_frozen: KeycloakAdmin, realm: str): assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00") admin.connection.token = None assert ( - admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) - == dict() + admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) + == dict() ) assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00") @@ -2310,7 +2323,7 @@ def test_update_required_action(admin: KeycloakAdmin, realm: str): def test_get_composite_client_roles_of_group( - admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str + admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str ): """Test get composite client roles of group. @@ -2333,7 +2346,7 @@ def test_get_composite_client_roles_of_group( def test_get_role_client_level_children( - admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str + admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str ): """Test get children of composite client role. @@ -2376,7 +2389,7 @@ def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfs def test_get_bruteforce_status_for_user( - admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str ): """Test users. @@ -2413,7 +2426,7 @@ def test_get_bruteforce_status_for_user( def test_clear_bruteforce_attempts_for_user( - admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str ): """Test users. @@ -2453,7 +2466,7 @@ def test_clear_bruteforce_attempts_for_user( def test_clear_bruteforce_attempts_for_all_users( - admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str ): """Test users. @@ -2503,8 +2516,8 @@ def test_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> None: admin.realm_name = realm assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()] assert ( - len([x["name"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"]) - == 1 + len([x["name"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"]) + == 1 ) @@ -2518,8 +2531,8 @@ def test_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> None: """ admin.realm_name = realm assert ( - admin.get_default_realm_role_id() - == [x["id"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"][0] + admin.get_default_realm_role_id() + == [x["id"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"][0] ) @@ -2605,7 +2618,7 @@ def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None: def test_initial_access_token( - admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str] + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str] ) -> None: """Test initial access token and client creation.