Browse Source

Merged in stackstorm-keycloak (pull request #1)

Add some actions for a Stackstorm pack

Approved-by: Marcos Pereira <marcospereira.mpj@gmail.com>
Approved-by: Rangel Torrezan <rangel.iflex@gmail.com>
pull/12/head
Martin Devlin 7 years ago
committed by Rangel Torrezan
parent
commit
22ad89611a
  1. 21
      README.md
  2. 26
      keycloak/connection.py
  3. 2
      keycloak/exceptions.py
  4. 191
      keycloak/keycloak_admin.py
  5. 9
      keycloak/keycloak_openid.py
  6. 4
      keycloak/urls_patterns.py
  7. 3
      requirements.txt

21
README.md

@ -3,6 +3,8 @@
Python Keycloak
====================
For review- see https://bitbucket.org/agriness/python-keycloak
**python-keycloak** is a Python package providing access to the Keycloak API.
## Installation
@ -113,6 +115,9 @@ count_users = keycloak_admin.users_count()
# Get users Returns a list of users, filtered according to query parameters
users = keycloak_admin.get_users({})
# Get user ID from name
user-id-keycloak = keycloak_admin.get_user_id("example@example.com")
# Get User
user = keycloak_admin.get_user("user-id-keycloak")
@ -142,14 +147,24 @@ server_info = keycloak_admin.get_server_info()
# Get clients belonging to the realm Returns a list of clients belonging to the realm
clients = keycloak_admin.get_clients()
# Get client - id (not client-id) from client by name
client_id=keycloak_admin.get_client_id("my-client")
# Get representation of the client - id of client (not client-id)
client = keycloak_admin.get_client(client_id='id-client')
client = keycloak_admin.get_client(client_id=client_id)
# Get all roles for the client
client_roles = keycloak_admin.get_client_role(client_id='id-client')
client_roles = keycloak_admin.get_client_role(client_id=client_id)
# Create client role
keycloak_admin.create_client_role(client_id, "test")
# Get client role id from name
role_id = keycloak_admin.get_client_role_id(client_id=client_id, role_name="test")
# Get all roles for the realm or client
realm_roles = keycloak_admin.get_roles()
```
# Assign client role to user. Note that BOTH role_name and role_id appear to be required.
keycloak_admin.assign_client_role(client_id=client_id, user_id=user_id, role_id=role_id, role_name="test")
```

26
keycloak/connection.py

@ -30,12 +30,14 @@ class ConnectionManager(object):
base_url (str): The server URL.
headers (dict): The header parameters of the requests to the server.
timeout (int): Timeout to use for requests to the server.
verify (bool): Verify server SSL.
"""
def __init__(self, base_url, headers={}, timeout=60):
def __init__(self, base_url, headers={}, timeout=60, verify=True):
self._base_url = base_url
self._headers = headers
self._timeout = timeout
self._verify = verify
@property
def base_url(self):
@ -57,6 +59,16 @@ class ConnectionManager(object):
""" """
self._timeout = value
@property
def verify(self):
""" Return verify in use for request to the server. """
return self._verify
@verify.setter
def verify(self, value):
""" """
self._verify = value
@property
def headers(self):
""" Return header request to the server. """
@ -118,7 +130,8 @@ class ConnectionManager(object):
return requests.get(urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout)
timeout=self.timeout,
verify=self.verify)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
@ -138,7 +151,8 @@ class ConnectionManager(object):
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout)
timeout=self.timeout,
verify=self.verify)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
@ -158,7 +172,8 @@ class ConnectionManager(object):
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout)
timeout=self.timeout,
verify=self.verify)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
@ -177,7 +192,8 @@ class ConnectionManager(object):
return requests.delete(urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout)
timeout=self.timeout,
verify=self.verify)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)

2
keycloak/exceptions.py

@ -14,9 +14,9 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from json.decoder import JSONDecodeError
import requests
from simplejson import JSONDecodeError
class KeycloakError(Exception):

191
keycloak/keycloak_admin.py

@ -17,7 +17,7 @@
from .urls_patterns import URL_ADMIN_USERS_COUNT, URL_ADMIN_USER, URL_ADMIN_USER_CONSENTS, \
URL_ADMIN_SEND_UPDATE_ACCOUNT, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_GET_SESSIONS, \
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES
URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENTS, URL_ADMIN_CLIENT, URL_ADMIN_CLIENT_ROLES, URL_ADMIN_REALM_ROLES, URL_ADMIN_USER_CLIENT_ROLES
from .keycloak_openid import KeycloakOpenID
from .exceptions import raise_error_from_response, KeycloakGetError
@ -32,20 +32,21 @@ import json
class KeycloakAdmin:
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli'):
def __init__(self, server_url, verify, username, password, realm_name='master', client_id='admin-cli'):
self._username = username
self._password = password
self._client_id = client_id
self._realm_name = realm_name
# Get token Admin
keycloak_openid = KeycloakOpenID(server_url, client_id, realm_name)
keycloak_openid = KeycloakOpenID(server_url=server_url, client_id=client_id, realm_name=realm_name, verify=verify)
self._token = keycloak_openid.token(username, password)
self._connection = ConnectionManager(base_url=server_url,
headers={'Authorization': 'Bearer ' + self.token.get('access_token'),
'Content-Type': 'application/json'},
timeout=60)
timeout=60,
verify=verify)
@property
def realm_name(self):
@ -105,7 +106,7 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path), **query)
return raise_error_from_response(data_raw, KeycloakGetError)
def create_user(self, payload):
def create_user(self, username, email='', firstName='', lastName='', emailVerified=False, enabled=True):
"""
Create a new user Username must be unique
@ -114,11 +115,17 @@ class KeycloakAdmin:
:param payload: UserRepresentation
:return: UserRepresentation
"""
data={}
data["username"]=username
data["email"]=email
data["firstName"]=firstName
data["lastName"]=lastName
data["emailVerified"]=emailVerified
data["enabled"]=enabled
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_USERS.format(**params_path),
data=json.dumps(payload))
data=json.dumps(data))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201)
def users_count(self):
@ -131,6 +138,29 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(URL_ADMIN_USERS_COUNT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_user_id(self, username):
"""
Get internal keycloak user id from username
This is required for further actions against this user.
:param username:
clientId in UserRepresentation
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_userrepresentation
:return: user_id (uuid as string)
"""
params_path = {"realm-name": self.realm_name, "username": username}
data_raw = self.connection.raw_get(URL_ADMIN_USERS.format(**params_path))
data_content = raise_error_from_response(data_raw, KeycloakGetError)
for user in data_content:
thisusername = json.dumps(user["username"]).strip('"')
if thisusername == username:
return json.dumps(user["id"]).strip('"')
return None
def get_user(self, user_id):
"""
Get representation of the user
@ -145,7 +175,7 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(URL_ADMIN_USER.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def update_user(self, user_id, payload):
def update_user(self, user_id, username, email='', firstName='', lastName='', emailVerified=False, enabled=True):
"""
Update the user
@ -154,9 +184,17 @@ class KeycloakAdmin:
:return: Http response
"""
data={}
data["username"]=username
data["email"]=email
data["firstName"]=firstName
data["lastName"]=lastName
data["emailVerified"]=emailVerified
data["enabled"]=enabled
params_path = {"realm-name": self.realm_name}
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.connection.raw_put(URL_ADMIN_USER.format(**params_path),
data=json.dumps(payload))
data=json.dumps(data))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def delete_user(self, user_id):
@ -259,6 +297,28 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_id(self, client_id_name):
"""
Get internal keycloak client id from client-id.
This is required for further actions against this client.
:param client_id_name:
clientId in ClientRepresentation
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_clientrepresentation
:return: client_id (uuid as string)
"""
params_path = {"realm-name": self.realm_name, "clientId": client_id_name}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENTS.format(**params_path))
data_content = raise_error_from_response(data_raw, KeycloakGetError)
for client in data_content:
client_id = json.dumps(client["clientId"]).strip('"')
if client_id == client_id_name:
return json.dumps(client["id"]).strip('"')
return None
def get_client(self, client_id):
"""
Get representation of the client
@ -274,7 +334,44 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_role(self, client_id):
def create_client(self, name, client_id, redirect_urls, protocol="openid-connect", public_client=True, direct_access_grants=True):
"""
Create a client
:param name: name of client, payload (ClientRepresentation)
ClientRepresentation
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_clientrepresentation
"""
data={}
data["name"]=name
data["clientId"]=client_id
data["redirectUris"]=redirect_urls
data["protocol"]=protocol
data["publicClient"]=public_client
data["directAccessGrantsEnabled"]=direct_access_grants
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_CLIENTS.format(**params_path),
data=json.dumps(data))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201)
def delete_client(self, client_id):
"""
Get representation of the client
ClientRepresentation
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_clientrepresentation
:param client_id: id of client (not client-id)
:return: ClientRepresentation
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def get_client_roles(self, client_id):
"""
Get all roles for the client
@ -289,6 +386,29 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_role_id(self, client_id, role_name):
"""
Get client role id
This is required for further actions with this role.
RoleRepresentation
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation
:param client_id: id of client (not client-id), role_name: name of role
:return: role_id
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_get(URL_ADMIN_CLIENT_ROLES.format(**params_path))
data_content = raise_error_from_response(data_raw, KeycloakGetError)
for role in data_content:
this_role_name = json.dumps(role["name"]).strip('"')
if this_role_name == role_name:
return json.dumps(role["id"]).strip('"')
return None
def get_roles(self):
"""
Get all roles for the realm or client
@ -302,3 +422,54 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(URL_ADMIN_REALM_ROLES.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def create_client_role(self, client_id, role_name):
"""
Create a client role
:param client_id: id of client (not client-id), payload (RoleRepresentation)
RoleRepresentation
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation
"""
data={}
data["name"]=role_name
data["clientRole"]=True
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_post(URL_ADMIN_CLIENT_ROLES.format(**params_path),
data=json.dumps(data))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201)
def delete_client_role(self, client_id, role_name):
"""
Create a client role
:param client_id: id of client (not client-id), payload (RoleRepresentation)
RoleRepresentation
http://www.keycloak.org/docs-api/3.3/rest-api/index.html#_rolerepresentation
"""
data={}
data["name"]=role_name
data["clientRole"]=True
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.connection.raw_delete(URL_ADMIN_CLIENT_ROLES.format(**params_path) + "/" + role_name,
data=json.dumps(data))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def assign_client_role(self, user_id, client_id, role_id, role_name):
"""
Assign a client role to a user
:param client_id: id of client (not client-id), user_id: id of user, client_id: id of client containing role, role_id: client role id, role_name: client role name)
"""
payload=[{}]
payload[0]["id"]=role_id
payload[0]["name"]=role_name
params_path = {"realm-name": self.realm_name, "id": user_id, "client-id": client_id}
data_raw = self.connection.raw_post(URL_ADMIN_USER_CLIENT_ROLES.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)

9
keycloak/keycloak_openid.py

@ -34,14 +34,14 @@ import json
class KeycloakOpenID:
def __init__(self, server_url, client_id, realm_name, client_secret_key=None):
def __init__(self, server_url, verify, client_id, realm_name, client_secret_key=None):
self._client_id = client_id
self._client_secret_key = client_secret_key
self._realm_name = realm_name
self._connection = ConnectionManager(base_url=server_url,
headers={},
timeout=60)
timeout=60,
verify=verify)
self._authorization = Authorization()
@ -360,6 +360,3 @@ class KeycloakOpenID:
permissions += policy.permissions
return list(set(permissions))

4
keycloak/urls_patterns.py

@ -33,6 +33,8 @@ URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-ac
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions"
URL_ADMIN_USER_CLIENT_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}"
URL_ADMIN_SERVER_INFO = "admin/serverinfo"
URL_ADMIN_CLIENTS = "admin/realms/{realm-name}/clients"
@ -40,5 +42,3 @@ URL_ADMIN_CLIENT = "admin/realms/{realm-name}/clients/{id}"
URL_ADMIN_CLIENT_ROLES = "admin/realms/{realm-name}/clients/{id}/roles"
URL_ADMIN_REALM_ROLES = "admin/realms/{realm-name}/roles"

3
requirements.txt

@ -1,3 +1,4 @@
requests==2.18.3
httmock==1.2.5
python-jose==1.3.2
python-jose==1.3.2
simplejson
Loading…
Cancel
Save