Browse Source

chore: Merge branch 'master' into feat/add-connection-pool-maxsize

pull/651/head
Richard Nemeth 4 weeks ago
parent
commit
e3383b59a5
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 4
      .github/workflows/daily.yaml
  2. 10
      .github/workflows/lint.yaml
  3. 2
      .github/workflows/publish.yaml
  4. 2
      .readthedocs.yaml
  5. 36
      CHANGELOG.md
  6. 2214
      poetry.lock
  7. 5
      pyproject.toml
  8. 9
      src/keycloak/authorization/role.py
  9. 23
      src/keycloak/connection.py
  10. 141
      src/keycloak/keycloak_admin.py
  11. 8
      src/keycloak/keycloak_openid.py
  12. 1
      src/keycloak/urls_patterns.py
  13. 7
      test_keycloak_init.sh
  14. 1
      tests/test_authorization.py
  15. 152
      tests/test_keycloak_admin.py
  16. 4
      tests/test_keycloak_openid.py

4
.github/workflows/daily.yaml

@ -10,7 +10,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
keycloak-version: ["22.0", "23.0", "24.0", "25.0", "26.0", "latest"] keycloak-version: ["22.0", "23.0", "24.0", "25.0", "26.0", "latest"]
env: env:
KEYCLOAK_DOCKER_IMAGE_TAG: ${{ matrix.keycloak-version }} KEYCLOAK_DOCKER_IMAGE_TAG: ${{ matrix.keycloak-version }}
@ -20,7 +20,7 @@ jobs:
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
- uses: docker-practice/actions-setup-docker@master
- uses: docker/setup-docker-action@v4
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip

10
.github/workflows/lint.yaml

@ -20,7 +20,7 @@ jobs:
- name: Set up Python 3.13 - name: Set up Python 3.13
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: "3.13"
python-version: "3.14"
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
@ -40,7 +40,7 @@ jobs:
- name: Set up Python 3.13 - name: Set up Python 3.13
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: "3.13"
python-version: "3.14"
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
@ -55,7 +55,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
keycloak-version: ["22.0", "23.0", "24.0", "25.0", "26.0", "latest"] keycloak-version: ["22.0", "23.0", "24.0", "25.0", "26.0", "latest"]
needs: needs:
- check-commits - check-commits
@ -68,7 +68,7 @@ jobs:
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
- uses: docker-practice/actions-setup-docker@master
- uses: docker/setup-docker-action@v4
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
@ -91,7 +91,7 @@ jobs:
- name: Set up Python 3.13 - name: Set up Python 3.13
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: "3.13"
python-version: "3.14"
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip

2
.github/workflows/publish.yaml

@ -15,7 +15,7 @@ jobs:
- name: Set up Python 3.13 - name: Set up Python 3.13
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: "3.13"
python-version: "3.14"
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip

2
.readthedocs.yaml

@ -6,7 +6,7 @@ sphinx:
build: build:
os: "ubuntu-24.04" os: "ubuntu-24.04"
tools: tools:
python: "3.13"
python: "3.14"
jobs: jobs:
post_create_environment: post_create_environment:
- python -m pip install poetry - python -m pip install poetry

36
CHANGELOG.md

@ -1,3 +1,39 @@
## v5.8.1 (2025-08-19)
### Fix
- prevent all httpx deprecation warnings (#666)
## v5.8.0 (2025-08-19)
### Feat
- implement endpoints returning the number of members in an organization (#665)
## v5.7.0 (2025-07-17)
### Feat
- add `get_composite_client_roles_of_role` (#660)
## v5.6.1 (2025-07-17)
### Fix
- fix tests (#661)
## v5.6.0 (2025-06-22)
### Feat
- add pagination support to get realm roles (#659)
## v5.5.1 (2025-05-25)
### Fix
- fix/latest version (#654)
## v5.5.0 (2025-04-09) ## v5.5.0 (2025-04-09)
### Feat ### Feat

2214
poetry.lock
File diff suppressed because it is too large
View File

5
pyproject.toml

@ -65,10 +65,7 @@ twine = ">=4.0.2"
freezegun = ">=1.2.2" freezegun = ">=1.2.2"
docutils = "<0.21" docutils = "<0.21"
ruff = ">=0.9.3" ruff = ">=0.9.3"
[[tool.poetry.source]]
name = "PyPI"
priority = "primary"
backports-asyncio-runner = { "version" = ">=1.2.0", "python" = ">=3.9,<3.11" }
[build-system] [build-system]
requires = ["poetry-core>=1.0.0"] requires = ["poetry-core>=1.0.0"]

9
src/keycloak/authorization/role.py

@ -78,3 +78,12 @@ class Role:
msg = f"Cannot compare Role with {type(other)}" msg = f"Cannot compare Role with {type(other)}"
raise NotImplementedError(msg) raise NotImplementedError(msg)
def __hash__(self) -> int:
"""
Hash method.
:returns: Hash value
:rtype: int
"""
return hash(f"{self.name}-{self.required}")

23
src/keycloak/connection.py

@ -418,7 +418,7 @@ class ConnectionManager:
method="POST", method="POST",
url=urljoin(self.base_url, path), url=urljoin(self.base_url, path),
params=self._filter_query_params(kwargs), params=self._filter_query_params(kwargs),
data=data,
**self._prepare_httpx_request_content(data),
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
) )
@ -444,7 +444,7 @@ class ConnectionManager:
return await self.async_s.put( return await self.async_s.put(
urljoin(self.base_url, path), urljoin(self.base_url, path),
params=self._filter_query_params(kwargs), params=self._filter_query_params(kwargs),
data=data,
**self._prepare_httpx_request_content(data),
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
) )
@ -475,7 +475,7 @@ class ConnectionManager:
return await self.async_s.request( return await self.async_s.request(
method="DELETE", method="DELETE",
url=urljoin(self.base_url, path), url=urljoin(self.base_url, path),
data=data or {},
**self._prepare_httpx_request_content(data or {}),
params=self._filter_query_params(kwargs), params=self._filter_query_params(kwargs),
headers=self.headers, headers=self.headers,
timeout=self.timeout, timeout=self.timeout,
@ -484,6 +484,23 @@ class ConnectionManager:
msg = "Can't connect to server" msg = "Can't connect to server"
raise KeycloakConnectionError(msg) from e raise KeycloakConnectionError(msg) from e
@staticmethod
def _prepare_httpx_request_content(data: dict | str | None) -> dict:
"""
Create the correct request content kwarg to `httpx.AsyncClient.request()`.
See https://www.python-httpx.org/compatibility/#request-content
:param data: the request content
:type data: dict | str | None
:returns: A dict mapping the correct kwarg to the request content
:rtype: dict
"""
if isinstance(data, str):
# Note: this could also accept bytes, Iterable[bytes], or AsyncIterable[bytes]
return {"content": data}
return {"data": data}
@staticmethod @staticmethod
def _filter_query_params(query_params: dict) -> dict: def _filter_query_params(query_params: dict) -> dict:
""" """

141
src/keycloak/keycloak_admin.py

@ -36,6 +36,7 @@ from . import urls_patterns
from .exceptions import ( from .exceptions import (
HTTP_ACCEPTED, HTTP_ACCEPTED,
HTTP_BAD_REQUEST, HTTP_BAD_REQUEST,
HTTP_CONFLICT,
HTTP_CREATED, HTTP_CREATED,
HTTP_NO_CONTENT, HTTP_NO_CONTENT,
HTTP_NOT_FOUND, HTTP_NOT_FOUND,
@ -385,8 +386,8 @@ class KeycloakAdmin:
return raise_error_from_response( return raise_error_from_response(
data_raw, data_raw,
KeycloakPostError, KeycloakPostError,
expected_codes=[HTTP_CREATED],
skip_exists=skip_exists,
expected_codes=[HTTP_CREATED]
+ ([HTTP_BAD_REQUEST, HTTP_CONFLICT] if skip_exists else []),
) )
def update_realm(self, realm_name: str, payload: dict) -> dict | bytes: def update_realm(self, realm_name: str, payload: dict) -> dict | bytes:
@ -908,6 +909,42 @@ class KeycloakAdmin:
return await self.a___fetch_all(url, query) return await self.a___fetch_all(url, query)
def get_organization_members_count(self, organization_id: str) -> int:
"""
Get the number of members in the organization.
:param organization_id: ID of the organization
:type organization_id: str
:return: Number of members in the organization
:rtype: int
"""
params_path = {
"realm-name": self.connection.realm_name,
"organization_id": organization_id,
}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_ORGANIZATION_MEMBERS_COUNT.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
async def a_get_organization_members_count(self, organization_id: str) -> int:
"""
Get the number of members in the organization asynchronously.
:param organization_id: ID of the organization
:type organization_id: str
:return: Number of members in the organization
:rtype: int
"""
params_path = {
"realm-name": self.connection.realm_name,
"organization_id": organization_id,
}
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_ORGANIZATION_MEMBERS_COUNT.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
def organization_user_add(self, user_id: str, organization_id: str) -> dict | bytes: def organization_user_add(self, user_id: str, organization_id: str) -> dict | bytes:
""" """
Add a user to an organization. Add a user to an organization.
@ -2915,7 +2952,9 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK]) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
def get_realm_roles(self, brief_representation: bool = True, search_text: str = "") -> list:
def get_realm_roles(
self, brief_representation: bool = True, search_text: str = "", query: dict | None = None
) -> list:
""" """
Get all roles for the realm or client. Get all roles for the realm or client.
@ -2926,20 +2965,23 @@ class KeycloakAdmin:
:type brief_representation: bool :type brief_representation: bool
:param search_text: optional search text to limit the returned result. :param search_text: optional search text to limit the returned result.
:type search_text: str :type search_text: str
:param query: Query parameters (optional)
:type query: dict
:return: Keycloak server response (RoleRepresentation) :return: Keycloak server response (RoleRepresentation)
:rtype: list :rtype: list
""" """
query = query or {}
params_path = {"realm-name": self.connection.realm_name} params_path = {"realm-name": self.connection.realm_name}
params = {"briefRepresentation": brief_representation} params = {"briefRepresentation": brief_representation}
url = urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path)
if search_text is not None and search_text.strip() != "": if search_text is not None and search_text.strip() != "":
params["search"] = search_text params["search"] = search_text
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path),
**params,
)
return raise_error_from_response(data_raw, KeycloakGetError)
if "first" in query and "max" in query:
return self.__fetch_paginated(url, query)
return self.__fetch_all(url, params)
def get_realm_role_groups( def get_realm_role_groups(
self, self,
@ -3629,6 +3671,27 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def get_composite_client_roles_of_role(self, client_id: str, role_name: str) -> list:
"""
Get composite roles of the client role.
:param client_id: The id of the client
:type client_id: str
:param role_name: The name of the role
:type role_name: str
:return: Keycloak server response (array RoleRepresentation)
:rtype: list
"""
params_path = {
"realm-name": self.connection.realm_name,
"id": client_id,
"role-name": role_name,
}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path),
)
return raise_error_from_response(data_raw, KeycloakGetError)
def assign_realm_roles_to_client_scope(self, client_id: str, roles: str | list) -> bytes: def assign_realm_roles_to_client_scope(self, client_id: str, roles: str | list) -> bytes:
""" """
Assign realm roles to a client's scope. Assign realm roles to a client's scope.
@ -4028,14 +4091,14 @@ class KeycloakAdmin:
expected_codes=[HTTP_NO_CONTENT], expected_codes=[HTTP_NO_CONTENT],
) )
def get_all_roles_of_user(self, user_id: str) -> list:
def get_all_roles_of_user(self, user_id: str) -> dict:
""" """
Get all level roles for a user. Get all level roles for a user.
:param user_id: id of user :param user_id: id of user
:type user_id: str :type user_id: str
:return: Keycloak server response (array RoleRepresentation)
:rtype: list
:return: Keycloak server response (MappingsRepresentation)
:rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name, "id": user_id} params_path = {"realm-name": self.connection.realm_name, "id": user_id}
data_raw = self.connection.raw_get( data_raw = self.connection.raw_get(
@ -5202,7 +5265,7 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def get_client_secrets(self, client_id: str) -> list:
def get_client_secrets(self, client_id: str) -> dict:
""" """
Get representation of the client secrets. Get representation of the client secrets.
@ -5211,7 +5274,7 @@ class KeycloakAdmin:
:param client_id: id of client (not client-id) :param client_id: id of client (not client-id)
:type client_id: str :type client_id: str
:return: Keycloak server response (ClientRepresentation) :return: Keycloak server response (ClientRepresentation)
:rtype: list
:rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name, "id": client_id} params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw = self.connection.raw_get( data_raw = self.connection.raw_get(
@ -6185,8 +6248,8 @@ class KeycloakAdmin:
return raise_error_from_response( return raise_error_from_response(
data_raw, data_raw,
KeycloakPostError, KeycloakPostError,
expected_codes=[HTTP_CREATED],
skip_exists=skip_exists,
expected_codes=[HTTP_CREATED]
+ ([HTTP_BAD_REQUEST, HTTP_CONFLICT] if skip_exists else []),
) )
async def a_update_realm(self, realm_name: str, payload: dict) -> dict: async def a_update_realm(self, realm_name: str, payload: dict) -> dict:
@ -8181,9 +8244,7 @@ class KeycloakAdmin:
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK]) return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[HTTP_OK])
async def a_get_realm_roles( async def a_get_realm_roles(
self,
brief_representation: bool = True,
search_text: str = "",
self, brief_representation: bool = True, search_text: str = "", query: dict | None = None
) -> list: ) -> list:
""" """
Get all roles for the realm or client asynchronously. Get all roles for the realm or client asynchronously.
@ -8195,20 +8256,23 @@ class KeycloakAdmin:
:type brief_representation: bool :type brief_representation: bool
:param search_text: optional search text to limit the returned result. :param search_text: optional search text to limit the returned result.
:type search_text: str :type search_text: str
:param query: Query parameters (optional)
:type query: dict
:return: Keycloak server response (RoleRepresentation) :return: Keycloak server response (RoleRepresentation)
:rtype: list :rtype: list
""" """
query = query or {}
params_path = {"realm-name": self.connection.realm_name} params_path = {"realm-name": self.connection.realm_name}
params = {"briefRepresentation": brief_representation} params = {"briefRepresentation": brief_representation}
url = urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path)
if search_text is not None and search_text.strip() != "": if search_text is not None and search_text.strip() != "":
params["search"] = search_text params["search"] = search_text
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_REALM_ROLES.format(**params_path),
**params,
)
return raise_error_from_response(data_raw, KeycloakGetError)
if "first" in query and "max" in query:
return await self.a___fetch_paginated(url, query)
return await self.a___fetch_all(url, params)
async def a_get_realm_role_groups( async def a_get_realm_role_groups(
self, self,
@ -8893,6 +8957,27 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
async def a_get_composite_client_roles_of_role(self, client_id: str, role_name: str) -> list:
"""
Get composite roles of the client role.
:param client_id: The id of the client
:type client_id: str
:param role_name: The name of the role
:type role_name: str
:return: Keycloak server response (array RoleRepresentation)
:rtype: list
"""
params_path = {
"realm-name": self.connection.realm_name,
"id": client_id,
"role-name": role_name,
}
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE.format(**params_path),
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_assign_realm_roles_to_client_scope( async def a_assign_realm_roles_to_client_scope(
self, self,
client_id: str, client_id: str,
@ -9318,14 +9403,14 @@ class KeycloakAdmin:
expected_codes=[HTTP_NO_CONTENT], expected_codes=[HTTP_NO_CONTENT],
) )
async def a_get_all_roles_of_user(self, user_id: str) -> list:
async def a_get_all_roles_of_user(self, user_id: str) -> dict:
""" """
Get all level roles for a user asynchronously. Get all level roles for a user asynchronously.
:param user_id: id of user :param user_id: id of user
:type user_id: str :type user_id: str
:return: Keycloak server response (array RoleRepresentation)
:rtype: list
:return: Keycloak server response (MappingsRepresentation)
:rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name, "id": user_id} params_path = {"realm-name": self.connection.realm_name, "id": user_id}
data_raw = await self.connection.a_raw_get( data_raw = await self.connection.a_raw_get(
@ -10423,7 +10508,7 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
async def a_get_client_secrets(self, client_id: str) -> list:
async def a_get_client_secrets(self, client_id: str) -> dict:
""" """
Get representation of the client secrets asynchronously. Get representation of the client secrets asynchronously.
@ -10432,7 +10517,7 @@ class KeycloakAdmin:
:param client_id: id of client (not client-id) :param client_id: id of client (not client-id)
:type client_id: str :type client_id: str
:return: Keycloak server response (ClientRepresentation) :return: Keycloak server response (ClientRepresentation)
:rtype: list
:rtype: dict
""" """
params_path = {"realm-name": self.connection.realm_name, "id": client_id} params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw = await self.connection.a_raw_get( data_raw = await self.connection.a_raw_get(

8
src/keycloak/keycloak_openid.py

@ -795,7 +795,7 @@ class KeycloakOpenID:
return list(set(permissions)) return list(set(permissions))
def uma_permissions(self, token: str, permissions: str = "", **extra_payload: dict) -> dict:
def uma_permissions(self, token: str, permissions: str = "", **extra_payload: dict) -> list:
""" """
Get UMA permissions by user token with requested permissions. Get UMA permissions by user token with requested permissions.
@ -811,7 +811,7 @@ class KeycloakOpenID:
:param extra_payload: Additional payload data :param extra_payload: Additional payload data
:type extra_payload: dict :type extra_payload: dict
:returns: Keycloak server response :returns: Keycloak server response
:rtype: dict
:rtype: list
""" """
permission = build_permission_param(permissions) permission = build_permission_param(permissions)
@ -1521,7 +1521,7 @@ class KeycloakOpenID:
token: str, token: str,
permissions: str = "", permissions: str = "",
**extra_payload: dict, **extra_payload: dict,
) -> dict:
) -> list:
""" """
Get UMA permissions by user token with requested permissions asynchronously. Get UMA permissions by user token with requested permissions asynchronously.
@ -1537,7 +1537,7 @@ class KeycloakOpenID:
:param extra_payload: Additional payload data :param extra_payload: Additional payload data
:type extra_payload: dict :type extra_payload: dict
:returns: Keycloak server response :returns: Keycloak server response
:rtype: dict
:rtype: list
""" """
permission = build_permission_param(permissions) permission = build_permission_param(permissions)

1
src/keycloak/urls_patterns.py

@ -244,6 +244,7 @@ URL_ADMIN_FLOWS_EXECUTION_CONFIG = URL_ADMIN_FLOWS_EXECUTION + "/config"
URL_ADMIN_ORGANIZATIONS = URL_ADMIN_REALM + "/organizations" URL_ADMIN_ORGANIZATIONS = URL_ADMIN_REALM + "/organizations"
URL_ADMIN_ORGANIZATION_BY_ID = URL_ADMIN_ORGANIZATIONS + "/{organization_id}" URL_ADMIN_ORGANIZATION_BY_ID = URL_ADMIN_ORGANIZATIONS + "/{organization_id}"
URL_ADMIN_ORGANIZATION_MEMBERS = URL_ADMIN_ORGANIZATION_BY_ID + "/members" URL_ADMIN_ORGANIZATION_MEMBERS = URL_ADMIN_ORGANIZATION_BY_ID + "/members"
URL_ADMIN_ORGANIZATION_MEMBERS_COUNT = URL_ADMIN_ORGANIZATION_MEMBERS + "/count"
URL_ADMIN_ORGANIZATION_DEL_MEMBER_BY_ID = URL_ADMIN_ORGANIZATION_MEMBERS + "/{user_id}" URL_ADMIN_ORGANIZATION_DEL_MEMBER_BY_ID = URL_ADMIN_ORGANIZATION_MEMBERS + "/{user_id}"
URL_ADMIN_ORGANIZATION_IDPS = URL_ADMIN_ORGANIZATION_BY_ID + "/identity-providers" URL_ADMIN_ORGANIZATION_IDPS = URL_ADMIN_ORGANIZATION_BY_ID + "/identity-providers"
URL_ADMIN_ORGANIZATION_IDP_BY_ALIAS = URL_ADMIN_ORGANIZATION_IDPS + "/{idp_alias}" URL_ADMIN_ORGANIZATION_IDP_BY_ALIAS = URL_ADMIN_ORGANIZATION_IDPS + "/{idp_alias}"

7
test_keycloak_init.sh

@ -15,7 +15,12 @@ function keycloak_stop() {
function keycloak_start() { function keycloak_start() {
echo "Starting keycloak docker container" echo "Starting keycloak docker container"
PWD=$(pwd) PWD=$(pwd)
docker run -d --name unittest_keycloak -e KEYCLOAK_ADMIN="${KEYCLOAK_ADMIN}" -e KEYCLOAK_ADMIN_PASSWORD="${KEYCLOAK_ADMIN_PASSWORD}" -e KC_FEATURES="token-exchange,admin-fine-grained-authz" -p "${KEYCLOAK_PORT}:8080" -v $PWD/tests/providers:/opt/keycloak/providers "${KEYCLOAK_DOCKER_IMAGE}" start-dev
if [[ "$KEYCLOAK_DOCKER_IMAGE_TAG" == "22.0" || "$KEYCLOAK_DOCKER_IMAGE_TAG" == "23.0" ]]; then
KEYCLOAK_FEATURES="admin-fine-grained-authz,token-exchange"
else
KEYCLOAK_FEATURES="admin-fine-grained-authz:v1,token-exchange:v1"
fi
docker run --rm -d --name unittest_keycloak -e KEYCLOAK_ADMIN="${KEYCLOAK_ADMIN}" -e KEYCLOAK_ADMIN_PASSWORD="${KEYCLOAK_ADMIN_PASSWORD}" -p "${KEYCLOAK_PORT}:8080" -v $PWD/tests/providers:/opt/keycloak/providers "${KEYCLOAK_DOCKER_IMAGE}" start-dev --features="${KEYCLOAK_FEATURES}"
SECONDS=0 SECONDS=0
until curl --silent --output /dev/null localhost:$KEYCLOAK_PORT; do until curl --silent --output /dev/null localhost:$KEYCLOAK_PORT; do
sleep 5; sleep 5;

1
tests/test_authorization.py

@ -40,6 +40,7 @@ def test_authorization_objects() -> None:
assert not r.required assert not r.required
assert r.get_name() == "test" assert r.get_name() == "test"
assert r == r # noqa: PLR0124 assert r == r # noqa: PLR0124
assert hash(r) == hash("test-False")
assert r == "test" assert r == "test"
with pytest.raises(NotImplementedError) as err: with pytest.raises(NotImplementedError) as err:

152
tests/test_keycloak_admin.py

@ -184,11 +184,19 @@ def test_realms(admin: KeycloakAdmin) -> None:
# Create the same realm, should fail # Create the same realm, should fail
with pytest.raises(KeycloakPostError) as err: with pytest.raises(KeycloakPostError) as err:
res = admin.create_realm(payload={"realm": "test"}) res = admin.create_realm(payload={"realm": "test"})
assert err.match('409: b\'{"errorMessage":"Conflict detected. See logs for details"}\'')
assert (
b"Realm test already exists" in err.value.error_message
or b"Conflict detected" in err.value.error_message
)
# Create the same realm, skip_exists true # Create the same realm, skip_exists true
res = admin.create_realm(payload={"realm": "test"}, skip_exists=True) res = admin.create_realm(payload={"realm": "test"}, skip_exists=True)
assert res == {"msg": "Already exists"}, res
assert res in [
{"errorMessage": "Realm test already exists"},
{"msg": "Already exists"},
{"errorMessage": "Conflict detected. See logs for details"},
], res
# Get a single realm # Get a single realm
res = admin.get_realm(realm_name="test") res = admin.get_realm(realm_name="test")
@ -383,6 +391,8 @@ def test_organizations(admin: KeycloakAdmin, realm: str) -> None:
users = admin.get_organization_members(org_id) users = admin.get_organization_members(org_id)
assert len(users) == 1, users assert len(users) == 1, users
assert users[0]["id"] == user_id, users[0]["id"] assert users[0]["id"] == user_id, users[0]["id"]
num_users = admin.get_organization_members_count(org_id)
assert num_users == 1, num_users
user_orgs = admin.get_user_organizations(user_id) user_orgs = admin.get_user_organizations(user_id)
assert len(user_orgs) == 1, user_orgs assert len(user_orgs) == 1, user_orgs
@ -391,6 +401,8 @@ def test_organizations(admin: KeycloakAdmin, realm: str) -> None:
admin.organization_user_remove(user_id, org_id) admin.organization_user_remove(user_id, org_id)
users = admin.get_organization_members(org_id) users = admin.get_organization_members(org_id)
assert len(users) == 0, users assert len(users) == 0, users
num_users = admin.get_organization_members_count(org_id)
assert num_users == 0, num_users
for i in range(admin.PAGE_SIZE + 50): for i in range(admin.PAGE_SIZE + 50):
user_id = admin.create_user( user_id = admin.create_user(
@ -875,7 +887,8 @@ def test_server_info(admin: KeycloakAdmin) -> None:
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
""" """
info = admin.get_server_info() info = admin.get_server_info()
assert set(info.keys()).issubset(
keys = info.keys()
assert set(keys).issubset(
{ {
"systemInfo", "systemInfo",
"memoryInfo", "memoryInfo",
@ -892,8 +905,9 @@ def test_server_info(admin: KeycloakAdmin) -> None:
"passwordPolicies", "passwordPolicies",
"enums", "enums",
"cryptoInfo", "cryptoInfo",
"cpuInfo",
}, },
), info.keys()
)
def test_groups(admin: KeycloakAdmin, user: str) -> None: def test_groups(admin: KeycloakAdmin, user: str) -> None:
@ -1669,6 +1683,31 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str) -> None:
assert err.match(COULD_NOT_FIND_ROLE_REGEX) assert err.match(COULD_NOT_FIND_ROLE_REGEX)
def test_realm_roles_pagination(admin: KeycloakAdmin, realm: str) -> None:
"""
Test realm roles pagination.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin.change_current_realm(realm)
for ind in range(admin.PAGE_SIZE + 50 - 3):
role_name = f"role_{ind:03}"
admin.create_realm_role(payload={"name": role_name})
roles = admin.get_realm_roles()
assert len(roles) == admin.PAGE_SIZE + 50, len(roles)
roles = admin.get_realm_roles(query={"first": 100, "max": 20})
assert len(roles) == 20, len(roles)
roles = admin.get_realm_roles(query={"first": 120, "max": 50})
assert len(roles) == 30, len(roles)
@pytest.mark.parametrize( @pytest.mark.parametrize(
("testcase", "arg_brief_repr", "includes_attributes"), ("testcase", "arg_brief_repr", "includes_attributes"),
[ [
@ -2226,7 +2265,13 @@ def test_client_roles(admin: KeycloakAdmin, client: str) -> None:
) )
assert res == {} assert res == {}
# Test composite client roles
# Test get composite client roles of role before adding
res = admin.get_composite_client_roles_of_role(
client_id=client, role_name="client-role-test-update"
)
assert len(res) == 0
# Test add composite client roles to role
with pytest.raises(KeycloakPostError) as err: with pytest.raises(KeycloakPostError) as err:
admin.add_composite_client_roles_to_role( admin.add_composite_client_roles_to_role(
client_role_id=client, client_role_id=client,
@ -2244,6 +2289,15 @@ def test_client_roles(admin: KeycloakAdmin, client: str) -> None:
"composite" "composite"
] ]
# Test get composite client roles of role after adding
res = admin.get_composite_client_roles_of_role(
client_id=client, role_name="client-role-test-update"
)
assert len(res) == 1
with pytest.raises(KeycloakGetError) as err:
admin.get_composite_client_roles_of_role(client_id=client, role_name="bad")
assert err.match(COULD_NOT_FIND_ROLE_REGEX)
# Test removal of composite client roles # Test removal of composite client roles
with pytest.raises(KeycloakDeleteError) as err: with pytest.raises(KeycloakDeleteError) as err:
admin.remove_composite_client_roles_from_role( admin.remove_composite_client_roles_from_role(
@ -2581,7 +2635,7 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str) -> None:
# Test flow executions # Test flow executions
res = admin.get_authentication_flow_executions(flow_alias="browser") res = admin.get_authentication_flow_executions(flow_alias="browser")
assert len(res) in [8, 12], res
assert len(res) in [8, 12, 14, 15], res
with pytest.raises(KeycloakGetError) as err: with pytest.raises(KeycloakGetError) as err:
admin.get_authentication_flow_executions(flow_alias="bad") admin.get_authentication_flow_executions(flow_alias="bad")
@ -2724,7 +2778,7 @@ def test_authentication_configs(admin: KeycloakAdmin, realm: str) -> None:
# Test list of auth providers # Test list of auth providers
res = admin.get_authenticator_providers() res = admin.get_authenticator_providers()
assert len(res) <= 40
assert len(res) <= 42
res = admin.get_authenticator_provider_config_description(provider_id="auth-cookie") res = admin.get_authenticator_provider_config_description(provider_id="auth-cookie")
assert res == { assert res == {
@ -3287,7 +3341,7 @@ def test_get_bruteforce_status_for_user(
:param realm: Keycloak realm :param realm: Keycloak realm
:type realm: str :type realm: str
""" """
oid, username, password = oid_with_credentials
oid, username, _ = oid_with_credentials
admin.change_current_realm(realm) admin.change_current_realm(realm)
# Turn on bruteforce protection # Turn on bruteforce protection
@ -3325,7 +3379,7 @@ def test_clear_bruteforce_attempts_for_user(
:param realm: Keycloak realm :param realm: Keycloak realm
:type realm: str :type realm: str
""" """
oid, username, password = oid_with_credentials
oid, username, _ = oid_with_credentials
admin.change_current_realm(realm) admin.change_current_realm(realm)
# Turn on bruteforce protection # Turn on bruteforce protection
@ -3366,7 +3420,7 @@ def test_clear_bruteforce_attempts_for_all_users(
:param realm: Keycloak realm :param realm: Keycloak realm
:type realm: str :type realm: str
""" """
oid, username, password = oid_with_credentials
oid, username, _ = oid_with_credentials
admin.change_current_realm(realm) admin.change_current_realm(realm)
# Turn on bruteforce protection # Turn on bruteforce protection
@ -3527,7 +3581,7 @@ def test_initial_access_token(
assert res["count"] == 2 assert res["count"] == 2
assert res["expiration"] == 3 assert res["expiration"] == 3
oid, username, password = oid_with_credentials
oid, _, _ = oid_with_credentials
client = str(uuid.uuid4()) client = str(uuid.uuid4())
secret = str(uuid.uuid4()) secret = str(uuid.uuid4())
@ -3639,11 +3693,19 @@ async def test_a_realms(admin: KeycloakAdmin) -> None:
# Create the same realm, should fail # Create the same realm, should fail
with pytest.raises(KeycloakPostError) as err: with pytest.raises(KeycloakPostError) as err:
res = await admin.a_create_realm(payload={"realm": "test"}) res = await admin.a_create_realm(payload={"realm": "test"})
assert err.match('409: b\'{"errorMessage":"Conflict detected. See logs for details"}\'')
assert (
b"Realm test already exists" in err.value.error_message
or b"Conflict detected" in err.value.error_message
)
# Create the same realm, skip_exists true # Create the same realm, skip_exists true
res = await admin.a_create_realm(payload={"realm": "test"}, skip_exists=True) res = await admin.a_create_realm(payload={"realm": "test"}, skip_exists=True)
assert res == {"msg": "Already exists"}, res
assert res in [
{"errorMessage": "Realm test already exists"},
{"msg": "Already exists"},
{"errorMessage": "Conflict detected. See logs for details"},
], res
# Get a single realm # Get a single realm
res = await admin.a_get_realm(realm_name="test") res = await admin.a_get_realm(realm_name="test")
@ -3843,6 +3905,8 @@ async def a_test_organizations(admin: KeycloakAdmin, realm: str) -> None:
users = await admin.a_get_organization_members(org_id) users = await admin.a_get_organization_members(org_id)
assert len(users) == 1, users assert len(users) == 1, users
assert users[0]["id"] == user_id, users[0]["id"] assert users[0]["id"] == user_id, users[0]["id"]
num_users = await admin.a_get_organization_members_count(org_id)
assert num_users == 1, num_users
user_orgs = await admin.a_get_user_organizations(user_id) user_orgs = await admin.a_get_user_organizations(user_id)
assert len(user_orgs) == 1, user_orgs assert len(user_orgs) == 1, user_orgs
@ -3851,6 +3915,8 @@ async def a_test_organizations(admin: KeycloakAdmin, realm: str) -> None:
await admin.a_organization_user_remove(user_id, org_id) await admin.a_organization_user_remove(user_id, org_id)
users = await admin.a_get_organization_members(org_id) users = await admin.a_get_organization_members(org_id)
assert len(users) == 0, users assert len(users) == 0, users
num_users = await admin.a_get_organization_members_count(org_id)
assert num_users == 0, num_users
for i in range(admin.PAGE_SIZE + 50): for i in range(admin.PAGE_SIZE + 50):
user_id = await admin.a_create_user( user_id = await admin.a_create_user(
@ -4356,7 +4422,8 @@ async def test_a_server_info(admin: KeycloakAdmin) -> None:
:type admin: KeycloakAdmin :type admin: KeycloakAdmin
""" """
info = await admin.a_get_server_info() info = await admin.a_get_server_info()
assert set(info.keys()).issubset(
keys = info.keys()
assert set(keys).issubset(
{ {
"systemInfo", "systemInfo",
"memoryInfo", "memoryInfo",
@ -4373,8 +4440,9 @@ async def test_a_server_info(admin: KeycloakAdmin) -> None:
"passwordPolicies", "passwordPolicies",
"enums", "enums",
"cryptoInfo", "cryptoInfo",
"cpuInfo",
}, },
), info.keys()
)
@pytest.mark.asyncio @pytest.mark.asyncio
@ -5194,6 +5262,32 @@ async def test_a_realm_roles(admin: KeycloakAdmin, realm: str) -> None:
assert err.match(COULD_NOT_FIND_ROLE_REGEX) assert err.match(COULD_NOT_FIND_ROLE_REGEX)
@pytest.mark.asyncio
async def test_a_realm_roles_pagination(admin: KeycloakAdmin, realm: str) -> None:
"""
Test realm roles pagination.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin.change_current_realm(realm)
for ind in range(admin.PAGE_SIZE + 50 - 3):
role_name = f"role_{ind:03}"
admin.create_realm_role(payload={"name": role_name})
roles = await admin.a_get_realm_roles()
assert len(roles) == admin.PAGE_SIZE + 50, len(roles)
roles = await admin.a_get_realm_roles(query={"first": 100, "max": 20})
assert len(roles) == 20, len(roles)
roles = await admin.a_get_realm_roles(query={"first": 120, "max": 50})
assert len(roles) == 30, len(roles)
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.parametrize( @pytest.mark.parametrize(
("testcase", "arg_brief_repr", "includes_attributes"), ("testcase", "arg_brief_repr", "includes_attributes"),
@ -5801,7 +5895,13 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str) -> None:
) )
assert res == {} assert res == {}
# Test composite client roles
# Test get composite client roles of role before adding
res = await admin.a_get_composite_client_roles_of_role(
client_id=client, role_name="client-role-test-update"
)
assert len(res) == 0
# Test add composite client roles to role
with pytest.raises(KeycloakPostError) as err: with pytest.raises(KeycloakPostError) as err:
await admin.a_add_composite_client_roles_to_role( await admin.a_add_composite_client_roles_to_role(
client_role_id=client, client_role_id=client,
@ -5819,6 +5919,12 @@ async def test_a_client_roles(admin: KeycloakAdmin, client: str) -> None:
"composite" "composite"
] ]
# Test get composite client roles of role after adding
res = await admin.a_get_composite_client_roles_of_role(
client_id=client, role_name="client-role-test-update"
)
assert len(res) == 1
# Test removal of composite client roles # Test removal of composite client roles
with pytest.raises(KeycloakDeleteError) as err: with pytest.raises(KeycloakDeleteError) as err:
await admin.a_remove_composite_client_roles_from_role( await admin.a_remove_composite_client_roles_from_role(
@ -6066,7 +6172,7 @@ async def test_a_email_query_param_handling(admin: KeycloakAdmin, user: str) ->
mock_put.assert_awaited_once_with( mock_put.assert_awaited_once_with(
ANY, ANY,
data='["UPDATE_PASSWORD"]',
content='["UPDATE_PASSWORD"]',
params={"client_id": "update-account-client-id", "redirect_uri": "https://example.com"}, params={"client_id": "update-account-client-id", "redirect_uri": "https://example.com"},
headers=ANY, headers=ANY,
timeout=60, timeout=60,
@ -6236,7 +6342,7 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str) -> None:
# Test flow executions # Test flow executions
res = await admin.a_get_authentication_flow_executions(flow_alias="browser") res = await admin.a_get_authentication_flow_executions(flow_alias="browser")
assert len(res) in [8, 12], res
assert len(res) in [8, 12, 14, 15], res
with pytest.raises(KeycloakGetError) as err: with pytest.raises(KeycloakGetError) as err:
await admin.a_get_authentication_flow_executions(flow_alias="bad") await admin.a_get_authentication_flow_executions(flow_alias="bad")
@ -6384,7 +6490,7 @@ async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str) -> Non
# Test list of auth providers # Test list of auth providers
res = await admin.a_get_authenticator_providers() res = await admin.a_get_authenticator_providers()
assert len(res) <= 40
assert len(res) <= 42
res = await admin.a_get_authenticator_provider_config_description(provider_id="auth-cookie") res = await admin.a_get_authenticator_provider_config_description(provider_id="auth-cookie")
assert res == { assert res == {
@ -6970,7 +7076,7 @@ async def test_a_get_bruteforce_status_for_user(
:param realm: Keycloak realm :param realm: Keycloak realm
:type realm: str :type realm: str
""" """
oid, username, password = oid_with_credentials
oid, username, _ = oid_with_credentials
await admin.a_change_current_realm(realm) await admin.a_change_current_realm(realm)
# Turn on bruteforce protection # Turn on bruteforce protection
@ -7009,7 +7115,7 @@ async def test_a_clear_bruteforce_attempts_for_user(
:param realm: Keycloak realm :param realm: Keycloak realm
:type realm: str :type realm: str
""" """
oid, username, password = oid_with_credentials
oid, username, _ = oid_with_credentials
await admin.a_change_current_realm(realm) await admin.a_change_current_realm(realm)
# Turn on bruteforce protection # Turn on bruteforce protection
@ -7051,7 +7157,7 @@ async def test_a_clear_bruteforce_attempts_for_all_users(
:param realm: Keycloak realm :param realm: Keycloak realm
:type realm: str :type realm: str
""" """
oid, username, password = oid_with_credentials
oid, username, _ = oid_with_credentials
await admin.a_change_current_realm(realm) await admin.a_change_current_realm(realm)
# Turn on bruteforce protection # Turn on bruteforce protection
@ -7225,7 +7331,7 @@ async def test_a_initial_access_token(
assert res["count"] == 2 assert res["count"] == 2
assert res["expiration"] == 3 assert res["expiration"] == 3
oid, username, password = oid_with_credentials
oid, _, _ = oid_with_credentials
client = str(uuid.uuid4()) client = str(uuid.uuid4())
secret = str(uuid.uuid4()) secret = str(uuid.uuid4())

4
tests/test_keycloak_openid.py

@ -392,7 +392,7 @@ def test_load_authorization_config(
server with client credentials server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
""" """
oid, username, password = oid_with_credentials_authz
oid, _, _ = oid_with_credentials_authz
oid.load_authorization_config(path="tests/data/authz_settings.json") oid.load_authorization_config(path="tests/data/authz_settings.json")
assert "test-authz-rb-policy" in oid.authorization.policies assert "test-authz-rb-policy" in oid.authorization.policies
@ -937,7 +937,7 @@ async def test_a_load_authorization_config(
server with client credentials server with client credentials
:type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] :type oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]
""" """
oid, username, password = oid_with_credentials_authz
oid, _, _ = oid_with_credentials_authz
await oid.a_load_authorization_config(path="tests/data/authz_settings.json") await oid.a_load_authorization_config(path="tests/data/authz_settings.json")
assert "test-authz-rb-policy" in oid.authorization.policies assert "test-authz-rb-policy" in oid.authorization.policies

Loading…
Cancel
Save