Browse Source

feat: fixed admin client to pass the tests

pull/309/head
Richard Nemeth 3 years ago
parent
commit
b911d94db9
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 23
      keycloak/_version.py
  2. 166
      keycloak/keycloak_admin.py
  3. 8
      keycloak/urls_patterns.py

23
keycloak/_version.py

@ -1 +1,24 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
__version__ = "0.0.0" __version__ = "0.0.0"

166
keycloak/keycloak_admin.py

@ -61,6 +61,7 @@ from .urls_patterns import (
URL_ADMIN_CLIENT_SCOPES_MAPPERS, URL_ADMIN_CLIENT_SCOPES_MAPPERS,
URL_ADMIN_CLIENT_SECRETS, URL_ADMIN_CLIENT_SECRETS,
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER, URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER,
URL_ADMIN_CLIENT_ROLE_GROUPS,
URL_ADMIN_CLIENTS, URL_ADMIN_CLIENTS,
URL_ADMIN_COMPONENT, URL_ADMIN_COMPONENT,
URL_ADMIN_COMPONENTS, URL_ADMIN_COMPONENTS,
@ -68,7 +69,6 @@ from .urls_patterns import (
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES, URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES,
URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE, URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE,
URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES, URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES,
URL_ADMIN_DELETE_USER_ROLE,
URL_ADMIN_EVENTS, URL_ADMIN_EVENTS,
URL_ADMIN_FLOW, URL_ADMIN_FLOW,
URL_ADMIN_FLOWS, URL_ADMIN_FLOWS,
@ -123,6 +123,23 @@ from .urls_patterns import (
class KeycloakAdmin: class KeycloakAdmin:
"""
Keycloak Admin client.
:param server_url: Keycloak server url
:param username: admin username
:param password: admin password
:param totp: Time based OTP
:param realm_name: realm name
:param client_id: client id
:param verify: True if want check connection SSL
:param client_secret_key: client secret key
(optional, required only for access type confidential)
:param custom_headers: dict of custom header to pass to each HTML request
:param user_realm_name: The realm name of the user, if different from realm_name
:param auto_refresh_token: list of methods that allows automatic token refresh.
Ex: ['get', 'put', 'post', 'delete']
"""
PAGE_SIZE = 100 PAGE_SIZE = 100
@ -154,20 +171,6 @@ class KeycloakAdmin:
user_realm_name=None, user_realm_name=None,
auto_refresh_token=None, auto_refresh_token=None,
): ):
"""
:param server_url: Keycloak server url
:param username: admin username
:param password: admin password
:param totp: Time based OTP
:param realm_name: realm name
:param client_id: client id
:param verify: True if want check connection SSL
:param client_secret_key: client secret key (optional, required only for access type confidential)
:param custom_headers: dict of custom header to pass to each HTML request
:param user_realm_name: The realm name of the user, if different from realm_name
:param auto_refresh_token: list of methods that allows automatic token refresh. ex: ['get', 'put', 'post', 'delete']
"""
self.server_url = server_url self.server_url = server_url
self.username = username self.username = username
self.password = password self.password = password
@ -378,6 +381,20 @@ class KeycloakAdmin:
data_raw = self.raw_get(URL_ADMIN_REALMS) data_raw = self.raw_get(URL_ADMIN_REALMS)
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def get_realm(self, realm_name):
"""
Get a specific realm.
RealmRepresentation:
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_realmrepresentation
:param realm_name: Realm name (not the realm id)
:return: RealmRepresentation
"""
params_path = {"realm-name": realm_name}
data_raw = self.raw_get(URL_ADMIN_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200])
def create_realm(self, payload, skip_exists=False): def create_realm(self, payload, skip_exists=False):
""" """
Create a realm Create a realm
@ -495,7 +512,7 @@ class KeycloakAdmin:
data_raw = self.raw_delete(URL_ADMIN_IDP.format(**params_path)) data_raw = self.raw_delete(URL_ADMIN_IDP.format(**params_path))
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def create_user(self, payload, exist_ok=True):
def create_user(self, payload, exist_ok=False):
""" """
Create a new user. Username must be unique Create a new user. Username must be unique
@ -530,6 +547,33 @@ class KeycloakAdmin:
data_raw = self.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) data_raw = self.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def user_logout(self, user_id):
"""
Logs out user.
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_logout
:param user_id: User id
:return:
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.raw_post(URL_ADMIN_USER_LOGOUT.format(**params_path), data="")
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
def user_consents(self, user_id):
"""
Get consents granted by the user
UserConsentRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_userconsentrepresentation
:param user_id: User id
:return: List of UserConsentRepresentations
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.raw_get(URL_ADMIN_USER_CONSENTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_user_id(self, username): def get_user_id(self, username):
""" """
Get internal keycloak user id from username Get internal keycloak user id from username
@ -923,7 +967,7 @@ class KeycloakAdmin:
GroupRepresentation GroupRepresentation
https://www.keycloak.org/docs-api/8.0/rest-api/#_grouprepresentation https://www.keycloak.org/docs-api/8.0/rest-api/#_grouprepresentation
:return: Http response
:return: Group ID for newly created group, otherwise None
""" """
if parent is None: if parent is None:
@ -937,9 +981,14 @@ class KeycloakAdmin:
URL_ADMIN_GROUP_CHILD.format(**params_path), data=json.dumps(payload) URL_ADMIN_GROUP_CHILD.format(**params_path), data=json.dumps(payload)
) )
return raise_error_from_response(
raise_error_from_response(
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
try:
_last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
except KeyError:
return
def update_group(self, group_id, payload): def update_group(self, group_id, payload):
""" """
@ -1229,18 +1278,27 @@ class KeycloakAdmin:
""" """
Create a client Create a client
ClientRepresentation: https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_clientrepresentation
ClientRepresentation:
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:param skip_exists: If true then do not raise an error if client already exists :param skip_exists: If true then do not raise an error if client already exists
:param payload: ClientRepresentation :param payload: ClientRepresentation
:return: Keycloak server response (UserRepresentation)
:return: Client ID
""" """
if skip_exists:
client_id = self.get_client_id(client_name=payload["name"])
if client_id is not None:
return client_id
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
data_raw = self.raw_post(URL_ADMIN_CLIENTS.format(**params_path), data=json.dumps(payload)) data_raw = self.raw_post(URL_ADMIN_CLIENTS.format(**params_path), data=json.dumps(payload))
return raise_error_from_response(
raise_error_from_response(
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
_last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
def update_client(self, client_id, payload): def update_client(self, client_id, payload):
""" """
@ -1368,21 +1426,46 @@ class KeycloakAdmin:
Create a client role Create a client role
RoleRepresentation RoleRepresentation
https://www.keycloak.org/docs-api/8.0/rest-api/index.html#_rolerepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_rolerepresentation
:param client_role_id: id of client (not client-id) :param client_role_id: id of client (not client-id)
:param payload: RoleRepresentation :param payload: RoleRepresentation
:param skip_exists: If true then do not raise an error if client role already exists :param skip_exists: If true then do not raise an error if client role already exists
:return: Keycloak server response (RoleRepresentation)
:return: Client role name
""" """
if skip_exists:
res = self.get_client_role(client_id=client_role_id, role_name=payload["name"])
if res:
return res["name"]
params_path = {"realm-name": self.realm_name, "id": client_role_id} params_path = {"realm-name": self.realm_name, "id": client_role_id}
data_raw = self.raw_post( data_raw = self.raw_post(
URL_ADMIN_CLIENT_ROLES.format(**params_path), data=json.dumps(payload) URL_ADMIN_CLIENT_ROLES.format(**params_path), data=json.dumps(payload)
) )
return raise_error_from_response(
raise_error_from_response(
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
_last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
def update_client_role(self, client_role_id, role_name, payload):
"""
Update a client role
RoleRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_rolerepresentation
:param client_role_id: id of client (not client-id)
:param role_name: role's name (not id!)
:param payload: RoleRepresentation
"""
params_path = {"realm-name": self.realm_name, "id": client_role_id, "role-name": role_name}
data_raw = self.raw_put(
URL_ADMIN_CLIENT_ROLE.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
def add_composite_client_roles_to_role(self, client_role_id, role_name, roles): def add_composite_client_roles_to_role(self, client_role_id, role_name, roles):
""" """
@ -1444,22 +1527,41 @@ class KeycloakAdmin:
params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name} params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name}
return self.__fetch_all(URL_ADMIN_CLIENT_ROLE_MEMBERS.format(**params_path), query) return self.__fetch_all(URL_ADMIN_CLIENT_ROLE_MEMBERS.format(**params_path), query)
def get_client_role_groups(self, client_id, role_name, **query):
"""
Get group members by client role .
:param client_id: The client id
:param role_name: the name of role to be queried.
:param query: Additional query parameters
(see https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clients_resource)
:return: Keycloak server response
"""
params_path = {"realm-name": self.realm_name, "id": client_id, "role-name": role_name}
return self.__fetch_all(URL_ADMIN_CLIENT_ROLE_GROUPS.format(**params_path), query)
def create_realm_role(self, payload, skip_exists=False): def create_realm_role(self, payload, skip_exists=False):
""" """
Create a new role for the realm or client Create a new role for the realm or client
:param payload: The role (use RoleRepresentation) :param payload: The role (use RoleRepresentation)
:param skip_exists: If true then do not raise an error if realm role already exists :param skip_exists: If true then do not raise an error if realm role already exists
:return Keycloak server response
:return: Realm role name
""" """
if skip_exists:
role = self.get_realm_role(role_name=payload["name"])
if role is not None:
return role["name"]
params_path = {"realm-name": self.realm_name} params_path = {"realm-name": self.realm_name}
data_raw = self.raw_post( data_raw = self.raw_post(
URL_ADMIN_REALM_ROLES.format(**params_path), data=json.dumps(payload) URL_ADMIN_REALM_ROLES.format(**params_path), data=json.dumps(payload)
) )
return raise_error_from_response(
raise_error_from_response(
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
_last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
def get_realm_role(self, role_name): def get_realm_role(self, role_name):
""" """
@ -2506,18 +2608,6 @@ class KeycloakAdmin:
data_raw = self.raw_get(URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)) data_raw = self.raw_get(URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def delete_user_realm_role(self, user_id, payload):
"""
Delete realm-level role mappings
DELETE admin/realms/{realm-name}/users/{id}/role-mappings/realm
"""
params_path = {"realm-name": self.realm_name, "id": str(user_id)}
data_raw = self.raw_delete(
URL_ADMIN_DELETE_USER_ROLE.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204])
def get_client_sessions_stats(self): def get_client_sessions_stats(self):
""" """
Get current session count for all clients with active sessions Get current session count for all clients with active sessions

8
keycloak/urls_patterns.py

@ -65,7 +65,6 @@ URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = (
) )
URL_ADMIN_USER_GROUP = "admin/realms/{realm-name}/users/{id}/groups/{group-id}" URL_ADMIN_USER_GROUP = "admin/realms/{realm-name}/users/{id}/groups/{group-id}"
URL_ADMIN_USER_GROUPS = "admin/realms/{realm-name}/users/{id}/groups" URL_ADMIN_USER_GROUPS = "admin/realms/{realm-name}/users/{id}/groups"
URL_ADMIN_USER_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_USER_CREDENTIALS = "admin/realms/{realm-name}/users/{id}/credentials" URL_ADMIN_USER_CREDENTIALS = "admin/realms/{realm-name}/users/{id}/credentials"
URL_ADMIN_USER_CREDENTIAL = "admin/realms/{realm-name}/users/{id}/credentials/{credential_id}" URL_ADMIN_USER_CREDENTIAL = "admin/realms/{realm-name}/users/{id}/credentials/{credential_id}"
URL_ADMIN_USER_LOGOUT = "admin/realms/{realm-name}/users/{id}/logout" URL_ADMIN_USER_LOGOUT = "admin/realms/{realm-name}/users/{id}/logout"
@ -87,12 +86,15 @@ URL_ADMIN_CLIENT_ROLES = URL_ADMIN_CLIENT + "/roles"
URL_ADMIN_CLIENT_ROLE = URL_ADMIN_CLIENT + "/roles/{role-name}" URL_ADMIN_CLIENT_ROLE = URL_ADMIN_CLIENT + "/roles/{role-name}"
URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE = URL_ADMIN_CLIENT_ROLE + "/composites" URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE = URL_ADMIN_CLIENT_ROLE + "/composites"
URL_ADMIN_CLIENT_ROLE_MEMBERS = URL_ADMIN_CLIENT + "/roles/{role-name}/users" URL_ADMIN_CLIENT_ROLE_MEMBERS = URL_ADMIN_CLIENT + "/roles/{role-name}/users"
URL_ADMIN_CLIENT_ROLE_GROUPS = URL_ADMIN_CLIENT + "/roles/{role-name}/groups"
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings" URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1" URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1" URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1" URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1&permission=false"
)
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = ( URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1" URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
) )
@ -155,6 +157,4 @@ URL_ADMIN_USER_FEDERATED_IDENTITY = (
) )
URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events" URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events"
URL_ADMIN_DELETE_USER_ROLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm"
URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats" URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats"
Loading…
Cancel
Save