|
@ -17,7 +17,7 @@ |
|
|
|
|
|
|
|
|
from .urls_patterns import URL_ADMIN_USERS_COUNT, URL_ADMIN_USER, URL_ADMIN_USER_CONSENTS, \ |
|
|
from .urls_patterns import URL_ADMIN_USERS_COUNT, URL_ADMIN_USER, URL_ADMIN_USER_CONSENTS, \ |
|
|
URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_GET_SESSIONS, \ |
|
|
URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_GET_SESSIONS, \ |
|
|
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES |
|
|
|
|
|
|
|
|
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES, URL_ADMIN_USER_CLIENT_ROLES |
|
|
from .keycloak_openid import KeycloakOpenID |
|
|
from .keycloak_openid import KeycloakOpenID |
|
|
|
|
|
|
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
@ -105,7 +105,7 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path), **query) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path), **query) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def create_user(self, payload): |
|
|
|
|
|
|
|
|
def create_user(self, username, email='', firstName='', lastName='', emailVerified=False, enabled=True): |
|
|
""" |
|
|
""" |
|
|
Create a new user Username must be unique |
|
|
Create a new user Username must be unique |
|
|
|
|
|
|
|
@ -114,11 +114,17 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
:param payload: UserRepresentation |
|
|
:param payload: UserRepresentation |
|
|
|
|
|
|
|
|
:return: UserRepresentation |
|
|
|
|
|
""" |
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["username"]=username |
|
|
|
|
|
data["email"]=email |
|
|
|
|
|
data["firstName"]=firstName |
|
|
|
|
|
data["lastName"]=lastName |
|
|
|
|
|
data["emailVerified"]=emailVerified |
|
|
|
|
|
data["enabled"]=enabled |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
data=json.dumps(payload)) |
|
|
|
|
|
|
|
|
data=json.dumps(data)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
|
def users_count(self): |
|
|
def users_count(self): |
|
@ -131,6 +137,29 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_user_id(self, username): |
|
|
|
|
|
""" |
|
|
|
|
|
Get internal keycloak user id from username |
|
|
|
|
|
This is required for further actions against this user. |
|
|
|
|
|
|
|
|
|
|
|
:param username: |
|
|
|
|
|
clientId in UserRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_userrepresentation |
|
|
|
|
|
|
|
|
|
|
|
:return: user_id (uuid as string) |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "username": username} |
|
|
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path)) |
|
|
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
for user in data_content: |
|
|
|
|
|
thisusername = json.dumps(user["username"]).strip('"') |
|
|
|
|
|
if thisusername == username: |
|
|
|
|
|
return json.dumps(user["id"]).strip('"') |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def get_user(self, user_id): |
|
|
def get_user(self, user_id): |
|
|
""" |
|
|
""" |
|
|
Get representation of the user |
|
|
Get representation of the user |
|
@ -145,7 +174,7 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def update_user(self, user_id, payload): |
|
|
|
|
|
|
|
|
def update_user(self, user_id, username, email='', firstName='', lastName='', emailVerified=False, enabled=True): |
|
|
""" |
|
|
""" |
|
|
Update the user |
|
|
Update the user |
|
|
|
|
|
|
|
@ -154,9 +183,17 @@ class KeycloakAdmin: |
|
|
|
|
|
|
|
|
:return: Http response |
|
|
:return: Http response |
|
|
""" |
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["username"]=username |
|
|
|
|
|
data["email"]=email |
|
|
|
|
|
data["firstName"]=firstName |
|
|
|
|
|
data["lastName"]=lastName |
|
|
|
|
|
data["emailVerified"]=emailVerified |
|
|
|
|
|
data["enabled"]=enabled |
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path), |
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path), |
|
|
data=json.dumps(payload)) |
|
|
|
|
|
|
|
|
data=json.dumps(data)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
|
def delete_user(self, user_id): |
|
|
def delete_user(self, user_id): |
|
@ -259,6 +296,28 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def get_client_id(self, client_id_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Get internal keycloak client id from client-id. |
|
|
|
|
|
This is required for further actions against this client. |
|
|
|
|
|
|
|
|
|
|
|
:param client_id_name: |
|
|
|
|
|
clientId in ClientRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_clientrepresentation |
|
|
|
|
|
|
|
|
|
|
|
:return: client_id (uuid as string) |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "clientId": client_id_name} |
|
|
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path)) |
|
|
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
for client in data_content: |
|
|
|
|
|
client_id = json.dumps(client["clientId"]).strip('"') |
|
|
|
|
|
if client_id == client_id_name: |
|
|
|
|
|
return json.dumps(client["id"]).strip('"') |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def get_client(self, client_id): |
|
|
def get_client(self, client_id): |
|
|
""" |
|
|
""" |
|
|
Get representation of the client |
|
|
Get representation of the client |
|
@ -274,7 +333,7 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def get_client_role(self, client_id): |
|
|
|
|
|
|
|
|
def get_client_roles(self, client_id): |
|
|
""" |
|
|
""" |
|
|
Get all roles for the client |
|
|
Get all roles for the client |
|
|
|
|
|
|
|
@ -289,6 +348,29 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def get_client_role_id(self, client_id, role_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Get client role id |
|
|
|
|
|
This is required for further actions with this role. |
|
|
|
|
|
|
|
|
|
|
|
RoleRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), role_name: name of role |
|
|
|
|
|
|
|
|
|
|
|
:return: role_id |
|
|
|
|
|
""" |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path)) |
|
|
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
for role in data_content: |
|
|
|
|
|
this_role_name = json.dumps(role["name"]).strip('"') |
|
|
|
|
|
if this_role_name == role_name: |
|
|
|
|
|
return json.dumps(role["id"]).strip('"') |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def get_roles(self): |
|
|
def get_roles(self): |
|
|
""" |
|
|
""" |
|
|
Get all roles for the realm or client |
|
|
Get all roles for the realm or client |
|
@ -302,3 +384,36 @@ class KeycloakAdmin: |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path)) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def create_client_role(self, client_id, role_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Create a client role |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), payload (RoleRepresentation) |
|
|
|
|
|
|
|
|
|
|
|
RoleRepresentation |
|
|
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
data={} |
|
|
|
|
|
data["name"]=role_name |
|
|
|
|
|
data["clientRole"]=True |
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path), |
|
|
|
|
|
data=json.dumps(data)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
|
|
|
|
def assign_client_role(self, user_id, client_id, role_id, role_name): |
|
|
|
|
|
""" |
|
|
|
|
|
Assign a client role to a user |
|
|
|
|
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), user_id: id of user, client_id: id of client containing role, role_id: client role id, role_name: client role name) |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
payload=[{}] |
|
|
|
|
|
payload[0]["id"]=role_id |
|
|
|
|
|
payload[0]["name"]=role_name |
|
|
|
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id} |
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path), |
|
|
|
|
|
data=json.dumps(payload)) |
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |