Browse Source

chore: Merge pull request #627 from marcospereirampj/feat/fixing_issues

Feat/fixing issues
542-keycloakadmin-not-recovering-from-keycloakauthenticationerror v5.1.0
Richard Nemeth 1 week ago
committed by GitHub
parent
commit
d0d2b60320
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 12
      poetry.lock
  2. 67
      src/keycloak/keycloak_admin.py
  3. 38
      src/keycloak/keycloak_openid.py
  4. 12
      src/keycloak/keycloak_uma.py
  5. 10
      tests/test_keycloak_admin.py

12
poetry.lock

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
[[package]]
name = "alabaster"
@ -1415,20 +1415,20 @@ dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments
[[package]]
name = "pytest-asyncio"
version = "0.24.0"
version = "0.25.0"
description = "Pytest support for asyncio"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
files = [
{file = "pytest_asyncio-0.24.0-py3-none-any.whl", hash = "sha256:a811296ed596b69bf0b6f3dc40f83bcaf341b155a269052d82efa2b25ac7037b"},
{file = "pytest_asyncio-0.24.0.tar.gz", hash = "sha256:d081d828e576d85f875399194281e92bf8a68d60d72d1a2faf2feddb6c46b276"},
{file = "pytest_asyncio-0.25.0-py3-none-any.whl", hash = "sha256:db5432d18eac6b7e28b46dcd9b69921b55c3b1086e85febfe04e70b18d9e81b3"},
{file = "pytest_asyncio-0.25.0.tar.gz", hash = "sha256:8c0610303c9e0442a5db8604505fc0f545456ba1528824842b37b4a626cbf609"},
]
[package.dependencies]
pytest = ">=8.2,<9"
[package.extras]
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1)"]
testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"]
[[package]]

67
src/keycloak/keycloak_admin.py

@ -619,7 +619,7 @@ class KeycloakAdmin:
users = self.get_users(query={"username": lower_user_name, "max": 1, "exact": True})
return users[0]["id"] if len(users) == 1 else None
def get_user(self, user_id):
def get_user(self, user_id, user_profile_metadata=False):
"""Get representation of the user.
UserRepresentation
@ -627,10 +627,15 @@ class KeycloakAdmin:
:param user_id: User id
:type user_id: str
:param user_profile_metadata: Whether to include user profile metadata in the response
:type user_profile_metadata: bool
:return: UserRepresentation
"""
params_path = {"realm-name": self.connection.realm_name, "id": user_id}
data_raw = self.connection.raw_get(urls_patterns.URL_ADMIN_USER.format(**params_path))
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_USER.format(**params_path),
userProfileMetadata=user_profile_metadata,
)
return raise_error_from_response(data_raw, KeycloakGetError)
def get_user_groups(self, user_id, query=None, brief_representation=True):
@ -1149,7 +1154,7 @@ class KeycloakAdmin:
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
return raise_error_from_response(data_raw, KeycloakGetError, [200, 404])
def create_group(self, payload, parent=None, skip_exists=False):
"""Create a group in the Realm.
@ -2124,9 +2129,7 @@ class KeycloakAdmin:
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_role(self, client_id, role_name):
"""Get client role id by name.
This is required for further actions with this role.
"""Get client role by name.
RoleRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation
@ -2135,8 +2138,8 @@ class KeycloakAdmin:
:type client_id: str
:param role_name: role's name (not id!)
:type role_name: str
:return: role_id
:rtype: str
:return: Role object
:rtype: dict
"""
params_path = {
"realm-name": self.connection.realm_name,
@ -3963,7 +3966,7 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
def get_client_all_sessions(self, client_id):
def get_client_all_sessions(self, client_id, query=None):
"""Get sessions associated with the client.
UserSessionRepresentation
@ -3971,14 +3974,18 @@ class KeycloakAdmin:
:param client_id: id of client
:type client_id: str
:param query: Additional query parameters
:type query: dict
:return: UserSessionRepresentation
:rtype: list
"""
query = query or {}
params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
url = urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)
if "first" in query or "max" in query:
return self.__fetch_paginated(url, query)
return self.__fetch_all(url, query)
def get_client_sessions_stats(self):
"""Get current session count for all clients with active sessions.
@ -4922,7 +4929,7 @@ class KeycloakAdmin:
)
return users[0]["id"] if len(users) == 1 else None
async def a_get_user(self, user_id):
async def a_get_user(self, user_id, user_profile_metadata=False):
"""Get representation of the user asynchronously.
UserRepresentation
@ -4930,11 +4937,14 @@ class KeycloakAdmin:
:param user_id: User id
:type user_id: str
:param user_profile_metadata: whether to include user profile metadata in the response
:type user_profile_metadata: bool
:return: UserRepresentation
"""
params_path = {"realm-name": self.connection.realm_name, "id": user_id}
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_USER.format(**params_path)
urls_patterns.URL_ADMIN_USER.format(**params_path),
userProfileMetadata=user_profile_metadata,
)
return raise_error_from_response(data_raw, KeycloakGetError)
@ -5460,7 +5470,7 @@ class KeycloakAdmin:
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
return raise_error_from_response(data_raw, KeycloakGetError, [200, 404])
async def a_create_group(self, payload, parent=None, skip_exists=False):
"""Create a group in the Realm asynchronously.
@ -6445,19 +6455,14 @@ class KeycloakAdmin:
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_get_client_role(self, client_id, role_name):
"""Get client role id by name asynchronously.
This is required for further actions with this role.
RoleRepresentation
https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_rolerepresentation
"""Get client role by name asynchronously.
:param client_id: id of client (not client-id)
:type client_id: str
:param role_name: role's name (not id!)
:type role_name: str
:return: role_id
:rtype: str
:return: Role object
:rtype: dict
"""
params_path = {
"realm-name": self.connection.realm_name,
@ -8299,7 +8304,7 @@ class KeycloakAdmin:
)
return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204])
async def a_get_client_all_sessions(self, client_id):
async def a_get_client_all_sessions(self, client_id, query=None):
"""Get sessions associated with the client asynchronously.
UserSessionRepresentation
@ -8307,14 +8312,18 @@ class KeycloakAdmin:
:param client_id: id of client
:type client_id: str
:param query: Additional query parameters
:type query: dict
:return: UserSessionRepresentation
:rtype: list
"""
query = query or {}
params_path = {"realm-name": self.connection.realm_name, "id": client_id}
data_raw = await self.connection.a_raw_get(
urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
url = urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path)
if "first" in query or "max" in query:
return await self.a___fetch_paginated(url, query)
return await self.a___fetch_all(url, query)
async def a_get_client_sessions_stats(self):
"""Get current session count for all clients with active sessions asynchronously.

38
src/keycloak/keycloak_openid.py

@ -731,7 +731,7 @@ class KeycloakOpenID:
return list(set(permissions))
def uma_permissions(self, token, permissions=""):
def uma_permissions(self, token, permissions="", **extra_payload):
"""Get UMA permissions by user token with requested permissions.
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
@ -743,6 +743,8 @@ class KeycloakOpenID:
:type token: str
:param permissions: list of uma permissions list(resource:scope) requested by the user
:type permissions: str
:param extra_payload: Additional payload data
:type extra_payload: dict
:returns: Keycloak server response
:rtype: dict
"""
@ -754,6 +756,7 @@ class KeycloakOpenID:
"permission": permission,
"response_mode": "permissions",
"audience": self.client_id,
**extra_payload,
}
orig_bearer = self.connection.headers.get("Authorization")
@ -800,13 +803,13 @@ class KeycloakOpenID:
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes: # pragma: no cover
needed.discard("{}#{}".format(resource, scope))
for resource in (resource_struct["rsname"], resource_struct["rsid"]):
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes: # pragma: no cover
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
@ -1394,7 +1397,7 @@ class KeycloakOpenID:
return list(set(permissions))
async def a_uma_permissions(self, token, permissions=""):
async def a_uma_permissions(self, token, permissions="", **extra_payload):
"""Get UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be
@ -1406,6 +1409,8 @@ class KeycloakOpenID:
:type token: str
:param permissions: list of uma permissions list(resource:scope) requested by the user
:type permissions: str
:param extra_payload: Additional payload data
:type extra_payload: dict
:returns: Keycloak server response
:rtype: dict
"""
@ -1417,6 +1422,7 @@ class KeycloakOpenID:
"permission": list(permission), # httpx does not handle `set` correctly
"response_mode": "permissions",
"audience": self.client_id,
**extra_payload,
}
orig_bearer = self.connection.headers.get("Authorization")
@ -1463,13 +1469,13 @@ class KeycloakOpenID:
raise
for resource_struct in granted:
resource = resource_struct["rsname"]
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes: # pragma: no cover
needed.discard("{}#{}".format(resource, scope))
for resource in (resource_struct["rsname"], resource_struct["rsid"]):
scopes = resource_struct.get("scopes", None)
if not scopes:
needed.discard(resource)
continue
for scope in scopes: # pragma: no cover
needed.discard("{}#{}".format(resource, scope))
return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed

12
src/keycloak/keycloak_uma.py

@ -318,7 +318,7 @@ class KeycloakUMA:
)
return raise_error_from_response(data_raw, KeycloakPostError)
def permissions_check(self, token, permissions: Iterable[UMAPermission]):
def permissions_check(self, token, permissions: Iterable[UMAPermission], **extra_payload):
"""Check UMA permissions by user token with requested permissions.
The token endpoint is used to check UMA permissions from Keycloak. It can only be
@ -330,6 +330,8 @@ class KeycloakUMA:
:type token: str
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
:param extra_payload: extra payload data
:type extra_payload: dict
:returns: Keycloak decision
:rtype: boolean
"""
@ -338,6 +340,7 @@ class KeycloakUMA:
"permission": ",".join(str(permission) for permission in permissions),
"response_mode": "decision",
"audience": self.connection.client_id,
**extra_payload,
}
# Everyone always has the null set of permissions
@ -657,7 +660,9 @@ class KeycloakUMA:
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_permissions_check(self, token, permissions: Iterable[UMAPermission]):
async def a_permissions_check(
self, token, permissions: Iterable[UMAPermission], **extra_payload
):
"""Check UMA permissions by user token with requested permissions asynchronously.
The token endpoint is used to check UMA permissions from Keycloak. It can only be
@ -669,6 +674,8 @@ class KeycloakUMA:
:type token: str
:param permissions: Iterable of uma permissions to validate the token against
:type permissions: Iterable[UMAPermission]
:param extra_payload: extra payload data
:type extra_payload: dict
:returns: Keycloak decision
:rtype: boolean
"""
@ -677,6 +684,7 @@ class KeycloakUMA:
"permission": ",".join(str(permission) for permission in permissions),
"response_mode": "decision",
"audience": self.connection.client_id,
**extra_payload,
}
# Everyone always has the null set of permissions

10
tests/test_keycloak_admin.py

@ -845,9 +845,8 @@ def test_groups(admin: KeycloakAdmin, user: str):
assert res is not None, res
assert res["id"] == subgroup_id_1, res
with pytest.raises(KeycloakGetError) as err:
admin.get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1/test")
assert err.match('404: b\'{"error":"Group path does not exist".*}\'')
res = admin.get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1/test")
assert res["error"] == "Group path does not exist"
res = admin.get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1")
assert res is not None, res
@ -3947,9 +3946,8 @@ async def test_a_groups(admin: KeycloakAdmin, user: str):
assert res is not None, res
assert res["id"] == subgroup_id_1, res
with pytest.raises(KeycloakGetError) as err:
await admin.a_get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1/test")
assert err.match('404: b\'{"error":"Group path does not exist".*}\'')
res = await admin.a_get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1/test")
assert res["error"] == "Group path does not exist"
res = await admin.a_get_group_by_path(path="/main-group/subgroup-2/subsubgroup-1")
assert res is not None, res

Loading…
Cancel
Save