|
|
@ -15,12 +15,16 @@ |
|
|
|
# You should have received a copy of the GNU Lesser General Public License |
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
|
|
|
|
|
|
# Unless otherwise stated in the comments, "id", in e.g. user_id, refers to the internal Keycloak server ID, usually a uuid string |
|
|
|
# Unless otherwise stated in the comments, "id", in e.g. user_id, refers to the |
|
|
|
# internal Keycloak server ID, usually a uuid string |
|
|
|
|
|
|
|
from .urls_patterns import URL_ADMIN_USERS_COUNT, URL_ADMIN_USER, URL_ADMIN_USER_CONSENTS, \ |
|
|
|
from .urls_patterns import \ |
|
|
|
URL_ADMIN_USERS_COUNT, URL_ADMIN_USER, URL_ADMIN_USER_CONSENTS, \ |
|
|
|
URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_GET_SESSIONS, \ |
|
|
|
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES, URL_ADMIN_USER_CLIENT_ROLES, \ |
|
|
|
URL_ADMIN_GROUP, URL_ADMIN_GROUPS, URL_ADMIN_GROUP_CHILD, URL_ADMIN_USER_GROUP, URL_ADMIN_USER_PASSWORD, URL_ADMIN_GROUP_PERMISSIONS |
|
|
|
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES, \ |
|
|
|
URL_ADMIN_USER_CLIENT_ROLES, URL_ADMIN_GROUP, URL_ADMIN_GROUPS, URL_ADMIN_GROUP_CHILD, URL_ADMIN_USER_GROUP,\ |
|
|
|
URL_ADMIN_USER_PASSWORD, URL_ADMIN_GROUP_PERMISSIONS |
|
|
|
|
|
|
|
from .keycloak_openid import KeycloakOpenID |
|
|
|
|
|
|
|
from .exceptions import raise_error_from_response, KeycloakGetError |
|
|
@ -109,47 +113,21 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path), **query) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def create_user(self, username, email='', firstName='', lastName='', emailVerified=False, enabled=True, password=None, passwordTemp=False, skip_exists=False): |
|
|
|
def create_user(self, payload): |
|
|
|
""" |
|
|
|
Create a new user Username must be unique |
|
|
|
|
|
|
|
UserRepresentation |
|
|
|
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_userrepresentation |
|
|
|
|
|
|
|
:param data: Http response |
|
|
|
:param payload: UserRepresentation |
|
|
|
|
|
|
|
:return: UserRepresentation |
|
|
|
""" |
|
|
|
data={} |
|
|
|
data["username"]=username |
|
|
|
data["email"]=email |
|
|
|
data["firstName"]=firstName |
|
|
|
data["lastName"]=lastName |
|
|
|
data["emailVerified"]=emailVerified |
|
|
|
data["enabled"]=enabled |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
|
|
|
|
exists = self.get_user_id(username=username) |
|
|
|
|
|
|
|
if exists is not None: |
|
|
|
return str(exists) |
|
|
|
|
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path), |
|
|
|
data=json.dumps(data)) |
|
|
|
create_resp = raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists) |
|
|
|
|
|
|
|
if password is not None: |
|
|
|
user_id = self.get_user_id(username) |
|
|
|
data={} |
|
|
|
data["value"]=password |
|
|
|
data["type"]="password" |
|
|
|
data["temporary"]=passwordTemp |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER_PASSWORD.format(**params_path), |
|
|
|
data=json.dumps(data)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
else: |
|
|
|
return create_resp |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
def users_count(self): |
|
|
|
""" |
|
|
@ -161,7 +139,6 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
|
|
|
def get_user_id(self, username): |
|
|
|
""" |
|
|
|
Get internal keycloak user id from username |
|
|
@ -198,40 +175,19 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def update_user(self, user_id, username, email='', firstName='', lastName='', emailVerified=False, enabled=True, password=None, passwordTemp=False): |
|
|
|
def update_user(self, user_id, payload): |
|
|
|
""" |
|
|
|
Update the user |
|
|
|
|
|
|
|
:param user_id: User id |
|
|
|
:param data: UserRepresentation |
|
|
|
:param payload: UserRepresentation |
|
|
|
|
|
|
|
:return: Http response |
|
|
|
""" |
|
|
|
data={} |
|
|
|
data["username"]=username |
|
|
|
data["email"]=email |
|
|
|
data["firstName"]=firstName |
|
|
|
data["lastName"]=lastName |
|
|
|
data["emailVerified"]=emailVerified |
|
|
|
data["enabled"]=enabled |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path), |
|
|
|
data=json.dumps(data)) |
|
|
|
update_resp = raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
if password is not None: |
|
|
|
user_id = self.get_user_id(username) |
|
|
|
data={} |
|
|
|
data["value"]=password |
|
|
|
data["type"]="password" |
|
|
|
data["temporary"]=passwordTemp |
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER_PASSWORD.format(**params_path), |
|
|
|
data=json.dumps(data)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
else: |
|
|
|
return update_resp |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def delete_user(self, user_id): |
|
|
|
""" |
|
|
@ -245,6 +201,26 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.connection.raw_delete(URL_ADMIN_USER.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204) |
|
|
|
|
|
|
|
def set_user_password(self, user_id, password, temporary=True): |
|
|
|
""" |
|
|
|
Set up a password for the user. If temporary is True, the user will have to reset |
|
|
|
the temporary password next time they log in. |
|
|
|
|
|
|
|
http://www.keycloak.org/docs-api/3.2/rest-api/#_users_resource |
|
|
|
http://www.keycloak.org/docs-api/3.2/rest-api/#_credentialrepresentation |
|
|
|
|
|
|
|
:param user_id: User id |
|
|
|
:param password: New password |
|
|
|
:param temporary: True if password is temporary |
|
|
|
|
|
|
|
:return: |
|
|
|
""" |
|
|
|
payload = {"type": "password", "temporary": temporary, "value": password} |
|
|
|
params_path = {"realm-name": self.realm_name, "id": user_id} |
|
|
|
data_raw = self.connection.raw_put(URL_ADMIN_USER_PASSWORD.format(**params_path), |
|
|
|
data=json.dumps(payload)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=200) |
|
|
|
|
|
|
|
def consents_user(self, user_id): |
|
|
|
""" |
|
|
|
Get consents granted by the user |
|
|
@ -362,25 +338,27 @@ class KeycloakAdmin: |
|
|
|
:return: GroupID (string) |
|
|
|
""" |
|
|
|
if parent is not None: |
|
|
|
params_path = {"realm-name": self.realm_name, "id": parent} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GROUP.format(**params_path)) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
data_content = [] |
|
|
|
data_content.append(res) |
|
|
|
params_path = {"realm-name": self.realm_name, "id": parent} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GROUP.format(**params_path)) |
|
|
|
res = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
data_content = [] |
|
|
|
data_content.append(res) |
|
|
|
else: |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GROUPS.format(**params_path)) |
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_GROUPS.format(**params_path)) |
|
|
|
data_content = raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
for group in data_content: |
|
|
|
thisgroupname = json.dumps(group["name"]).strip('"') |
|
|
|
thisgrouppath = json.dumps(group["path"]).strip('"') |
|
|
|
if (thisgroupname == name and name is not None) or (thisgrouppath == path and path is not None): |
|
|
|
return json.dumps(group["id"]).strip('"') |
|
|
|
for subgroup in group["subGroups"]: |
|
|
|
thisgrouppath = json.dumps(subgroup["path"]).strip('"') |
|
|
|
if (thisgrouppath == path and path is not None) or (thisgrouppath == name and name is not None): |
|
|
|
return json.dumps(subgroup["id"]).strip('"') |
|
|
|
thisgroupname = json.dumps(group["name"]).strip('"') |
|
|
|
thisgrouppath = json.dumps(group["path"]).strip('"') |
|
|
|
if (thisgroupname == name and name is not None) or (thisgrouppath == path and path is not None): |
|
|
|
return json.dumps(group["id"]).strip('"') |
|
|
|
for subgroup in group["subGroups"]: |
|
|
|
thisgrouppath = json.dumps(subgroup["path"]).strip('"') |
|
|
|
|
|
|
|
if (thisgrouppath == path and path is not None) or (thisgrouppath == name and name is not None): |
|
|
|
return json.dumps(subgroup["id"]).strip('"') |
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
def create_group(self, name=None, client_roles={}, realm_roles=[], sub_groups=[], path=None, parent=None, skip_exists=False): |
|
|
@ -539,7 +517,8 @@ class KeycloakAdmin: |
|
|
|
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
def create_client(self, name, client_id, redirect_uris, protocol="openid-connect", public_client=True, direct_access_grants=True, skip_exists=False): |
|
|
|
def create_client(self, name, client_id, redirect_uris, protocol="openid-connect", public_client=True, |
|
|
|
direct_access_grants=True): |
|
|
|
""" |
|
|
|
Create a client |
|
|
|
|
|
|
@ -563,7 +542,7 @@ class KeycloakAdmin: |
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
data_raw = self.connection.raw_post(URL_ADMIN_CLIENTS.format(**params_path), |
|
|
|
data=json.dumps(data)) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists) |
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201) |
|
|
|
|
|
|
|
def delete_client(self, client_id): |
|
|
|
""" |
|
|
@ -671,7 +650,8 @@ class KeycloakAdmin: |
|
|
|
""" |
|
|
|
Assign a client role to a user |
|
|
|
|
|
|
|
:param client_id: id of client (not client-id), user_id: id of user, client_id: id of client containing role, role_id: client role id, role_name: client role name) |
|
|
|
:param client_id: id of client (not client-id), user_id: id of user, client_id: id of client containing role, |
|
|
|
role_id: client role id, role_name: client role name) |
|
|
|
|
|
|
|
""" |
|
|
|
payload=[{}] |
|
|
|