Browse Source

feat: initial setup of CICD and linting

pull/309/head
Richard Nemeth 2 years ago
parent
commit
cc82e6a874
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 37
      .circleci/config.yml
  2. 32
      .github/workflows/bump.yaml
  3. 27
      .github/workflows/daily.yaml
  4. 90
      .github/workflows/lint.yaml
  5. 33
      .github/workflows/publish.yaml
  6. 3
      .gitignore
  7. 16
      .pre-commit-config.yaml
  8. 10
      .readthedocs.yaml
  9. 8
      .releaserc.json
  10. 31
      CHANGELOG.md
  11. 1
      CODEOWNERS
  12. 86
      CONTRIBUTING.md
  13. 3
      MANIFEST.in
  14. 5
      dev-requirements.txt
  15. 9
      docs-requirements.txt
  16. 70
      docs/source/conf.py
  17. 1
      keycloak/_version.py
  18. 53
      keycloak/authorization/__init__.py
  19. 5
      keycloak/authorization/policy.py
  20. 102
      keycloak/connection.py
  21. 26
      keycloak/exceptions.py
  22. 626
      keycloak/keycloak_admin.py
  23. 140
      keycloak/keycloak_openid.py
  24. 191
      keycloak/tests/test_connection.py
  25. 59
      keycloak/urls_patterns.py
  26. 6
      pyproject.toml
  27. 6
      requirements.txt
  28. 65
      setup.py
  29. 35
      test_keycloak_init.sh
  30. 0
      tests/__init__.py
  31. 61
      tests/conftest.py
  32. 1201
      tests/test_keycloak_admin.py
  33. 26
      tests/test_urls_patterns.py
  34. 4
      tox.env
  35. 48
      tox.ini

37
.circleci/config.yml

@ -1,37 +0,0 @@
version: 2
jobs:
build:
docker:
- image: circleci/python:3.6.1
working_directory: ~/repo
steps:
- checkout
- restore_cache:
keys:
- v1-dependencies-{{ checksum "requirements.txt" }}
# fallback to using the latest cache if no exact match is found
- v1-dependencies-
- run:
name: install dependencies
command: |
python3 -m venv venv
. venv/bin/activate
pip install -r requirements.txt
- save_cache:
paths:
- ./venv
key: v1-dependencies-{{ checksum "requirements.txt" }}
- run:
name: run tests
command: |
. venv/bin/activate
python3 -m unittest discover
- store_artifacts:
path: test-reports
destination: test-reports

32
.github/workflows/bump.yaml

@ -0,0 +1,32 @@
name: Bump version
on:
workflow_run:
workflows: [ "Lint" ]
branches: [ master ]
types:
- completed
jobs:
tag-version:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
token: ${{ secrets.PAT_TOKEN }}
- uses: actions/setup-node@v3
with:
node-version: 18
- name: determine-version
run: |
VERSION=$(npx semantic-release --branches master --dry-run | { grep -i 'the next release version is' || test $? = 1; } | sed -E 's/.* ([[:digit:].]+)$/\1/')
echo "VERSION=$VERSION" >> $GITHUB_ENV
id: version
- uses: rickstaa/action-create-tag@v1
continue-on-error: true
env:
GITHUB_TOKEN: ${{ secrets.PAT_TOKEN }}
with:
tag: v${{ env.VERSION }}
message: "Releasing v${{ env.VERSION }}"
github_token: ${{ secrets.PAT_TOKEN }}

27
.github/workflows/daily.yaml

@ -0,0 +1,27 @@
name: Daily check
on:
schedule:
- cron: '0 4 * * *'
jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- uses: docker-practice/actions-setup-docker@master
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Run tests
run: |
tox -e tests

90
.github/workflows/lint.yaml

@ -0,0 +1,90 @@
name: Lint
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
check-commits:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: webiny/action-conventional-commits@v1.0.3
check-linting:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Check linting, formatting
run: |
tox -e check
check-docs:
runs-on: ubuntu-latest
needs:
- check-commits
- check-linting
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Check documentation build
run: |
tox -e docs
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
needs:
- check-commits
- check-linting
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- uses: docker-practice/actions-setup-docker@master
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Run tests
run: |
tox -e tests
build:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Run build
run: |
tox -e build

33
.github/workflows/publish.yaml

@ -0,0 +1,33 @@
name: Publish
on:
push:
tags:
- 'v*'
jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox wheel twine
- name: Apply the tag version
run: |
version=${{ github.ref_name }}
sed -i 's/__version__ = .*/__version__ = "'${version:1}'"/' keycloak/_version.py
- name: Run build
run: |
tox -e build
- name: Publish to PyPi
env:
TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }}
TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }}
run: |
twine upload -u $TWINE_USERNAME -p $TWINE_PASSWORD dist/*

3
.gitignore

@ -103,4 +103,5 @@ ENV/
.idea/
main.py
main2.py
s3air-authz-config.json
s3air-authz-config.json
.vscode

16
.pre-commit-config.yaml

@ -0,0 +1,16 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/compilerla/conventional-pre-commit
rev: v1.2.0
hooks:
- id: conventional-pre-commit
stages: [ commit-msg ]
args: [ ] # optional: list of Conventional Commits types to allow

10
.readthedocs.yaml

@ -0,0 +1,10 @@
version: 2
build:
os: "ubuntu-20.04"
tools:
python: "3.10"
python:
install:
- requirements: docs-requirements.txt

8
.releaserc.json

@ -0,0 +1,8 @@
{
"plugins": ["@semantic-release/commit-analyzer"],
"verifyConditions": false,
"npmPublish": false,
"publish": false,
"fail": false,
"success": false
}

31
CHANGELOG.md

@ -1,45 +1,44 @@
Changelog
============
# Changelog
All notable changes to this project will be documented in this file.
## [0.5.0] - 2017-08-21
* Basic functions for Keycloak API (well_know, token, userinfo, logout, certs,
entitlement, instropect)
- Basic functions for Keycloak API (well_know, token, userinfo, logout, certs,
entitlement, instropect)
## [0.6.0] - 2017-08-23
* Added load authorization settings
- Added load authorization settings
## [0.7.0] - 2017-08-23
* Added polices
- Added polices
## [0.8.0] - 2017-08-23
* Added permissions
- Added permissions
## [0.9.0] - 2017-09-05
* Added functions for Admin Keycloak API
- Added functions for Admin Keycloak API
## [0.10.0] - 2017-10-23
* Updated libraries versions
* Updated Docs
- Updated libraries versions
- Updated Docs
## [0.11.0] - 2017-12-12
* Changed Instropect RPT
- Changed Instropect RPT
## [0.12.0] - 2018-01-25
* Add groups functions
* Add Admin Tasks for user and client role management
* Function to trigger user sync from provider
- Add groups functions
- Add Admin Tasks for user and client role management
- Function to trigger user sync from provider
## [0.12.1] - 2018-08-04
* Add get_idps
* Rework group functions
- Add get_idps
- Rework group functions

1
CODEOWNERS

@ -0,0 +1 @@
* @ryshoooo @marcospereirampj

86
CONTRIBUTING.md

@ -0,0 +1,86 @@
# Contributing
Welcome to the Python Keycloak contributing guidelines. We are all more than happy to receive
any contributions to the repository and want to thank you in advance for your contributions!
This document outlines the process and the guidelines on how contributions work for this repository.
## Setting up the dev environment
The development environment is mainly up to the developer. Our recommendations are to create a python
virtual environment and install the necessary requirements. Example
```sh
python -m venv venv
source venv/bin/activate
python -m pip install -U pip
python -m pip install -r requirements.txt
python -m pip install -r dev-requirements.txt
```
## Running checks and tests
We're utilizing `tox` for most of the testing workflows. However we also have an external dependency on `docker`.
We're using docker to spin up a local keycloak instance which we run our test cases against. This is to avoid
a lot of unnecessary mocking and yet have immediate feedback from the actual Keycloak instance. All of the setup
is done for you with the tox environments, all you need is to have both tox and docker installed
(`tox` is included in the `dev-requirements.txt`).
To run the unit tests, simply run
```sh
tox -e tests
```
The project is also adhering to strict linting (flake8) and formatting (black + isort). You can always check that
your code changes adhere to the format by running
```sh
tox -e check
```
If the check fails, you'll see an error message specifying what went wrong. To simplify things, you can also run
```sh
tox -e apply-check
```
which will apply isort and black formatting for you in the repository. The flake8 problems however need to be resolved
manually by the developer.
Additionally we require that the documentation pages are built without warnings. This check is also run via tox, using
the command
```sh
tox -e docs
```
The check is also run in the CICD pipelines. We require that the documentation pages built from the code docstrings
do not create visually "bad" pages.
## Conventional commits
Commits to this project must adhere to the [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/) that will allow
us to automate version bumps and changelog entry creation.
After cloning this repository, you must install the pre-commit hook for
conventional commits (this is included in the `dev-requirements.txt`)
```sh
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install pre-commit
pre-commit install --install-hooks -t pre-commit -t pre-push -t commit-msg
```
## How to contribute
1. Fork this repository, develop and test your changes
2. Make sure that your changes do not decrease the test coverage
3. Make sure you're commits follow the conventional commits
4. Submit a pull request
## How to release
The CICD pipelines are set up for the repository. When a PR is merged, a new version of the library
will be automatically deployed to the PyPi server, meaning you'll be able to see your changes immediately.

3
MANIFEST.in

@ -1 +1,4 @@
include LICENSE
include requirements.txt
include dev-requirements.txt
include docs-requirements.txt

5
dev-requirements.txt

@ -0,0 +1,5 @@
tox
pytest
pytest-cov
wheel
pre-commit

9
docs-requirements.txt

@ -0,0 +1,9 @@
mock
alabaster
commonmark
recommonmark
sphinx
sphinx-rtd-theme
readthedocs-sphinx-ext
m2r2
sphinx-autoapi

70
docs/source/conf.py

@ -32,37 +32,37 @@ import sphinx_rtd_theme
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx.ext.viewcode',
"sphinx.ext.autodoc",
"sphinx.ext.intersphinx",
"sphinx.ext.todo",
"sphinx.ext.viewcode",
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
source_suffix = ".rst"
# The master toctree document.
master_doc = 'index'
master_doc = "index"
# General information about the project.
project = 'python-keycloak'
copyright = '2017, Marcos Pereira'
author = 'Marcos Pereira'
project = "python-keycloak"
copyright = "2017, Marcos Pereira"
author = "Marcos Pereira"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '0.27.1'
version = "0.27.1"
# The full version, including alpha/beta/rc tags.
release = '0.27.1'
release = "0.27.1"
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
@ -74,13 +74,13 @@ language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
add_function_parentheses = False
add_module_names = True
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
pygments_style = "sphinx"
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True
@ -91,7 +91,7 @@ todo_include_todos = True
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
html_theme = "sphinx_rtd_theme"
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# Theme options are theme-specific and customize the look and feel of a theme
@ -103,7 +103,7 @@ html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
html_static_path = ["_static"]
html_use_smartypants = False
@ -116,7 +116,7 @@ html_show_copyright = True
#
# This is required for the alabaster theme
# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars
#html_sidebars = {
# html_sidebars = {
# '**': [
# 'about.html',
# 'navigation.html',
@ -124,13 +124,13 @@ html_show_copyright = True
# 'searchbox.html',
# 'donate.html',
# ]
#}
# }
# -- Options for HTMLHelp output ------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'python-keycloakdoc'
htmlhelp_basename = "python-keycloakdoc"
# -- Options for LaTeX output ---------------------------------------------
@ -139,15 +139,12 @@ latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
@ -157,8 +154,13 @@ latex_elements = {
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'python-keycloak.tex', 'python-keycloak Documentation',
'Marcos Pereira', 'manual'),
(
master_doc,
"python-keycloak.tex",
"python-keycloak Documentation",
"Marcos Pereira",
"manual",
)
]
@ -166,10 +168,7 @@ latex_documents = [
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'python-keycloak', 'python-keycloak Documentation',
[author], 1)
]
man_pages = [(master_doc, "python-keycloak", "python-keycloak Documentation", [author], 1)]
# -- Options for Texinfo output -------------------------------------------
@ -178,10 +177,13 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'python-keycloak', 'python-keycloak Documentation',
author, 'python-keycloak', 'One line description of project.',
'Miscellaneous'),
(
master_doc,
"python-keycloak",
"python-keycloak Documentation",
author,
"python-keycloak",
"One line description of project.",
"Miscellaneous",
)
]

1
keycloak/_version.py

@ -0,0 +1 @@
__version__ = "0.0.0"

53
keycloak/authorization/__init__.py

@ -55,39 +55,44 @@ class Authorization:
:param data: keycloak authorization data (dict)
:return:
"""
for pol in data['policies']:
if pol['type'] == 'role':
policy = Policy(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
config_roles = json.loads(pol['config']['roles'])
for pol in data["policies"]:
if pol["type"] == "role":
policy = Policy(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
config_roles = json.loads(pol["config"]["roles"])
for role in config_roles:
policy.add_role(Role(name=role['id'],
required=role['required']))
policy.add_role(Role(name=role["id"], required=role["required"]))
self.policies[policy.name] = policy
if pol['type'] == 'scope':
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
if pol["type"] == "scope":
permission = Permission(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
permission.scopes = ast.literal_eval(pol['config']['scopes'])
permission.scopes = ast.literal_eval(pol["config"]["scopes"])
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
self.policies[policy_name].add_permission(permission)
if pol['type'] == 'resource':
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
if pol["type"] == "resource":
permission = Permission(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
permission.resources = ast.literal_eval(pol['config'].get('resources', "[]"))
permission.resources = ast.literal_eval(pol["config"].get("resources", "[]"))
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
if self.policies.get(policy_name) is not None:
self.policies[policy_name].add_permission(permission)

5
keycloak/authorization/policy.py

@ -98,9 +98,10 @@ class Policy:
:param role: keycloak role.
:return:
"""
if self.type != 'role':
if self.type != "role":
raise KeycloakAuthorizationConfigError(
"Can't add role. Policy type is different of role")
"Can't add role. Policy type is different of role"
)
self._roles.append(role)
def add_permission(self, permission):

102
keycloak/connection.py

@ -29,11 +29,11 @@ except ImportError:
import requests
from requests.adapters import HTTPAdapter
from .exceptions import (KeycloakConnectionError)
from .exceptions import KeycloakConnectionError
class ConnectionManager(object):
""" Represents a simple server connection.
"""Represents a simple server connection.
Args:
base_url (str): The server URL.
headers (dict): The header parameters of the requests to the server.
@ -52,15 +52,15 @@ class ConnectionManager(object):
# retry once to reset connection with Keycloak after tomcat's ConnectionTimeout
# see https://github.com/marcospereirampj/python-keycloak/issues/36
for protocol in ('https://', 'http://'):
for protocol in ("https://", "http://"):
adapter = HTTPAdapter(max_retries=1)
# adds POST to retry whitelist
allowed_methods = set(adapter.max_retries.allowed_methods)
allowed_methods.add('POST')
allowed_methods.add("POST")
adapter.max_retries.allowed_methods = frozenset(allowed_methods)
self._s.mount(protocol, adapter)
if proxies:
self._s.proxies.update(proxies)
@ -69,7 +69,7 @@ class ConnectionManager(object):
@property
def base_url(self):
""" Return base url in use for requests to the server. """
"""Return base url in use for requests to the server."""
return self._base_url
@base_url.setter
@ -79,7 +79,7 @@ class ConnectionManager(object):
@property
def timeout(self):
""" Return timeout in use for request to the server. """
"""Return timeout in use for request to the server."""
return self._timeout
@timeout.setter
@ -89,7 +89,7 @@ class ConnectionManager(object):
@property
def verify(self):
""" Return verify in use for request to the server. """
"""Return verify in use for request to the server."""
return self._verify
@verify.setter
@ -99,7 +99,7 @@ class ConnectionManager(object):
@property
def headers(self):
""" Return header request to the server. """
"""Return header request to the server."""
return self._headers
@headers.setter
@ -108,7 +108,7 @@ class ConnectionManager(object):
self._headers = value
def param_headers(self, key):
""" Return a specific header parameter.
"""Return a specific header parameter.
:arg
key (str): Header parameters key.
:return:
@ -117,11 +117,11 @@ class ConnectionManager(object):
return self.headers.get(key)
def clean_headers(self):
""" Clear header parameters. """
"""Clear header parameters."""
self.headers = {}
def exist_param_headers(self, key):
""" Check if the parameter exists in the header.
"""Check if the parameter exists in the header.
:arg
key (str): Header parameters key.
:return:
@ -130,7 +130,7 @@ class ConnectionManager(object):
return self.param_headers(key) is not None
def add_param_headers(self, key, value):
""" Add a single parameter inside the header.
"""Add a single parameter inside the header.
:arg
key (str): Header parameters key.
value (str): Value to be added.
@ -138,14 +138,14 @@ class ConnectionManager(object):
self.headers[key] = value
def del_param_headers(self, key):
""" Remove a specific parameter.
"""Remove a specific parameter.
:arg
key (str): Key of the header parameters.
"""
self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
""" Submit get request to the path.
"""Submit get request to the path.
:arg
path (str): Path for request.
:return
@ -155,17 +155,18 @@ class ConnectionManager(object):
"""
try:
return self._s.get(urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.get(
urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_post(self, path, data, **kwargs):
""" Submit post request to the path.
"""Submit post request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
@ -175,18 +176,19 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return self._s.post(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.post(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_put(self, path, data, **kwargs):
""" Submit put request to the path.
"""Submit put request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
@ -196,18 +198,19 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return self._s.put(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.put(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_delete(self, path, data={}, **kwargs):
""" Submit delete request to the path.
"""Submit delete request to the path.
:arg
path (str): Path for request.
@ -218,12 +221,13 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return self._s.delete(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.delete(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)

26
keycloak/exceptions.py

@ -25,8 +25,7 @@ import requests
class KeycloakError(Exception):
def __init__(self, error_message="", response_code=None,
response_body=None):
def __init__(self, error_message="", response_code=None, response_body=None):
Exception.__init__(self, error_message)
@ -56,10 +55,23 @@ class KeycloakOperationError(KeycloakError):
class KeycloakDeprecationError(KeycloakError):
pass
class KeycloakGetError(KeycloakOperationError):
pass
class KeycloakPostError(KeycloakOperationError):
pass
class KeycloakPutError(KeycloakOperationError):
pass
class KeycloakDeleteError(KeycloakOperationError):
pass
class KeycloakSecretNotFound(KeycloakOperationError):
pass
@ -90,10 +102,10 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
return response.content
if skip_exists and response.status_code == 409:
return {"Already exists"}
return {"msg": "Already exists"}
try:
message = response.json()['message']
message = response.json()["message"]
except (KeyError, ValueError):
message = response.content
@ -103,6 +115,6 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
if response.status_code == 401:
error = KeycloakAuthenticationError
raise error(error_message=message,
response_code=response.status_code,
response_body=response.content)
raise error(
error_message=message, response_code=response.status_code, response_body=response.content
)

626
keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

140
keycloak/keycloak_openid.py

@ -27,24 +27,38 @@ from jose import jwt
from .authorization import Authorization
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError, \
KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError
from .exceptions import (
KeycloakAuthorizationConfigError,
KeycloakDeprecationError,
KeycloakGetError,
KeycloakInvalidTokenError,
KeycloakRPTNotFound,
raise_error_from_response,
)
from .urls_patterns import (
URL_REALM,
URL_AUTH,
URL_CERTS,
URL_ENTITLEMENT,
URL_INTROSPECT,
URL_LOGOUT,
URL_REALM,
URL_TOKEN,
URL_USERINFO,
URL_WELL_KNOWN,
URL_LOGOUT,
URL_CERTS,
URL_ENTITLEMENT,
URL_INTROSPECT
)
class KeycloakOpenID:
def __init__(self, server_url, realm_name, client_id, client_secret_key=None, verify=True, custom_headers=None, proxies=None):
def __init__(
self,
server_url,
realm_name,
client_id,
client_secret_key=None,
verify=True,
custom_headers=None,
proxies=None,
):
"""
:param server_url: Keycloak server url
@ -62,11 +76,9 @@ class KeycloakOpenID:
if custom_headers is not None:
# merge custom headers to main headers
headers.update(custom_headers)
self._connection = ConnectionManager(base_url=server_url,
headers=headers,
timeout=60,
verify=verify,
proxies=proxies)
self._connection = ConnectionManager(
base_url=server_url, headers=headers, timeout=60, verify=verify, proxies=proxies
)
self._authorization = Authorization()
@ -138,7 +150,7 @@ class KeycloakOpenID:
:param kwargs:
:return:
"""
if method_token_info == 'introspect':
if method_token_info == "introspect":
token_info = self.introspect(token)
else:
token_info = self.decode_token(token, **kwargs)
@ -146,11 +158,11 @@ class KeycloakOpenID:
return token_info
def well_know(self):
""" The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
"""The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
:return It lists endpoints and other configuration options relevant.
:return It lists endpoints and other configuration options relevant.
"""
params_path = {"realm-name": self.realm_name}
@ -165,12 +177,23 @@ class KeycloakOpenID:
:return:
"""
params_path = {"authorization-endpoint": self.well_know()['authorization_endpoint'],
"client-id": self.client_id,
"redirect-uri": redirect_uri}
params_path = {
"authorization-endpoint": self.well_know()["authorization_endpoint"],
"client-id": self.client_id,
"redirect-uri": redirect_uri,
}
return URL_AUTH.format(**params_path)
def token(self, username="", password="", grant_type=["password"], code="", redirect_uri="", totp=None, **extra):
def token(
self,
username="",
password="",
grant_type=["password"],
code="",
redirect_uri="",
totp=None,
**extra
):
"""
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
@ -188,9 +211,14 @@ class KeycloakOpenID:
:return:
"""
params_path = {"realm-name": self.realm_name}
payload = {"username": username, "password": password,
"client_id": self.client_id, "grant_type": grant_type,
"code": code, "redirect_uri": redirect_uri}
payload = {
"username": username,
"password": password,
"client_id": self.client_id,
"grant_type": grant_type,
"code": code,
"redirect_uri": redirect_uri,
}
if extra:
payload.update(extra)
@ -198,8 +226,7 @@ class KeycloakOpenID:
payload["totp"] = totp
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
@ -216,10 +243,13 @@ class KeycloakOpenID:
:return:
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "grant_type": grant_type, "refresh_token": refresh_token}
payload = {
"client_id": self.client_id,
"grant_type": grant_type,
"refresh_token": refresh_token,
}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def userinfo(self, token):
@ -250,8 +280,7 @@ class KeycloakOpenID:
payload = {"client_id": self.client_id, "refresh_token": refresh_token}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204])
@ -268,7 +297,7 @@ class KeycloakOpenID:
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_CERTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def public_key(self):
"""
The public key is exposed by the realm page directly.
@ -277,8 +306,7 @@ class KeycloakOpenID:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)['public_key']
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
def entitlement(self, token, resource_server_id):
"""
@ -293,8 +321,8 @@ class KeycloakOpenID:
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
if data_raw.status_code == 404:
if data_raw.status_code == 404:
return raise_error_from_response(data_raw, KeycloakDeprecationError)
return raise_error_from_response(data_raw, KeycloakGetError)
@ -316,7 +344,7 @@ class KeycloakOpenID:
payload = {"client_id": self.client_id, "token": token}
if token_type_hint == 'requesting_party_token':
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
self.connection.add_param_headers("Authorization", "Bearer " + token)
@ -325,12 +353,11 @@ class KeycloakOpenID:
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def decode_token(self, token, key, algorithms=['RS256'], **kwargs):
def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
"""
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
@ -347,8 +374,7 @@ class KeycloakOpenID:
:return:
"""
return jwt.decode(token, key, algorithms=algorithms,
audience=self.client_id, **kwargs)
return jwt.decode(token, key, algorithms=algorithms, audience=self.client_id, **kwargs)
def load_authorization_config(self, path):
"""
@ -357,12 +383,12 @@ class KeycloakOpenID:
:param path: settings file (json)
:return:
"""
authorization_file = open(path, 'r')
authorization_file = open(path, "r")
authorization_json = json.loads(authorization_file.read())
self.authorization.load_config(authorization_json)
authorization_file.close()
def get_policies(self, token, method_token_info='introspect', **kwargs):
def get_policies(self, token, method_token_info="introspect", **kwargs):
"""
Get policies by user token
@ -377,12 +403,10 @@ class KeycloakOpenID:
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == 'introspect' and not token_info['active']:
raise KeycloakInvalidTokenError(
"Token expired or invalid."
)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info['resource_access'].get(self.client_id)
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
@ -390,13 +414,13 @@ class KeycloakOpenID:
policies = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources['roles']:
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
policies.append(policy)
return list(set(policies))
def get_permissions(self, token, method_token_info='introspect', **kwargs):
def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""
Get permission by user token
@ -413,12 +437,10 @@ class KeycloakOpenID:
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == 'introspect' and not token_info['active']:
raise KeycloakInvalidTokenError(
"Token expired or invalid."
)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info['resource_access'].get(self.client_id)
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
@ -426,7 +448,7 @@ class KeycloakOpenID:
permissions = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources['roles']:
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
permissions += policy.permissions

191
keycloak/tests/test_connection.py

@ -1,191 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest import mock
from httmock import urlmatch, response, HTTMock, all_requests
from keycloak import KeycloakAdmin, KeycloakOpenID
from ..connection import ConnectionManager
try:
import unittest
except ImportError:
import unittest2 as unittest
class TestConnection(unittest.TestCase):
def setUp(self):
self._conn = ConnectionManager(
base_url="http://localhost/",
headers={},
timeout=60)
@all_requests
def response_content_success(self, url, request):
headers = {'content-type': 'application/json'}
content = b'response_ok'
return response(200, content, headers, None, 5, request)
def test_raw_get(self):
with HTTMock(self.response_content_success):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b'response_ok')
self.assertEqual(resp.status_code, 200)
def test_raw_post(self):
@urlmatch(path="/known_path", method="post")
def response_post_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
return response(201, content, headers, None, 5, request)
with HTTMock(response_post_success):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 201)
def test_raw_put(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
return response(200, content, headers, None, 5, request)
with HTTMock(response_put_success):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 200)
def test_raw_get_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="get")
def response_get_fail(url, request):
headers = {'content-type': 'application/json'}
content = "404 page not found".encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_get_fail):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b"404 page not found")
self.assertEqual(resp.status_code, 404)
def test_raw_post_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="post")
def response_post_fail(url, request):
headers = {'content-type': 'application/json'}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_post_fail):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
def test_raw_put_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_fail(url, request):
headers = {'content-type': 'application/json'}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_put_fail):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
def test_add_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
def test_del_param_headers(self):
self._conn.add_param_headers("test", "value")
self._conn.del_param_headers("test")
self.assertEqual(self._conn.headers, {})
def test_clean_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
self._conn.clean_headers()
self.assertEqual(self._conn.headers, {})
def test_exist_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
def test_KeycloakAdmin_custom_header(self):
class FakeToken:
@staticmethod
def get(string_val):
return "faketoken"
fake_token = FakeToken()
with mock.patch.object(KeycloakOpenID, "__init__", return_value=None) as mock_keycloak_open_id:
with mock.patch("keycloak.keycloak_openid.KeycloakOpenID.token", return_value=fake_token):
with mock.patch("keycloak.connection.ConnectionManager.__init__", return_value=None) as mock_connection_manager:
with mock.patch("keycloak.connection.ConnectionManager.__del__", return_value=None) as mock_connection_manager_delete:
server_url = "https://localhost/auth/"
username = "admin"
password = "secret"
realm_name = "master"
headers = {
'Custom': 'test-custom-header'
}
KeycloakAdmin(server_url=server_url,
username=username,
password=password,
realm_name=realm_name,
verify=False,
custom_headers=headers)
mock_keycloak_open_id.assert_called_with(server_url=server_url,
realm_name=realm_name,
client_id='admin-cli',
client_secret_key=None,
verify=False,
custom_headers=headers)
expected_header = {'Authorization': 'Bearer faketoken',
'Content-Type': 'application/json',
'Custom': 'test-custom-header'
}
mock_connection_manager.assert_called_with(base_url=server_url,
headers=expected_header,
timeout=60,
verify=False)
mock_connection_manager_delete.assert_called_once_with()

59
keycloak/urls_patterns.py

@ -30,7 +30,9 @@ URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout"
URL_CERTS = "realms/{realm-name}/protocol/openid-connect/certs"
URL_INTROSPECT = "realms/{realm-name}/protocol/openid-connect/token/introspect"
URL_ENTITLEMENT = "realms/{realm-name}/authz/entitlement/{resource-server-id}"
URL_AUTH = "{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}"
URL_AUTH = (
"{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}"
)
# ADMIN URLS
URL_ADMIN_USERS = "admin/realms/{realm-name}/users"
@ -41,14 +43,26 @@ URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-ac
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions"
URL_ADMIN_USER_CLIENT_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}"
URL_ADMIN_USER_CLIENT_ROLES = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}"
)
URL_ADMIN_USER_REALM_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/realm"
URL_ADMIN_USER_REALM_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/available"
URL_ADMIN_USER_REALM_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite"
URL_ADMIN_USER_REALM_ROLES_AVAILABLE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/realm/available"
)
URL_ADMIN_USER_REALM_ROLES_COMPOSITE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite"
)
URL_ADMIN_GROUPS_REALM_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/realm"
URL_ADMIN_GROUPS_CLIENT_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}"
URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available"
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite"
URL_ADMIN_GROUPS_CLIENT_ROLES = (
"admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}"
)
URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available"
)
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite"
)
URL_ADMIN_USER_GROUP = "admin/realms/{realm-name}/users/{id}/groups/{group-id}"
URL_ADMIN_USER_GROUPS = "admin/realms/{realm-name}/users/{id}/groups"
URL_ADMIN_USER_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
@ -79,8 +93,12 @@ URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/re
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1"
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
)
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
)
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user"
URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}"
@ -101,7 +119,9 @@ URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances"
URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers"
URL_ADMIN_IDP = "admin/realms//{realm-name}/identity-provider/instances/{alias}"
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = "admin/realms/{realm-name}/roles/{role-name}/composites"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = (
"admin/realms/{realm-name}/roles/{role-name}/composites"
)
URL_ADMIN_REALM_EXPORT = "admin/realms/{realm-name}/partial-export?exportClients={export-clients}&exportGroupsAndRoles={export-groups-and-roles}"
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes"
@ -113,10 +133,16 @@ URL_ADMIN_FLOWS = "admin/realms/{realm-name}/authentication/flows"
URL_ADMIN_FLOW = URL_ADMIN_FLOWS + "/{id}"
URL_ADMIN_FLOWS_ALIAS = "admin/realms/{realm-name}/authentication/flows/{flow-id}"
URL_ADMIN_FLOWS_COPY = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/copy"
URL_ADMIN_FLOWS_EXECUTIONS = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions"
URL_ADMIN_FLOWS_EXECUTIONS = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions"
)
URL_ADMIN_FLOWS_EXECUTION = "admin/realms/{realm-name}/authentication/executions/{id}"
URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution"
URL_ADMIN_FLOWS_EXECUTIONS_FLOW = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow"
URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution"
)
URL_ADMIN_FLOWS_EXECUTIONS_FLOW = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow"
)
URL_ADMIN_AUTHENTICATOR_CONFIG = "admin/realms/{realm-name}/authentication/config/{id}"
URL_ADMIN_COMPONENTS = "admin/realms/{realm-name}/components"
@ -124,10 +150,11 @@ URL_ADMIN_COMPONENT = "admin/realms/{realm-name}/components/{component-id}"
URL_ADMIN_KEYS = "admin/realms/{realm-name}/keys"
URL_ADMIN_USER_FEDERATED_IDENTITIES = "admin/realms/{realm-name}/users/{id}/federated-identity"
URL_ADMIN_USER_FEDERATED_IDENTITY = "admin/realms/{realm-name}/users/{id}/federated-identity/{provider}"
URL_ADMIN_USER_FEDERATED_IDENTITY = (
"admin/realms/{realm-name}/users/{id}/federated-identity/{provider}"
)
URL_ADMIN_EVENTS = 'admin/realms/{realm-name}/events'
URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events"
URL_ADMIN_DELETE_USER_ROLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm"
URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats"

6
pyproject.toml

@ -0,0 +1,6 @@
[tool.black]
line-length = 99
[tool.isort]
line_length = 99
profile = "black"

6
requirements.txt

@ -1,7 +1,3 @@
requests>=2.20.0
httmock>=1.2.5
python-jose>=1.4.0
twine==1.13.0
jose~=1.0.0
setuptools~=54.2.0
urllib3>=1.26.5
urllib3>=1.26.0

65
setup.py

@ -1,31 +1,56 @@
# -*- coding: utf-8 -*-
import re
from setuptools import setup
with open("README.md", "r") as fh:
long_description = fh.read()
with open("requirements.txt", "r") as fh:
reqs = fh.read().split("\n")
with open("dev-requirements.txt", "r") as fh:
dev_reqs = fh.read().split("\n")
with open("docs-requirements.txt", "r") as fh:
docs_reqs = fh.read().split("\n")
VERSIONFILE = "keycloak/_version.py"
verstrline = open(VERSIONFILE, "rt").read()
VSRE = r"^__version__ = ['\"]([^'\"]*)['\"]"
mo = re.search(VSRE, verstrline, re.M)
if mo:
verstr = mo.group(1)
else:
raise RuntimeError("Unable to find version string in %s." % (VERSIONFILE,))
setup(
name='python-keycloak',
version='0.27.1',
url='https://github.com/marcospereirampj/python-keycloak',
license='The MIT License',
author='Marcos Pereira',
author_email='marcospereira.mpj@gmail.com',
keywords='keycloak openid',
description='python-keycloak is a Python package providing access to the Keycloak API.',
name="python-keycloak",
version=verstr,
url="https://github.com/marcospereirampj/python-keycloak",
license="The MIT License",
author="Marcos Pereira, Richard Nemeth",
author_email="marcospereira.mpj@gmail.com; ryshoooo@gmail.com",
keywords="keycloak openid oidc",
description="python-keycloak is a Python package providing access to the Keycloak API.",
long_description=long_description,
long_description_content_type="text/markdown",
packages=['keycloak', 'keycloak.authorization', 'keycloak.tests'],
install_requires=['requests>=2.20.0', 'python-jose>=1.4.0'],
tests_require=['httmock>=1.2.5'],
packages=["keycloak"],
install_requires=reqs,
tests_require=dev_reqs,
extras_require={"docs": docs_reqs},
python_requires=">=3.7",
project_urls={
"Documentation": "https://python-keycloak.readthedocs.io/en/latest/",
"Issue tracker": "https://github.com/marcospereirampj/python-keycloak/issues",
},
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Development Status :: 3 - Alpha',
'Operating System :: MacOS',
'Operating System :: Unix',
'Operating System :: Microsoft :: Windows',
'Topic :: Utilities'
]
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Operating System :: MacOS",
"Operating System :: Unix",
"Operating System :: Microsoft :: Windows",
"Topic :: Utilities",
],
)

35
test_keycloak_init.sh

@ -0,0 +1,35 @@
#!/usr/bin/env bash
CMD_ARGS=$1
KEYCLOAK_DOCKER_IMAGE="quay.io/keycloak/keycloak:latest"
echo "${CMD_ARGS}"
function keycloak_stop() {
docker stop unittest_keycloak &> /dev/null
docker rm unittest_keycloak &> /dev/null
}
function keycloak_start() {
echo "Starting keycloak docker container"
docker run -d --name unittest_keycloak -e KEYCLOAK_ADMIN="${KEYCLOAK_ADMIN}" -e KEYCLOAK_ADMIN_PASSWORD="${KEYCLOAK_ADMIN_PASSWORD}" -p "${KEYCLOAK_PORT}:8080" "${KEYCLOAK_DOCKER_IMAGE}" start-dev
SECONDS=0
until curl localhost:$KEYCLOAK_PORT; do
sleep 5;
if [ ${SECONDS} -gt 180 ]; then
echo "Timeout exceeded";
exit 1;
fi
done
}
# Ensuring that keycloak is stopped in case of CTRL-C
trap keycloak_stop err exit
keycloak_stop # In case it did not shut down correctly last time.
keycloak_start
eval ${CMD_ARGS}
RETURN_VALUE=$?
exit ${RETURN_VALUE}

0
keycloak/tests/__init__.py → tests/__init__.py

61
tests/conftest.py

@ -0,0 +1,61 @@
import os
import uuid
import pytest
from keycloak import KeycloakAdmin
@pytest.fixture
def env():
class KeycloakTestEnv(object):
KEYCLOAK_HOST = os.environ["KEYCLOAK_HOST"]
KEYCLOAK_PORT = os.environ["KEYCLOAK_PORT"]
KEYCLOAK_ADMIN = os.environ["KEYCLOAK_ADMIN"]
KEYCLOAK_ADMIN_PASSWORD = os.environ["KEYCLOAK_ADMIN_PASSWORD"]
return KeycloakTestEnv()
@pytest.fixture
def admin(env):
return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
)
@pytest.fixture
def realm(admin: KeycloakAdmin) -> str:
realm_name = str(uuid.uuid4())
admin.create_realm(payload={"realm": realm_name})
yield realm_name
admin.delete_realm(realm_name=realm_name)
@pytest.fixture
def user(admin: KeycloakAdmin, realm: str) -> str:
admin.realm_name = realm
username = str(uuid.uuid4())
user_id = admin.create_user(payload={"username": username, "email": f"{username}@test.test"})
yield user_id
admin.delete_user(user_id=user_id)
@pytest.fixture
def group(admin: KeycloakAdmin, realm: str) -> str:
admin.realm_name = realm
group_name = str(uuid.uuid4())
group_id = admin.create_group(payload={"name": group_name})
yield group_id
admin.delete_group(group_id=group_id)
@pytest.fixture
def client(admin: KeycloakAdmin, realm: str) -> str:
admin.realm_name = realm
client = str(uuid.uuid4())
client_id = admin.create_client(payload={"name": client, "clientId": client})
yield client_id
admin.delete_client(client_id=client_id)

1201
tests/test_keycloak_admin.py
File diff suppressed because it is too large
View File

26
tests/test_urls_patterns.py

@ -0,0 +1,26 @@
from keycloak import urls_patterns
def test_correctness_of_patterns():
"""Test that there are no duplicate url patterns."""
# Test that the patterns are present
urls = [x for x in dir(urls_patterns) if not x.startswith("__")]
assert len(urls) >= 0
# Test that all patterns start with URL_
for url in urls:
assert url.startswith("URL_"), f"The url pattern {url} does not begin with URL_"
# Test that the patterns have unique names
seen_urls = list()
for url in urls:
assert url not in seen_urls, f"The url pattern {url} is present twice."
seen_urls.append(url)
# Test that the pattern values are unique
seen_url_values = list()
for url in urls:
url_value = urls_patterns.__dict__[url]
assert url_value not in seen_url_values, f"The url {url} has a duplicate value {url_value}"
seen_url_values.append(url_value)

4
tox.env

@ -0,0 +1,4 @@
KEYCLOAK_ADMIN=admin
KEYCLOAK_ADMIN_PASSWORD=admin
KEYCLOAK_HOST={env:KEYCLOAK_HOST:localhost}
KEYCLOAK_PORT=8080

48
tox.ini

@ -0,0 +1,48 @@
[tox]
envlist = check, apply-check, docs, tests, build
[testenv]
install_command = pip install {opts} {packages}
[testenv:check]
deps =
black
isort
flake8
commands =
black --check --diff keycloak tests docs
isort -c --df keycloak tests docs
flake8 keycloak tests docs
[testenv:apply-check]
deps =
black
isort
flake8
commands =
black -C keycloak tests docs
black keycloak tests docs
isort keycloak tests docs
[testenv:docs]
deps =
.[docs]
commands =
python -m sphinx -T -E -W -b html -d _build/doctrees -D language=en ./docs/source _build/html
[testenv:tests]
setenv = file|tox.env
deps =
-rrequirements.txt
-rdev-requirements.txt
commands =
./test_keycloak_init.sh "pytest -vv --cov=keycloak --cov-report term-missing {posargs}"
[testenv:build]
deps =
-rdev-requirements.txt
commands =
python setup.py sdist bdist_wheel
[flake8]
max-line-length = 99
Loading…
Cancel
Save