Browse Source

fix: flake8, black and immediate issues resolved

pull/307/head
Richard Nemeth 3 years ago
parent
commit
5297485926
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 3
      .gitignore
  2. 6
      keycloak/__init__.py
  3. 53
      keycloak/authorization/__init__.py
  4. 3
      keycloak/authorization/permission.py
  5. 12
      keycloak/authorization/policy.py
  6. 102
      keycloak/connection.py
  7. 12
      keycloak/exceptions.py
  8. 687
      keycloak/keycloak_admin.py
  9. 150
      keycloak/keycloak_openid.py
  10. 65
      keycloak/urls_patterns.py
  11. 9
      pyproject.toml
  12. 12
      setup.cfg
  13. 38
      setup.py
  14. 0
      tests/__init__.py
  15. 122
      tests/test_connection.py

3
.gitignore

@ -103,4 +103,5 @@ ENV/
.idea/
main.py
main2.py
s3air-authz-config.json
s3air-authz-config.json
.vscode

6
keycloak/__init__.py

@ -21,5 +21,7 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from .keycloak_admin import *
from .keycloak_openid import *
from .keycloak_admin import KeycloakAdmin
from .keycloak_openid import KeycloakOpenID
__all__ = ["KeycloakAdmin", "KeycloakOpenID"]

53
keycloak/authorization/__init__.py

@ -55,39 +55,44 @@ class Authorization:
:param data: keycloak authorization data (dict)
:return:
"""
for pol in data['policies']:
if pol['type'] == 'role':
policy = Policy(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
config_roles = json.loads(pol['config']['roles'])
for pol in data["policies"]:
if pol["type"] == "role":
policy = Policy(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
config_roles = json.loads(pol["config"]["roles"])
for role in config_roles:
policy.add_role(Role(name=role['id'],
required=role['required']))
policy.add_role(Role(name=role["id"], required=role["required"]))
self.policies[policy.name] = policy
if pol['type'] == 'scope':
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
if pol["type"] == "scope":
permission = Permission(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
permission.scopes = ast.literal_eval(pol['config']['scopes'])
permission.scopes = ast.literal_eval(pol["config"]["scopes"])
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
self.policies[policy_name].add_permission(permission)
if pol['type'] == 'resource':
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
if pol["type"] == "resource":
permission = Permission(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
permission.resources = ast.literal_eval(pol['config'].get('resources', "[]"))
permission.resources = ast.literal_eval(pol["config"].get("resources", "[]"))
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
if self.policies.get(policy_name) is not None:
self.policies[policy_name].add_permission(permission)

3
keycloak/authorization/permission.py

@ -26,7 +26,8 @@ class Permission:
"""
Consider this simple and very common permission:
A permission associates the object being protected with the policies that must be evaluated to determine whether access is granted.
A permission associates the object being protected with the policies that must be evaluated to
determine whether access is granted.
X CAN DO Y ON RESOURCE Z

12
keycloak/authorization/policy.py

@ -29,9 +29,10 @@ class Policy:
A policy defines the conditions that must be satisfied to grant access to an object.
Unlike permissions, you do not specify the object being protected but rather the conditions
that must be satisfied for access to a given object (for example, resource, scope, or both).
Policies are strongly related to the different access control mechanisms (ACMs) that you can use to
protect your resources. With policies, you can implement strategies for attribute-based access control
(ABAC), role-based access control (RBAC), context-based access control, or any combination of these.
Policies are strongly related to the different access control mechanisms (ACMs) that you can
use to protect your resources. With policies, you can implement strategies for attribute-based
access control (ABAC), role-based access control (RBAC), context-based access control, or any
combination of these.
https://keycloak.gitbooks.io/documentation/authorization_services/topics/policy/overview.html
@ -98,9 +99,10 @@ class Policy:
:param role: keycloak role.
:return:
"""
if self.type != 'role':
if self.type != "role":
raise KeycloakAuthorizationConfigError(
"Can't add role. Policy type is different of role")
"Can't add role. Policy type is different of role"
)
self._roles.append(role)
def add_permission(self, permission):

102
keycloak/connection.py

@ -29,11 +29,11 @@ except ImportError:
import requests
from requests.adapters import HTTPAdapter
from .exceptions import (KeycloakConnectionError)
from .exceptions import KeycloakConnectionError
class ConnectionManager(object):
""" Represents a simple server connection.
"""Represents a simple server connection.
Args:
base_url (str): The server URL.
headers (dict): The header parameters of the requests to the server.
@ -52,15 +52,15 @@ class ConnectionManager(object):
# retry once to reset connection with Keycloak after tomcat's ConnectionTimeout
# see https://github.com/marcospereirampj/python-keycloak/issues/36
for protocol in ('https://', 'http://'):
for protocol in ("https://", "http://"):
adapter = HTTPAdapter(max_retries=1)
# adds POST to retry whitelist
allowed_methods = set(adapter.max_retries.allowed_methods)
allowed_methods.add('POST')
allowed_methods.add("POST")
adapter.max_retries.allowed_methods = frozenset(allowed_methods)
self._s.mount(protocol, adapter)
if proxies:
self._s.proxies.update(proxies)
@ -69,7 +69,7 @@ class ConnectionManager(object):
@property
def base_url(self):
""" Return base url in use for requests to the server. """
"""Return base url in use for requests to the server."""
return self._base_url
@base_url.setter
@ -79,7 +79,7 @@ class ConnectionManager(object):
@property
def timeout(self):
""" Return timeout in use for request to the server. """
"""Return timeout in use for request to the server."""
return self._timeout
@timeout.setter
@ -89,7 +89,7 @@ class ConnectionManager(object):
@property
def verify(self):
""" Return verify in use for request to the server. """
"""Return verify in use for request to the server."""
return self._verify
@verify.setter
@ -99,7 +99,7 @@ class ConnectionManager(object):
@property
def headers(self):
""" Return header request to the server. """
"""Return header request to the server."""
return self._headers
@headers.setter
@ -108,7 +108,7 @@ class ConnectionManager(object):
self._headers = value
def param_headers(self, key):
""" Return a specific header parameter.
"""Return a specific header parameter.
:arg
key (str): Header parameters key.
:return:
@ -117,11 +117,11 @@ class ConnectionManager(object):
return self.headers.get(key)
def clean_headers(self):
""" Clear header parameters. """
"""Clear header parameters."""
self.headers = {}
def exist_param_headers(self, key):
""" Check if the parameter exists in the header.
"""Check if the parameter exists in the header.
:arg
key (str): Header parameters key.
:return:
@ -130,7 +130,7 @@ class ConnectionManager(object):
return self.param_headers(key) is not None
def add_param_headers(self, key, value):
""" Add a single parameter inside the header.
"""Add a single parameter inside the header.
:arg
key (str): Header parameters key.
value (str): Value to be added.
@ -138,14 +138,14 @@ class ConnectionManager(object):
self.headers[key] = value
def del_param_headers(self, key):
""" Remove a specific parameter.
"""Remove a specific parameter.
:arg
key (str): Key of the header parameters.
"""
self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
""" Submit get request to the path.
"""Submit get request to the path.
:arg
path (str): Path for request.
:return
@ -155,17 +155,18 @@ class ConnectionManager(object):
"""
try:
return self._s.get(urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.get(
urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_post(self, path, data, **kwargs):
""" Submit post request to the path.
"""Submit post request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
@ -175,18 +176,19 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return self._s.post(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.post(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_put(self, path, data, **kwargs):
""" Submit put request to the path.
"""Submit put request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
@ -196,18 +198,19 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return self._s.put(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.put(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_delete(self, path, data={}, **kwargs):
""" Submit delete request to the path.
"""Submit delete request to the path.
:arg
path (str): Path for request.
@ -218,12 +221,13 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return self._s.delete(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.delete(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)

12
keycloak/exceptions.py

@ -25,8 +25,7 @@ import requests
class KeycloakError(Exception):
def __init__(self, error_message="", response_code=None,
response_body=None):
def __init__(self, error_message="", response_code=None, response_body=None):
Exception.__init__(self, error_message)
@ -56,6 +55,7 @@ class KeycloakOperationError(KeycloakError):
class KeycloakDeprecationError(KeycloakError):
pass
class KeycloakGetError(KeycloakOperationError):
pass
@ -93,7 +93,7 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
return {"Already exists"}
try:
message = response.json()['message']
message = response.json()["message"]
except (KeyError, ValueError):
message = response.content
@ -103,6 +103,6 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
if response.status_code == 401:
error = KeycloakAuthenticationError
raise error(error_message=message,
response_code=response.status_code,
response_body=response.content)
raise error(
error_message=message, response_code=response.status_code, response_body=response.content
)

687
keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

150
keycloak/keycloak_openid.py

@ -27,24 +27,38 @@ from jose import jwt
from .authorization import Authorization
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError, \
KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError
from .exceptions import (
KeycloakAuthorizationConfigError,
KeycloakDeprecationError,
KeycloakGetError,
KeycloakInvalidTokenError,
KeycloakRPTNotFound,
raise_error_from_response,
)
from .urls_patterns import (
URL_REALM,
URL_AUTH,
URL_CERTS,
URL_ENTITLEMENT,
URL_INTROSPECT,
URL_LOGOUT,
URL_REALM,
URL_TOKEN,
URL_USERINFO,
URL_WELL_KNOWN,
URL_LOGOUT,
URL_CERTS,
URL_ENTITLEMENT,
URL_INTROSPECT
)
class KeycloakOpenID:
def __init__(self, server_url, realm_name, client_id, client_secret_key=None, verify=True, custom_headers=None, proxies=None):
def __init__(
self,
server_url,
realm_name,
client_id,
client_secret_key=None,
verify=True,
custom_headers=None,
proxies=None,
):
"""
:param server_url: Keycloak server url
@ -62,11 +76,9 @@ class KeycloakOpenID:
if custom_headers is not None:
# merge custom headers to main headers
headers.update(custom_headers)
self._connection = ConnectionManager(base_url=server_url,
headers=headers,
timeout=60,
verify=verify,
proxies=proxies)
self._connection = ConnectionManager(
base_url=server_url, headers=headers, timeout=60, verify=verify, proxies=proxies
)
self._authorization = Authorization()
@ -138,7 +150,7 @@ class KeycloakOpenID:
:param kwargs:
:return:
"""
if method_token_info == 'introspect':
if method_token_info == "introspect":
token_info = self.introspect(token)
else:
token_info = self.decode_token(token, **kwargs)
@ -146,11 +158,11 @@ class KeycloakOpenID:
return token_info
def well_know(self):
""" The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
"""The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
:return It lists endpoints and other configuration options relevant.
:return It lists endpoints and other configuration options relevant.
"""
params_path = {"realm-name": self.realm_name}
@ -165,12 +177,23 @@ class KeycloakOpenID:
:return:
"""
params_path = {"authorization-endpoint": self.well_know()['authorization_endpoint'],
"client-id": self.client_id,
"redirect-uri": redirect_uri}
params_path = {
"authorization-endpoint": self.well_know()["authorization_endpoint"],
"client-id": self.client_id,
"redirect-uri": redirect_uri,
}
return URL_AUTH.format(**params_path)
def token(self, username="", password="", grant_type=["password"], code="", redirect_uri="", totp=None, **extra):
def token(
self,
username="",
password="",
grant_type=["password"],
code="",
redirect_uri="",
totp=None,
**extra
):
"""
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
@ -188,9 +211,14 @@ class KeycloakOpenID:
:return:
"""
params_path = {"realm-name": self.realm_name}
payload = {"username": username, "password": password,
"client_id": self.client_id, "grant_type": grant_type,
"code": code, "redirect_uri": redirect_uri}
payload = {
"username": username,
"password": password,
"client_id": self.client_id,
"grant_type": grant_type,
"code": code,
"redirect_uri": redirect_uri,
}
if extra:
payload.update(extra)
@ -198,8 +226,7 @@ class KeycloakOpenID:
payload["totp"] = totp
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
@ -216,10 +243,13 @@ class KeycloakOpenID:
:return:
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "grant_type": grant_type, "refresh_token": refresh_token}
payload = {
"client_id": self.client_id,
"grant_type": grant_type,
"refresh_token": refresh_token,
}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def userinfo(self, token):
@ -250,8 +280,7 @@ class KeycloakOpenID:
payload = {"client_id": self.client_id, "refresh_token": refresh_token}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204])
@ -268,7 +297,7 @@ class KeycloakOpenID:
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_CERTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def public_key(self):
"""
The public key is exposed by the realm page directly.
@ -277,32 +306,31 @@ class KeycloakOpenID:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)['public_key']
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
def entitlement(self, token, resource_server_id):
"""
Client applications can use a specific endpoint to obtain a special security token
called a requesting party token (RPT). This token consists of all the entitlements
(or permissions) for a user as a result of the evaluation of the permissions and authorization
policies associated with the resources being requested. With an RPT, client applications can
gain access to protected resources at the resource server.
(or permissions) for a user as a result of the evaluation of the permissions and
authorization policies associated with the resources being requested. With an RPT,
client applications can gain access to protected resources at the resource server.
:return:
"""
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
if data_raw.status_code == 404:
if data_raw.status_code == 404:
return raise_error_from_response(data_raw, KeycloakDeprecationError)
return raise_error_from_response(data_raw, KeycloakGetError)
def introspect(self, token, rpt=None, token_type_hint=None):
"""
The introspection endpoint is used to retrieve the active state of a token. It is can only be
invoked by confidential clients.
The introspection endpoint is used to retrieve the active state of a token.
It is can only be invoked by confidential clients.
https://tools.ietf.org/html/rfc7662
@ -316,7 +344,7 @@ class KeycloakOpenID:
payload = {"client_id": self.client_id, "token": token}
if token_type_hint == 'requesting_party_token':
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
self.connection.add_param_headers("Authorization", "Bearer " + token)
@ -325,12 +353,11 @@ class KeycloakOpenID:
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def decode_token(self, token, key, algorithms=['RS256'], **kwargs):
def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
"""
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
@ -347,8 +374,7 @@ class KeycloakOpenID:
:return:
"""
return jwt.decode(token, key, algorithms=algorithms,
audience=self.client_id, **kwargs)
return jwt.decode(token, key, algorithms=algorithms, audience=self.client_id, **kwargs)
def load_authorization_config(self, path):
"""
@ -357,12 +383,12 @@ class KeycloakOpenID:
:param path: settings file (json)
:return:
"""
authorization_file = open(path, 'r')
authorization_file = open(path, "r")
authorization_json = json.loads(authorization_file.read())
self.authorization.load_config(authorization_json)
authorization_file.close()
def get_policies(self, token, method_token_info='introspect', **kwargs):
def get_policies(self, token, method_token_info="introspect", **kwargs):
"""
Get policies by user token
@ -377,12 +403,10 @@ class KeycloakOpenID:
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == 'introspect' and not token_info['active']:
raise KeycloakInvalidTokenError(
"Token expired or invalid."
)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info['resource_access'].get(self.client_id)
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
@ -390,13 +414,13 @@ class KeycloakOpenID:
policies = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources['roles']:
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
policies.append(policy)
return list(set(policies))
def get_permissions(self, token, method_token_info='introspect', **kwargs):
def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""
Get permission by user token
@ -413,12 +437,10 @@ class KeycloakOpenID:
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == 'introspect' and not token_info['active']:
raise KeycloakInvalidTokenError(
"Token expired or invalid."
)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info['resource_access'].get(self.client_id)
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
@ -426,7 +448,7 @@ class KeycloakOpenID:
permissions = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources['roles']:
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
permissions += policy.permissions

65
keycloak/urls_patterns.py

@ -30,7 +30,9 @@ URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout"
URL_CERTS = "realms/{realm-name}/protocol/openid-connect/certs"
URL_INTROSPECT = "realms/{realm-name}/protocol/openid-connect/token/introspect"
URL_ENTITLEMENT = "realms/{realm-name}/authz/entitlement/{resource-server-id}"
URL_AUTH = "{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}"
URL_AUTH = (
"{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}"
)
# ADMIN URLS
URL_ADMIN_USERS = "admin/realms/{realm-name}/users"
@ -41,14 +43,26 @@ URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-ac
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions"
URL_ADMIN_USER_CLIENT_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}"
URL_ADMIN_USER_CLIENT_ROLES = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}"
)
URL_ADMIN_USER_REALM_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/realm"
URL_ADMIN_USER_REALM_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/available"
URL_ADMIN_USER_REALM_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite"
URL_ADMIN_USER_REALM_ROLES_AVAILABLE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/realm/available"
)
URL_ADMIN_USER_REALM_ROLES_COMPOSITE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite"
)
URL_ADMIN_GROUPS_REALM_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/realm"
URL_ADMIN_GROUPS_CLIENT_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}"
URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available"
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite"
URL_ADMIN_GROUPS_CLIENT_ROLES = (
"admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}"
)
URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available"
)
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite"
)
URL_ADMIN_USER_GROUP = "admin/realms/{realm-name}/users/{id}/groups/{group-id}"
URL_ADMIN_USER_GROUPS = "admin/realms/{realm-name}/users/{id}/groups"
URL_ADMIN_USER_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
@ -73,14 +87,19 @@ URL_ADMIN_CLIENT_ROLES = URL_ADMIN_CLIENT + "/roles"
URL_ADMIN_CLIENT_ROLE = URL_ADMIN_CLIENT + "/roles/{role-name}"
URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE = URL_ADMIN_CLIENT_ROLE + "/composites"
URL_ADMIN_CLIENT_ROLE_MEMBERS = URL_ADMIN_CLIENT + "/roles/{role-name}/users"
URL_ADMIN_CLIENT_ROLE_GROUPS = URL_ADMIN_CLIENT + "/roles/{role-name}/groups"
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1"
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
)
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
)
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user"
URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}"
@ -101,8 +120,13 @@ URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances"
URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers"
URL_ADMIN_IDP = "admin/realms//{realm-name}/identity-provider/instances/{alias}"
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = "admin/realms/{realm-name}/roles/{role-name}/composites"
URL_ADMIN_REALM_EXPORT = "admin/realms/{realm-name}/partial-export?exportClients={export-clients}&exportGroupsAndRoles={export-groups-and-roles}"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = (
"admin/realms/{realm-name}/roles/{role-name}/composites"
)
URL_ADMIN_REALM_EXPORT = (
"admin/realms/{realm-name}/partial-export?exportClients={export-clients"
+ "}&exportGroupsAndRoles={export-groups-and-roles}"
)
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes"
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE = URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES + "/{id}"
@ -113,10 +137,16 @@ URL_ADMIN_FLOWS = "admin/realms/{realm-name}/authentication/flows"
URL_ADMIN_FLOW = URL_ADMIN_FLOWS + "/{id}"
URL_ADMIN_FLOWS_ALIAS = "admin/realms/{realm-name}/authentication/flows/{flow-id}"
URL_ADMIN_FLOWS_COPY = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/copy"
URL_ADMIN_FLOWS_EXECUTIONS = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions"
URL_ADMIN_FLOWS_EXECUTIONS = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions"
)
URL_ADMIN_FLOWS_EXECUTION = "admin/realms/{realm-name}/authentication/executions/{id}"
URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution"
URL_ADMIN_FLOWS_EXECUTIONS_FLOW = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow"
URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution"
)
URL_ADMIN_FLOWS_EXECUTIONS_FLOW = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow"
)
URL_ADMIN_AUTHENTICATOR_CONFIG = "admin/realms/{realm-name}/authentication/config/{id}"
URL_ADMIN_COMPONENTS = "admin/realms/{realm-name}/components"
@ -124,10 +154,11 @@ URL_ADMIN_COMPONENT = "admin/realms/{realm-name}/components/{component-id}"
URL_ADMIN_KEYS = "admin/realms/{realm-name}/keys"
URL_ADMIN_USER_FEDERATED_IDENTITIES = "admin/realms/{realm-name}/users/{id}/federated-identity"
URL_ADMIN_USER_FEDERATED_IDENTITY = "admin/realms/{realm-name}/users/{id}/federated-identity/{provider}"
URL_ADMIN_USER_FEDERATED_IDENTITY = (
"admin/realms/{realm-name}/users/{id}/federated-identity/{provider}"
)
URL_ADMIN_EVENTS = 'admin/realms/{realm-name}/events'
URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events"
URL_ADMIN_DELETE_USER_ROLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm"
URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats"

9
pyproject.toml

@ -0,0 +1,9 @@
[tool.black]
line-length = 99
[tool.isort]
line_length = 99
profile = "black"
[tool.flake8]
max-line-length = 99

12
setup.cfg

@ -1,2 +1,12 @@
[metadata]
description-file = README.md
description-file = README.md
[tool.black]
line-length = 99
[tool.isort]
line_length = 99
profile = "black"
[tool.flake8]
max-line-length = 99

38
setup.py

@ -6,26 +6,26 @@ with open("README.md", "r") as fh:
long_description = fh.read()
setup(
name='python-keycloak',
version='0.27.0',
url='https://github.com/marcospereirampj/python-keycloak',
license='The MIT License',
author='Marcos Pereira',
author_email='marcospereira.mpj@gmail.com',
keywords='keycloak openid',
description='python-keycloak is a Python package providing access to the Keycloak API.',
name="python-keycloak",
version="0.27.0",
url="https://github.com/marcospereirampj/python-keycloak",
license="The MIT License",
author="Marcos Pereira",
author_email="marcospereira.mpj@gmail.com",
keywords="keycloak openid",
description="python-keycloak is a Python package providing access to the Keycloak API.",
long_description=long_description,
long_description_content_type="text/markdown",
packages=['keycloak', 'keycloak.authorization', 'keycloak.tests'],
install_requires=['requests>=2.20.0', 'python-jose>=1.4.0'],
tests_require=['httmock>=1.2.5'],
packages=["keycloak", "keycloak.authorization", "keycloak.tests"],
install_requires=["requests>=2.20.0", "python-jose>=1.4.0", "urllib>=1.26.0"],
tests_require=["httmock>=1.2.5"],
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Development Status :: 3 - Alpha',
'Operating System :: MacOS',
'Operating System :: Unix',
'Operating System :: Microsoft :: Windows',
'Topic :: Utilities'
]
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Operating System :: MacOS",
"Operating System :: Unix",
"Operating System :: Microsoft :: Windows",
"Topic :: Utilities",
],
)

0
keycloak/tests/__init__.py → tests/__init__.py

122
keycloak/tests/test_connection.py → tests/test_connection.py

@ -19,7 +19,7 @@ from unittest import mock
from httmock import urlmatch, response, HTTMock, all_requests
from keycloak import KeycloakAdmin, KeycloakOpenID
from ..connection import ConnectionManager
from keycloak.connection import ConnectionManager
try:
import unittest
@ -28,55 +28,49 @@ except ImportError:
class TestConnection(unittest.TestCase):
def setUp(self):
self._conn = ConnectionManager(
base_url="http://localhost/",
headers={},
timeout=60)
self._conn = ConnectionManager(base_url="http://localhost/", headers={}, timeout=60)
@all_requests
def response_content_success(self, url, request):
headers = {'content-type': 'application/json'}
content = b'response_ok'
headers = {"content-type": "application/json"}
content = b"response_ok"
return response(200, content, headers, None, 5, request)
def test_raw_get(self):
with HTTMock(self.response_content_success):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b'response_ok')
self.assertEqual(resp.content, b"response_ok")
self.assertEqual(resp.status_code, 200)
def test_raw_post(self):
@urlmatch(path="/known_path", method="post")
def response_post_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
headers = {"content-type": "application/json"}
content = "response".encode("utf-8")
return response(201, content, headers, None, 5, request)
with HTTMock(response_post_success):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
resp = self._conn.raw_post("/known_path", {"field": "value"})
self.assertEqual(resp.content, b"response")
self.assertEqual(resp.status_code, 201)
def test_raw_put(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
headers = {"content-type": "application/json"}
content = "response".encode("utf-8")
return response(200, content, headers, None, 5, request)
with HTTMock(response_put_success):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
resp = self._conn.raw_put("/known_path", {"field": "value"})
self.assertEqual(resp.content, b"response")
self.assertEqual(resp.status_code, 200)
def test_raw_get_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="get")
def response_get_fail(url, request):
headers = {'content-type': 'application/json'}
headers = {"content-type": "application/json"}
content = "404 page not found".encode("utf-8")
return response(404, content, headers, None, 5, request)
@ -89,33 +83,30 @@ class TestConnection(unittest.TestCase):
def test_raw_post_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="post")
def response_post_fail(url, request):
headers = {'content-type': 'application/json'}
headers = {"content-type": "application/json"}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_post_fail):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
resp = self._conn.raw_post("/known_path", {"field": "value"})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
def test_raw_put_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_fail(url, request):
headers = {'content-type': 'application/json'}
headers = {"content-type": "application/json"}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_put_fail):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
resp = self._conn.raw_put("/known_path", {"field": "value"})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
def test_add_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
self.assertEqual(self._conn.headers, {"test": "value"})
def test_del_param_headers(self):
self._conn.add_param_headers("test", "value")
@ -124,8 +115,7 @@ class TestConnection(unittest.TestCase):
def test_clean_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
self.assertEqual(self._conn.headers, {"test": "value"})
self._conn.clean_headers()
self.assertEqual(self._conn.headers, {})
@ -141,11 +131,9 @@ class TestConnection(unittest.TestCase):
def test_get_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
self.assertEqual(self._conn.headers, {"test": "value"})
def test_KeycloakAdmin_custom_header(self):
class FakeToken:
@staticmethod
def get(string_val):
@ -153,39 +141,49 @@ class TestConnection(unittest.TestCase):
fake_token = FakeToken()
with mock.patch.object(KeycloakOpenID, "__init__", return_value=None) as mock_keycloak_open_id:
with mock.patch("keycloak.keycloak_openid.KeycloakOpenID.token", return_value=fake_token):
with mock.patch("keycloak.connection.ConnectionManager.__init__", return_value=None) as mock_connection_manager:
with mock.patch("keycloak.connection.ConnectionManager.__del__", return_value=None) as mock_connection_manager_delete:
with mock.patch.object(
KeycloakOpenID, "__init__", return_value=None
) as mock_keycloak_open_id:
with mock.patch(
"keycloak.keycloak_openid.KeycloakOpenID.token", return_value=fake_token
):
with mock.patch(
"keycloak.connection.ConnectionManager.__init__", return_value=None
) as mock_connection_manager:
with mock.patch(
"keycloak.connection.ConnectionManager.__del__", return_value=None
) as mock_connection_manager_delete:
server_url = "https://localhost/auth/"
username = "admin"
password = "secret"
realm_name = "master"
headers = {
'Custom': 'test-custom-header'
headers = {"Custom": "test-custom-header"}
KeycloakAdmin(
server_url=server_url,
username=username,
password=password,
realm_name=realm_name,
verify=False,
custom_headers=headers,
)
mock_keycloak_open_id.assert_called_with(
server_url=server_url,
realm_name=realm_name,
client_id="admin-cli",
client_secret_key=None,
verify=False,
custom_headers=headers,
)
expected_header = {
"Authorization": "Bearer faketoken",
"Content-Type": "application/json",
"Custom": "test-custom-header",
}
KeycloakAdmin(server_url=server_url,
username=username,
password=password,
realm_name=realm_name,
verify=False,
custom_headers=headers)
mock_keycloak_open_id.assert_called_with(server_url=server_url,
realm_name=realm_name,
client_id='admin-cli',
client_secret_key=None,
verify=False,
custom_headers=headers)
expected_header = {'Authorization': 'Bearer faketoken',
'Content-Type': 'application/json',
'Custom': 'test-custom-header'
}
mock_connection_manager.assert_called_with(base_url=server_url,
headers=expected_header,
timeout=60,
verify=False)
mock_connection_manager.assert_called_with(
base_url=server_url, headers=expected_header, timeout=60, verify=False
)
mock_connection_manager_delete.assert_called_once_with()
Loading…
Cancel
Save