Browse Source

Merge branch 'master' into feat/support-admin-events

pull/474/head
Richard Nemeth 12 months ago
committed by GitHub
parent
commit
616b644f6b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      CHANGELOG.md
  2. 10
      README.md
  3. 3
      pyproject.toml
  4. 8
      src/keycloak/connection.py
  5. 153
      src/keycloak/keycloak_admin.py
  6. 38
      src/keycloak/keycloak_openid.py
  7. 25
      src/keycloak/openid_connection.py
  8. 6
      src/keycloak/urls_patterns.py
  9. 146
      tests/test_keycloak_admin.py

20
CHANGELOG.md

@ -1,4 +1,22 @@
## v3.3.0 (2023-06-28)
## v3.6.0 (2023-11-13)
### Feat
- add KeycloakAdmin.get_idp() (#478)
## v3.5.0 (2023-11-13)
### Feat
- Update dynamic client using registration access token (#491)
## v3.4.0 (2023-11-13)
### Feat
- add an optional search criteria to the get_realm_roles function (#504)
## v3.3.0 (2023-06-27)
### Feat ### Feat

10
README.md

@ -249,6 +249,9 @@ client = keycloak_admin.get_client(client_id="client_id")
# Get all roles for the realm or client # Get all roles for the realm or client
realm_roles = keycloak_admin.get_realm_roles() realm_roles = keycloak_admin.get_realm_roles()
# Get all roles for the realm or client that their names includes the searched text
realm_roles = keycloak_admin.get_realm_roles(search_text="CompanyA_")
# Get all roles for the client # Get all roles for the client
client_roles = keycloak_admin.get_client_roles(client_id="client_id") client_roles = keycloak_admin.get_client_roles(client_id="client_id")
@ -303,7 +306,7 @@ groups = keycloak_admin.get_groups()
group = keycloak_admin.get_group(group_id='group_id') group = keycloak_admin.get_group(group_id='group_id')
# Get group by name # Get group by name
group = keycloak_admin.get_group_by_path(path='/group/subgroup', search_in_subgroups=True)
group = keycloak_admin.get_group_by_path(path='/group/subgroup')
# Function to trigger user sync from provider # Function to trigger user sync from provider
sync_users(storage_id="storage_di", action="action") sync_users(storage_id="storage_di", action="action")
@ -337,9 +340,12 @@ keycloak_admin.get_client_roles_of_client_scope(client_id=another_client_id, cli
# Remove client roles assigned to client's scope # Remove client roles assigned to client's scope
keycloak_admin.delete_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles) keycloak_admin.delete_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles)
# Get all ID Providers
# Get all IDP Providers
idps = keycloak_admin.get_idps() idps = keycloak_admin.get_idps()
# Get a specific IDP Provider, using its alias
idp = keycloak_admin.get_idp("idp-alias")
# Create a new Realm # Create a new Realm
keycloak_admin.create_realm(payload={"realm": "demo"}, skip_exists=False) keycloak_admin.create_realm(payload={"realm": "demo"}, skip_exists=False)

3
pyproject.toml

@ -30,6 +30,7 @@ Documentation = "https://python-keycloak.readthedocs.io/en/latest/"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.7,<4.0" python = ">=3.7,<4.0"
setuptools = "*"
requests = ">=2.20.0" requests = ">=2.20.0"
python-jose = ">=3.3.0" python-jose = ">=3.3.0"
mock = {version = "^4.0.3", optional = true} mock = {version = "^4.0.3", optional = true}
@ -41,7 +42,7 @@ sphinx-rtd-theme = {version = "^1.0.0", optional = true}
readthedocs-sphinx-ext = {version = "^2.1.9", optional = true} readthedocs-sphinx-ext = {version = "^2.1.9", optional = true}
m2r2 = {version = "^0.3.2", optional = true} m2r2 = {version = "^0.3.2", optional = true}
sphinx-autoapi = {version = "^2.0.0", optional = true} sphinx-autoapi = {version = "^2.0.0", optional = true}
requests-toolbelt = ">=1.0.0"
requests-toolbelt = "*"
deprecation = ">=2.1.0" deprecation = ">=2.1.0"
[tool.poetry.extras] [tool.poetry.extras]

8
src/keycloak/connection.py

@ -43,8 +43,8 @@ class ConnectionManager(object):
:type headers: dict :type headers: dict
:param timeout: Timeout to use for requests to the server. :param timeout: Timeout to use for requests to the server.
:type timeout: int :type timeout: int
:param verify: Verify server SSL.
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param proxies: The proxies servers requests is sent by. :param proxies: The proxies servers requests is sent by.
:type proxies: dict :type proxies: dict
""" """
@ -58,8 +58,8 @@ class ConnectionManager(object):
:type headers: dict :type headers: dict
:param timeout: Timeout to use for requests to the server. :param timeout: Timeout to use for requests to the server.
:type timeout: int :type timeout: int
:param verify: Verify server SSL.
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param proxies: The proxies servers requests is sent by. :param proxies: The proxies servers requests is sent by.
:type proxies: dict :type proxies: dict
""" """

153
src/keycloak/keycloak_admin.py

@ -63,8 +63,8 @@ class KeycloakAdmin:
:type realm_name: str :type realm_name: str
:param client_id: client id :param client_id: client id
:type client_id: str :type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key :param client_secret_key: client secret key
(optional, required only for access type confidential) (optional, required only for access type confidential)
:type client_secret_key: str :type client_secret_key: str
@ -84,7 +84,7 @@ class KeycloakAdmin:
PAGE_SIZE = 100 PAGE_SIZE = 100
_auto_refresh_token = None _auto_refresh_token = None
_connection = None
_connection: Optional[KeycloakOpenIDConnection] = None
def __init__( def __init__(
self, self,
@ -119,8 +119,8 @@ class KeycloakAdmin:
:type realm_name: str :type realm_name: str
:param client_id: client id :param client_id: client id
:type client_id: str :type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key :param client_secret_key: client secret key
(optional, required only for access type confidential) (optional, required only for access type confidential)
:type client_secret_key: str :type client_secret_key: str
@ -204,7 +204,7 @@ class KeycloakAdmin:
self.connection.realm_name = value self.connection.realm_name = value
@property @property
def connection(self):
def connection(self) -> KeycloakOpenIDConnection:
"""Get connection. """Get connection.
:returns: Connection manager :returns: Connection manager
@ -213,7 +213,7 @@ class KeycloakAdmin:
return self._connection return self._connection
@connection.setter @connection.setter
def connection(self, value):
def connection(self, value: KeycloakOpenIDConnection) -> None:
self._connection = value self._connection = value
@property @property
@ -532,6 +532,29 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def partial_import_realm(self, realm_name, payload):
"""Partial import realm configuration from PartialImportRepresentation.
Realm partialImport is used for modifying configuration of existing realm.
PartialImportRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/#_partialimportrepresentation
:param realm_name: Realm name (not the realm id)
:type realm_name: str
:param payload: PartialImportRepresentation
:type payload: dict
:return: PartialImportResponse
:rtype: dict
"""
params_path = {"realm-name": realm_name}
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_REALM_PARTIAL_IMPORT.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200])
def export_realm(self, export_clients=False, export_groups_and_role=False): def export_realm(self, export_clients=False, export_groups_and_role=False):
"""Export the realm configurations in the json format. """Export the realm configurations in the json format.
@ -773,6 +796,23 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path)) data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def get_idp(self, idp_alias):
"""Get IDP provider.
Get the representation of a specific IDP Provider.
IdentityProviderRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_identityproviderrepresentation
:param: idp_alias: alias for IdP to get
:type idp_alias: str
:return: IdentityProviderRepresentation
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias}
data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDP.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def delete_idp(self, idp_alias): def delete_idp(self, idp_alias):
"""Delete an ID Provider. """Delete an ID Provider.
@ -1314,7 +1354,9 @@ class KeycloakAdmin:
:rtype: dict :rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name, "path": path} params_path = {"realm-name": self.connection.realm_name, "path": path}
data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path))
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def create_group(self, payload, parent=None, skip_exists=False): def create_group(self, payload, parent=None, skip_exists=False):
@ -1775,7 +1817,7 @@ class KeycloakAdmin:
""" """
params_path = {"realm-name": self.realm_name, "id": client_id} params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION.format(**params_path), urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION.format(**params_path),
data=json.dumps(payload), data=json.dumps(payload),
) )
@ -2147,7 +2189,7 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def get_realm_roles(self, brief_representation=True):
def get_realm_roles(self, brief_representation=True, search_text=""):
"""Get all roles for the realm or client. """Get all roles for the realm or client.
RoleRepresentation RoleRepresentation
@ -2155,16 +2197,54 @@ class KeycloakAdmin:
:param brief_representation: whether to omit role attributes in the response :param brief_representation: whether to omit role attributes in the response
:type brief_representation: bool :type brief_representation: bool
:param search_text: optional search text to limit the returned result.
:type search_text: str
:return: Keycloak server response (RoleRepresentation) :return: Keycloak server response (RoleRepresentation)
:rtype: list :rtype: list
""" """
url = urls_patterns.URL_ADMIN_REALM_ROLES
params_path = {"realm-name": self.connection.realm_name} params_path = {"realm-name": self.connection.realm_name}
params = {"briefRepresentation": brief_representation} params = {"briefRepresentation": brief_representation}
data_raw = self.connection.raw_get( data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), **params urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path), **params
) )
# set the search_text path param, if it is a valid string
if search_text is not None and search_text.strip() != "":
params_path["search-text"] = search_text
url = urls_patterns.URL_ADMIN_REALM_ROLES_SEARCH
data_raw = self.connection.raw_get(url.format(**params_path), **params)
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def get_realm_role_groups(self, role_name, query=None, brief_representation=True):
"""Get role groups of realm by role name.
:param role_name: Name of the role.
:type role_name: str
:param query: Additional Query parameters
(see https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_parameters_226)
:type query: dict
:param brief_representation: whether to omit role attributes in the response
:type brief_representation: bool
:return: Keycloak Server Response (GroupRepresentation)
:rtype: list
"""
query = query or {}
params = {"briefRepresentation": brief_representation}
query.update(params)
params_path = {"realm-name": self.connection.realm_name, "role-name": role_name}
url = urls_patterns.URL_ADMIN_REALM_ROLES_GROUPS.format(**params_path)
if "first" in query or "max" in query:
return self.__fetch_paginated(url, query)
return self.__fetch_all(url, query)
def get_realm_role_members(self, role_name, query=None): def get_realm_role_members(self, role_name, query=None):
"""Get role members of realm by role name. """Get role members of realm by role name.
@ -2526,6 +2606,23 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def get_realm_role_by_id(self, role_id: str):
"""Get realm role by role id.
RoleRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_rolerepresentation
:param role_id: role's id, not name!
:type role_id: str
:return: role
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "role-id": role_id}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_ID.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def update_realm_role(self, role_name, payload): def update_realm_role(self, role_name, payload):
"""Update a role for the realm by name. """Update a role for the realm by name.
@ -4147,6 +4244,36 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def create_client_authz_scope_permission(self, payload, client_id):
"""Create permissions for a authz scope.
Payload example::
payload={
"name": "My Permission Name",
"type": "scope",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"resources": [some_resource_id],
"scopes": [some_scope_id],
"policies": [some_policy_id],
}
:param payload: No Document
:type payload: dict
:param client_id: id in ClientRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:type client_id: str
:return: Keycloak server response
:rtype: bytes
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.raw_post(
urls_patterns.URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[201])
def update_client_authz_scope_permission(self, payload, client_id, scope_id): def update_client_authz_scope_permission(self, payload, client_id, scope_id):
"""Update permissions for a given scope. """Update permissions for a given scope.
@ -4388,7 +4515,7 @@ class KeycloakAdmin:
:rtype: dict :rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name} params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_KEYS_CACHE.format(**params_path), data="" urls_patterns.URL_ADMIN_CLEAR_KEYS_CACHE.format(**params_path), data=""
) )
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
@ -4400,7 +4527,7 @@ class KeycloakAdmin:
:rtype: dict :rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name} params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_REALM_CACHE.format(**params_path), data="" urls_patterns.URL_ADMIN_CLEAR_REALM_CACHE.format(**params_path), data=""
) )
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
@ -4412,7 +4539,7 @@ class KeycloakAdmin:
:rtype: dict :rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name} params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_USER_CACHE.format(**params_path), data="" urls_patterns.URL_ADMIN_CLEAR_USER_CACHE.format(**params_path), data=""
) )
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])

38
src/keycloak/keycloak_openid.py

@ -41,6 +41,7 @@ from .exceptions import (
KeycloakGetError, KeycloakGetError,
KeycloakInvalidTokenError, KeycloakInvalidTokenError,
KeycloakPostError, KeycloakPostError,
KeycloakPutError,
KeycloakRPTNotFound, KeycloakRPTNotFound,
raise_error_from_response, raise_error_from_response,
) )
@ -49,6 +50,7 @@ from .urls_patterns import (
URL_AUTH, URL_AUTH,
URL_CERTS, URL_CERTS,
URL_CLIENT_REGISTRATION, URL_CLIENT_REGISTRATION,
URL_CLIENT_UPDATE,
URL_ENTITLEMENT, URL_ENTITLEMENT,
URL_INTROSPECT, URL_INTROSPECT,
URL_LOGOUT, URL_LOGOUT,
@ -66,7 +68,7 @@ class KeycloakOpenID:
:param client_id: client id :param client_id: client id
:param realm_name: realm name :param realm_name: realm name
:param client_secret_key: client secret key :param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:param custom_headers: dict of custom header to pass to each HTML request :param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by. :param proxies: dict of proxies to sent the request by.
:param timeout: connection timeout in seconds :param timeout: connection timeout in seconds
@ -93,8 +95,8 @@ class KeycloakOpenID:
:type realm_name: str :type realm_name: str
:param client_secret_key: client secret key :param client_secret_key: client secret key
:type client_secret_key: str :type client_secret_key: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param custom_headers: dict of custom header to pass to each HTML request :param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict :type custom_headers: dict
:param proxies: dict of proxies to sent the request by. :param proxies: dict of proxies to sent the request by.
@ -343,7 +345,7 @@ class KeycloakOpenID:
def exchange_token( def exchange_token(
self, self,
token: str, token: str,
audience: str,
audience: Optional[str] = None,
subject: Optional[str] = None, subject: Optional[str] = None,
subject_token_type: Optional[str] = None, subject_token_type: Optional[str] = None,
subject_issuer: Optional[str] = None, subject_issuer: Optional[str] = None,
@ -711,3 +713,31 @@ class KeycloakOpenID:
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload) URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
) )
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def update_client(self, token: str, client_id: str, payload: dict):
"""Update a client.
ClientRepresentation:
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:param token: registration access token
:type token: str
:param client_id: Keycloak client id
:type client_id: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
if "clientId" not in payload:
payload["clientId"] = client_id
data_raw = self.connection.raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPutError)

25
src/keycloak/openid_connection.py

@ -54,6 +54,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
_custom_headers = None _custom_headers = None
_user_realm_name = None _user_realm_name = None
_expires_at = None _expires_at = None
_keycloak_openid = None
def __init__( def __init__(
self, self,
@ -86,8 +87,8 @@ class KeycloakOpenIDConnection(ConnectionManager):
:type realm_name: str :type realm_name: str
:param client_id: client id :param client_id: client id
:type client_id: str :type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key :param client_secret_key: client secret key
(optional, required only for access type confidential) (optional, required only for access type confidential)
:type client_secret_key: str :type client_secret_key: str
@ -275,11 +276,16 @@ class KeycloakOpenIDConnection(ConnectionManager):
# merge custom headers to main headers # merge custom headers to main headers
self.headers.update(self.custom_headers) self.headers.update(self.custom_headers)
def get_token(self):
"""Get admin token.
@property
def keycloak_openid(self) -> KeycloakOpenID:
"""Get the KeycloakOpenID object.
The admin token is then set in the `token` attribute.
The KeycloakOpenID is used to refresh tokens
:returns: KeycloakOpenID
:rtype: KeycloakOpenID
""" """
if self._keycloak_openid is None:
if self.user_realm_name: if self.user_realm_name:
token_realm_name = self.user_realm_name token_realm_name = self.user_realm_name
elif self.realm_name: elif self.realm_name:
@ -287,7 +293,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
else: else:
token_realm_name = "master" token_realm_name = "master"
self.keycloak_openid = KeycloakOpenID(
self._keycloak_openid = KeycloakOpenID(
server_url=self.server_url, server_url=self.server_url,
client_id=self.client_id, client_id=self.client_id,
realm_name=token_realm_name, realm_name=token_realm_name,
@ -296,6 +302,13 @@ class KeycloakOpenIDConnection(ConnectionManager):
timeout=self.timeout, timeout=self.timeout,
) )
return self._keycloak_openid
def get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
grant_type = [] grant_type = []
if self.client_secret_key: if self.client_secret_key:
grant_type.append("client_credentials") grant_type.append("client_credentials")

6
src/keycloak/urls_patterns.py

@ -126,6 +126,7 @@ URL_ADMIN_CLIENT_AUTHZ_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/{policy-id}"
URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/scopes" URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/scopes"
URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/resources" URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/resources"
URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope/{scope-id}" URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope/{scope-id}"
URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/client" URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/client"
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user" URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user"
@ -140,13 +141,16 @@ URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER = URL_ADMIN_CLIENT_SCOPE + "/protocol-mappers
URL_ADMIN_CLIENT_SCOPES_MAPPERS = URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER + "/{protocol-mapper-id}" URL_ADMIN_CLIENT_SCOPES_MAPPERS = URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER + "/{protocol-mapper-id}"
URL_ADMIN_REALM_ROLES = "admin/realms/{realm-name}/roles" URL_ADMIN_REALM_ROLES = "admin/realms/{realm-name}/roles"
URL_ADMIN_REALM_ROLES_SEARCH = URL_ADMIN_REALM_ROLES + "?search={search-text}"
URL_ADMIN_REALM_ROLES_MEMBERS = URL_ADMIN_REALM_ROLES + "/{role-name}/users" URL_ADMIN_REALM_ROLES_MEMBERS = URL_ADMIN_REALM_ROLES + "/{role-name}/users"
URL_ADMIN_REALM_ROLES_GROUPS = URL_ADMIN_REALM_ROLES + "/{role-name}/groups"
URL_ADMIN_REALMS = "admin/realms" URL_ADMIN_REALMS = "admin/realms"
URL_ADMIN_REALM = "admin/realms/{realm-name}" URL_ADMIN_REALM = "admin/realms/{realm-name}"
URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances" URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances"
URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers" URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers"
URL_ADMIN_IDP_MAPPER_UPDATE = URL_ADMIN_IDP_MAPPERS + "/{mapper-id}" URL_ADMIN_IDP_MAPPER_UPDATE = URL_ADMIN_IDP_MAPPERS + "/{mapper-id}"
URL_ADMIN_IDP = "admin/realms/{realm-name}/identity-provider/instances/{alias}" URL_ADMIN_IDP = "admin/realms/{realm-name}/identity-provider/instances/{alias}"
URL_ADMIN_REALM_ROLES_ROLE_BY_ID = URL_ADMIN_REALM + "/roles-by-id/{role-id}"
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}" URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = ( URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = (
"admin/realms/{realm-name}/roles/{role-name}/composites" "admin/realms/{realm-name}/roles/{role-name}/composites"
@ -156,6 +160,8 @@ URL_ADMIN_REALM_EXPORT = (
+ "exportGroupsAndRoles={export-groups-and-roles}" + "exportGroupsAndRoles={export-groups-and-roles}"
) )
URL_ADMIN_REALM_PARTIAL_IMPORT = "admin/realms/{realm-name}/partialImport"
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes" URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes"
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE = URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES + "/{id}" URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE = URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES + "/{id}"
URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-optional-client-scopes" URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-optional-client-scopes"

146
tests/test_keycloak_admin.py

@ -216,6 +216,53 @@ def test_import_export_realms(admin: KeycloakAdmin, realm: str):
assert err.match('500: b\'{"error":"unknown_error"}\'') assert err.match('500: b\'{"error":"unknown_error"}\'')
def test_partial_import_realm(admin: KeycloakAdmin, realm: str):
"""Test partial import of realm configuration.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
test_realm_role = str(uuid.uuid4())
test_user = str(uuid.uuid4())
test_client = str(uuid.uuid4())
admin.realm_name = realm
client_id = admin.create_client(payload={"name": test_client, "clientId": test_client})
realm_export = admin.export_realm(export_clients=True, export_groups_and_role=False)
client_config = [
client_entry for client_entry in realm_export["clients"] if client_entry["id"] == client_id
][0]
# delete before partial import
admin.delete_client(client_id)
payload = {
"ifResourceExists": "SKIP",
"id": realm_export["id"],
"realm": realm,
"clients": [client_config],
"roles": {"realm": [{"name": test_realm_role}]},
"users": [{"username": test_user, "email": f"{test_user}@test.test"}],
}
# check add
res = admin.partial_import_realm(realm_name=realm, payload=payload)
assert res["added"] == 3
# check skip
res = admin.partial_import_realm(realm_name=realm, payload=payload)
assert res["skipped"] == 3
# check overwrite
payload["ifResourceExists"] = "OVERWRITE"
res = admin.partial_import_realm(realm_name=realm, payload=payload)
assert res["overwritten"] == 3
def test_users(admin: KeycloakAdmin, realm: str): def test_users(admin: KeycloakAdmin, realm: str):
"""Test users. """Test users.
@ -395,6 +442,18 @@ def test_idps(admin: KeycloakAdmin, realm: str):
assert len(idps) == 1 assert len(idps) == 1
assert "github" == idps[0]["alias"] assert "github" == idps[0]["alias"]
# Test get idp
idp = admin.get_idp("github")
assert "github" == idp["alias"]
assert idp.get("config")
assert "test" == idp["config"]["clientId"]
assert "**********" == idp["config"]["clientSecret"]
# Test get idp fail
with pytest.raises(KeycloakGetError) as err:
admin.get_idp("does-not-exist")
assert err.match('404: b\'{"error":"HTTP 404 Not Found"}\'')
# Test IdP update # Test IdP update
res = admin.update_idp(idp_alias="github", payload=idps[0]) res = admin.update_idp(idp_alias="github", payload=idps[0])
@ -548,6 +607,7 @@ def test_server_info(admin: KeycloakAdmin):
"passwordPolicies", "passwordPolicies",
"enums", "enums",
"cryptoInfo", "cryptoInfo",
"features",
} }
), info.keys() ), info.keys()
@ -1103,6 +1163,12 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str):
assert "uma_authorization" in role_names, role_names assert "uma_authorization" in role_names, role_names
assert "offline_access" in role_names, role_names assert "offline_access" in role_names, role_names
# Test get realm roles with search text
searched_roles = admin.get_realm_roles(search_text="uma_a")
searched_role_names = [x["name"] for x in searched_roles]
assert "uma_authorization" in searched_role_names, searched_role_names
assert "offline_access" not in searched_role_names, searched_role_names
# Test empty members # Test empty members
with pytest.raises(KeycloakGetError) as err: with pytest.raises(KeycloakGetError) as err:
admin.get_realm_role_members(role_name="does-not-exist") admin.get_realm_role_members(role_name="does-not-exist")
@ -1119,6 +1185,11 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str):
role_id_2 = admin.create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True) role_id_2 = admin.create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True)
assert role_id == role_id_2 assert role_id == role_id_2
# Test get realm role by its id
role_id = admin.get_realm_role(role_name="test-realm-role")["id"]
res = admin.get_realm_role_by_id(role_id)
assert res["name"] == "test-realm-role"
# Test update realm role # Test update realm role
res = admin.update_realm_role( res = admin.update_realm_role(
role_name="test-realm-role", payload={"name": "test-realm-role-update"} role_name="test-realm-role", payload={"name": "test-realm-role-update"}
@ -1240,6 +1311,14 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str):
res = admin.get_composite_realm_roles_of_role(role_name=composite_role) res = admin.get_composite_realm_roles_of_role(role_name=composite_role)
assert len(res) == 0 assert len(res) == 0
# Test realm role group list
res = admin.get_realm_role_groups(role_name="test-realm-role-update")
assert len(res) == 1
assert res[0]["id"] == group_id
with pytest.raises(KeycloakGetError) as err:
admin.get_realm_role_groups(role_name="non-existent-role")
assert err.match('404: b\'{"error":"Could not find role"}\'')
# Test delete realm role # Test delete realm role
res = admin.delete_realm_role(role_name=composite_role) res = admin.delete_realm_role(role_name=composite_role)
assert res == dict(), res assert res == dict(), res
@ -1800,6 +1879,31 @@ def test_enable_token_exchange(admin: KeycloakAdmin, realm: str):
scope_id=token_exchange_permission_id, scope_id=token_exchange_permission_id,
) )
# Create permissions on the target client to reference this policy
admin.create_client_authz_scope_permission(
payload={
"id": token_exchange_permission_id,
"name": "test-permission",
"type": "scope",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"resources": [token_exchange_resource_id],
"scopes": [token_exchange_scope_id],
"policies": [client_policy_id],
},
client_id=realm_management_id,
)
permission_name = admin.get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
)["name"]
assert permission_name == "test-permission"
with pytest.raises(KeycloakPostError) as err:
admin.create_client_authz_scope_permission(
payload={"name": "test-permission", "scopes": [token_exchange_scope_id]},
client_id="realm_management_id",
)
assert err.match('404: b\'{"errorMessage":"Could not find client"}\'')
def test_email(admin: KeycloakAdmin, user: str): def test_email(admin: KeycloakAdmin, user: str):
"""Test email. """Test email.
@ -1869,7 +1973,19 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
admin.realm_name = realm admin.realm_name = realm
res = admin.get_authentication_flows() res = admin.get_authentication_flows()
assert len(res) == 8 or len(res) == 7, res
default_flows = len(res)
assert {x["alias"] for x in res}.issubset(
{
"reset credentials",
"browser",
"registration",
"http challenge",
"docker auth",
"direct grant",
"first broker login",
"clients",
}
)
assert set(res[0].keys()) == { assert set(res[0].keys()) == {
"alias", "alias",
"authenticationExecutions", "authenticationExecutions",
@ -1879,24 +1995,6 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
"providerId", "providerId",
"topLevel", "topLevel",
} }
assert {x["alias"] for x in res} == {
"reset credentials",
"browser",
"http challenge",
"registration",
"docker auth",
"direct grant",
"first broker login",
"clients",
} or {x["alias"] for x in res} == {
"reset credentials",
"browser",
"registration",
"docker auth",
"direct grant",
"first broker login",
"clients",
}
with pytest.raises(KeycloakGetError) as err: with pytest.raises(KeycloakGetError) as err:
admin.get_authentication_flow_for_id(flow_id="bad") admin.get_authentication_flow_for_id(flow_id="bad")
@ -1912,7 +2010,7 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
res = admin.copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser") res = admin.copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser")
assert res == b"", res assert res == b"", res
assert len(admin.get_authentication_flows()) == 9 or len(admin.get_authentication_flows()) == 8
assert len(admin.get_authentication_flows()) == (default_flows + 1)
# Test create # Test create
res = admin.create_authentication_flow( res = admin.create_authentication_flow(
@ -2031,7 +2129,7 @@ def test_authentication_configs(admin: KeycloakAdmin, realm: str):
# Test list of auth providers # Test list of auth providers
res = admin.get_authenticator_providers() res = admin.get_authenticator_providers()
assert len(res) == 38 or len(res) == 35
assert len(res) > 1
res = admin.get_authenticator_provider_config_description(provider_id="auth-cookie") res = admin.get_authenticator_provider_config_description(provider_id="auth-cookie")
assert res == { assert res == {
@ -2772,7 +2870,7 @@ def test_initial_access_token(
res = oid.register_client( res = oid.register_client(
token=res["token"], token=res["token"],
payload={ payload={
"name": client,
"name": "DynamicRegisteredClient",
"clientId": client, "clientId": client,
"enabled": True, "enabled": True,
"publicClient": False, "publicClient": False,
@ -2782,3 +2880,7 @@ def test_initial_access_token(
}, },
) )
assert res["clientId"] == client assert res["clientId"] == client
new_secret = str(uuid.uuid4())
res = oid.update_client(res["registrationAccessToken"], client, payload={"secret": new_secret})
assert res["secret"] == new_secret
Loading…
Cancel
Save