diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index fba28c2..d754cd4 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -2343,6 +2343,23 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + def get_mappers_from_client(self, client_id): + """ + List of all client mappers. + + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_protocolmapperrepresentation + + :param client_id: Client id + :returns: KeycloakServerResponse (list of ProtocolMapperRepresentation) + """ + params_path = {"realm-name": self.realm_name, "id": client_id} + + data_raw = self.raw_get( + urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path) + ) + + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200]) + def add_mapper_to_client(self, client_id, payload): """ Add a mapper to a client @@ -2373,7 +2390,7 @@ class KeycloakAdmin: params_path = { "realm-name": self.realm_name, - "id": self.client_id, + "id": client_id, "protocol-mapper-id": mapper_id, } @@ -2444,6 +2461,7 @@ class KeycloakAdmin: :param query: Query parameters (optional) :return: components list """ + query = query or dict() params_path = {"realm-name": self.realm_name} data_raw = self.raw_get( urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=None, **query @@ -2458,15 +2476,15 @@ class KeycloakAdmin: https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_componentrepresentation :param payload: ComponentRepresentation - - :return: UserRepresentation + :return: Component id """ params_path = {"realm-name": self.realm_name} - data_raw = self.raw_post( urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=json.dumps(payload) ) - return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 def get_component(self, component_id): """ @@ -2533,6 +2551,7 @@ class KeycloakAdmin: :return: events list """ + query = query or dict() params_path = {"realm-name": self.realm_name} data_raw = self.raw_get( urls_patterns.URL_ADMIN_EVENTS.format(**params_path), data=None, **query @@ -2550,7 +2569,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name} data_raw = self.raw_put( - urls_patterns.URL_ADMIN_EVENTS.format(**params_path), data=json.dumps(payload) + urls_patterns.URL_ADMIN_EVENTS_CONFIG.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py index 071c733..d836ed4 100644 --- a/src/keycloak/urls_patterns.py +++ b/src/keycloak/urls_patterns.py @@ -166,4 +166,5 @@ URL_ADMIN_USER_FEDERATED_IDENTITY = ( ) URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events" +URL_ADMIN_EVENTS_CONFIG = URL_ADMIN_EVENTS + "/config" URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats" diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 58d298f..d927aae 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -4,6 +4,7 @@ import keycloak from keycloak import KeycloakAdmin from keycloak.connection import ConnectionManager from keycloak.exceptions import ( + KeycloakAuthenticationError, KeycloakDeleteError, KeycloakGetError, KeycloakPostError, @@ -54,6 +55,59 @@ def test_keycloak_admin_init(env): assert admin.auto_refresh_token == list(), admin.auto_refresh_token assert admin.user_realm_name is None, admin.user_realm_name assert admin.custom_headers is None, admin.custom_headers + assert admin.token + + admin = KeycloakAdmin( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + username=env.KEYCLOAK_ADMIN, + password=env.KEYCLOAK_ADMIN_PASSWORD, + realm_name=None, + user_realm_name="master", + ) + assert admin.token + admin = KeycloakAdmin( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + username=env.KEYCLOAK_ADMIN, + password=env.KEYCLOAK_ADMIN_PASSWORD, + realm_name=None, + user_realm_name=None, + ) + assert admin.token + + admin.create_realm(payload={"realm": "authz", "enabled": True}) + admin.realm_name = "authz" + admin.create_client( + payload={ + "name": "authz-client", + "clientId": "authz-client", + "authorizationServicesEnabled": True, + "serviceAccountsEnabled": True, + "clientAuthenticatorType": "client-secret", + "directAccessGrantsEnabled": False, + "enabled": True, + "implicitFlowEnabled": False, + "publicClient": False, + } + ) + secret = admin.generate_client_secrets(client_id=admin.get_client_id("authz-client")) + assert KeycloakAdmin( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + user_realm_name="authz", + client_id="authz-client", + client_secret_key=secret["value"], + ).token + admin.delete_realm(realm_name="authz") + + assert ( + KeycloakAdmin( + server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", + username=None, + password=None, + client_secret_key=None, + custom_headers={"custom": "header"}, + ).token + is None + ) def test_realms(admin: KeycloakAdmin): @@ -573,6 +627,47 @@ def test_clients(admin: KeycloakAdmin, realm: str): admin.update_client(client_id="does-not-exist", payload={"name": "test-client-change"}) assert err.match('404: b\'{"error":"Could not find client"}\'') + # Test client mappers + res = admin.get_mappers_from_client(client_id=client_id) + assert len(res) == 0 + + with pytest.raises(KeycloakPostError) as err: + admin.add_mapper_to_client(client_id="does-not-exist", payload=dict()) + assert err.match('404: b\'{"error":"Could not find client"}\'') + + res = admin.add_mapper_to_client( + client_id=client_id, + payload={ + "name": "test-mapper", + "protocol": "openid-connect", + "protocolMapper": "oidc-usermodel-attribute-mapper", + }, + ) + assert res == b"" + assert len(admin.get_mappers_from_client(client_id=client_id)) == 1 + + mapper = admin.get_mappers_from_client(client_id=client_id)[0] + with pytest.raises(KeycloakPutError) as err: + admin.update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict()) + assert err.match('404: b\'{"error":"Model not found"}\'') + mapper["config"]["user.attribute"] = "test" + res = admin.update_client_mapper(client_id=client_id, mapper_id=mapper["id"], payload=mapper) + assert res == dict() + + res = admin.remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) + assert res == dict() + with pytest.raises(KeycloakDeleteError) as err: + admin.remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) + assert err.match('404: b\'{"error":"Model not found"}\'') + + # Test client sessions + with pytest.raises(KeycloakGetError) as err: + admin.get_client_all_sessions(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client"}\'') + + assert admin.get_client_all_sessions(client_id=client_id) == list() + assert admin.get_client_sessions_stats() == list() + # Test authz auth_client_id = admin.create_client( payload={ @@ -703,6 +798,42 @@ def test_clients(admin: KeycloakAdmin, realm: str): admin.delete_client(client_id=auth_client_id) assert err.match('404: b\'{"error":"Could not find client"}\'') + # Test client credentials + admin.create_client( + payload={ + "name": "test-confidential", + "enabled": True, + "protocol": "openid-connect", + "publicClient": False, + "redirectUris": ["http://localhost/*"], + "webOrigins": ["+"], + "clientId": "test-confidential", + "secret": "test-secret", + "clientAuthenticatorType": "client-secret", + } + ) + with pytest.raises(KeycloakGetError) as err: + admin.get_client_secrets(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client"}\'') + + secrets = admin.get_client_secrets( + client_id=admin.get_client_id(client_name="test-confidential") + ) + assert secrets == {"type": "secret", "value": "test-secret"} + + with pytest.raises(KeycloakPostError) as err: + admin.generate_client_secrets(client_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find client"}\'') + + res = admin.generate_client_secrets( + client_id=admin.get_client_id(client_name="test-confidential") + ) + assert res + assert ( + admin.get_client_secrets(client_id=admin.get_client_id(client_name="test-confidential")) + == res + ) + def test_realm_roles(admin: KeycloakAdmin, realm: str): admin.realm_name = realm @@ -1382,3 +1513,166 @@ def test_client_scopes(admin: KeycloakAdmin, realm: str): with pytest.raises(KeycloakDeleteError) as err: admin.delete_client_scope(client_scope_id=res) assert err.match('404: b\'{"error":"Could not find client scope"}\'') + + +def test_components(admin: KeycloakAdmin, realm: str): + admin.realm_name = realm + + # Test get components + res = admin.get_components() + assert len(res) == 12 + + with pytest.raises(KeycloakGetError) as err: + admin.get_component(component_id="does-not-exist") + assert err.match('404: b\'{"error":"Could not find component"}\'') + + res_get = admin.get_component(component_id=res[0]["id"]) + assert res_get == res[0] + + # Test create component + with pytest.raises(KeycloakPostError) as err: + admin.create_component(payload={"bad": "dict"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + res = admin.create_component( + payload={ + "name": "Test Component", + "providerId": "max-clients", + "providerType": "org.keycloak.services.clientregistration." + + "policy.ClientRegistrationPolicy", + "config": {"max-clients": ["1000"]}, + } + ) + assert res + assert admin.get_component(component_id=res)["name"] == "Test Component" + + # Test update component + component = admin.get_component(component_id=res) + component["name"] = "Test Component Update" + + with pytest.raises(KeycloakPutError) as err: + admin.update_component(component_id="does-not-exist", payload=dict()) + assert err.match('404: b\'{"error":"Could not find component"}\'') + res_upd = admin.update_component(component_id=res, payload=component) + assert res_upd == dict() + assert admin.get_component(component_id=res)["name"] == "Test Component Update" + + # Test delete component + res_del = admin.delete_component(component_id=res) + assert res_del == dict() + with pytest.raises(KeycloakDeleteError) as err: + admin.delete_component(component_id=res) + assert err.match('404: b\'{"error":"Could not find component"}\'') + + +def test_keys(admin: KeycloakAdmin, realm: str): + admin.realm_name = realm + assert set(admin.get_keys()["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} + assert {k["algorithm"] for k in admin.get_keys()["keys"]} == { + "HS256", + "RSA-OAEP", + "AES", + "RS256", + } + + +def test_events(admin: KeycloakAdmin, realm: str): + admin.realm_name = realm + + events = admin.get_events() + assert events == list() + + with pytest.raises(KeycloakPutError) as err: + admin.set_events(payload={"bad": "conf"}) + assert err.match('400: b\'{"error":"Unrecognized field') + + res = admin.set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True}) + assert res == dict() + + admin.create_client(payload={"name": "test", "clientId": "test"}) + + events = admin.get_events() + assert events == list() + + +def test_auto_refresh(admin: KeycloakAdmin, realm: str): + # Test get refresh + admin.auto_refresh_token = list() + admin.connection = ConnectionManager( + base_url=admin.server_url, + headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, + timeout=60, + verify=admin.verify, + ) + + with pytest.raises(KeycloakAuthenticationError) as err: + admin.get_realm(realm_name=realm) + assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') + + admin.auto_refresh_token = ["get"] + del admin.token["refresh_token"] + assert admin.get_realm(realm_name=realm) + + # Test bad refresh token + admin.connection = ConnectionManager( + base_url=admin.server_url, + headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, + timeout=60, + verify=admin.verify, + ) + admin.token["refresh_token"] = "bad" + with pytest.raises(KeycloakGetError) as err: + admin.get_realm(realm_name="test-refresh") + assert err.match( + '400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\'' + ) + admin.realm_name = "master" + admin.get_token() + admin.realm_name = realm + + # Test post refresh + admin.connection = ConnectionManager( + base_url=admin.server_url, + headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, + timeout=60, + verify=admin.verify, + ) + with pytest.raises(KeycloakAuthenticationError) as err: + admin.create_realm(payload={"realm": "test-refresh"}) + assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') + + admin.auto_refresh_token = ["get", "post"] + admin.realm_name = "master" + admin.user_logout(user_id=admin.get_user_id(username=admin.username)) + assert admin.create_realm(payload={"realm": "test-refresh"}) == b"" + admin.realm_name = realm + + # Test update refresh + admin.connection = ConnectionManager( + base_url=admin.server_url, + headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, + timeout=60, + verify=admin.verify, + ) + with pytest.raises(KeycloakAuthenticationError) as err: + admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) + assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') + + admin.auto_refresh_token = ["get", "post", "put"] + assert ( + admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict() + ) + + # Test delete refresh + admin.connection = ConnectionManager( + base_url=admin.server_url, + headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, + timeout=60, + verify=admin.verify, + ) + with pytest.raises(KeycloakAuthenticationError) as err: + admin.delete_realm(realm_name="test-refresh") + assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') + + admin.auto_refresh_token = ["get", "post", "put", "delete"] + assert admin.delete_realm(realm_name="test-refresh") == dict()