Browse Source

feat: add initial access token support and policy delete method

* feat: Add policy delete method

* feat: Added initial access token support
pull/426/head v2.14.0
mklassen 2 years ago
committed by GitHub
parent
commit
8f4b49a4d1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      src/keycloak/keycloak_admin.py
  2. 22
      src/keycloak/keycloak_openid.py
  3. 38
      src/keycloak/urls_patterns.py
  4. 47
      tests/test_keycloak_admin.py

54
src/keycloak/keycloak_admin.py

@ -1661,6 +1661,42 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def delete_client_authz_policy(self, client_id, policy_id):
"""Delete a policy from client.
:param client_id: id in ClientRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:type client_id: str
:param policy_id: id in PolicyRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_policyrepresentation
:type policy_id: str
:return: Keycloak server response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id}
data_raw = self.connection.raw_delete(
urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def get_client_authz_policy(self, client_id, policy_id):
"""Get a policy from client.
:param client_id: id in ClientRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:type client_id: str
:param policy_id: id in PolicyRepresentation
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_policyrepresentation
:type policy_id: str
:return: Keycloak server response
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def get_client_service_account_user(self, client_id): def get_client_service_account_user(self, client_id):
"""Get service account user from client. """Get service account user from client.
@ -1812,6 +1848,24 @@ class KeycloakAdmin:
) )
return raise_error_from_response(data_raw, KeycloakDeleteError) return raise_error_from_response(data_raw, KeycloakDeleteError)
def create_initial_access_token(self, count: int = 1, expiration: int = 1):
"""Create an initial access token.
:param count: Number of clients that can be registered
:type count: int
:param expiration: Days until expireation
:type expiration: int
:return: initial access token
:rtype: str
"""
payload = {"count": count, "expiration": expiration}
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLIENT_INITIAL_ACCESS.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200])
def create_client(self, payload, skip_exists=False): def create_client(self, payload, skip_exists=False):
"""Create a client. """Create a client.

22
src/keycloak/keycloak_openid.py

@ -47,6 +47,7 @@ from .uma_permissions import AuthStatus, build_permission_param
from .urls_patterns import ( from .urls_patterns import (
URL_AUTH, URL_AUTH,
URL_CERTS, URL_CERTS,
URL_CLIENT_REGISTRATION,
URL_ENTITLEMENT, URL_ENTITLEMENT,
URL_INTROSPECT, URL_INTROSPECT,
URL_LOGOUT, URL_LOGOUT,
@ -679,3 +680,24 @@ class KeycloakOpenID:
return AuthStatus( return AuthStatus(
is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed
) )
def register_client(self, token: str, payload: dict):
"""Create a client.
ClientRepresentation:
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:param token: Initial access token
:type token: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
data_raw = self.connection.raw_post(
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)

38
src/keycloak/urls_patterns.py

@ -38,6 +38,9 @@ URL_AUTH = (
"&scope={scope}&state={state}" "&scope={scope}&state={state}"
) )
URL_CLIENT_REGISTRATION = URL_REALM + "/clients-registrations/default"
URL_CLIENT_UPDATE = URL_CLIENT_REGISTRATION + "/{client-id}"
# ADMIN URLS # ADMIN URLS
URL_ADMIN_USERS = "admin/realms/{realm-name}/users" URL_ADMIN_USERS = "admin/realms/{realm-name}/users"
URL_ADMIN_USERS_COUNT = "admin/realms/{realm-name}/users/count" URL_ADMIN_USERS_COUNT = "admin/realms/{realm-name}/users/count"
@ -83,6 +86,7 @@ URL_ADMIN_GROUP_CHILD = "admin/realms/{realm-name}/groups/{id}/children"
URL_ADMIN_GROUP_PERMISSIONS = "admin/realms/{realm-name}/groups/{id}/management/permissions" URL_ADMIN_GROUP_PERMISSIONS = "admin/realms/{realm-name}/groups/{id}/management/permissions"
URL_ADMIN_GROUP_MEMBERS = "admin/realms/{realm-name}/groups/{id}/members" URL_ADMIN_GROUP_MEMBERS = "admin/realms/{realm-name}/groups/{id}/members"
URL_ADMIN_CLIENT_INITIAL_ACCESS = "admin/realms/{realm-name}/clients-initial-access"
URL_ADMIN_CLIENTS = "admin/realms/{realm-name}/clients" URL_ADMIN_CLIENTS = "admin/realms/{realm-name}/clients"
URL_ADMIN_CLIENT = URL_ADMIN_CLIENTS + "/{id}" URL_ADMIN_CLIENT = URL_ADMIN_CLIENTS + "/{id}"
URL_ADMIN_CLIENT_ALL_SESSIONS = URL_ADMIN_CLIENT + "/user-sessions" URL_ADMIN_CLIENT_ALL_SESSIONS = URL_ADMIN_CLIENT + "/user-sessions"
@ -106,29 +110,21 @@ URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPE = (
URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPES + "/{client_scope_id}" URL_ADMIN_CLIENT_DEFAULT_CLIENT_SCOPES + "/{client_scope_id}"
) )
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT + "/authz/resource-server/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT + "/authz/resource-server/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy?max=-1&permission=false"
)
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/role?max=-1"
)
URL_ADMIN_CLIENT_AUTHZ = URL_ADMIN_CLIENT + "/authz/resource-server"
URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT_AUTHZ + "/settings"
URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT_AUTHZ + "/resource?max=-1"
URL_ADMIN_CLIENT_AUTHZ_SCOPES = URL_ADMIN_CLIENT_AUTHZ + "/scope?max=-1"
URL_ADMIN_CLIENT_AUTHZ_PERMISSIONS = URL_ADMIN_CLIENT_AUTHZ + "/permission?max=-1"
URL_ADMIN_CLIENT_AUTHZ_POLICIES = URL_ADMIN_CLIENT_AUTHZ + "/policy?max=-1&permission=false"
URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/role?max=-1"
URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = ( URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1"
)
URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/{policy-id}/scopes"
)
URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = (
URL_ADMIN_CLIENT + "/authz/resource-server/policy/{policy-id}/resources"
)
URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = (
URL_ADMIN_CLIENT + "/authz/resource-server/permission/scope/{scope-id}"
URL_ADMIN_CLIENT_AUTHZ + "/permission/resource?max=-1"
) )
URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT + "/authz/resource-server/policy/client"
URL_ADMIN_CLIENT_AUTHZ_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/{policy-id}"
URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/scopes"
URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/resources"
URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope/{scope-id}"
URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/client"
URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user" URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user"
URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}" URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}"

47
tests/test_keycloak_admin.py

@ -1,6 +1,7 @@
"""Test the keycloak admin object.""" """Test the keycloak admin object."""
import copy import copy
import uuid
from typing import Tuple from typing import Tuple
import freezegun import freezegun
@ -832,6 +833,17 @@ def test_clients(admin: KeycloakAdmin, realm: str):
) == {"msg": "Already exists"} ) == {"msg": "Already exists"}
assert len(admin.get_client_authz_policies(client_id=auth_client_id)) == 2 assert len(admin.get_client_authz_policies(client_id=auth_client_id)) == 2
res = admin.create_client_authz_role_based_policy(
client_id=auth_client_id,
payload={"name": "test-authz-rb-policy-delete", "roles": [{"id": role_id}]},
)
res2 = admin.get_client_authz_policy(client_id=auth_client_id, policy_id=res["id"])
assert res["id"] == res2["id"]
admin.delete_client_authz_policy(client_id=auth_client_id, policy_id=res["id"])
with pytest.raises(KeycloakGetError) as err:
admin.get_client_authz_policy(client_id=auth_client_id, policy_id=res["id"])
assert err.match("404: b''")
# Test authz permissions # Test authz permissions
res = admin.get_client_authz_permissions(client_id=auth_client_id) res = admin.get_client_authz_permissions(client_id=auth_client_id)
assert len(res) == 1, res assert len(res) == 1, res
@ -2577,3 +2589,38 @@ def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None:
admin.realm_name = realm admin.realm_name = realm
res = admin.clear_user_cache() res = admin.clear_user_cache()
assert res == {} assert res == {}
def test_initial_access_token(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str]
) -> None:
"""Test initial access token and client creation.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
"""
res = admin.create_initial_access_token(2, 3)
assert "token" in res
assert res["count"] == 2
assert res["expiration"] == 3
oid, username, password = oid_with_credentials
client = str(uuid.uuid4())
secret = str(uuid.uuid4())
res = oid.register_client(
token=res["token"],
payload={
"name": client,
"clientId": client,
"enabled": True,
"publicClient": False,
"protocol": "openid-connect",
"secret": secret,
"clientAuthenticatorType": "client-secret",
},
)
assert res["clientId"] == client
Loading…
Cancel
Save