Browse Source

Merge pull request #309 from marcospereirampj/feature/cicd

Feature/cicd
pull/312/head
Richard Nemeth 3 years ago
committed by GitHub
parent
commit
0953622b94
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      .circleci/config.yml
  2. 32
      .github/workflows/bump.yaml
  3. 27
      .github/workflows/daily.yaml
  4. 90
      .github/workflows/lint.yaml
  5. 33
      .github/workflows/publish.yaml
  6. 2
      .gitignore
  7. 16
      .pre-commit-config.yaml
  8. 10
      .readthedocs.yaml
  9. 8
      .releaserc.json
  10. 31
      CHANGELOG.md
  11. 1
      CODEOWNERS
  12. 86
      CONTRIBUTING.md
  13. 3
      MANIFEST.in
  14. 48
      README.md
  15. 5
      dev-requirements.txt
  16. 9
      docs-requirements.txt
  17. 80
      docs/source/conf.py
  18. 303
      docs/source/index.rst
  19. 1
      docs/source/readme.rst
  20. 7
      keycloak/__init__.py
  21. 24
      keycloak/_version.py
  22. 57
      keycloak/authorization/__init__.py
  23. 14
      keycloak/authorization/permission.py
  24. 12
      keycloak/authorization/policy.py
  25. 185
      keycloak/connection.py
  26. 26
      keycloak/exceptions.py
  27. 1334
      keycloak/keycloak_admin.py
  28. 185
      keycloak/keycloak_openid.py
  29. 191
      keycloak/tests/test_connection.py
  30. 78
      keycloak/urls_patterns.py
  31. 6
      pyproject.toml
  32. 6
      requirements.txt
  33. 65
      setup.py
  34. 35
      test_keycloak_init.sh
  35. 0
      tests/__init__.py
  36. 61
      tests/conftest.py
  37. 1241
      tests/test_keycloak_admin.py
  38. 26
      tests/test_urls_patterns.py
  39. 4
      tox.env
  40. 48
      tox.ini

37
.circleci/config.yml

@ -1,37 +0,0 @@
version: 2
jobs:
build:
docker:
- image: circleci/python:3.6.1
working_directory: ~/repo
steps:
- checkout
- restore_cache:
keys:
- v1-dependencies-{{ checksum "requirements.txt" }}
# fallback to using the latest cache if no exact match is found
- v1-dependencies-
- run:
name: install dependencies
command: |
python3 -m venv venv
. venv/bin/activate
pip install -r requirements.txt
- save_cache:
paths:
- ./venv
key: v1-dependencies-{{ checksum "requirements.txt" }}
- run:
name: run tests
command: |
. venv/bin/activate
python3 -m unittest discover
- store_artifacts:
path: test-reports
destination: test-reports

32
.github/workflows/bump.yaml

@ -0,0 +1,32 @@
name: Bump version
on:
workflow_run:
workflows: [ "Lint" ]
branches: [ master ]
types:
- completed
jobs:
tag-version:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
token: ${{ secrets.PAT_TOKEN }}
- uses: actions/setup-node@v3
with:
node-version: 18
- name: determine-version
run: |
VERSION=$(npx semantic-release --branches master --dry-run | { grep -i 'the next release version is' || test $? = 1; } | sed -E 's/.* ([[:digit:].]+)$/\1/')
echo "VERSION=$VERSION" >> $GITHUB_ENV
id: version
- uses: rickstaa/action-create-tag@v1
continue-on-error: true
env:
GITHUB_TOKEN: ${{ secrets.PAT_TOKEN }}
with:
tag: v${{ env.VERSION }}
message: "Releasing v${{ env.VERSION }}"
github_token: ${{ secrets.PAT_TOKEN }}

27
.github/workflows/daily.yaml

@ -0,0 +1,27 @@
name: Daily check
on:
schedule:
- cron: '0 4 * * *'
jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- uses: docker-practice/actions-setup-docker@master
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Run tests
run: |
tox -e tests

90
.github/workflows/lint.yaml

@ -0,0 +1,90 @@
name: Lint
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
check-commits:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: webiny/action-conventional-commits@v1.0.3
check-linting:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Check linting, formatting
run: |
tox -e check
check-docs:
runs-on: ubuntu-latest
needs:
- check-commits
- check-linting
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Check documentation build
run: |
tox -e docs
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
needs:
- check-commits
- check-linting
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- uses: docker-practice/actions-setup-docker@master
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Run tests
run: |
tox -e tests
build:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox
- name: Run build
run: |
tox -e build

33
.github/workflows/publish.yaml

@ -0,0 +1,33 @@
name: Publish
on:
push:
tags:
- 'v*'
jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install tox wheel twine
- name: Apply the tag version
run: |
version=${{ github.ref_name }}
sed -i 's/__version__ = .*/__version__ = "'${version:1}'"/' keycloak/_version.py
- name: Run build
run: |
tox -e build
- name: Publish to PyPi
env:
TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }}
TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }}
run: |
twine upload -u $TWINE_USERNAME -p $TWINE_PASSWORD dist/*

2
.gitignore

@ -104,3 +104,5 @@ ENV/
main.py
main2.py
s3air-authz-config.json
.vscode
_build

16
.pre-commit-config.yaml

@ -0,0 +1,16 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/compilerla/conventional-pre-commit
rev: v1.2.0
hooks:
- id: conventional-pre-commit
stages: [ commit-msg ]
args: [ ] # optional: list of Conventional Commits types to allow

10
.readthedocs.yaml

@ -0,0 +1,10 @@
version: 2
build:
os: "ubuntu-20.04"
tools:
python: "3.10"
python:
install:
- requirements: docs-requirements.txt

8
.releaserc.json

@ -0,0 +1,8 @@
{
"plugins": ["@semantic-release/commit-analyzer"],
"verifyConditions": false,
"npmPublish": false,
"publish": false,
"fail": false,
"success": false
}

31
CHANGELOG.md

@ -1,45 +1,44 @@
Changelog
============
# Changelog
All notable changes to this project will be documented in this file.
## [0.5.0] - 2017-08-21
* Basic functions for Keycloak API (well_know, token, userinfo, logout, certs,
entitlement, instropect)
- Basic functions for Keycloak API (well_know, token, userinfo, logout, certs,
entitlement, instropect)
## [0.6.0] - 2017-08-23
* Added load authorization settings
- Added load authorization settings
## [0.7.0] - 2017-08-23
* Added polices
- Added polices
## [0.8.0] - 2017-08-23
* Added permissions
- Added permissions
## [0.9.0] - 2017-09-05
* Added functions for Admin Keycloak API
- Added functions for Admin Keycloak API
## [0.10.0] - 2017-10-23
* Updated libraries versions
* Updated Docs
- Updated libraries versions
- Updated Docs
## [0.11.0] - 2017-12-12
* Changed Instropect RPT
- Changed Instropect RPT
## [0.12.0] - 2018-01-25
* Add groups functions
* Add Admin Tasks for user and client role management
* Function to trigger user sync from provider
- Add groups functions
- Add Admin Tasks for user and client role management
- Function to trigger user sync from provider
## [0.12.1] - 2018-08-04
* Add get_idps
* Rework group functions
- Add get_idps
- Rework group functions

1
CODEOWNERS

@ -0,0 +1 @@
* @ryshoooo @marcospereirampj

86
CONTRIBUTING.md

@ -0,0 +1,86 @@
# Contributing
Welcome to the Python Keycloak contributing guidelines. We are all more than happy to receive
any contributions to the repository and want to thank you in advance for your contributions!
This document outlines the process and the guidelines on how contributions work for this repository.
## Setting up the dev environment
The development environment is mainly up to the developer. Our recommendations are to create a python
virtual environment and install the necessary requirements. Example
```sh
python -m venv venv
source venv/bin/activate
python -m pip install -U pip
python -m pip install -r requirements.txt
python -m pip install -r dev-requirements.txt
```
## Running checks and tests
We're utilizing `tox` for most of the testing workflows. However we also have an external dependency on `docker`.
We're using docker to spin up a local keycloak instance which we run our test cases against. This is to avoid
a lot of unnecessary mocking and yet have immediate feedback from the actual Keycloak instance. All of the setup
is done for you with the tox environments, all you need is to have both tox and docker installed
(`tox` is included in the `dev-requirements.txt`).
To run the unit tests, simply run
```sh
tox -e tests
```
The project is also adhering to strict linting (flake8) and formatting (black + isort). You can always check that
your code changes adhere to the format by running
```sh
tox -e check
```
If the check fails, you'll see an error message specifying what went wrong. To simplify things, you can also run
```sh
tox -e apply-check
```
which will apply isort and black formatting for you in the repository. The flake8 problems however need to be resolved
manually by the developer.
Additionally we require that the documentation pages are built without warnings. This check is also run via tox, using
the command
```sh
tox -e docs
```
The check is also run in the CICD pipelines. We require that the documentation pages built from the code docstrings
do not create visually "bad" pages.
## Conventional commits
Commits to this project must adhere to the [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/) that will allow
us to automate version bumps and changelog entry creation.
After cloning this repository, you must install the pre-commit hook for
conventional commits (this is included in the `dev-requirements.txt`)
```sh
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install pre-commit
pre-commit install --install-hooks -t pre-commit -t pre-push -t commit-msg
```
## How to contribute
1. Fork this repository, develop and test your changes
2. Make sure that your changes do not decrease the test coverage
3. Make sure you're commits follow the conventional commits
4. Submit a pull request
## How to release
The CICD pipelines are set up for the repository. When a PR is merged, a new version of the library
will be automatically deployed to the PyPi server, meaning you'll be able to see your changes immediately.

3
MANIFEST.in

@ -1 +1,4 @@
include LICENSE
include requirements.txt
include dev-requirements.txt
include docs-requirements.txt

48
README.md

@ -1,9 +1,7 @@
[![CircleCI](https://circleci.com/gh/marcospereirampj/python-keycloak/tree/master.svg?style=svg)](https://circleci.com/gh/marcospereirampj/python-keycloak/tree/master)
[![CircleCI](https://github.com/marcospereirampj/python-keycloak/actions/workflows/daily.yaml/badge.svg)](https://github.com/marcospereirampj/python-keycloak/)
[![Documentation Status](https://readthedocs.org/projects/python-keycloak/badge/?version=latest)](http://python-keycloak.readthedocs.io/en/latest/?badge=latest)
Python Keycloak
====================
# Python Keycloak
For review- see https://github.com/marcospereirampj/python-keycloak
@ -13,24 +11,27 @@ For review- see https://github.com/marcospereirampj/python-keycloak
### Via Pypi Package:
``` $ pip install python-keycloak ```
`$ pip install python-keycloak`
### Manually
``` $ python setup.py install ```
`$ python setup.py install`
## Dependencies
python-keycloak depends on:
* Python 3
* [requests](https://requests.readthedocs.io)
* [python-jose](http://python-jose.readthedocs.io/en/latest/)
- Python 3
- [requests](https://requests.readthedocs.io)
- [python-jose](http://python-jose.readthedocs.io/en/latest/)
- [urllib3](https://urllib3.readthedocs.io/en/stable/)
### Tests Dependencies
* unittest
* [httmock](https://github.com/patrys/httmock)
- [tox](https://tox.readthedocs.io/)
- [pytest](https://docs.pytest.org/en/latest/)
- [pytest-cov](https://github.com/pytest-dev/pytest-cov)
- [wheel](https://github.com/pypa/wheel)
## Bug reports
@ -43,18 +44,19 @@ The documentation for python-keycloak is available on [readthedocs](http://pytho
## Contributors
* [Agriness Team](http://www.agriness.com/pt/)
* [Marcos Pereira](marcospereira.mpj@gmail.com)
* [Martin Devlin](https://bitbucket.org/devlinmpearson/)
* [Shon T. Urbas](https://bitbucket.org/surbas/)
* [Markus Spanier](https://bitbucket.org/spanierm/)
* [Remco Kranenburg](https://bitbucket.org/Remco47/)
* [Armin](https://bitbucket.org/arminfelder/)
* [njordr](https://bitbucket.org/njordr/)
* [Josha Inglis](https://bitbucket.org/joshainglis/)
* [Alex](https://bitbucket.org/alex_zel/)
* [Ewan Jone](https://bitbucket.org/kisamoto/)
* [Lukas Martini](https://github.com/lutoma)
- [Agriness Team](http://www.agriness.com/pt/)
- [Marcos Pereira](marcospereira.mpj@gmail.com)
- [Martin Devlin](https://bitbucket.org/devlinmpearson/)
- [Shon T. Urbas](https://bitbucket.org/surbas/)
- [Markus Spanier](https://bitbucket.org/spanierm/)
- [Remco Kranenburg](https://bitbucket.org/Remco47/)
- [Armin](https://bitbucket.org/arminfelder/)
- [njordr](https://bitbucket.org/njordr/)
- [Josha Inglis](https://bitbucket.org/joshainglis/)
- [Alex](https://bitbucket.org/alex_zel/)
- [Ewan Jone](https://bitbucket.org/kisamoto/)
- [Lukas Martini](https://github.com/lutoma)
- [Adamatics](https://www.adamatics.com)
## Usage

5
dev-requirements.txt

@ -0,0 +1,5 @@
tox
pytest
pytest-cov
wheel
pre-commit

9
docs-requirements.txt

@ -0,0 +1,9 @@
mock
alabaster
commonmark
recommonmark
sphinx
sphinx-rtd-theme
readthedocs-sphinx-ext
m2r2
sphinx-autoapi

80
docs/source/conf.py

@ -22,6 +22,8 @@
# sys.path.insert(0, os.path.abspath('.'))
import sphinx_rtd_theme
from keycloak import __version__
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
@ -32,37 +34,45 @@ import sphinx_rtd_theme
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx.ext.viewcode',
"sphinx.ext.autodoc",
"sphinx.ext.intersphinx",
"sphinx.ext.todo",
"sphinx.ext.viewcode",
"m2r2",
"autoapi.extension",
]
autoapi_type = "python"
autoapi_dirs = ["../../keycloak"]
autoapi_root = "reference"
autoapi_keep_files = False
autoapi_add_toctree_entry = False
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
source_suffix = ".rst"
# The master toctree document.
master_doc = 'index'
master_doc = "index"
# General information about the project.
project = 'python-keycloak'
copyright = '2017, Marcos Pereira'
author = 'Marcos Pereira'
project = "python-keycloak"
copyright = "2017, Marcos Pereira"
author = "Marcos Pereira"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '0.27.1'
version = __version__
# The full version, including alpha/beta/rc tags.
release = '0.27.1'
release = __version__
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
@ -74,13 +84,13 @@ language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
add_function_parentheses = False
add_module_names = True
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
pygments_style = "sphinx"
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True
@ -91,7 +101,7 @@ todo_include_todos = True
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
html_theme = "sphinx_rtd_theme"
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# Theme options are theme-specific and customize the look and feel of a theme
@ -103,7 +113,7 @@ html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# html_static_path = ["_static"]
html_use_smartypants = False
@ -116,7 +126,7 @@ html_show_copyright = True
#
# This is required for the alabaster theme
# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars
#html_sidebars = {
# html_sidebars = {
# '**': [
# 'about.html',
# 'navigation.html',
@ -124,13 +134,13 @@ html_show_copyright = True
# 'searchbox.html',
# 'donate.html',
# ]
#}
# }
# -- Options for HTMLHelp output ------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'python-keycloakdoc'
htmlhelp_basename = "python-keycloakdoc"
# -- Options for LaTeX output ---------------------------------------------
@ -139,15 +149,12 @@ latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
@ -157,8 +164,13 @@ latex_elements = {
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'python-keycloak.tex', 'python-keycloak Documentation',
'Marcos Pereira', 'manual'),
(
master_doc,
"python-keycloak.tex",
"python-keycloak Documentation",
"Marcos Pereira",
"manual",
)
]
@ -166,10 +178,7 @@ latex_documents = [
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'python-keycloak', 'python-keycloak Documentation',
[author], 1)
]
man_pages = [(master_doc, "python-keycloak", "python-keycloak Documentation", [author], 1)]
# -- Options for Texinfo output -------------------------------------------
@ -178,10 +187,13 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'python-keycloak', 'python-keycloak Documentation',
author, 'python-keycloak', 'One line description of project.',
'Miscellaneous'),
(
master_doc,
"python-keycloak",
"python-keycloak Documentation",
author,
"python-keycloak",
"One line description of project.",
"Miscellaneous",
)
]

303
docs/source/index.rst

@ -3,305 +3,12 @@
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
.. image:: https://readthedocs.org/projects/adamatics-keycloak/badge/?version=latest
:target: https://adamatics-keycloak.readthedocs.io/en/latest/?badge=latest
.. mdinclude:: ../../README.md
.. toctree::
:maxdepth: 2
:caption: Contents:
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
.. image:: https://readthedocs.org/projects/python-keycloak/badge/?version=latest
:target: http://python-keycloak.readthedocs.io/en/latest/?badge=latest
Welcome to python-keycloak's documentation!
===========================================
**python-keycloak** is a Python package providing access to the Keycloak API.
Installation
==================
Via Pypi Package::
$ pip install python-keycloak
Manually::
$ python setup.py install
Dependencies
==================
python-keycloak depends on:
* Python 3
* `requests <http://docs.python-requests.org/en/master/>`_
* `python-jose <http://python-jose.readthedocs.io/en/latest/>`_
Tests Dependencies
------------------
* unittest
* `httmock <https://github.com/patrys/httmock>`_
Bug reports
==================
Please report bugs and feature requests at
`https://github.com/marcospereirampj/python-keycloak/issues <https://github.com/marcospereirampj/python-keycloak/issues>`_
Documentation
==================
The documentation for python-keycloak is available on `readthedocs <http://python-keycloak.readthedocs.io>`_.
Contributors
==================
* `Agriness Team <http://www.agriness.com/pt/>`_
* `Marcos Pereira <marcospereira.mpj@gmail.com>`_
* `Martin Devlin <martin.devlin@pearson.com>`_
* `Shon T. Urbas <shon.urbas@gmail.com>`_
* `Markus Spanier <https://bitbucket.org/spanierm/>`_
* `Remco Kranenburg <https://bitbucket.org/Remco47/>`_
* `Armin <https://bitbucket.org/arminfelder/>`_
* `Njordr <https://bitbucket.org/njordr/>`_
* `Josha Inglis <https://bitbucket.org/joshainglis/>`_
* `Alex <https://bitbucket.org/alex_zel/>`_
* `Ewan Jone <https://bitbucket.org/kisamoto/>`_
Usage
=====
Main methods::
# KEYCLOAK OPENID
from keycloak import KeycloakOpenID
# Configure client
keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/",
client_id="example_client",
realm_name="example_realm",
client_secret_key="secret",
verify=True)
# Optionally, you can pass custom headers that will be added to all HTTP calls
# keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/",
# client_id="example_client",
# realm_name="example_realm",
# client_secret_key="secret",
# verify=True,
# custom_headers={'CustomHeader': 'value'})
# Optionally, you can pass proxies as well that will be used in all HTTP calls. See requests documentation for more details_
# `requests-proxies <https://2.python-requests.org/en/master/user/advanced/#id10>`_.
# keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/",
# client_id="example_client",
# realm_name="example_realm",
# client_secret_key="secret",
# verify=True,
# proxies={'http': 'http://10.10.1.10:3128', 'https': 'http://10.10.1.10:1080'})
# Get WellKnow
config_well_know = keycloak_openid.well_know()
# Get Token
token = keycloak_openid.token("user", "password")
token = keycloak_openid.token("user", "password", totp="012345")
# Get Userinfo
userinfo = keycloak_openid.userinfo(token['access_token'])
# Refresh token
token = keycloak_openid.refresh_token(token['refresh_token'])
# Logout
keycloak_openid.logout(token['refresh_token'])
# Get Certs
certs = keycloak_openid.certs()
# Get RPT (Entitlement)
token = keycloak_openid.token("user", "password")
rpt = keycloak_openid.entitlement(token['access_token'], "resource_id")
# Instropect RPT
token_rpt_info = keycloak_openid.introspect(keycloak_openid.introspect(token['access_token'], rpt=rpt['rpt'],
token_type_hint="requesting_party_token"))
# Introspect Token
token_info = keycloak_openid.introspect(token['access_token']))
# Decode Token
KEYCLOAK_PUBLIC_KEY = "secret"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options)
# Get permissions by token
token = keycloak_openid.token("user", "password")
keycloak_openid.load_authorization_config("example-authz-config.json")
policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode', key=KEYCLOAK_PUBLIC_KEY)
permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect')
# KEYCLOAK ADMIN
from keycloak import KeycloakAdmin
keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
username='example-admin',
password='secret',
realm_name="example_realm",
verify=True)
# Optionally, you can pass custom headers that will be added to all HTTP calls
#keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
# username='example-admin',
# password='secret',
# realm_name="example_realm",
# verify=True,
# custom_headers={'CustomHeader': 'value'})
#
# You can also authenticate with client_id and client_secret
#keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
# client_id="example_client",
# client_secret_key="secret",
# realm_name="example_realm",
# verify=True,
# custom_headers={'CustomHeader': 'value'})
# Add user
new_user = keycloak_admin.create_user({"email": "example@example.com",
"username": "example@example.com",
"enabled": True,
"firstName": "Example",
"lastName": "Example",
"realmRoles": ["user_default", ],
"attributes": {"example": "1,2,3,3,"}})
# Add user and set password
new_user = keycloak_admin.create_user({"email": "example@example.com",
"username": "example@example.com",
"enabled": True,
"firstName": "Example",
"lastName": "Example",
"credentials": [{"value": "secret","type": "password",}],
"realmRoles": ["user_default", ],
"attributes": {"example": "1,2,3,3,"}})
# User counter
count_users = keycloak_admin.users_count()
# Get users Returns a list of users, filtered according to query parameters
users = keycloak_admin.get_users({})
# Get user ID from name
user-id-keycloak = keycloak_admin.get_user_id("example@example.com")
# Get User
user = keycloak_admin.get_user("user-id-keycloak")
# Update User
response = keycloak_admin.update_user(user_id="user-id-keycloak",
payload={'firstName': 'Example Update'})
# Update User Password
response = set_user_password(user_id="user-id-keycloak", password="secret", temporary=True)
# Delete User
response = keycloak_admin.delete_user(user_id="user-id-keycloak")
# Get consents granted by the user
consents = keycloak_admin.consents_user(user_id="user-id-keycloak")
# Send User Action
response = keycloak_admin.send_update_account(user_id="user-id-keycloak",
payload=json.dumps(['UPDATE_PASSWORD']))
# Send Verify Email
response = keycloak_admin.send_verify_email(user_id="user-id-keycloak")
# Get sessions associated with the user
sessions = keycloak_admin.get_sessions(user_id="user-id-keycloak")
# Get themes, social providers, auth providers, and event listeners available on this server
server_info = keycloak_admin.get_server_info()
# Get clients belonging to the realm Returns a list of clients belonging to the realm
clients = keycloak_admin.get_clients()
# Get client - id (not client-id) from client by name
client_id=keycloak_admin.get_client_id("my-client")
# Get representation of the client - id of client (not client-id)
client = keycloak_admin.get_client(client_id="client_id")
# Get all roles for the realm or client
realm_roles = keycloak_admin.get_realm_roles()
# Get all roles for the client
client_roles = keycloak_admin.get_client_roles(client_id="client_id")
# Get client role
role = keycloak_admin.get_client_role(client_id="client_id", role_name="role_name")
# Warning: Deprecated
# Get client role id from name
role_id = keycloak_admin.get_client_role_id(client_id="client_id", role_name="test")
# Create client role
keycloak_admin.create_client_role(client_id="client_id", {'name': 'roleName', 'clientRole': True})
# Get client role id from name
role_id = keycloak_admin.get_client_role_id(client_id=client_id, role_name="test")
# Get all roles for the realm or client
realm_roles = keycloak_admin.get_roles()
# Assign client role to user. Note that BOTH role_name and role_id appear to be required.
keycloak_admin.assign_client_role(client_id="client_id", user_id="user_id", role_id="role_id", role_name="test")
# Assign realm roles to user. Note that BOTH role_name and role_id appear to be required.
keycloak_admin.assign_realm_roles(client_id="client_id", user_id="user_id", roles=[{"roles_representation"}])
# Delete realm roles of user. Note that BOTH role_name and role_id appear to be required.
keycloak_admin.deletes_realm_roles_of_user(user_id="user_id", roles=[{"roles_representation"}])
# Create new group
group = keycloak_admin.create_group(name="Example Group")
# Get all groups
groups = keycloak_admin.get_groups()
# Get group
group = keycloak_admin.get_group(group_id='group_id')
# Get group by path
group = keycloak_admin.get_group_by_path(path='/group/subgroup', search_in_subgroups=True)
# Function to trigger user sync from provider
sync_users(storage_id="storage_di", action="action")
# List public RSA keys
components = keycloak_admin.keys
# List all keys
components = keycloak_admin.get_components(query={"parent":"example_realm", "type":"org.keycloak.keys.KeyProvider"})
# Create a new RSA key
component = keycloak_admin.create_component({"name":"rsa-generated","providerId":"rsa-generated","providerType":"org.keycloak.keys.KeyProvider","parentId":"example_realm","config":{"priority":["100"],"enabled":["true"],"active":["true"],"algorithm":["RS256"],"keySize":["2048"]}})
# Update the key
component_details['config']['active'] = ["false"]
keycloak_admin.update_component(component['id'])
# Delete the key
keycloak_admin.delete_component(component['id'])
readme
reference/keycloak/index

1
docs/source/readme.rst

@ -0,0 +1 @@
.. mdinclude:: ../../README.md

7
keycloak/__init__.py

@ -21,5 +21,8 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from .keycloak_admin import *
from .keycloak_openid import *
from ._version import __version__
from .keycloak_admin import KeycloakAdmin
from .keycloak_openid import KeycloakOpenID
__all__ = ["KeycloakAdmin", "KeycloakOpenID", "__version__"]

24
keycloak/_version.py

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
__version__ = "0.0.0"

57
keycloak/authorization/__init__.py

@ -38,7 +38,7 @@ class Authorization:
"""
def __init__(self):
self._policies = {}
self.policies = {}
@property
def policies(self):
@ -53,41 +53,46 @@ class Authorization:
Load policies, roles and permissions (scope/resources).
:param data: keycloak authorization data (dict)
:return:
:returns: None
"""
for pol in data['policies']:
if pol['type'] == 'role':
policy = Policy(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
config_roles = json.loads(pol['config']['roles'])
for pol in data["policies"]:
if pol["type"] == "role":
policy = Policy(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
config_roles = json.loads(pol["config"]["roles"])
for role in config_roles:
policy.add_role(Role(name=role['id'],
required=role['required']))
policy.add_role(Role(name=role["id"], required=role["required"]))
self.policies[policy.name] = policy
if pol['type'] == 'scope':
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
if pol["type"] == "scope":
permission = Permission(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
permission.scopes = ast.literal_eval(pol['config']['scopes'])
permission.scopes = ast.literal_eval(pol["config"]["scopes"])
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
self.policies[policy_name].add_permission(permission)
if pol['type'] == 'resource':
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
decision_strategy=pol['decisionStrategy'])
if pol["type"] == "resource":
permission = Permission(
name=pol["name"],
type=pol["type"],
logic=pol["logic"],
decision_strategy=pol["decisionStrategy"],
)
permission.resources = ast.literal_eval(pol['config'].get('resources', "[]"))
permission.resources = ast.literal_eval(pol["config"].get("resources", "[]"))
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
for policy_name in ast.literal_eval(pol["config"]["applyPolicies"]):
if self.policies.get(policy_name) is not None:
self.policies[policy_name].add_permission(permission)

14
keycloak/authorization/permission.py

@ -26,15 +26,19 @@ class Permission:
"""
Consider this simple and very common permission:
A permission associates the object being protected with the policies that must be evaluated to determine whether access is granted.
A permission associates the object being protected with the policies that must be evaluated to
determine whether access is granted.
X CAN DO Y ON RESOURCE Z
where
X represents one or more users, roles, or groups, or a combination of them. You can
where
- X represents one or more users, roles, or groups, or a combination of them. You can
also use claims and context here.
Y represents an action to be performed, for example, write, view, and so on.
Z represents a protected resource, for example, "/accounts".
- Y represents an action to be performed, for example, write, view, and so on.
- Z represents a protected resource, for example, "/accounts".
https://keycloak.gitbooks.io/documentation/authorization_services/topics/permission/overview.html

12
keycloak/authorization/policy.py

@ -29,9 +29,10 @@ class Policy:
A policy defines the conditions that must be satisfied to grant access to an object.
Unlike permissions, you do not specify the object being protected but rather the conditions
that must be satisfied for access to a given object (for example, resource, scope, or both).
Policies are strongly related to the different access control mechanisms (ACMs) that you can use to
protect your resources. With policies, you can implement strategies for attribute-based access control
(ABAC), role-based access control (RBAC), context-based access control, or any combination of these.
Policies are strongly related to the different access control mechanisms (ACMs) that you can
use to protect your resources. With policies, you can implement strategies for attribute-based
access control (ABAC), role-based access control (RBAC), context-based access control, or any
combination of these.
https://keycloak.gitbooks.io/documentation/authorization_services/topics/policy/overview.html
@ -98,9 +99,10 @@ class Policy:
:param role: keycloak role.
:return:
"""
if self.type != 'role':
if self.type != "role":
raise KeycloakAuthorizationConfigError(
"Can't add role. Policy type is different of role")
"Can't add role. Policy type is different of role"
)
self._roles.append(role)
def add_permission(self, permission):

185
keycloak/connection.py

@ -29,17 +29,18 @@ except ImportError:
import requests
from requests.adapters import HTTPAdapter
from .exceptions import (KeycloakConnectionError)
from .exceptions import KeycloakConnectionError
class ConnectionManager(object):
""" Represents a simple server connection.
Args:
base_url (str): The server URL.
headers (dict): The header parameters of the requests to the server.
timeout (int): Timeout to use for requests to the server.
verify (bool): Verify server SSL.
proxies (dict): The proxies servers requests is sent by.
"""
Represents a simple server connection.
:param base_url: (str) The server URL.
:param headers: (dict) The header parameters of the requests to the server.
:param timeout: (int) Timeout to use for requests to the server.
:param verify: (bool) Verify server SSL.
:param proxies: (dict) The proxies servers requests is sent by.
"""
def __init__(self, base_url, headers={}, timeout=60, verify=True, proxies=None):
@ -52,11 +53,11 @@ class ConnectionManager(object):
# retry once to reset connection with Keycloak after tomcat's ConnectionTimeout
# see https://github.com/marcospereirampj/python-keycloak/issues/36
for protocol in ('https://', 'http://'):
for protocol in ("https://", "http://"):
adapter = HTTPAdapter(max_retries=1)
# adds POST to retry whitelist
allowed_methods = set(adapter.max_retries.allowed_methods)
allowed_methods.add('POST')
allowed_methods.add("POST")
adapter.max_retries.allowed_methods = frozenset(allowed_methods)
self._s.mount(protocol, adapter)
@ -69,7 +70,7 @@ class ConnectionManager(object):
@property
def base_url(self):
""" Return base url in use for requests to the server. """
"""Return base url in use for requests to the server."""
return self._base_url
@base_url.setter
@ -79,7 +80,7 @@ class ConnectionManager(object):
@property
def timeout(self):
""" Return timeout in use for request to the server. """
"""Return timeout in use for request to the server."""
return self._timeout
@timeout.setter
@ -89,7 +90,7 @@ class ConnectionManager(object):
@property
def verify(self):
""" Return verify in use for request to the server. """
"""Return verify in use for request to the server."""
return self._verify
@verify.setter
@ -99,7 +100,7 @@ class ConnectionManager(object):
@property
def headers(self):
""" Return header request to the server. """
"""Return header request to the server."""
return self._headers
@headers.setter
@ -108,122 +109,116 @@ class ConnectionManager(object):
self._headers = value
def param_headers(self, key):
""" Return a specific header parameter.
:arg
key (str): Header parameters key.
:return:
If the header parameters exist, return its value.
"""
Return a specific header parameter.
:param key: (str) Header parameters key.
:returns: If the header parameters exist, return its value.
"""
return self.headers.get(key)
def clean_headers(self):
""" Clear header parameters. """
"""Clear header parameters."""
self.headers = {}
def exist_param_headers(self, key):
""" Check if the parameter exists in the header.
:arg
key (str): Header parameters key.
:return:
If the header parameters exist, return True.
"""Check if the parameter exists in the header.
:param key: (str) Header parameters key.
:returns: If the header parameters exist, return True.
"""
return self.param_headers(key) is not None
def add_param_headers(self, key, value):
""" Add a single parameter inside the header.
:arg
key (str): Header parameters key.
value (str): Value to be added.
"""Add a single parameter inside the header.
:param key: (str) Header parameters key.
:param value: (str) Value to be added.
"""
self.headers[key] = value
def del_param_headers(self, key):
""" Remove a specific parameter.
:arg
key (str): Key of the header parameters.
"""Remove a specific parameter.
:param key: (str) Key of the header parameters.
"""
self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
""" Submit get request to the path.
:arg
path (str): Path for request.
:return
Response the request.
:exception
HttpError: Can't connect to server.
"""Submit get request to the path.
:param path: (str) Path for request.
:returns: Response the request.
:raises: HttpError Can't connect to server.
"""
try:
return self._s.get(urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.get(
urljoin(self.base_url, path),
params=kwargs,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_post(self, path, data, **kwargs):
""" Submit post request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
:return
Response the request.
:exception
HttpError: Can't connect to server.
"""Submit post request to the path.
:param path: (str) Path for request.
:param data: (dict) Payload for request.
:returns: Response the request.
:raises: HttpError Can't connect to server.
"""
try:
return self._s.post(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.post(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_put(self, path, data, **kwargs):
""" Submit put request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
:return
Response the request.
:exception
HttpError: Can't connect to server.
"""Submit put request to the path.
:param path: (str) Path for request.
:param data: (dict) Payload for request.
:returns: Response the request.
:raises: HttpError Can't connect to server.
"""
try:
return self._s.put(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.put(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)
def raw_delete(self, path, data={}, **kwargs):
""" Submit delete request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
:return
Response the request.
:exception
HttpError: Can't connect to server.
"""Submit delete request to the path.
:param path: (str) Path for request.
:param data: (dict) Payload for request.
:returns: Response the request.
:raises: HttpError Can't connect to server.
"""
try:
return self._s.delete(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify)
return self._s.delete(
urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.headers,
timeout=self.timeout,
verify=self.verify,
)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
raise KeycloakConnectionError("Can't connect to server (%s)" % e)

26
keycloak/exceptions.py

@ -25,8 +25,7 @@ import requests
class KeycloakError(Exception):
def __init__(self, error_message="", response_code=None,
response_body=None):
def __init__(self, error_message="", response_code=None, response_body=None):
Exception.__init__(self, error_message)
@ -56,10 +55,23 @@ class KeycloakOperationError(KeycloakError):
class KeycloakDeprecationError(KeycloakError):
pass
class KeycloakGetError(KeycloakOperationError):
pass
class KeycloakPostError(KeycloakOperationError):
pass
class KeycloakPutError(KeycloakOperationError):
pass
class KeycloakDeleteError(KeycloakOperationError):
pass
class KeycloakSecretNotFound(KeycloakOperationError):
pass
@ -90,10 +102,10 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
return response.content
if skip_exists and response.status_code == 409:
return {"Already exists"}
return {"msg": "Already exists"}
try:
message = response.json()['message']
message = response.json()["message"]
except (KeyError, ValueError):
message = response.content
@ -103,6 +115,6 @@ def raise_error_from_response(response, error, expected_codes=None, skip_exists=
if response.status_code == 401:
error = KeycloakAuthenticationError
raise error(error_message=message,
response_code=response.status_code,
response_body=response.content)
raise error(
error_message=message, response_code=response.status_code, response_body=response.content
)

1334
keycloak/keycloak_admin.py
File diff suppressed because it is too large
View File

185
keycloak/keycloak_openid.py

@ -27,48 +27,59 @@ from jose import jwt
from .authorization import Authorization
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError, \
KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError, KeycloakDeprecationError
from .exceptions import (
KeycloakAuthorizationConfigError,
KeycloakDeprecationError,
KeycloakGetError,
KeycloakInvalidTokenError,
KeycloakRPTNotFound,
raise_error_from_response,
)
from .urls_patterns import (
URL_REALM,
URL_AUTH,
URL_CERTS,
URL_ENTITLEMENT,
URL_INTROSPECT,
URL_LOGOUT,
URL_REALM,
URL_TOKEN,
URL_USERINFO,
URL_WELL_KNOWN,
URL_LOGOUT,
URL_CERTS,
URL_ENTITLEMENT,
URL_INTROSPECT
)
class KeycloakOpenID:
def __init__(self, server_url, realm_name, client_id, client_secret_key=None, verify=True, custom_headers=None, proxies=None):
"""
:param server_url: Keycloak server url
:param client_id: client id
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by.
"""
self._client_id = client_id
self._client_secret_key = client_secret_key
self._realm_name = realm_name
headers = dict()
if custom_headers is not None:
# merge custom headers to main headers
headers.update(custom_headers)
self._connection = ConnectionManager(base_url=server_url,
headers=headers,
timeout=60,
verify=verify,
proxies=proxies)
self._authorization = Authorization()
"""
Keycloak OpenID client.
:param server_url: Keycloak server url
:param client_id: client id
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param custom_headers: dict of custom header to pass to each HTML request
:param proxies: dict of proxies to sent the request by.
"""
def __init__(
self,
server_url,
realm_name,
client_id,
client_secret_key=None,
verify=True,
custom_headers=None,
proxies=None,
):
self.client_id = client_id
self.client_secret_key = client_secret_key
self.realm_name = realm_name
headers = custom_headers if custom_headers is not None else dict()
self.connection = ConnectionManager(
base_url=server_url, headers=headers, timeout=60, verify=verify, proxies=proxies
)
self.authorization = Authorization()
@property
def client_id(self):
@ -138,7 +149,7 @@ class KeycloakOpenID:
:param kwargs:
:return:
"""
if method_token_info == 'introspect':
if method_token_info == "introspect":
token_info = self.introspect(token)
else:
token_info = self.decode_token(token, **kwargs)
@ -146,11 +157,11 @@ class KeycloakOpenID:
return token_info
def well_know(self):
""" The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
"""The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to
the OpenID Connect implementation in Keycloak.
:return It lists endpoints and other configuration options relevant.
:return It lists endpoints and other configuration options relevant.
"""
params_path = {"realm-name": self.realm_name}
@ -165,12 +176,23 @@ class KeycloakOpenID:
:return:
"""
params_path = {"authorization-endpoint": self.well_know()['authorization_endpoint'],
"client-id": self.client_id,
"redirect-uri": redirect_uri}
params_path = {
"authorization-endpoint": self.well_know()["authorization_endpoint"],
"client-id": self.client_id,
"redirect-uri": redirect_uri,
}
return URL_AUTH.format(**params_path)
def token(self, username="", password="", grant_type=["password"], code="", redirect_uri="", totp=None, **extra):
def token(
self,
username="",
password="",
grant_type=["password"],
code="",
redirect_uri="",
totp=None,
**extra
):
"""
The token endpoint is used to obtain tokens. Tokens can either be obtained by
exchanging an authorization code or by supplying credentials directly depending on
@ -183,14 +205,19 @@ class KeycloakOpenID:
:param password:
:param grant_type:
:param code:
:param redirect_uri
:param totp
:param redirect_uri:
:param totp:
:return:
"""
params_path = {"realm-name": self.realm_name}
payload = {"username": username, "password": password,
"client_id": self.client_id, "grant_type": grant_type,
"code": code, "redirect_uri": redirect_uri}
payload = {
"username": username,
"password": password,
"client_id": self.client_id,
"grant_type": grant_type,
"code": code,
"redirect_uri": redirect_uri,
}
if extra:
payload.update(extra)
@ -198,8 +225,7 @@ class KeycloakOpenID:
payload["totp"] = totp
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def refresh_token(self, refresh_token, grant_type=["refresh_token"]):
@ -216,10 +242,13 @@ class KeycloakOpenID:
:return:
"""
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "grant_type": grant_type, "refresh_token": refresh_token}
payload = {
"client_id": self.client_id,
"grant_type": grant_type,
"refresh_token": refresh_token,
}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def userinfo(self, token):
@ -250,8 +279,7 @@ class KeycloakOpenID:
payload = {"client_id": self.client_id, "refresh_token": refresh_token}
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204])
@ -277,16 +305,15 @@ class KeycloakOpenID:
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_REALM.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)['public_key']
return raise_error_from_response(data_raw, KeycloakGetError)["public_key"]
def entitlement(self, token, resource_server_id):
"""
Client applications can use a specific endpoint to obtain a special security token
called a requesting party token (RPT). This token consists of all the entitlements
(or permissions) for a user as a result of the evaluation of the permissions and authorization
policies associated with the resources being requested. With an RPT, client applications can
gain access to protected resources at the resource server.
(or permissions) for a user as a result of the evaluation of the permissions and
authorization policies associated with the resources being requested. With an RPT,
client applications can gain access to protected resources at the resource server.
:return:
"""
@ -301,8 +328,8 @@ class KeycloakOpenID:
def introspect(self, token, rpt=None, token_type_hint=None):
"""
The introspection endpoint is used to retrieve the active state of a token. It is can only be
invoked by confidential clients.
The introspection endpoint is used to retrieve the active state of a token.
It is can only be invoked by confidential clients.
https://tools.ietf.org/html/rfc7662
@ -316,7 +343,7 @@ class KeycloakOpenID:
payload = {"client_id": self.client_id, "token": token}
if token_type_hint == 'requesting_party_token':
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
self.connection.add_param_headers("Authorization", "Bearer " + token)
@ -325,12 +352,11 @@ class KeycloakOpenID:
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def decode_token(self, token, key, algorithms=['RS256'], **kwargs):
def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
"""
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
@ -347,8 +373,7 @@ class KeycloakOpenID:
:return:
"""
return jwt.decode(token, key, algorithms=algorithms,
audience=self.client_id, **kwargs)
return jwt.decode(token, key, algorithms=algorithms, audience=self.client_id, **kwargs)
def load_authorization_config(self, path):
"""
@ -357,12 +382,12 @@ class KeycloakOpenID:
:param path: settings file (json)
:return:
"""
authorization_file = open(path, 'r')
authorization_file = open(path, "r")
authorization_json = json.loads(authorization_file.read())
self.authorization.load_config(authorization_json)
authorization_file.close()
def get_policies(self, token, method_token_info='introspect', **kwargs):
def get_policies(self, token, method_token_info="introspect", **kwargs):
"""
Get policies by user token
@ -377,12 +402,10 @@ class KeycloakOpenID:
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == 'introspect' and not token_info['active']:
raise KeycloakInvalidTokenError(
"Token expired or invalid."
)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info['resource_access'].get(self.client_id)
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
@ -390,13 +413,13 @@ class KeycloakOpenID:
policies = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources['roles']:
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
policies.append(policy)
return list(set(policies))
def get_permissions(self, token, method_token_info='introspect', **kwargs):
def get_permissions(self, token, method_token_info="introspect", **kwargs):
"""
Get permission by user token
@ -413,12 +436,10 @@ class KeycloakOpenID:
token_info = self._token_info(token, method_token_info, **kwargs)
if method_token_info == 'introspect' and not token_info['active']:
raise KeycloakInvalidTokenError(
"Token expired or invalid."
)
if method_token_info == "introspect" and not token_info["active"]:
raise KeycloakInvalidTokenError("Token expired or invalid.")
user_resources = token_info['resource_access'].get(self.client_id)
user_resources = token_info["resource_access"].get(self.client_id)
if not user_resources:
return None
@ -426,7 +447,7 @@ class KeycloakOpenID:
permissions = []
for policy_name, policy in self.authorization.policies.items():
for role in user_resources['roles']:
for role in user_resources["roles"]:
if self._build_name_role(role) in policy.roles:
permissions += policy.permissions

191
keycloak/tests/test_connection.py

@ -1,191 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest import mock
from httmock import urlmatch, response, HTTMock, all_requests
from keycloak import KeycloakAdmin, KeycloakOpenID
from ..connection import ConnectionManager
try:
import unittest
except ImportError:
import unittest2 as unittest
class TestConnection(unittest.TestCase):
def setUp(self):
self._conn = ConnectionManager(
base_url="http://localhost/",
headers={},
timeout=60)
@all_requests
def response_content_success(self, url, request):
headers = {'content-type': 'application/json'}
content = b'response_ok'
return response(200, content, headers, None, 5, request)
def test_raw_get(self):
with HTTMock(self.response_content_success):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b'response_ok')
self.assertEqual(resp.status_code, 200)
def test_raw_post(self):
@urlmatch(path="/known_path", method="post")
def response_post_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
return response(201, content, headers, None, 5, request)
with HTTMock(response_post_success):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 201)
def test_raw_put(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
return response(200, content, headers, None, 5, request)
with HTTMock(response_put_success):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 200)
def test_raw_get_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="get")
def response_get_fail(url, request):
headers = {'content-type': 'application/json'}
content = "404 page not found".encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_get_fail):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b"404 page not found")
self.assertEqual(resp.status_code, 404)
def test_raw_post_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="post")
def response_post_fail(url, request):
headers = {'content-type': 'application/json'}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_post_fail):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
def test_raw_put_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_fail(url, request):
headers = {'content-type': 'application/json'}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_put_fail):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
def test_add_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
def test_del_param_headers(self):
self._conn.add_param_headers("test", "value")
self._conn.del_param_headers("test")
self.assertEqual(self._conn.headers, {})
def test_clean_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
self._conn.clean_headers()
self.assertEqual(self._conn.headers, {})
def test_exist_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
def test_KeycloakAdmin_custom_header(self):
class FakeToken:
@staticmethod
def get(string_val):
return "faketoken"
fake_token = FakeToken()
with mock.patch.object(KeycloakOpenID, "__init__", return_value=None) as mock_keycloak_open_id:
with mock.patch("keycloak.keycloak_openid.KeycloakOpenID.token", return_value=fake_token):
with mock.patch("keycloak.connection.ConnectionManager.__init__", return_value=None) as mock_connection_manager:
with mock.patch("keycloak.connection.ConnectionManager.__del__", return_value=None) as mock_connection_manager_delete:
server_url = "https://localhost/auth/"
username = "admin"
password = "secret"
realm_name = "master"
headers = {
'Custom': 'test-custom-header'
}
KeycloakAdmin(server_url=server_url,
username=username,
password=password,
realm_name=realm_name,
verify=False,
custom_headers=headers)
mock_keycloak_open_id.assert_called_with(server_url=server_url,
realm_name=realm_name,
client_id='admin-cli',
client_secret_key=None,
verify=False,
custom_headers=headers)
expected_header = {'Authorization': 'Bearer faketoken',
'Content-Type': 'application/json',
'Custom': 'test-custom-header'
}
mock_connection_manager.assert_called_with(base_url=server_url,
headers=expected_header,
timeout=60,
verify=False)
mock_connection_manager_delete.assert_called_once_with()

78
keycloak/urls_patterns.py

@ -30,7 +30,9 @@ URL_LOGOUT = "realms/{realm-name}/protocol/openid-connect/logout"
URL_CERTS = "realms/{realm-name}/protocol/openid-connect/certs"
URL_INTROSPECT = "realms/{realm-name}/protocol/openid-connect/token/introspect"
URL_ENTITLEMENT = "realms/{realm-name}/authz/entitlement/{resource-server-id}"
URL_AUTH = "{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}"
URL_AUTH = (
"{authorization-endpoint}?client_id={client-id}&response_type=code&redirect_uri={redirect-uri}"
)
# ADMIN URLS
URL_ADMIN_USERS = "admin/realms/{realm-name}/users"
@ -41,17 +43,28 @@ URL_ADMIN_SEND_UPDATE_ACCOUNT = "admin/realms/{realm-name}/users/{id}/execute-ac
URL_ADMIN_SEND_VERIFY_EMAIL = "admin/realms/{realm-name}/users/{id}/send-verify-email"
URL_ADMIN_RESET_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_GET_SESSIONS = "admin/realms/{realm-name}/users/{id}/sessions"
URL_ADMIN_USER_CLIENT_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}"
URL_ADMIN_USER_CLIENT_ROLES = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}"
)
URL_ADMIN_USER_REALM_ROLES = "admin/realms/{realm-name}/users/{id}/role-mappings/realm"
URL_ADMIN_USER_REALM_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/available"
URL_ADMIN_USER_REALM_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite"
URL_ADMIN_USER_REALM_ROLES_AVAILABLE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/realm/available"
)
URL_ADMIN_USER_REALM_ROLES_COMPOSITE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/realm/composite"
)
URL_ADMIN_GROUPS_REALM_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/realm"
URL_ADMIN_GROUPS_CLIENT_ROLES = "admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}"
URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available"
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = "admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite"
URL_ADMIN_GROUPS_CLIENT_ROLES = (
"admin/realms/{realm-name}/groups/{id}/role-mappings/clients/{client-id}"
)
URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/available"
)
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE = (
"admin/realms/{realm-name}/users/{id}/role-mappings/clients/{client-id}/composite"
)
URL_ADMIN_USER_GROUP = "admin/realms/{realm-name}/users/{id}/groups/{group-id}"
URL_ADMIN_USER_GROUPS = "admin/realms/{realm-name}/users/{id}/groups"
URL_ADMIN_USER_PASSWORD = "admin/realms/{realm-name}/users/{id}/reset-password"
URL_ADMIN_USER_CREDENTIALS = "admin/realms/{realm-name}/users/{id}/credentials"
URL_ADMIN_USER_CREDENTIAL = "admin/realms/{realm-name}/users/{id}/credentials/{credential_id}"
URL_ADMIN_USER_LOGOUT = "admin/realms/{realm-name}/users/{id}/logout"
@ -73,14 +86,21 @@ URL_ADMIN_CLIENT_ROLES = URL_ADMIN_CLIENT + "/roles"
URL_ADMIN_CLIENT_ROLE = URL_ADMIN_CLIENT + "/roles/{role-name}"
URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE = URL_ADMIN_CLIENT_ROLE + "/composites"
URL_ADMIN_CLIENT_ROLE_MEMBERS = URL_ADMIN_CLIENT + "/roles/{role-name}/users"
URL_ADMIN_CLIENT_ROLE_GROUPS = URL_ADMIN_CLIENT + "/roles/{role-name}/groups"
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1"
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1&permission=false"
)
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
)
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
)
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user"
URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}"
@ -101,8 +121,13 @@ URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances"
URL_ADMIN_IDP_MAPPERS = "admin/realms/{realm-name}/identity-provider/instances/{idp-alias}/mappers"
URL_ADMIN_IDP = "admin/realms//{realm-name}/identity-provider/instances/{alias}"
URL_ADMIN_REALM_ROLES_ROLE_BY_NAME = "admin/realms/{realm-name}/roles/{role-name}"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = "admin/realms/{realm-name}/roles/{role-name}/composites"
URL_ADMIN_REALM_EXPORT = "admin/realms/{realm-name}/partial-export?exportClients={export-clients}&exportGroupsAndRoles={export-groups-and-roles}"
URL_ADMIN_REALM_ROLES_COMPOSITE_REALM_ROLE = (
"admin/realms/{realm-name}/roles/{role-name}/composites"
)
URL_ADMIN_REALM_EXPORT = (
"admin/realms/{realm-name}/partial-export?exportClients={export-clients}&"
+ "exportGroupsAndRoles={export-groups-and-roles}"
)
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES = URL_ADMIN_REALM + "/default-default-client-scopes"
URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE = URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES + "/{id}"
@ -113,10 +138,22 @@ URL_ADMIN_FLOWS = "admin/realms/{realm-name}/authentication/flows"
URL_ADMIN_FLOW = URL_ADMIN_FLOWS + "/{id}"
URL_ADMIN_FLOWS_ALIAS = "admin/realms/{realm-name}/authentication/flows/{flow-id}"
URL_ADMIN_FLOWS_COPY = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/copy"
URL_ADMIN_FLOWS_EXECUTIONS = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions"
URL_ADMIN_FLOWS_EXECUTIONS = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions"
)
URL_ADMIN_FLOWS_EXECUTION = "admin/realms/{realm-name}/authentication/executions/{id}"
URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution"
URL_ADMIN_FLOWS_EXECUTIONS_FLOW = "admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow"
URL_ADMIN_FLOWS_EXECUTIONS_EXECUTION = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/execution"
)
URL_ADMIN_FLOWS_EXECUTIONS_FLOW = (
"admin/realms/{realm-name}/authentication/flows/{flow-alias}/executions/flow"
)
URL_ADMIN_AUTHENTICATOR_PROVIDERS = (
"admin/realms/{realm-name}/authentication/authenticator-providers"
)
URL_ADMIN_AUTHENTICATOR_CONFIG_DESCRIPTION = (
"admin/realms/{realm-name}/authentication/config-description/{provider-id}"
)
URL_ADMIN_AUTHENTICATOR_CONFIG = "admin/realms/{realm-name}/authentication/config/{id}"
URL_ADMIN_COMPONENTS = "admin/realms/{realm-name}/components"
@ -124,10 +161,9 @@ URL_ADMIN_COMPONENT = "admin/realms/{realm-name}/components/{component-id}"
URL_ADMIN_KEYS = "admin/realms/{realm-name}/keys"
URL_ADMIN_USER_FEDERATED_IDENTITIES = "admin/realms/{realm-name}/users/{id}/federated-identity"
URL_ADMIN_USER_FEDERATED_IDENTITY = "admin/realms/{realm-name}/users/{id}/federated-identity/{provider}"
URL_ADMIN_USER_FEDERATED_IDENTITY = (
"admin/realms/{realm-name}/users/{id}/federated-identity/{provider}"
)
URL_ADMIN_EVENTS = 'admin/realms/{realm-name}/events'
URL_ADMIN_DELETE_USER_ROLE = "admin/realms/{realm-name}/users/{id}/role-mappings/realm"
URL_ADMIN_EVENTS = "admin/realms/{realm-name}/events"
URL_ADMIN_CLIENT_SESSION_STATS = "admin/realms/{realm-name}/client-session-stats"

6
pyproject.toml

@ -0,0 +1,6 @@
[tool.black]
line-length = 99
[tool.isort]
line_length = 99
profile = "black"

6
requirements.txt

@ -1,7 +1,3 @@
requests>=2.20.0
httmock>=1.2.5
python-jose>=1.4.0
twine==1.13.0
jose~=1.0.0
setuptools~=54.2.0
urllib3>=1.26.5
urllib3>=1.26.0

65
setup.py

@ -1,31 +1,56 @@
# -*- coding: utf-8 -*-
import re
from setuptools import setup
with open("README.md", "r") as fh:
long_description = fh.read()
with open("requirements.txt", "r") as fh:
reqs = fh.read().split("\n")
with open("dev-requirements.txt", "r") as fh:
dev_reqs = fh.read().split("\n")
with open("docs-requirements.txt", "r") as fh:
docs_reqs = fh.read().split("\n")
VERSIONFILE = "keycloak/_version.py"
verstrline = open(VERSIONFILE, "rt").read()
VSRE = r"^__version__ = ['\"]([^'\"]*)['\"]"
mo = re.search(VSRE, verstrline, re.M)
if mo:
verstr = mo.group(1)
else:
raise RuntimeError("Unable to find version string in %s." % (VERSIONFILE,))
setup(
name='python-keycloak',
version='0.27.1',
url='https://github.com/marcospereirampj/python-keycloak',
license='The MIT License',
author='Marcos Pereira',
author_email='marcospereira.mpj@gmail.com',
keywords='keycloak openid',
description='python-keycloak is a Python package providing access to the Keycloak API.',
name="python-keycloak",
version=verstr,
url="https://github.com/marcospereirampj/python-keycloak",
license="The MIT License",
author="Marcos Pereira, Richard Nemeth",
author_email="marcospereira.mpj@gmail.com; ryshoooo@gmail.com",
keywords="keycloak openid oidc",
description="python-keycloak is a Python package providing access to the Keycloak API.",
long_description=long_description,
long_description_content_type="text/markdown",
packages=['keycloak', 'keycloak.authorization', 'keycloak.tests'],
install_requires=['requests>=2.20.0', 'python-jose>=1.4.0'],
tests_require=['httmock>=1.2.5'],
packages=["keycloak"],
install_requires=reqs,
tests_require=dev_reqs,
extras_require={"docs": docs_reqs},
python_requires=">=3.7",
project_urls={
"Documentation": "https://python-keycloak.readthedocs.io/en/latest/",
"Issue tracker": "https://github.com/marcospereirampj/python-keycloak/issues",
},
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Development Status :: 3 - Alpha',
'Operating System :: MacOS',
'Operating System :: Unix',
'Operating System :: Microsoft :: Windows',
'Topic :: Utilities'
]
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Operating System :: MacOS",
"Operating System :: Unix",
"Operating System :: Microsoft :: Windows",
"Topic :: Utilities",
],
)

35
test_keycloak_init.sh

@ -0,0 +1,35 @@
#!/usr/bin/env bash
CMD_ARGS=$1
KEYCLOAK_DOCKER_IMAGE="quay.io/keycloak/keycloak:latest"
echo "${CMD_ARGS}"
function keycloak_stop() {
docker stop unittest_keycloak &> /dev/null
docker rm unittest_keycloak &> /dev/null
}
function keycloak_start() {
echo "Starting keycloak docker container"
docker run -d --name unittest_keycloak -e KEYCLOAK_ADMIN="${KEYCLOAK_ADMIN}" -e KEYCLOAK_ADMIN_PASSWORD="${KEYCLOAK_ADMIN_PASSWORD}" -p "${KEYCLOAK_PORT}:8080" "${KEYCLOAK_DOCKER_IMAGE}" start-dev
SECONDS=0
until curl localhost:$KEYCLOAK_PORT; do
sleep 5;
if [ ${SECONDS} -gt 180 ]; then
echo "Timeout exceeded";
exit 1;
fi
done
}
# Ensuring that keycloak is stopped in case of CTRL-C
trap keycloak_stop err exit
keycloak_stop # In case it did not shut down correctly last time.
keycloak_start
eval ${CMD_ARGS}
RETURN_VALUE=$?
exit ${RETURN_VALUE}

0
keycloak/tests/__init__.py → tests/__init__.py

61
tests/conftest.py

@ -0,0 +1,61 @@
import os
import uuid
import pytest
from keycloak import KeycloakAdmin
@pytest.fixture
def env():
class KeycloakTestEnv(object):
KEYCLOAK_HOST = os.environ["KEYCLOAK_HOST"]
KEYCLOAK_PORT = os.environ["KEYCLOAK_PORT"]
KEYCLOAK_ADMIN = os.environ["KEYCLOAK_ADMIN"]
KEYCLOAK_ADMIN_PASSWORD = os.environ["KEYCLOAK_ADMIN_PASSWORD"]
return KeycloakTestEnv()
@pytest.fixture
def admin(env):
return KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
)
@pytest.fixture
def realm(admin: KeycloakAdmin) -> str:
realm_name = str(uuid.uuid4())
admin.create_realm(payload={"realm": realm_name})
yield realm_name
admin.delete_realm(realm_name=realm_name)
@pytest.fixture
def user(admin: KeycloakAdmin, realm: str) -> str:
admin.realm_name = realm
username = str(uuid.uuid4())
user_id = admin.create_user(payload={"username": username, "email": f"{username}@test.test"})
yield user_id
admin.delete_user(user_id=user_id)
@pytest.fixture
def group(admin: KeycloakAdmin, realm: str) -> str:
admin.realm_name = realm
group_name = str(uuid.uuid4())
group_id = admin.create_group(payload={"name": group_name})
yield group_id
admin.delete_group(group_id=group_id)
@pytest.fixture
def client(admin: KeycloakAdmin, realm: str) -> str:
admin.realm_name = realm
client = str(uuid.uuid4())
client_id = admin.create_client(payload={"name": client, "clientId": client})
yield client_id
admin.delete_client(client_id=client_id)

1241
tests/test_keycloak_admin.py
File diff suppressed because it is too large
View File

26
tests/test_urls_patterns.py

@ -0,0 +1,26 @@
from keycloak import urls_patterns
def test_correctness_of_patterns():
"""Test that there are no duplicate url patterns."""
# Test that the patterns are present
urls = [x for x in dir(urls_patterns) if not x.startswith("__")]
assert len(urls) >= 0
# Test that all patterns start with URL_
for url in urls:
assert url.startswith("URL_"), f"The url pattern {url} does not begin with URL_"
# Test that the patterns have unique names
seen_urls = list()
for url in urls:
assert url not in seen_urls, f"The url pattern {url} is present twice."
seen_urls.append(url)
# Test that the pattern values are unique
seen_url_values = list()
for url in urls:
url_value = urls_patterns.__dict__[url]
assert url_value not in seen_url_values, f"The url {url} has a duplicate value {url_value}"
seen_url_values.append(url_value)

4
tox.env

@ -0,0 +1,4 @@
KEYCLOAK_ADMIN=admin
KEYCLOAK_ADMIN_PASSWORD=admin
KEYCLOAK_HOST={env:KEYCLOAK_HOST:localhost}
KEYCLOAK_PORT=8080

48
tox.ini

@ -0,0 +1,48 @@
[tox]
envlist = check, apply-check, docs, tests, build
[testenv]
install_command = pip install {opts} {packages}
[testenv:check]
deps =
black
isort
flake8
commands =
black --check --diff keycloak tests docs
isort -c --df keycloak tests docs
flake8 keycloak tests docs
[testenv:apply-check]
deps =
black
isort
flake8
commands =
black -C keycloak tests docs
black keycloak tests docs
isort keycloak tests docs
[testenv:docs]
deps =
.[docs]
commands =
python -m sphinx -T -E -W -b html -d _build/doctrees -D language=en ./docs/source _build/html
[testenv:tests]
setenv = file|tox.env
deps =
-rrequirements.txt
-rdev-requirements.txt
commands =
./test_keycloak_init.sh "pytest -vv --cov=keycloak --cov-report term-missing {posargs}"
[testenv:build]
deps =
-rdev-requirements.txt
commands =
python setup.py sdist bdist_wheel
[flake8]
max-line-length = 99
Loading…
Cancel
Save