|
@ -17,7 +17,7 @@ |
|
|
|
|
|
|
|
|
from .urls_patterns import URL_ADMIN_USERS_COUNT, URL_ADMIN_USER, URL_ADMIN_USER_CONSENTS, \ |
|
|
from .urls_patterns import URL_ADMIN_USERS_COUNT, URL_ADMIN_USER, URL_ADMIN_USER_CONSENTS, \ |
|
|
URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_GET_SESSIONS, \ |
|
|
URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_GET_SESSIONS, \ |
|
|
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES |
|
|
|
|
|
|
|
|
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES, URL_ADMIN_USER_CLIENT_ROLES |
|
|
from .keycloak_openid import KeycloakOpenID |
|
|
from .keycloak_openid import KeycloakOpenID |
|
|
|
|
|
|
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
@ -32,20 +32,21 @@ import json |
|
|
|
|
|
|
|
|
class KeycloakAdmin: |
|
|
class KeycloakAdmin: |
|
|
|
|
|
|
|
|
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli'): |
|
|
|
|
|
|
|
|
def __init__(self, server_url, verify, username, password, realm_name='master', client_id='admin-cli'): |
|
|
self._username = username |
|
|
self._username = username |
|
|
self._password = password |
|
|
self._password = password |
|
|
self._client_id = client_id |
|
|
self._client_id = client_id |
|
|
self._realm_name = realm_name |
|
|
self._realm_name = realm_name |
|
|
|
|
|
|
|
|
# Get token Admin |
|
|
# Get token Admin |
|
|
keycloak_openid = KeycloakOpenID(server_url, client_id, realm_name) |
|
|
|
|
|
|
|
|
keycloak_openid = KeycloakOpenID(server_url=server_url, client_id=client_id, realm_name=realm_name, verify=verify) |
|
|
self._token = keycloak_openid.token(username, password) |
|
|
self._token = keycloak_openid.token(username, password) |
|
|
|
|
|
|
|
|
self._connection = ConnectionManager(base_url=server_url, |
|
|
self._connection = ConnectionManager(base_url=server_url, |
|
|
headers={'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
headers={'Authorization': 'Bearer ' + self.token.get('access_token'), |
|
|
'Content-Type': 'application/json'}, |
|
|
'Content-Type': 'application/json'}, |
|
|
timeout=60) |
|
|
|
|
|
|
|
|
timeout=60, |
|
|
|
|
|
verify=verify) |
|
|
|
|
|
|
|
|
@property |
|
|
@property |
|
|
def realm_name(self): |
|
|
def realm_name(self): |
|
@ -105,7 +106,7 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path), **query) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path), **query) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def create_user(self, payload): |
|
|
|
|
|
|
|
|
def create_user(self, username, email='', firstName='', lastName='', emailVerified=False, enabled=True): |
|
|
""" |
|
|
""" |
|
|
Create a new user Username must be unique |
|
|
Create a new user Username must be unique |
|
|
|
|
|
|
|
@ -114,11 +115,17 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
:param payload: UserRepresentation |
|
|
:param payload: UserRepresentation |
|
|
|
|
|
|
|
|
:return: UserRepresentation |
|
|
|
|
|
""" |
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["username"]=username |
|
|
|
|
|
data["email"]=email |
|
|
|
|
|
data["firstName"]=firstName |
|
|
|
|
|
data["lastName"]=lastName |
|
|
|
|
|
data["emailVerified"]=emailVerified |
|
|
|
|
|
data["enabled"]=enabled |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
data=json.dumps(payload)) |
|
|
|
|
|
|
|
|
data=json.dumps(data)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
|
def users_count(self): |
|
|
def users_count(self): |
|
@ -131,6 +138,29 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_user_id(self, username): |
|
|
|
|
|
""" |
|
|
|
|
|
Get internal keycloak user id from username |
|
|
|
|
|
This is required for further actions against this user. |
|
|
|
|
|
|
|
|
|
|
|
:param username: |
|
|
|
|
|
clientId in UserRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_userrepresentation |
|
|
|
|
|
|
|
|
|
|
|
:return: user_id (uuid as string) |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "username": username} |
|
|
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path)) |
|
|
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
for user in data_content: |
|
|
|
|
|
thisusername = json.dumps(user["username"]).strip('"') |
|
|
|
|
|
if thisusername == username: |
|
|
|
|
|
return json.dumps(user["id"]).strip('"') |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def get_user(self, user_id): |
|
|
def get_user(self, user_id): |
|
|
""" |
|
|
""" |
|
|
Get representation of the user |
|
|
Get representation of the user |
|
@ -145,7 +175,7 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def update_user(self, user_id, payload): |
|
|
|
|
|
|
|
|
def update_user(self, user_id, username, email='', firstName='', lastName='', emailVerified=False, enabled=True): |
|
|
""" |
|
|
""" |
|
|
Update the user |
|
|
Update the user |
|
|
|
|
|
|
|
@ -154,9 +184,17 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
:return: Http response |
|
|
:return: Http response |
|
|
""" |
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["username"]=username |
|
|
|
|
|
data["email"]=email |
|
|
|
|
|
data["firstName"]=firstName |
|
|
|
|
|
data["lastName"]=lastName |
|
|
|
|
|
data["emailVerified"]=emailVerified |
|
|
|
|
|
data["enabled"]=enabled |
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path), |
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path), |
|
|
data=json.dumps(payload)) |
|
|
|
|
|
|
|
|
data=json.dumps(data)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
|
def delete_user(self, user_id): |
|
|
def delete_user(self, user_id): |
|
@ -259,6 +297,28 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def get_client_id(self, client_id_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Get internal keycloak client id from client-id. |
|
|
|
|
|
This is required for further actions against this client. |
|
|
|
|
|
|
|
|
|
|
|
:param client_id_name: |
|
|
|
|
|
clientId in ClientRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_clientrepresentation |
|
|
|
|
|
|
|
|
|
|
|
:return: client_id (uuid as string) |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "clientId": client_id_name} |
|
|
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
for client in data_content: |
|
|
|
|
|
client_id = json.dumps(client["clientId"]).strip('"') |
|
|
|
|
|
if client_id == client_id_name: |
|
|
|
|
|
return json.dumps(client["id"]).strip('"') |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def get_client(self, client_id): |
|
|
def get_client(self, client_id): |
|
|
""" |
|
|
""" |
|
|
Get representation of the client |
|
|
Get representation of the client |
|
@ -274,7 +334,44 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def get_client_role(self, client_id): |
|
|
|
|
|
|
|
|
def create_client(self, name, client_id, redirect_urls, protocol="openid-connect", public_client=True, direct_access_grants=True): |
|
|
|
|
|
""" |
|
|
|
|
|
Create a client |
|
|
|
|
|
|
|
|
|
|
|
:param name: name of client, payload (ClientRepresentation) |
|
|
|
|
|
|
|
|
|
|
|
ClientRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_clientrepresentation |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["name"]=name |
|
|
|
|
|
data["clientId"]=client_id |
|
|
|
|
|
data["redirectUris"]=redirect_urls |
|
|
|
|
|
data["protocol"]=protocol |
|
|
|
|
|
data["publicClient"]=public_client |
|
|
|
|
|
data["directAccessGrantsEnabled"]=direct_access_grants |
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_CLIENTS.format(**params_path), |
|
|
|
|
|
data=json.dumps(data)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
|
|
|
|
def delete_client(self, client_id): |
|
|
|
|
|
""" |
|
|
|
|
|
Get representation of the client |
|
|
|
|
|
|
|
|
|
|
|
ClientRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_clientrepresentation |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id) |
|
|
|
|
|
|
|
|
|
|
|
:return: ClientRepresentation |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
|
|
|
|
def get_client_roles(self, client_id): |
|
|
""" |
|
|
""" |
|
|
Get all roles for the client |
|
|
Get all roles for the client |
|
|
|
|
|
|
|
@ -289,6 +386,29 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def get_client_role_id(self, client_id, role_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Get client role id |
|
|
|
|
|
This is required for further actions with this role. |
|
|
|
|
|
|
|
|
|
|
|
RoleRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), role_name: name of role |
|
|
|
|
|
|
|
|
|
|
|
:return: role_id |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
for role in data_content: |
|
|
|
|
|
this_role_name = json.dumps(role["name"]).strip('"') |
|
|
|
|
|
if this_role_name == role_name: |
|
|
|
|
|
return json.dumps(role["id"]).strip('"') |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def get_roles(self): |
|
|
def get_roles(self): |
|
|
""" |
|
|
""" |
|
|
Get all roles for the realm or client |
|
|
Get all roles for the realm or client |
|
@ -302,3 +422,54 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def create_client_role(self, client_id, role_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Create a client role |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), payload (RoleRepresentation) |
|
|
|
|
|
|
|
|
|
|
|
RoleRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["name"]=role_name |
|
|
|
|
|
data["clientRole"]=True |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path), |
|
|
|
|
|
data=json.dumps(data)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
|
|
|
|
def delete_client_role(self, client_id, role_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Create a client role |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), payload (RoleRepresentation) |
|
|
|
|
|
|
|
|
|
|
|
RoleRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["name"]=role_name |
|
|
|
|
|
data["clientRole"]=True |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT_ROLES.format(**params_path) + "/" + role_name, |
|
|
|
|
|
data=json.dumps(data)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
|
|
|
|
def assign_client_role(self, user_id, client_id, role_id, role_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Assign a client role to a user |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), user_id: id of user, client_id: id of client containing role, role_id: client role id, role_name: client role name) |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
payload=[{}] |
|
|
|
|
|
payload[0]["id"]=role_id |
|
|
|
|
|
payload[0]["name"]=role_name |
|
|
|
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), |
|
|
|
|
|
data=json.dumps(payload)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |