Browse Source

style: fixed docstrings everywhere

pull/354/head
Richard Nemeth 2 years ago
parent
commit
5e6c775735
  1. 44
      poetry.lock
  2. 2
      src/keycloak/__init__.py
  3. 10
      src/keycloak/authorization/__init__.py
  4. 14
      src/keycloak/authorization/permission.py
  5. 20
      src/keycloak/authorization/policy.py
  6. 9
      src/keycloak/authorization/role.py
  7. 15
      src/keycloak/connection.py
  8. 609
      src/keycloak/keycloak_admin.py
  9. 75
      src/keycloak/keycloak_openid.py
  10. 42
      src/keycloak/uma_permissions.py
  11. 2
      src/keycloak/urls_patterns.py
  12. 1
      tests/__init__.py
  13. 29
      tests/test_keycloak_admin.py
  14. 28
      tests/test_uma_permissions.py
  15. 3
      tests/test_urls_patterns.py

44
poetry.lock

@ -22,7 +22,7 @@ test = ["coverage", "flake8", "pexpect", "wheel"]
[[package]]
name = "astroid"
version = "2.11.6"
version = "2.11.7"
description = "An abstract syntax tree for Python with inference support."
category = "main"
optional = true
@ -36,7 +36,7 @@ wrapt = ">=1.11,<2"
[[package]]
name = "atomicwrites"
version = "1.4.0"
version = "1.4.1"
description = "Atomic file writes."
category = "dev"
optional = false
@ -208,7 +208,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "ecdsa"
version = "0.17.0"
version = "0.18.0"
description = "ECDSA cryptographic signature library (pure python)"
category = "main"
optional = false
@ -460,7 +460,7 @@ testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "pre-commit"
version = "2.19.0"
version = "2.20.0"
description = "A framework for managing and maintaining multi-language pre-commit hooks."
category = "dev"
optional = false
@ -871,7 +871,7 @@ python-versions = ">=3.7"
[[package]]
name = "tomlkit"
version = "0.11.0"
version = "0.11.1"
description = "Style preserving TOML library"
category = "dev"
optional = false
@ -926,11 +926,11 @@ python-versions = ">=3.5"
[[package]]
name = "urllib3"
version = "1.26.9"
version = "1.26.10"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4"
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4"
[package.extras]
brotli = ["brotlicffi (>=0.8.0)", "brotli (>=1.0.9)", "brotlipy (>=0.6.0)"]
@ -1001,14 +1001,8 @@ argcomplete = [
{file = "argcomplete-1.12.3-py2.py3-none-any.whl", hash = "sha256:291f0beca7fd49ce285d2f10e4c1c77e9460cf823eef2de54df0c0fec88b0d81"},
{file = "argcomplete-1.12.3.tar.gz", hash = "sha256:2c7dbffd8c045ea534921e63b0be6fe65e88599990d8dc408ac8c542b72a5445"},
]
astroid = [
{file = "astroid-2.11.6-py3-none-any.whl", hash = "sha256:ba33a82a9a9c06a5ceed98180c5aab16e29c285b828d94696bf32d6015ea82a9"},
{file = "astroid-2.11.6.tar.gz", hash = "sha256:4f933d0bf5e408b03a6feb5d23793740c27e07340605f236496cd6ce552043d6"},
]
atomicwrites = [
{file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"},
{file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"},
]
astroid = []
atomicwrites = []
attrs = [
{file = "attrs-21.4.0-py2.py3-none-any.whl", hash = "sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4"},
{file = "attrs-21.4.0.tar.gz", hash = "sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"},
@ -1125,10 +1119,7 @@ docutils = [
{file = "docutils-0.17.1-py2.py3-none-any.whl", hash = "sha256:cf316c8370a737a022b72b56874f6602acf974a37a9fba42ec2876387549fc61"},
{file = "docutils-0.17.1.tar.gz", hash = "sha256:686577d2e4c32380bb50cbb22f575ed742d58168cee37e99117a854bcd88f125"},
]
ecdsa = [
{file = "ecdsa-0.17.0-py2.py3-none-any.whl", hash = "sha256:5cf31d5b33743abe0dfc28999036c849a69d548f994b535e527ee3cb7f3ef676"},
{file = "ecdsa-0.17.0.tar.gz", hash = "sha256:b9f500bb439e4153d0330610f5d26baaf18d17b8ced1bc54410d189385ea68aa"},
]
ecdsa = []
filelock = [
{file = "filelock-3.7.1-py3-none-any.whl", hash = "sha256:37def7b658813cda163b56fc564cdc75e86d338246458c4c28ae84cabefa2404"},
{file = "filelock-3.7.1.tar.gz", hash = "sha256:3a0fd85166ad9dbab54c9aec96737b744106dc5f15c0b09a6744a445299fcf04"},
@ -1290,10 +1281,7 @@ pluggy = [
{file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"},
{file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"},
]
pre-commit = [
{file = "pre_commit-2.19.0-py2.py3-none-any.whl", hash = "sha256:10c62741aa5704faea2ad69cb550ca78082efe5697d6f04e5710c3c229afdd10"},
{file = "pre_commit-2.19.0.tar.gz", hash = "sha256:4233a1e38621c87d9dda9808c6606d7e7ba0e087cd56d3fe03202a01d2919615"},
]
pre-commit = []
prompt-toolkit = [
{file = "prompt_toolkit-3.0.30-py3-none-any.whl", hash = "sha256:d8916d3f62a7b67ab353a952ce4ced6a1d2587dfe9ef8ebc30dd7c386751f289"},
{file = "prompt_toolkit-3.0.30.tar.gz", hash = "sha256:859b283c50bde45f5f97829f77a4674d1c1fcd88539364f1b28a37805cfd89c0"},
@ -1463,10 +1451,7 @@ tomli = [
{file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"},
{file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"},
]
tomlkit = [
{file = "tomlkit-0.11.0-py3-none-any.whl", hash = "sha256:0f4050db66fd445b885778900ce4dd9aea8c90c4721141fde0d6ade893820ef1"},
{file = "tomlkit-0.11.0.tar.gz", hash = "sha256:71ceb10c0eefd8b8f11fe34e8a51ad07812cb1dc3de23247425fbc9ddc47b9dd"},
]
tomlkit = []
tox = [
{file = "tox-3.25.1-py2.py3-none-any.whl", hash = "sha256:c38e15f4733683a9cc0129fba078633e07eb0961f550a010ada879e95fb32632"},
{file = "tox-3.25.1.tar.gz", hash = "sha256:c138327815f53bc6da4fe56baec5f25f00622ae69ef3fe4e1e385720e22486f9"},
@ -1505,10 +1490,7 @@ unidecode = [
{file = "Unidecode-1.3.4-py3-none-any.whl", hash = "sha256:afa04efcdd818a93237574791be9b2817d7077c25a068b00f8cff7baa4e59257"},
{file = "Unidecode-1.3.4.tar.gz", hash = "sha256:8e4352fb93d5a735c788110d2e7ac8e8031eb06ccbfe8d324ab71735015f9342"},
]
urllib3 = [
{file = "urllib3-1.26.9-py2.py3-none-any.whl", hash = "sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14"},
{file = "urllib3-1.26.9.tar.gz", hash = "sha256:aabaf16477806a5e1dd19aa41f8c2b7950dd3c746362d7e3223dbe6de6ac448e"},
]
urllib3 = []
virtualenv = [
{file = "virtualenv-20.15.1-py2.py3-none-any.whl", hash = "sha256:b30aefac647e86af6d82bfc944c556f8f1a9c90427b2fb4e3bfbf338cb82becf"},
{file = "virtualenv-20.15.1.tar.gz", hash = "sha256:288171134a2ff3bfb1a2f54f119e77cd1b81c29fc1265a2356f3e8d14c7d58c4"},

2
src/keycloak/__init__.py

@ -21,6 +21,8 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Python-Keycloak library."""
from ._version import __version__
from .connection import ConnectionManager
from .exceptions import (

10
src/keycloak/authorization/__init__.py

@ -21,6 +21,8 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Authorization module."""
import ast
import json
@ -30,18 +32,19 @@ from .role import Role
class Authorization:
"""
Keycloak Authorization (policies, roles, scopes and resources).
"""Keycloak Authorization (policies, roles, scopes and resources).
https://keycloak.gitbooks.io/documentation/authorization_services/index.html
"""
def __init__(self):
"""Init method."""
self.policies = {}
@property
def policies(self):
"""Get policies."""
return self._policies
@policies.setter
@ -49,8 +52,7 @@ class Authorization:
self._policies = value
def load_config(self, data):
"""
Load policies, roles and permissions (scope/resources).
"""Load policies, roles and permissions (scope/resources).
:param data: keycloak authorization data (dict)
:returns: None

14
src/keycloak/authorization/permission.py

@ -21,9 +21,12 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak authorization Permission module."""
class Permission:
"""
"""Base permission class.
Consider this simple and very common permission:
A permission associates the object being protected with the policies that must be evaluated to
@ -45,6 +48,7 @@ class Permission:
"""
def __init__(self, name, type, logic, decision_strategy):
"""Init method."""
self._name = name
self._type = type
self._logic = logic
@ -53,13 +57,16 @@ class Permission:
self._scopes = []
def __repr__(self):
"""Repr method."""
return "<Permission: %s (%s)>" % (self.name, self.type)
def __str__(self):
"""Str method."""
return "Permission: %s (%s)" % (self.name, self.type)
@property
def name(self):
"""Get name."""
return self._name
@name.setter
@ -68,6 +75,7 @@ class Permission:
@property
def type(self):
"""Get type."""
return self._type
@type.setter
@ -76,6 +84,7 @@ class Permission:
@property
def logic(self):
"""Get logic."""
return self._logic
@logic.setter
@ -84,6 +93,7 @@ class Permission:
@property
def decision_strategy(self):
"""Get decision strategy."""
return self._decision_strategy
@decision_strategy.setter
@ -92,6 +102,7 @@ class Permission:
@property
def resources(self):
"""Get resources."""
return self._resources
@resources.setter
@ -100,6 +111,7 @@ class Permission:
@property
def scopes(self):
"""Get scopes."""
return self._scopes
@scopes.setter

20
src/keycloak/authorization/policy.py

@ -21,11 +21,14 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak authorization Policy module."""
from ..exceptions import KeycloakAuthorizationConfigError
class Policy:
"""
"""Base policy class.
A policy defines the conditions that must be satisfied to grant access to an object.
Unlike permissions, you do not specify the object being protected but rather the conditions
that must be satisfied for access to a given object (for example, resource, scope, or both).
@ -39,6 +42,7 @@ class Policy:
"""
def __init__(self, name, type, logic, decision_strategy):
"""Init method."""
self._name = name
self._type = type
self._logic = logic
@ -47,13 +51,16 @@ class Policy:
self._permissions = []
def __repr__(self):
"""Repr method."""
return "<Policy: %s (%s)>" % (self.name, self.type)
def __str__(self):
"""Str method."""
return "Policy: %s (%s)" % (self.name, self.type)
@property
def name(self):
"""Get name."""
return self._name
@name.setter
@ -62,6 +69,7 @@ class Policy:
@property
def type(self):
"""Get type."""
return self._type
@type.setter
@ -70,6 +78,7 @@ class Policy:
@property
def logic(self):
"""Get logic."""
return self._logic
@logic.setter
@ -78,6 +87,7 @@ class Policy:
@property
def decision_strategy(self):
"""Get decision strategy."""
return self._decision_strategy
@decision_strategy.setter
@ -86,15 +96,16 @@ class Policy:
@property
def roles(self):
"""Get roles."""
return self._roles
@property
def permissions(self):
"""Get permissions."""
return self._permissions
def add_role(self, role):
"""
Add keycloak role in policy.
"""Add keycloak role in policy.
:param role: keycloak role.
:return:
@ -106,8 +117,7 @@ class Policy:
self._roles.append(role)
def add_permission(self, permission):
"""
Add keycloak permission in policy.
"""Add keycloak permission in policy.
:param permission: keycloak permission.
:return:

9
src/keycloak/authorization/role.py

@ -21,25 +21,30 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""The authorization Role module."""
class Role:
"""
"""Authorization Role base class.
Roles identify a type or category of user. Admin, user,
manager, and employee are all typical roles that may exist in an organization.
https://keycloak.gitbooks.io/documentation/server_admin/topics/roles.html
"""
def __init__(self, name, required=False):
"""Init method."""
self.name = name
self.required = required
@property
def get_name(self):
"""Get name."""
return self.name
def __eq__(self, other):
"""Eq method."""
if isinstance(other, str):
return self.name == other
return NotImplemented

15
src/keycloak/connection.py

@ -21,6 +21,8 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Connection manager module."""
try:
from urllib.parse import urljoin
except ImportError:
@ -33,8 +35,7 @@ from .exceptions import KeycloakConnectionError
class ConnectionManager(object):
"""
Represents a simple server connection.
"""Represents a simple server connection.
:param base_url: (str) The server URL.
:param headers: (dict) The header parameters of the requests to the server.
@ -44,6 +45,7 @@ class ConnectionManager(object):
"""
def __init__(self, base_url, headers={}, timeout=60, verify=True, proxies=None):
"""Init method."""
self._base_url = base_url
self._headers = headers
self._timeout = timeout
@ -66,6 +68,7 @@ class ConnectionManager(object):
self._s.proxies.update(proxies)
def __del__(self):
"""Del method."""
self._s.close()
@property
@ -75,7 +78,6 @@ class ConnectionManager(object):
@base_url.setter
def base_url(self, value):
""" """
self._base_url = value
@property
@ -85,7 +87,6 @@ class ConnectionManager(object):
@timeout.setter
def timeout(self, value):
""" """
self._timeout = value
@property
@ -95,7 +96,6 @@ class ConnectionManager(object):
@verify.setter
def verify(self, value):
""" """
self._verify = value
@property
@ -105,12 +105,10 @@ class ConnectionManager(object):
@headers.setter
def headers(self, value):
""" """
self._headers = value
def param_headers(self, key):
"""
Return a specific header parameter.
"""Return a specific header parameter.
:param key: (str) Header parameters key.
:returns: If the header parameters exist, return its value.
@ -151,7 +149,6 @@ class ConnectionManager(object):
:returns: Response the request.
:raises: HttpError Can't connect to server.
"""
try:
return self._s.get(
urljoin(self.base_url, path),

609
src/keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

75
src/keycloak/keycloak_openid.py

@ -21,6 +21,12 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak OpenID module.
The module contains mainly the implementation of KeycloakOpenID class, the main
class to handle authentication and token manipulation.
"""
import json
from jose import jwt
@ -52,8 +58,7 @@ from .urls_patterns import (
class KeycloakOpenID:
"""
Keycloak OpenID client.
"""Keycloak OpenID client.
:param server_url: Keycloak server url
:param client_id: client id
@ -75,6 +80,7 @@ class KeycloakOpenID:
proxies=None,
timeout=60,
):
"""Init method."""
self.client_id = client_id
self.client_secret_key = client_secret_key
self.realm_name = realm_name
@ -87,6 +93,7 @@ class KeycloakOpenID:
@property
def client_id(self):
"""Get client id."""
return self._client_id
@client_id.setter
@ -95,6 +102,7 @@ class KeycloakOpenID:
@property
def client_secret_key(self):
"""Get the client secret key."""
return self._client_secret_key
@client_secret_key.setter
@ -103,6 +111,7 @@ class KeycloakOpenID:
@property
def realm_name(self):
"""Get the realm name."""
return self._realm_name
@realm_name.setter
@ -111,6 +120,7 @@ class KeycloakOpenID:
@property
def connection(self):
"""Get connection."""
return self._connection
@connection.setter
@ -119,6 +129,7 @@ class KeycloakOpenID:
@property
def authorization(self):
"""Get authorization."""
return self._authorization
@authorization.setter
@ -126,8 +137,7 @@ class KeycloakOpenID:
self._authorization = value
def _add_secret_key(self, payload):
"""
Add secret key if exist.
"""Add secret key if exists.
:param payload:
:return:
@ -138,7 +148,7 @@ class KeycloakOpenID:
return payload
def _build_name_role(self, role):
"""
"""Build name of a role.
:param role:
:return:
@ -146,7 +156,7 @@ class KeycloakOpenID:
return self.client_id + "/" + role
def _token_info(self, token, method_token_info, **kwargs):
"""
"""Getter for the token data.
:param token:
:param method_token_info:
@ -161,19 +171,20 @@ class KeycloakOpenID:
return token_info
def well_known(self):
"""The most important endpoint to understand is the well-known configuration
"""Get the well_known object.
The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
:return It lists endpoints and other configuration options relevant.
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def auth_url(self, redirect_uri):
"""
"""Get the authentication URL endpoint.
http://openid.net/specs/openid-connect-core-1_0.html#AuthorizationEndpoint
@ -196,7 +207,8 @@ class KeycloakOpenID:
totp=None,
**extra
):
"""
"""Retrieve user token.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
what flow is used. The token endpoint is also used to obtain new access tokens
@ -232,7 +244,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
"""
"""Refresh the user token.
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
what flow is used. The token endpoint is also used to obtain new access tokens
@ -255,7 +268,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
def exchange_token(self, token: str, client_id: str, audience: str, subject: str) -> dict:
"""
"""Exchange user token.
Use a token to obtain an entirely different token. See
https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange
@ -279,7 +293,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
def userinfo(self, token):
"""
"""Get the user info object.
The userinfo endpoint returns standard claims about the authenticated user,
and is protected by a bearer token.
@ -294,8 +309,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError)
def logout(self, refresh_token):
"""
The logout endpoint logs out the authenticated user.
"""Log out the authenticated user.
:param refresh_token:
:return:
"""
@ -306,7 +321,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
def certs(self):
"""
"""Get certificates.
The certificate endpoint returns the public keys enabled by the realm, encoded as a
JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled
for verifying tokens.
@ -320,7 +336,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError)
def public_key(self):
"""
"""Retrieve the public key.
The public key is exposed by the realm page directly.
:return:
@ -330,7 +347,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
def entitlement(self, token, resource_server_id):
"""
"""Get entitlements from the token.
Client applications can use a specific endpoint to obtain a special security token
called a requesting party token (RPT). This token consists of all the entitlements
(or permissions) for a user as a result of the evaluation of the permissions and
@ -349,7 +367,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover
def introspect(self, token, rpt=None, token_type_hint=None):
"""
"""Introspect the user token.
The introspection endpoint is used to retrieve the active state of a token.
It is can only be invoked by confidential clients.
@ -377,7 +396,8 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
"""
"""Decode user token.
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
also defines a JWK Set JSON data structure that represents a set of
@ -395,8 +415,7 @@ class KeycloakOpenID:
return jwt.decode(token, key, algorithms=algorithms, audience=self.client_id, **kwargs)
def load_authorization_config(self, path):
"""
Load Keycloak settings (authorization)
"""Load Keycloak settings (authorization).
:param path: settings file (json)
:return:
@ -407,8 +426,7 @@ class KeycloakOpenID:
self.authorization.load_config(authorization_json)
def get_policies(self, token, method_token_info="introspect", **kwargs):
"""
Get policies by user token
"""Get policies by user token.
:param token: user token
:return: policies list
@ -438,8 +456,7 @@ class KeycloakOpenID:
return list(set(policies))
def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""
Get permission by user token
"""Get permission by user token.
:param token: user token
:param method_token_info: Decode token method
@ -471,8 +488,7 @@ class KeycloakOpenID:
return list(set(permissions))
def uma_permissions(self, token, permissions=""):
"""
Get UMA permissions by user token with requested permissions
"""Get UMA permissions by user token with requested permissions.
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
invoked by confidential clients.
@ -499,8 +515,7 @@ class KeycloakOpenID:
return raise_error_from_response(data_raw, KeycloakPostError)
def has_uma_access(self, token, permissions):
"""
Determine whether user has uma permissions with specified user token
"""Determine whether user has uma permissions with specified user token.
:param token: user token
:param permissions: list of uma permissions (resource:scope)

42
src/keycloak/uma_permissions.py

@ -21,11 +21,14 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""User-managed access permissions module."""
from keycloak.exceptions import KeycloakPermissionFormatError, PermissionDefinitionError
class UMAPermission:
"""A class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
Usage example:
@ -36,9 +39,16 @@ class UMAPermission:
>>> print(permission)
'Users#delete'
:param permission: Permission
:type permission: UMAPermission
:param resource: Resource
:type resource: str
:param scope: Scope
:type scope: str
"""
def __init__(self, permission=None, resource="", scope=""):
"""Init method."""
self.resource = resource
self.scope = scope
@ -53,21 +63,26 @@ class UMAPermission:
self.scope = str(permission.scope)
def __str__(self):
"""Str method."""
scope = self.scope
if scope:
scope = "#" + scope
return "{}{}".format(self.resource, scope)
def __eq__(self, __o: object) -> bool:
"""Eq method."""
return str(self) == str(__o)
def __repr__(self) -> str:
"""Repr method."""
return self.__str__()
def __hash__(self) -> int:
"""Hash method."""
return hash(str(self))
def __call__(self, permission=None, resource="", scope="") -> object:
"""Call method."""
result_resource = self.resource
result_scope = self.scope
@ -91,36 +106,58 @@ class UMAPermission:
class Resource(UMAPermission):
"""An UMAPermission Resource class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
:param resource: Resource
:type resource: str
"""
def __init__(self, resource):
"""Init method."""
super().__init__(resource=resource)
class Scope(UMAPermission):
"""An UMAPermission Scope class to conveniently assembly permissions.
The class itself is callable, and will return the assembled permission.
:param scope: Scope
:type scope: str
"""
def __init__(self, scope):
"""Init method."""
super().__init__(scope=scope)
class AuthStatus:
"""A class that represents the authorization/login status of a user associated with a token.
This has to evaluate to True if and only if the user is properly authorized
for the requested resource."""
for the requested resource.
:param is_logged_in: Is logged in indicator
:type is_logged_in: bool
:param is_authorized: Is authorized indicator
:type is_authorized: bool
:param missing_permissions: Missing permissions
:type missing_permissions: set
"""
def __init__(self, is_logged_in, is_authorized, missing_permissions):
"""Init method."""
self.is_logged_in = is_logged_in
self.is_authorized = is_authorized
self.missing_permissions = missing_permissions
def __bool__(self):
"""Bool method."""
return self.is_authorized
def __repr__(self):
"""Repr method."""
return (
f"AuthStatus("
f"is_authorized={self.is_authorized}, "
@ -130,8 +167,7 @@ class AuthStatus:
def build_permission_param(permissions):
"""
Transform permissions to a set, so they are usable for requests
"""Transform permissions to a set, so they are usable for requests.
:param permissions: either str (resource#scope),
iterable[str] (resource#scope),

2
src/keycloak/urls_patterns.py

@ -21,6 +21,8 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Keycloak URL patterns."""
# OPENID URLS
URL_REALM = "realms/{realm-name}"
URL_WELL_KNOWN = "realms/{realm-name}/.well-known/openid-configuration"

1
tests/__init__.py

@ -0,0 +1 @@
"""Tests module."""

29
tests/test_keycloak_admin.py

@ -1,3 +1,5 @@
"""Test the keycloak admin object."""
import pytest
import keycloak
@ -13,10 +15,12 @@ from keycloak.exceptions import (
def test_keycloak_version():
"""Test version."""
assert keycloak.__version__, keycloak.__version__
def test_keycloak_admin_bad_init(env):
"""Test keycloak admin bad init."""
with pytest.raises(TypeError) as err:
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
@ -37,6 +41,7 @@ def test_keycloak_admin_bad_init(env):
def test_keycloak_admin_init(env):
"""Test keycloak admin init."""
admin = KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
@ -111,6 +116,7 @@ def test_keycloak_admin_init(env):
def test_realms(admin: KeycloakAdmin):
"""Test realms."""
# Get realms
realms = admin.get_realms()
assert len(realms) == 1, realms
@ -175,6 +181,7 @@ def test_realms(admin: KeycloakAdmin):
def test_import_export_realms(admin: KeycloakAdmin, realm: str):
"""Test import and export of realms."""
admin.realm_name = realm
realm_export = admin.export_realm(export_clients=True, export_groups_and_role=True)
@ -192,6 +199,7 @@ def test_import_export_realms(admin: KeycloakAdmin, realm: str):
def test_users(admin: KeycloakAdmin, realm: str):
"""Test users."""
admin.realm_name = realm
# Check no users present
@ -283,6 +291,7 @@ def test_users(admin: KeycloakAdmin, realm: str):
def test_users_pagination(admin: KeycloakAdmin, realm: str):
"""Test user pagination."""
admin.realm_name = realm
for ind in range(admin.PAGE_SIZE + 50):
@ -300,6 +309,7 @@ def test_users_pagination(admin: KeycloakAdmin, realm: str):
def test_idps(admin: KeycloakAdmin, realm: str):
"""Test IDPs."""
admin.realm_name = realm
# Create IDP
@ -371,6 +381,7 @@ def test_idps(admin: KeycloakAdmin, realm: str):
def test_user_credentials(admin: KeycloakAdmin, user: str):
"""Test user credentials."""
res = admin.set_user_password(user_id=user, password="booya", temporary=True)
assert res == dict(), res
@ -398,6 +409,7 @@ def test_user_credentials(admin: KeycloakAdmin, user: str):
def test_social_logins(admin: KeycloakAdmin, user: str):
"""Test social logins."""
res = admin.add_user_social_login(
user_id=user, provider_id="gitlab", provider_userid="test", provider_username="test"
)
@ -437,6 +449,7 @@ def test_social_logins(admin: KeycloakAdmin, user: str):
def test_server_info(admin: KeycloakAdmin):
"""Test server info."""
info = admin.get_server_info()
assert set(info.keys()) == {
"systemInfo",
@ -456,6 +469,7 @@ def test_server_info(admin: KeycloakAdmin):
def test_groups(admin: KeycloakAdmin, user: str):
"""Test groups."""
# Test get groups
groups = admin.get_groups()
assert len(groups) == 0
@ -599,6 +613,7 @@ def test_groups(admin: KeycloakAdmin, user: str):
def test_clients(admin: KeycloakAdmin, realm: str):
"""Test clients."""
admin.realm_name = realm
# Test get clients
@ -860,6 +875,7 @@ def test_clients(admin: KeycloakAdmin, realm: str):
def test_realm_roles(admin: KeycloakAdmin, realm: str):
"""Test realm roles."""
admin.realm_name = realm
# Test get realm roles
@ -1015,6 +1031,7 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str):
def test_client_roles(admin: KeycloakAdmin, client: str):
"""Test client roles."""
# Test get client roles
res = admin.get_client_roles(client_id=client)
assert len(res) == 0
@ -1177,6 +1194,7 @@ def test_client_roles(admin: KeycloakAdmin, client: str):
def test_enable_token_exchange(admin: KeycloakAdmin, realm: str):
"""Test enable token exchange."""
# Test enabling token exchange between two confidential clients
admin.realm_name = realm
@ -1265,6 +1283,7 @@ def test_enable_token_exchange(admin: KeycloakAdmin, realm: str):
def test_email(admin: KeycloakAdmin, user: str):
"""Test email."""
# Emails will fail as we don't have SMTP test setup
with pytest.raises(KeycloakPutError) as err:
admin.send_update_account(user_id=user, payload=dict())
@ -1277,6 +1296,7 @@ def test_email(admin: KeycloakAdmin, user: str):
def test_get_sessions(admin: KeycloakAdmin):
"""Test get sessions."""
sessions = admin.get_sessions(user_id=admin.get_user_id(username=admin.username))
assert len(sessions) >= 1
with pytest.raises(KeycloakGetError) as err:
@ -1285,6 +1305,7 @@ def test_get_sessions(admin: KeycloakAdmin):
def test_get_client_installation_provider(admin: KeycloakAdmin, client: str):
"""Test get client installation provider."""
with pytest.raises(KeycloakGetError) as err:
admin.get_client_installation_provider(client_id=client, provider_id="bad")
assert err.match('404: b\'{"error":"Unknown Provider"}\'')
@ -1303,6 +1324,7 @@ def test_get_client_installation_provider(admin: KeycloakAdmin, client: str):
def test_auth_flows(admin: KeycloakAdmin, realm: str):
"""Test auth flows."""
admin.realm_name = realm
res = admin.get_authentication_flows()
@ -1449,6 +1471,7 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
def test_authentication_configs(admin: KeycloakAdmin, realm: str):
"""Test authentication configs."""
admin.realm_name = realm
# Test list of auth providers
@ -1480,6 +1503,7 @@ def test_authentication_configs(admin: KeycloakAdmin, realm: str):
def test_sync_users(admin: KeycloakAdmin, realm: str):
"""Test sync users."""
admin.realm_name = realm
# Only testing the error message
@ -1489,6 +1513,7 @@ def test_sync_users(admin: KeycloakAdmin, realm: str):
def test_client_scopes(admin: KeycloakAdmin, realm: str):
"""Test client scopes."""
admin.realm_name = realm
# Test get client scopes
@ -1626,6 +1651,7 @@ def test_client_scopes(admin: KeycloakAdmin, realm: str):
def test_components(admin: KeycloakAdmin, realm: str):
"""Test components."""
admin.realm_name = realm
# Test get components
@ -1676,6 +1702,7 @@ def test_components(admin: KeycloakAdmin, realm: str):
def test_keys(admin: KeycloakAdmin, realm: str):
"""Test keys."""
admin.realm_name = realm
assert set(admin.get_keys()["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"}
assert {k["algorithm"] for k in admin.get_keys()["keys"]} == {
@ -1687,6 +1714,7 @@ def test_keys(admin: KeycloakAdmin, realm: str):
def test_events(admin: KeycloakAdmin, realm: str):
"""Test events."""
admin.realm_name = realm
events = admin.get_events()
@ -1706,6 +1734,7 @@ def test_events(admin: KeycloakAdmin, realm: str):
def test_auto_refresh(admin: KeycloakAdmin, realm: str):
"""Test auto refresh token."""
# Test get refresh
admin.auto_refresh_token = list()
admin.connection = ConnectionManager(

28
tests/test_uma_permissions.py

@ -14,6 +14,9 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Test uma permissions."""
import re
import pytest
@ -23,30 +26,35 @@ from keycloak.uma_permissions import Resource, Scope, build_permission_param
def test_resource_with_scope_obj():
"""Test resource with scope."""
r = Resource("Resource1")
s = Scope("Scope1")
assert r(s) == "Resource1#Scope1"
def test_scope_with_resource_obj():
"""Test scope with resource."""
r = Resource("Resource1")
s = Scope("Scope1")
assert s(r) == "Resource1#Scope1"
def test_resource_scope_str():
"""Test resource scope as string."""
r = Resource("Resource1")
s = "Scope1"
assert r(scope=s) == "Resource1#Scope1"
def test_scope_resource_str():
"""Test scope resource as string."""
r = "Resource1"
s = Scope("Scope1")
assert s(resource=r) == "Resource1#Scope1"
def test_resource_scope_list():
"""Test resource scope as list."""
r = Resource("Resource1")
s = ["Scope1"]
with pytest.raises(PermissionDefinitionError) as err:
@ -55,94 +63,114 @@ def test_resource_scope_list():
def test_build_permission_none():
"""Test build permission param with None."""
assert build_permission_param(None) == set()
def test_build_permission_empty_str():
"""Test build permission param with an empty string."""
assert build_permission_param("") == set()
def test_build_permission_empty_list():
"""Test build permission param with an empty list."""
assert build_permission_param([]) == set()
def test_build_permission_empty_tuple():
"""Test build permission param with an empty tuple."""
assert build_permission_param(()) == set()
def test_build_permission_empty_set():
"""Test build permission param with an empty set."""
assert build_permission_param(set()) == set()
def test_build_permission_empty_dict():
"""Test build permission param with an empty dict."""
assert build_permission_param({}) == set()
def test_build_permission_str():
"""Test build permission param as string."""
assert build_permission_param("resource1") == {"resource1"}
def test_build_permission_list_str():
"""Test build permission param with list of strings."""
assert build_permission_param(["res1#scope1", "res1#scope2"]) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_str():
"""Test build permission param with tuple of strings."""
assert build_permission_param(("res1#scope1", "res1#scope2")) == {"res1#scope1", "res1#scope2"}
def test_build_permission_set_str():
"""Test build permission param with set of strings."""
assert build_permission_param({"res1#scope1", "res1#scope2"}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_str():
"""Test build permission param with dictionary."""
assert build_permission_param({"res1": "scope1"}) == {"res1#scope1"}
def test_build_permission_tuple_dict_str_list_str():
"""Test build permission param with dictionary of list."""
assert build_permission_param({"res1": ["scope1", "scope2"]}) == {"res1#scope1", "res1#scope2"}
def test_build_permission_tuple_dict_str_list_str2():
"""Test build permission param with mutliple-keyed dictionary."""
assert build_permission_param(
{"res1": ["scope1", "scope2"], "res2": ["scope2", "scope3"]}
) == {"res1#scope1", "res1#scope2", "res2#scope2", "res2#scope3"}
def test_build_permission_uma():
"""Test build permission param with UMA."""
assert build_permission_param(Resource("res1")(Scope("scope1"))) == {"res1#scope1"}
def test_build_permission_uma_list():
"""Test build permission param with list of UMAs."""
assert build_permission_param(
[Resource("res1")(Scope("scope1")), Resource("res1")(Scope("scope2"))]
) == {"res1#scope1", "res1#scope2"}
def test_build_permission_misbuilt_dict_str_list_list_str():
"""Test bad build of permission param from dictionary."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": [["scope1", "scope2"]]})
assert err.match(re.escape("misbuilt permission {'res1': [['scope1', 'scope2']]}"))
def test_build_permission_misbuilt_list_list_str():
"""Test bad build of permission param from list."""
with pytest.raises(KeycloakPermissionFormatError) as err:
print(build_permission_param([["scope1", "scope2"]]))
assert err.match(re.escape("misbuilt permission [['scope1', 'scope2']]"))
def test_build_permission_misbuilt_list_set_str():
"""Test bad build of permission param from set."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1", "scope2"}])
assert err.match("misbuilt permission.*")
def test_build_permission_misbuilt_set_set_str():
"""Test bad build of permission param from list of set."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param([{"scope1"}])
assert err.match(re.escape("misbuilt permission [{'scope1'}]"))
def test_build_permission_misbuilt_dict_non_iterable():
"""Test bad build of permission param from non-iterable."""
with pytest.raises(KeycloakPermissionFormatError) as err:
build_permission_param({"res1": 5})
assert err.match(re.escape("misbuilt permission {'res1': 5}"))

3
tests/test_urls_patterns.py

@ -1,9 +1,10 @@
"""Test URL patterns."""
from keycloak import urls_patterns
def test_correctness_of_patterns():
"""Test that there are no duplicate url patterns."""
# Test that the patterns are present
urls = [x for x in dir(urls_patterns) if not x.startswith("__")]
assert len(urls) >= 0

Loading…
Cancel
Save