diff --git a/CHANGELOG.md b/CHANGELOG.md index f8f38c2..3cb72e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,22 @@ -## v3.3.0 (2023-06-28) +## v3.6.0 (2023-11-13) + +### Feat + +- add KeycloakAdmin.get_idp() (#478) + +## v3.5.0 (2023-11-13) + +### Feat + +- Update dynamic client using registration access token (#491) + +## v3.4.0 (2023-11-13) + +### Feat + +- add an optional search criteria to the get_realm_roles function (#504) + +## v3.3.0 (2023-06-27) ### Feat diff --git a/README.md b/README.md index 7addd26..c0f30f8 100644 --- a/README.md +++ b/README.md @@ -249,6 +249,9 @@ client = keycloak_admin.get_client(client_id="client_id") # Get all roles for the realm or client realm_roles = keycloak_admin.get_realm_roles() +# Get all roles for the realm or client that their names includes the searched text +realm_roles = keycloak_admin.get_realm_roles(search_text="CompanyA_") + # Get all roles for the client client_roles = keycloak_admin.get_client_roles(client_id="client_id") @@ -303,7 +306,7 @@ groups = keycloak_admin.get_groups() group = keycloak_admin.get_group(group_id='group_id') # Get group by name -group = keycloak_admin.get_group_by_path(path='/group/subgroup', search_in_subgroups=True) +group = keycloak_admin.get_group_by_path(path='/group/subgroup') # Function to trigger user sync from provider sync_users(storage_id="storage_di", action="action") @@ -337,9 +340,12 @@ keycloak_admin.get_client_roles_of_client_scope(client_id=another_client_id, cli # Remove client roles assigned to client's scope keycloak_admin.delete_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles) -# Get all ID Providers +# Get all IDP Providers idps = keycloak_admin.get_idps() +# Get a specific IDP Provider, using its alias +idp = keycloak_admin.get_idp("idp-alias") + # Create a new Realm keycloak_admin.create_realm(payload={"realm": "demo"}, skip_exists=False) diff --git a/pyproject.toml b/pyproject.toml index 5ee5ba7..cf9823a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ Documentation = "https://python-keycloak.readthedocs.io/en/latest/" [tool.poetry.dependencies] python = ">=3.7,<4.0" +setuptools = "*" requests = ">=2.20.0" python-jose = ">=3.3.0" mock = {version = "^4.0.3", optional = true} @@ -41,7 +42,7 @@ sphinx-rtd-theme = {version = "^1.0.0", optional = true} readthedocs-sphinx-ext = {version = "^2.1.9", optional = true} m2r2 = {version = "^0.3.2", optional = true} sphinx-autoapi = {version = "^2.0.0", optional = true} -requests-toolbelt = ">=1.0.0" +requests-toolbelt = "*" deprecation = ">=2.1.0" [tool.poetry.extras] diff --git a/src/keycloak/connection.py b/src/keycloak/connection.py index 3134676..63918fe 100644 --- a/src/keycloak/connection.py +++ b/src/keycloak/connection.py @@ -43,8 +43,8 @@ class ConnectionManager(object): :type headers: dict :param timeout: Timeout to use for requests to the server. :type timeout: int - :param verify: Verify server SSL. - :type verify: bool + :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use + :type verify: Union[bool,str] :param proxies: The proxies servers requests is sent by. :type proxies: dict """ @@ -58,8 +58,8 @@ class ConnectionManager(object): :type headers: dict :param timeout: Timeout to use for requests to the server. :type timeout: int - :param verify: Verify server SSL. - :type verify: bool + :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use + :type verify: Union[bool,str] :param proxies: The proxies servers requests is sent by. :type proxies: dict """ diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 6c27112..d9f65c0 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -63,8 +63,8 @@ class KeycloakAdmin: :type realm_name: str :param client_id: client id :type client_id: str - :param verify: True if want check connection SSL - :type verify: bool + :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use + :type verify: Union[bool,str] :param client_secret_key: client secret key (optional, required only for access type confidential) :type client_secret_key: str @@ -84,7 +84,7 @@ class KeycloakAdmin: PAGE_SIZE = 100 _auto_refresh_token = None - _connection = None + _connection: Optional[KeycloakOpenIDConnection] = None def __init__( self, @@ -119,8 +119,8 @@ class KeycloakAdmin: :type realm_name: str :param client_id: client id :type client_id: str - :param verify: True if want check connection SSL - :type verify: bool + :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use + :type verify: Union[bool,str] :param client_secret_key: client secret key (optional, required only for access type confidential) :type client_secret_key: str @@ -204,7 +204,7 @@ class KeycloakAdmin: self.connection.realm_name = value @property - def connection(self): + def connection(self) -> KeycloakOpenIDConnection: """Get connection. :returns: Connection manager @@ -213,7 +213,7 @@ class KeycloakAdmin: return self._connection @connection.setter - def connection(self, value): + def connection(self, value: KeycloakOpenIDConnection) -> None: self._connection = value @property @@ -532,6 +532,29 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + def partial_import_realm(self, realm_name, payload): + """Partial import realm configuration from PartialImportRepresentation. + + Realm partialImport is used for modifying configuration of existing realm. + + PartialImportRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/#_partialimportrepresentation + + :param realm_name: Realm name (not the realm id) + :type realm_name: str + :param payload: PartialImportRepresentation + :type payload: dict + + :return: PartialImportResponse + :rtype: dict + """ + params_path = {"realm-name": realm_name} + data_raw = self.connection.raw_post( + urls_patterns.URL_ADMIN_REALM_PARTIAL_IMPORT.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200]) + def export_realm(self, export_clients=False, export_groups_and_role=False): """Export the realm configurations in the json format. @@ -773,6 +796,23 @@ class KeycloakAdmin: data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) + def get_idp(self, idp_alias): + """Get IDP provider. + + Get the representation of a specific IDP Provider. + + IdentityProviderRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_identityproviderrepresentation + + :param: idp_alias: alias for IdP to get + :type idp_alias: str + :return: IdentityProviderRepresentation + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias} + data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDP.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + def delete_idp(self, idp_alias): """Delete an ID Provider. @@ -1314,7 +1354,9 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name, "path": path} - data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path)) + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path) + ) return raise_error_from_response(data_raw, KeycloakGetError) def create_group(self, payload, parent=None, skip_exists=False): @@ -1775,7 +1817,7 @@ class KeycloakAdmin: """ params_path = {"realm-name": self.realm_name, "id": client_id} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION.format(**params_path), data=json.dumps(payload), ) @@ -2147,7 +2189,7 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) - def get_realm_roles(self, brief_representation=True): + def get_realm_roles(self, brief_representation=True, search_text=""): """Get all roles for the realm or client. RoleRepresentation @@ -2155,16 +2197,54 @@ class KeycloakAdmin: :param brief_representation: whether to omit role attributes in the response :type brief_representation: bool + :param search_text: optional search text to limit the returned result. + :type search_text: str :return: Keycloak server response (RoleRepresentation) :rtype: list """ + url = urls_patterns.URL_ADMIN_REALM_ROLES params_path = {"realm-name": self.connection.realm_name} params = {"briefRepresentation": brief_representation} data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), **params ) + + # set the search_text path param, if it is a valid string + if search_text is not None and search_text.strip() != "": + params_path["search-text"] = search_text + url = urls_patterns.URL_ADMIN_REALM_ROLES_SEARCH + + data_raw = self.connection.raw_get(url.format(**params_path), **params) return raise_error_from_response(data_raw, KeycloakGetError) + def get_realm_role_groups(self, role_name, query=None, brief_representation=True): + """Get role groups of realm by role name. + + :param role_name: Name of the role. + :type role_name: str + :param query: Additional Query parameters + (see https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_parameters_226) + :type query: dict + :param brief_representation: whether to omit role attributes in the response + :type brief_representation: bool + :return: Keycloak Server Response (GroupRepresentation) + :rtype: list + """ + query = query or {} + + params = {"briefRepresentation": brief_representation} + + query.update(params) + + params_path = {"realm-name": self.connection.realm_name, "role-name": role_name} + + url = urls_patterns.URL_ADMIN_REALM_ROLES_GROUPS.format(**params_path) + + if "first" in query or "max" in query: + return self.__fetch_paginated(url, query) + + return self.__fetch_all(url, query) + def get_realm_role_members(self, role_name, query=None): """Get role members of realm by role name. @@ -2526,6 +2606,23 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakGetError) + def get_realm_role_by_id(self, role_id: str): + """Get realm role by role id. + + RoleRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_rolerepresentation + + :param role_id: role's id, not name! + :type role_id: str + :return: role + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "role-id": role_id} + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_ID.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + def update_realm_role(self, role_name, payload): """Update a role for the realm by name. @@ -4147,6 +4244,36 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakGetError) + def create_client_authz_scope_permission(self, payload, client_id): + """Create permissions for a authz scope. + + Payload example:: + + payload={ + "name": "My Permission Name", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [some_resource_id], + "scopes": [some_scope_id], + "policies": [some_policy_id], + } + + :param payload: No Document + :type payload: dict + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.realm_name, "id": client_id} + data_raw = self.raw_post( + urls_patterns.URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[201]) + def update_client_authz_scope_permission(self, payload, client_id, scope_id): """Update permissions for a given scope. @@ -4388,7 +4515,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLEAR_KEYS_CACHE.format(**params_path), data="" ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) @@ -4400,7 +4527,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLEAR_REALM_CACHE.format(**params_path), data="" ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) @@ -4412,7 +4539,7 @@ class KeycloakAdmin: :rtype: dict """ params_path = {"realm-name": self.connection.realm_name} - data_raw = self.raw_post( + data_raw = self.connection.raw_post( urls_patterns.URL_ADMIN_CLEAR_USER_CACHE.format(**params_path), data="" ) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index f689c37..c359980 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -41,6 +41,7 @@ from .exceptions import ( KeycloakGetError, KeycloakInvalidTokenError, KeycloakPostError, + KeycloakPutError, KeycloakRPTNotFound, raise_error_from_response, ) @@ -49,6 +50,7 @@ from .urls_patterns import ( URL_AUTH, URL_CERTS, URL_CLIENT_REGISTRATION, + URL_CLIENT_UPDATE, URL_ENTITLEMENT, URL_INTROSPECT, URL_LOGOUT, @@ -66,7 +68,7 @@ class KeycloakOpenID: :param client_id: client id :param realm_name: realm name :param client_secret_key: client secret key - :param verify: True if want check connection SSL + :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use :param custom_headers: dict of custom header to pass to each HTML request :param proxies: dict of proxies to sent the request by. :param timeout: connection timeout in seconds @@ -93,8 +95,8 @@ class KeycloakOpenID: :type realm_name: str :param client_secret_key: client secret key :type client_secret_key: str - :param verify: True if want check connection SSL - :type verify: bool + :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use + :type verify: Union[bool,str] :param custom_headers: dict of custom header to pass to each HTML request :type custom_headers: dict :param proxies: dict of proxies to sent the request by. @@ -343,7 +345,7 @@ class KeycloakOpenID: def exchange_token( self, token: str, - audience: str, + audience: Optional[str] = None, subject: Optional[str] = None, subject_token_type: Optional[str] = None, subject_issuer: Optional[str] = None, @@ -711,3 +713,31 @@ class KeycloakOpenID: URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPostError) + + def update_client(self, token: str, client_id: str, payload: dict): + """Update a client. + + ClientRepresentation: + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + + :param token: registration access token + :type token: str + :param client_id: Keycloak client id + :type client_id: str + :param payload: ClientRepresentation + :type payload: dict + :return: Client Representation + :rtype: dict + """ + params_path = {"realm-name": self.realm_name, "client-id": client_id} + self.connection.add_param_headers("Authorization", "Bearer " + token) + self.connection.add_param_headers("Content-Type", "application/json") + + # Keycloak complains if the clientId is not set in the payload + if "clientId" not in payload: + payload["clientId"] = client_id + + data_raw = self.connection.raw_put( + URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError) diff --git a/src/keycloak/openid_connection.py b/src/keycloak/openid_connection.py index 4b9a532..c7141fe 100644 --- a/src/keycloak/openid_connection.py +++ b/src/keycloak/openid_connection.py @@ -54,6 +54,7 @@ class KeycloakOpenIDConnection(ConnectionManager): _custom_headers = None _user_realm_name = None _expires_at = None + _keycloak_openid = None def __init__( self, @@ -86,8 +87,8 @@ class KeycloakOpenIDConnection(ConnectionManager): :type realm_name: str :param client_id: client id :type client_id: str - :param verify: True if want check connection SSL - :type verify: bool + :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use + :type verify: Union[bool,str] :param client_secret_key: client secret key (optional, required only for access type confidential) :type client_secret_key: str @@ -275,27 +276,39 @@ class KeycloakOpenIDConnection(ConnectionManager): # merge custom headers to main headers self.headers.update(self.custom_headers) + @property + def keycloak_openid(self) -> KeycloakOpenID: + """Get the KeycloakOpenID object. + + The KeycloakOpenID is used to refresh tokens + + :returns: KeycloakOpenID + :rtype: KeycloakOpenID + """ + if self._keycloak_openid is None: + if self.user_realm_name: + token_realm_name = self.user_realm_name + elif self.realm_name: + token_realm_name = self.realm_name + else: + token_realm_name = "master" + + self._keycloak_openid = KeycloakOpenID( + server_url=self.server_url, + client_id=self.client_id, + realm_name=token_realm_name, + verify=self.verify, + client_secret_key=self.client_secret_key, + timeout=self.timeout, + ) + + return self._keycloak_openid + def get_token(self): """Get admin token. The admin token is then set in the `token` attribute. """ - if self.user_realm_name: - token_realm_name = self.user_realm_name - elif self.realm_name: - token_realm_name = self.realm_name - else: - token_realm_name = "master" - - self.keycloak_openid = KeycloakOpenID( - server_url=self.server_url, - client_id=self.client_id, - realm_name=token_realm_name, - verify=self.verify, - client_secret_key=self.client_secret_key, - timeout=self.timeout, - ) - grant_type = [] if self.client_secret_key: grant_type.append("client_credentials") diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py index a433444..e6cbfa0 100644 --- a/src/keycloak/urls_patterns.py +++ b/src/keycloak/urls_patterns.py @@ -126,6 +126,7 @@ URL_ADMIN_CLIENT_AUTHZ_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/{policy-id}" URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/scopes" URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/resources" URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope/{scope-id}" +URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope?max=-1" URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/client" URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user" @@ -140,13 +141,16 @@ URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER = URL_ADMIN_CLIENT_SCOPE + "/protocol-mappers URL_ADMIN_CLIENT_SCOPES_MAPPERS = URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER + "/{protocol-mapper-id}" URL_ADMIN_REALM_ROLES = "admin/realms/{realm-name}/roles" +URL_ADMIN_REALM_ROLES_SEARCH = URL_ADMIN_REALM_ROLES + "?search={search-text}" URL_ADMIN_REALM_ROLES_MEMBERS = URL_ADMIN_REALM_ROLES + "/{role-name}/users" +URL_ADMIN_REALM_ROLES_GROUPS = URL_ADMIN_REALM_ROLES + "/{role-name}/groups" URL_ADMIN_REALMS = "admin/realms" URL_ADMIN_REALM = "admin/realms/{realm-name}" URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances" URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers" URL_ADMIN_IDP_MAPPER_UPDATE = URL_ADMIN_IDP_MAPPERS + "/{mapper-id}" URL_ADMIN_IDP = "admin/realms/{realm-name}/identity-provider/instances/{alias}" +URL_ADMIN_REALM_ROLES_ROLE_BY_ID = URL_ADMIN_REALM + "/roles-by-id/{role-id}" URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}" URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = ( "admin/realms/{realm-name}/roles/{role-name}/composites" @@ -156,6 +160,8 @@ URL_ADMIN_REALM_EXPORT = ( + "exportGroupsAndRoles={export-groups-and-roles}" ) +URL_ADMIN_REALM_PARTIAL_IMPORT = "admin/realms/{realm-name}/partialImport" + URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes" URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE = URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES + "/{id}" URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-optional-client-scopes" diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index e78f51b..505f01d 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -216,6 +216,53 @@ def test_import_export_realms(admin: KeycloakAdmin, realm: str): assert err.match('500: b\'{"error":"unknown_error"}\'') +def test_partial_import_realm(admin: KeycloakAdmin, realm: str): + """Test partial import of realm configuration. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param realm: Keycloak realm + :type realm: str + """ + test_realm_role = str(uuid.uuid4()) + test_user = str(uuid.uuid4()) + test_client = str(uuid.uuid4()) + + admin.realm_name = realm + client_id = admin.create_client(payload={"name": test_client, "clientId": test_client}) + + realm_export = admin.export_realm(export_clients=True, export_groups_and_role=False) + + client_config = [ + client_entry for client_entry in realm_export["clients"] if client_entry["id"] == client_id + ][0] + + # delete before partial import + admin.delete_client(client_id) + + payload = { + "ifResourceExists": "SKIP", + "id": realm_export["id"], + "realm": realm, + "clients": [client_config], + "roles": {"realm": [{"name": test_realm_role}]}, + "users": [{"username": test_user, "email": f"{test_user}@test.test"}], + } + + # check add + res = admin.partial_import_realm(realm_name=realm, payload=payload) + assert res["added"] == 3 + + # check skip + res = admin.partial_import_realm(realm_name=realm, payload=payload) + assert res["skipped"] == 3 + + # check overwrite + payload["ifResourceExists"] = "OVERWRITE" + res = admin.partial_import_realm(realm_name=realm, payload=payload) + assert res["overwritten"] == 3 + + def test_users(admin: KeycloakAdmin, realm: str): """Test users. @@ -395,6 +442,18 @@ def test_idps(admin: KeycloakAdmin, realm: str): assert len(idps) == 1 assert "github" == idps[0]["alias"] + # Test get idp + idp = admin.get_idp("github") + assert "github" == idp["alias"] + assert idp.get("config") + assert "test" == idp["config"]["clientId"] + assert "**********" == idp["config"]["clientSecret"] + + # Test get idp fail + with pytest.raises(KeycloakGetError) as err: + admin.get_idp("does-not-exist") + assert err.match('404: b\'{"error":"HTTP 404 Not Found"}\'') + # Test IdP update res = admin.update_idp(idp_alias="github", payload=idps[0]) @@ -548,6 +607,7 @@ def test_server_info(admin: KeycloakAdmin): "passwordPolicies", "enums", "cryptoInfo", + "features", } ), info.keys() @@ -1103,6 +1163,12 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str): assert "uma_authorization" in role_names, role_names assert "offline_access" in role_names, role_names + # Test get realm roles with search text + searched_roles = admin.get_realm_roles(search_text="uma_a") + searched_role_names = [x["name"] for x in searched_roles] + assert "uma_authorization" in searched_role_names, searched_role_names + assert "offline_access" not in searched_role_names, searched_role_names + # Test empty members with pytest.raises(KeycloakGetError) as err: admin.get_realm_role_members(role_name="does-not-exist") @@ -1119,6 +1185,11 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str): role_id_2 = admin.create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True) assert role_id == role_id_2 + # Test get realm role by its id + role_id = admin.get_realm_role(role_name="test-realm-role")["id"] + res = admin.get_realm_role_by_id(role_id) + assert res["name"] == "test-realm-role" + # Test update realm role res = admin.update_realm_role( role_name="test-realm-role", payload={"name": "test-realm-role-update"} @@ -1240,6 +1311,14 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str): res = admin.get_composite_realm_roles_of_role(role_name=composite_role) assert len(res) == 0 + # Test realm role group list + res = admin.get_realm_role_groups(role_name="test-realm-role-update") + assert len(res) == 1 + assert res[0]["id"] == group_id + with pytest.raises(KeycloakGetError) as err: + admin.get_realm_role_groups(role_name="non-existent-role") + assert err.match('404: b\'{"error":"Could not find role"}\'') + # Test delete realm role res = admin.delete_realm_role(role_name=composite_role) assert res == dict(), res @@ -1800,6 +1879,31 @@ def test_enable_token_exchange(admin: KeycloakAdmin, realm: str): scope_id=token_exchange_permission_id, ) + # Create permissions on the target client to reference this policy + admin.create_client_authz_scope_permission( + payload={ + "id": token_exchange_permission_id, + "name": "test-permission", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [token_exchange_resource_id], + "scopes": [token_exchange_scope_id], + "policies": [client_policy_id], + }, + client_id=realm_management_id, + ) + permission_name = admin.get_client_authz_scope_permission( + client_id=realm_management_id, scope_id=token_exchange_permission_id + )["name"] + assert permission_name == "test-permission" + with pytest.raises(KeycloakPostError) as err: + admin.create_client_authz_scope_permission( + payload={"name": "test-permission", "scopes": [token_exchange_scope_id]}, + client_id="realm_management_id", + ) + assert err.match('404: b\'{"errorMessage":"Could not find client"}\'') + def test_email(admin: KeycloakAdmin, user: str): """Test email. @@ -1869,7 +1973,19 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str): admin.realm_name = realm res = admin.get_authentication_flows() - assert len(res) == 8 or len(res) == 7, res + default_flows = len(res) + assert {x["alias"] for x in res}.issubset( + { + "reset credentials", + "browser", + "registration", + "http challenge", + "docker auth", + "direct grant", + "first broker login", + "clients", + } + ) assert set(res[0].keys()) == { "alias", "authenticationExecutions", @@ -1879,24 +1995,6 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str): "providerId", "topLevel", } - assert {x["alias"] for x in res} == { - "reset credentials", - "browser", - "http challenge", - "registration", - "docker auth", - "direct grant", - "first broker login", - "clients", - } or {x["alias"] for x in res} == { - "reset credentials", - "browser", - "registration", - "docker auth", - "direct grant", - "first broker login", - "clients", - } with pytest.raises(KeycloakGetError) as err: admin.get_authentication_flow_for_id(flow_id="bad") @@ -1912,7 +2010,7 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str): res = admin.copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser") assert res == b"", res - assert len(admin.get_authentication_flows()) == 9 or len(admin.get_authentication_flows()) == 8 + assert len(admin.get_authentication_flows()) == (default_flows + 1) # Test create res = admin.create_authentication_flow( @@ -2031,7 +2129,7 @@ def test_authentication_configs(admin: KeycloakAdmin, realm: str): # Test list of auth providers res = admin.get_authenticator_providers() - assert len(res) == 38 or len(res) == 35 + assert len(res) > 1 res = admin.get_authenticator_provider_config_description(provider_id="auth-cookie") assert res == { @@ -2772,7 +2870,7 @@ def test_initial_access_token( res = oid.register_client( token=res["token"], payload={ - "name": client, + "name": "DynamicRegisteredClient", "clientId": client, "enabled": True, "publicClient": False, @@ -2782,3 +2880,7 @@ def test_initial_access_token( }, ) assert res["clientId"] == client + + new_secret = str(uuid.uuid4()) + res = oid.update_client(res["registrationAccessToken"], client, payload={"secret": new_secret}) + assert res["secret"] == new_secret