Browse Source

chore: Merge branch 'master' into ci/fix_tests

pull/506/head
Richard Nemeth 1 year ago
parent
commit
5e08005bf3
  1. 20
      CHANGELOG.md
  2. 19
      README.md
  3. 3
      pyproject.toml
  4. 8
      src/keycloak/connection.py
  5. 164
      src/keycloak/keycloak_admin.py
  6. 68
      src/keycloak/keycloak_openid.py
  7. 49
      src/keycloak/openid_connection.py
  8. 11
      src/keycloak/urls_patterns.py
  9. 138
      tests/test_keycloak_admin.py

20
CHANGELOG.md

@ -1,4 +1,22 @@
## v3.3.0 (2023-06-28)
## v3.6.0 (2023-11-13)
### Feat
- add KeycloakAdmin.get_idp() (#478)
## v3.5.0 (2023-11-13)
### Feat
- Update dynamic client using registration access token (#491)
## v3.4.0 (2023-11-13)
### Feat
- add an optional search criteria to the get_realm_roles function (#504)
## v3.3.0 (2023-06-27)
### Feat

19
README.md

@ -306,7 +306,7 @@ groups = keycloak_admin.get_groups()
group = keycloak_admin.get_group(group_id='group_id')
# Get group by name
group = keycloak_admin.get_group_by_path(path='/group/subgroup', search_in_subgroups=True)
group = keycloak_admin.get_group_by_path(path='/group/subgroup')
# Function to trigger user sync from provider
sync_users(storage_id="storage_di", action="action")
@ -340,9 +340,12 @@ keycloak_admin.get_client_roles_of_client_scope(client_id=another_client_id, cli
# Remove client roles assigned to client's scope
keycloak_admin.delete_client_roles_of_client_scope(client_id=another_client_id, client_roles_owner_id=client_id, roles=client_roles)
# Get all ID Providers
# Get all IDP Providers
idps = keycloak_admin.get_idps()
# Get a specific IDP Provider, using its alias
idp = keycloak_admin.get_idp("idp-alias")
# Create a new Realm
keycloak_admin.create_realm(payload={"realm": "demo"}, skip_exists=False)
@ -353,6 +356,18 @@ keycloak_admin.realm_name = "demo" # Change realm to 'demo'
keycloak_admin.get_users() # Get users in realm 'demo'
keycloak_admin.create_user(...) # Creates a new user in 'demo'
# Get User events
keycloak_admin.get_events(query={'type': 'LOGIN',
'user': user['id'],
'dateFrom': '2023-08-02'})
# Get Admin events
keycloak_admin.get_admin_events(query={'resourceTypes': 'USER',
'operationTypes': 'UPDATE',
'resourcePath': 'users/' + user['id'],
'dateFrom': '2023-08-02'
})
# KEYCLOAK UMA
from keycloak import KeycloakOpenIDConnection

3
pyproject.toml

@ -30,6 +30,7 @@ Documentation = "https://python-keycloak.readthedocs.io/en/latest/"
[tool.poetry.dependencies]
python = ">=3.7,<4.0"
setuptools = "*"
requests = ">=2.20.0"
python-jose = ">=3.3.0"
mock = {version = "^4.0.3", optional = true}
@ -41,7 +42,7 @@ sphinx-rtd-theme = {version = "^1.0.0", optional = true}
readthedocs-sphinx-ext = {version = "^2.1.9", optional = true}
m2r2 = {version = "^0.3.2", optional = true}
sphinx-autoapi = {version = "^2.0.0", optional = true}
requests-toolbelt = ">=1.0.0"
requests-toolbelt = "*"
deprecation = ">=2.1.0"
[tool.poetry.extras]

8
src/keycloak/connection.py

@ -43,8 +43,8 @@ class ConnectionManager(object):
:type headers: dict
:param timeout: Timeout to use for requests to the server.
:type timeout: int
:param verify: Verify server SSL.
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param proxies: The proxies servers requests is sent by.
:type proxies: dict
"""
@ -58,8 +58,8 @@ class ConnectionManager(object):
:type headers: dict
:param timeout: Timeout to use for requests to the server.
:type timeout: int
:param verify: Verify server SSL.
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param proxies: The proxies servers requests is sent by.
:type proxies: dict
"""

164
src/keycloak/keycloak_admin.py

@ -63,8 +63,8 @@ class KeycloakAdmin:
:type realm_name: str
:param client_id: client id
:type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:type client_secret_key: str
@ -84,7 +84,7 @@ class KeycloakAdmin:
PAGE_SIZE = 100
_auto_refresh_token = None
_connection = None
_connection: Optional[KeycloakOpenIDConnection] = None
def __init__(
self,
@ -119,8 +119,8 @@ class KeycloakAdmin:
:type realm_name: str
:param client_id: client id
:type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:type client_secret_key: str
@ -204,7 +204,7 @@ class KeycloakAdmin:
self.connection.realm_name = value
@property
def connection(self):
def connection(self) -> KeycloakOpenIDConnection:
"""Get connection.
:returns: Connection manager
@ -213,7 +213,7 @@ class KeycloakAdmin:
return self._connection
@connection.setter
def connection(self, value):
def connection(self, value: KeycloakOpenIDConnection) -> None:
self._connection = value
@property
@ -532,6 +532,29 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
def partial_import_realm(self, realm_name, payload):
"""Partial import realm configuration from PartialImportRepresentation.
Realm partialImport is used for modifying configuration of existing realm.
PartialImportRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/#_partialimportrepresentation
:param realm_name: Realm name (not the realm id)
:type realm_name: str
:param payload: PartialImportRepresentation
:type payload: dict
:return: PartialImportResponse
:rtype: dict
"""
params_path = {"realm-name": realm_name}
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_REALM_PARTIAL_IMPORT.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200])
def export_realm(self, export_clients=False, export_groups_and_role=False):
"""Export the realm configurations in the json format.
@ -773,6 +796,23 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDPS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_idp(self, idp_alias):
"""Get IDP provider.
Get the representation of a specific IDP Provider.
IdentityProviderRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_identityproviderrepresentation
:param: idp_alias: alias for IdP to get
:type idp_alias: str
:return: IdentityProviderRepresentation
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "alias": idp_alias}
data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_IDP.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def delete_idp(self, idp_alias):
"""Delete an ID Provider.
@ -1314,7 +1354,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "path": path}
data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path))
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def create_group(self, payload, parent=None, skip_exists=False):
@ -1775,7 +1817,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION.format(**params_path),
data=json.dumps(payload),
)
@ -2175,6 +2217,34 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(url.format(**params_path), **params)
return raise_error_from_response(data_raw, KeycloakGetError)
def get_realm_role_groups(self, role_name, query=None, brief_representation=True):
"""Get role groups of realm by role name.
:param role_name: Name of the role.
:type role_name: str
:param query: Additional Query parameters
(see https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_parameters_226)
:type query: dict
:param brief_representation: whether to omit role attributes in the response
:type brief_representation: bool
:return: Keycloak Server Response (GroupRepresentation)
:rtype: list
"""
query = query or {}
params = {"briefRepresentation": brief_representation}
query.update(params)
params_path = {"realm-name": self.connection.realm_name, "role-name": role_name}
url = urls_patterns.URL_ADMIN_REALM_ROLES_GROUPS.format(**params_path)
if "first" in query or "max" in query:
return self.__fetch_paginated(url, query)
return self.__fetch_all(url, query)
def get_realm_role_members(self, role_name, query=None):
"""Get role members of realm by role name.
@ -2536,6 +2606,23 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
def get_realm_role_by_id(self, role_id: str):
"""Get realm role by role id.
RoleRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_rolerepresentation
:param role_id: role's id, not name!
:type role_id: str
:return: role
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "role-id": role_id}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_REALM_ROLES_ROLE_BY_ID.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def update_realm_role(self, role_name, payload):
"""Update a role for the realm by name.
@ -3849,6 +3936,27 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
def get_admin_events(self, query=None):
"""Get Administrative events.
Return a list of events, filtered according to query parameters
AdminEvents Representation array
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_getevents
https://www.keycloak.org/docs-api/22.0.1/rest-api/index.html#_get_adminrealmsrealmadmin_events
:param query: Additional query parameters
:type query: dict
:return: events list
:rtype: list
"""
query = query or dict()
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_ADMIN_EVENTS.format(**params_path), data=None, **query
)
return raise_error_from_response(data_raw, KeycloakGetError)
def get_events(self, query=None):
"""Get events.
@ -3865,7 +3973,7 @@ class KeycloakAdmin:
query = query or dict()
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_EVENTS.format(**params_path), data=None, **query
urls_patterns.URL_ADMIN_USER_EVENTS.format(**params_path), data=None, **query
)
return raise_error_from_response(data_raw, KeycloakGetError)
@ -4136,6 +4244,36 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakGetError)
def create_client_authz_scope_permission(self, payload, client_id):
"""Create permissions for a authz scope.
Payload example::
payload={
"name": "My Permission Name",
"type": "scope",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"resources": [some_resource_id],
"scopes": [some_scope_id],
"policies": [some_policy_id],
}
:param payload: No Document
:type payload: dict
:param client_id: id in ClientRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:type client_id: str
:return: Keycloak server response
:rtype: bytes
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.raw_post(
urls_patterns.URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[201])
def update_client_authz_scope_permission(self, payload, client_id, scope_id):
"""Update permissions for a given scope.
@ -4377,7 +4515,7 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_KEYS_CACHE.format(**params_path), data=""
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
@ -4389,7 +4527,7 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_REALM_CACHE.format(**params_path), data=""
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
@ -4401,7 +4539,7 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_USER_CACHE.format(**params_path), data=""
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])

68
src/keycloak/keycloak_openid.py

@ -41,6 +41,7 @@ from .exceptions import (
KeycloakGetError,
KeycloakInvalidTokenError,
KeycloakPostError,
KeycloakPutError,
KeycloakRPTNotFound,
raise_error_from_response,
)
@ -49,6 +50,8 @@ from .urls_patterns import (
URL_AUTH,
URL_CERTS,
URL_CLIENT_REGISTRATION,
URL_CLIENT_UPDATE,
URL_DEVICE,
URL_ENTITLEMENT,
URL_INTROSPECT,
URL_LOGOUT,
@ -66,7 +69,7 @@ class KeycloakOpenID:
:param client_id: client id
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by.
:param timeout: connection timeout in seconds
@ -93,8 +96,8 @@ class KeycloakOpenID:
:type realm_name: str
:param client_secret_key: client secret key
:type client_secret_key: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param custom_headers: dict of custom header to pass to each HTML request
:type custom_headers: dict
:param proxies: dict of proxies to sent the request by.
@ -343,7 +346,7 @@ class KeycloakOpenID:
def exchange_token(
self,
token: str,
audience: str,
audience: Optional[str] = None,
subject: Optional[str] = None,
subject_token_type: Optional[str] = None,
subject_issuer: Optional[str] = None,
@ -711,3 +714,60 @@ class KeycloakOpenID:
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)
def device(self):
"""Get device authorization grant.
The device endpoint is used to obtain a user code verification and user authentication.
The response contains a device_code, user_code, verification_uri,
verification_uri_complete, expires_in (lifetime in seconds for device_code
and user_code), and polling interval.
Users can either follow the verification_uri and enter the user_code or
follow the verification_uri_complete.
After authenticating with valid credentials, users can obtain tokens using the
"urn:ietf:params:oauth:grant-type:device_code" grant_type and the device_code.
https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow
https://github.com/keycloak/keycloak-community/blob/main/design/oauth2-device-authorization-grant.md#how-to-try-it
:returns: Device Authorization Response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
payload = {
"client_id": self.client_id,
}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_DEVICE.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)
def update_client(self, token: str, client_id: str, payload: dict):
"""Update a client.
ClientRepresentation:
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:param token: registration access token
:type token: str
:param client_id: Keycloak client id
:type client_id: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
if "clientId" not in payload:
payload["clientId"] = client_id
data_raw = self.connection.raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPutError)

49
src/keycloak/openid_connection.py

@ -54,6 +54,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
_custom_headers = None
_user_realm_name = None
_expires_at = None
_keycloak_openid = None
def __init__(
self,
@ -86,8 +87,8 @@ class KeycloakOpenIDConnection(ConnectionManager):
:type realm_name: str
:param client_id: client id
:type client_id: str
:param verify: True if want check connection SSL
:type verify: bool
:param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
:type verify: Union[bool,str]
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:type client_secret_key: str
@ -275,27 +276,39 @@ class KeycloakOpenIDConnection(ConnectionManager):
# merge custom headers to main headers
self.headers.update(self.custom_headers)
@property
def keycloak_openid(self) -> KeycloakOpenID:
"""Get the KeycloakOpenID object.
The KeycloakOpenID is used to refresh tokens
:returns: KeycloakOpenID
:rtype: KeycloakOpenID
"""
if self._keycloak_openid is None:
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self._keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
)
return self._keycloak_openid
def get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self.keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
)
grant_type = []
if self.client_secret_key:
grant_type.append("client_credentials")

11
src/keycloak/urls_patterns.py

@ -37,6 +37,7 @@ URL_AUTH = (
"{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}"
"&scope={scope}&state={state}"
)
URL_DEVICE = "realms/{realm-name}/protocol/openid-connect/auth/device"
URL_CLIENT_REGISTRATION = URL_REALM + "/clients-registrations/default"
URL_CLIENT_UPDATE = URL_CLIENT_REGISTRATION + "/{client-id}"
@ -126,6 +127,7 @@ URL_ADMIN_CLIENT_AUTHZ_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/{policy-id}"
URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/scopes"
URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/resources"
URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope/{scope-id}"
URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/client"
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user"
@ -142,12 +144,14 @@ URL_ADMIN_CLIENT_SCOPES_MAPPERS = URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER + "/{protoc
URL_ADMIN_REALM_ROLES = "admin/realms/{realm-name}/roles"
URL_ADMIN_REALM_ROLES_SEARCH = URL_ADMIN_REALM_ROLES + "?search={search-text}"
URL_ADMIN_REALM_ROLES_MEMBERS = URL_ADMIN_REALM_ROLES + "/{role-name}/users"
URL_ADMIN_REALM_ROLES_GROUPS = URL_ADMIN_REALM_ROLES + "/{role-name}/groups"
URL_ADMIN_REALMS = "admin/realms"
URL_ADMIN_REALM = "admin/realms/{realm-name}"
URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances"
URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers"
URL_ADMIN_IDP_MAPPER_UPDATE = URL_ADMIN_IDP_MAPPERS + "/{mapper-id}"
URL_ADMIN_IDP = "admin/realms/{realm-name}/identity-provider/instances/{alias}"
URL_ADMIN_REALM_ROLES_ROLE_BY_ID = URL_ADMIN_REALM + "/roles-by-id/{role-id}"
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = (
"admin/realms/{realm-name}/roles/{role-name}/composites"
@ -157,6 +161,8 @@ URL_ADMIN_REALM_EXPORT = (
+ "exportGroupsAndRoles={export-groups-and-roles}"
)
URL_ADMIN_REALM_PARTIAL_IMPORT = "admin/realms/{realm-name}/partialImport"
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes"
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE = URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES + "/{id}"
URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-optional-client-scopes"
@ -193,8 +199,9 @@ URL_ADMIN_USER_FEDERATED_IDENTITY = (
"admin/realms/{realm-name}/users/{id}/federated-identity/{provider}"
)
URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events"
URL_ADMIN_EVENTS_CONFIG = URL_ADMIN_EVENTS + "/config"
URL_ADMIN_USER_EVENTS = "admin/realms/{realm-name}/events"
URL_ADMIN_ADMIN_EVENTS = "admin/realms/{realm-name}/admin-events"
URL_ADMIN_EVENTS_CONFIG = URL_ADMIN_USER_EVENTS + "/config"
URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats"
URL_ADMIN_GROUPS_CLIENT_ROLES_COMPOSITE = URL_ADMIN_GROUPS_CLIENT_ROLES + "/composite"

138
tests/test_keycloak_admin.py

@ -216,6 +216,53 @@ def test_import_export_realms(admin: KeycloakAdmin, realm: str):
assert err.match('500: b\'{"error":"unknown_error"}\'')
def test_partial_import_realm(admin: KeycloakAdmin, realm: str):
"""Test partial import of realm configuration.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
test_realm_role = str(uuid.uuid4())
test_user = str(uuid.uuid4())
test_client = str(uuid.uuid4())
admin.realm_name = realm
client_id = admin.create_client(payload={"name": test_client, "clientId": test_client})
realm_export = admin.export_realm(export_clients=True, export_groups_and_role=False)
client_config = [
client_entry for client_entry in realm_export["clients"] if client_entry["id"] == client_id
][0]
# delete before partial import
admin.delete_client(client_id)
payload = {
"ifResourceExists": "SKIP",
"id": realm_export["id"],
"realm": realm,
"clients": [client_config],
"roles": {"realm": [{"name": test_realm_role}]},
"users": [{"username": test_user, "email": f"{test_user}@test.test"}],
}
# check add
res = admin.partial_import_realm(realm_name=realm, payload=payload)
assert res["added"] == 3
# check skip
res = admin.partial_import_realm(realm_name=realm, payload=payload)
assert res["skipped"] == 3
# check overwrite
payload["ifResourceExists"] = "OVERWRITE"
res = admin.partial_import_realm(realm_name=realm, payload=payload)
assert res["overwritten"] == 3
def test_users(admin: KeycloakAdmin, realm: str):
"""Test users.
@ -395,6 +442,18 @@ def test_idps(admin: KeycloakAdmin, realm: str):
assert len(idps) == 1
assert "github" == idps[0]["alias"]
# Test get idp
idp = admin.get_idp("github")
assert "github" == idp["alias"]
assert idp.get("config")
assert "test" == idp["config"]["clientId"]
assert "**********" == idp["config"]["clientSecret"]
# Test get idp fail
with pytest.raises(KeycloakGetError) as err:
admin.get_idp("does-not-exist")
assert err.match('404: b\'{"error":"HTTP 404 Not Found"}\'')
# Test IdP update
res = admin.update_idp(idp_alias="github", payload=idps[0])
@ -549,6 +608,7 @@ def test_server_info(admin: KeycloakAdmin):
"passwordPolicies",
"enums",
"cryptoInfo",
"features",
}
), info.keys()
@ -1126,6 +1186,11 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str):
role_id_2 = admin.create_realm_role(payload={"name": "test-realm-role"}, skip_exists=True)
assert role_id == role_id_2
# Test get realm role by its id
role_id = admin.get_realm_role(role_name="test-realm-role")["id"]
res = admin.get_realm_role_by_id(role_id)
assert res["name"] == "test-realm-role"
# Test update realm role
res = admin.update_realm_role(
role_name="test-realm-role", payload={"name": "test-realm-role-update"}
@ -1247,6 +1312,14 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str):
res = admin.get_composite_realm_roles_of_role(role_name=composite_role)
assert len(res) == 0
# Test realm role group list
res = admin.get_realm_role_groups(role_name="test-realm-role-update")
assert len(res) == 1
assert res[0]["id"] == group_id
with pytest.raises(KeycloakGetError) as err:
admin.get_realm_role_groups(role_name="non-existent-role")
assert err.match('404: b\'{"error":"Could not find role"}\'')
# Test delete realm role
res = admin.delete_realm_role(role_name=composite_role)
assert res == dict(), res
@ -1807,6 +1880,31 @@ def test_enable_token_exchange(admin: KeycloakAdmin, realm: str):
scope_id=token_exchange_permission_id,
)
# Create permissions on the target client to reference this policy
admin.create_client_authz_scope_permission(
payload={
"id": token_exchange_permission_id,
"name": "test-permission",
"type": "scope",
"logic": "POSITIVE",
"decisionStrategy": "UNANIMOUS",
"resources": [token_exchange_resource_id],
"scopes": [token_exchange_scope_id],
"policies": [client_policy_id],
},
client_id=realm_management_id,
)
permission_name = admin.get_client_authz_scope_permission(
client_id=realm_management_id, scope_id=token_exchange_permission_id
)["name"]
assert permission_name == "test-permission"
with pytest.raises(KeycloakPostError) as err:
admin.create_client_authz_scope_permission(
payload={"name": "test-permission", "scopes": [token_exchange_scope_id]},
client_id="realm_management_id",
)
assert err.match('404: b\'{"errorMessage":"Could not find client"}\'')
def test_email(admin: KeycloakAdmin, user: str):
"""Test email.
@ -1877,6 +1975,19 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
res = admin.get_authentication_flows()
assert len(res) <= 8, res
default_flows = len(res)
assert {x["alias"] for x in res}.issubset(
{
"reset credentials",
"browser",
"registration",
"http challenge",
"docker auth",
"direct grant",
"first broker login",
"clients",
}
)
assert set(res[0].keys()) == {
"alias",
"authenticationExecutions",
@ -1890,7 +2001,6 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
{
"reset credentials",
"browser",
"http challenge",
"registration",
"docker auth",
"direct grant",
@ -1914,7 +2024,7 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
res = admin.copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser")
assert res == b"", res
assert len(admin.get_authentication_flows()) <= 9
assert len(admin.get_authentication_flows()) == (default_flows + 1)
# Test create
res = admin.create_authentication_flow(
@ -2294,7 +2404,23 @@ def test_keys(admin: KeycloakAdmin, realm: str):
}
def test_events(admin: KeycloakAdmin, realm: str):
def test_admin_events(admin: KeycloakAdmin, realm: str):
"""Test events.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin.realm_name = realm
admin.create_client(payload={"name": "test", "clientId": "test"})
events = admin.get_admin_events()
assert events == list()
def test_user_events(admin: KeycloakAdmin, realm: str):
"""Test events.
:param admin: Keycloak Admin client
@ -2758,7 +2884,7 @@ def test_initial_access_token(
res = oid.register_client(
token=res["token"],
payload={
"name": client,
"name": "DynamicRegisteredClient",
"clientId": client,
"enabled": True,
"publicClient": False,
@ -2768,3 +2894,7 @@ def test_initial_access_token(
},
)
assert res["clientId"] == client
new_secret = str(uuid.uuid4())
res = oid.update_client(res["registrationAccessToken"], client, payload={"secret": new_secret})
assert res["secret"] == new_secret
Loading…
Cancel
Save