From 667d1e088e352f4262cc5cc35d6f69b337d9d513 Mon Sep 17 00:00:00 2001 From: Erik Cederstrand Date: Mon, 13 Jun 2022 12:50:17 +0200 Subject: [PATCH 1/2] feat: support token exchange config via admin API This adds support for the basic endpoints necessary to configure client-to-client token exchange. The /authz API is lacking official documentation. Basic docs added to docstrings instead. --- src/keycloak/keycloak_admin.py | 143 +++++++++++++++++++++++++++++++++ src/keycloak/urls_patterns.py | 11 +++ 2 files changed, 154 insertions(+) diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 7f82679..942edc9 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -2774,3 +2774,146 @@ class KeycloakAdmin: params_path = {"realm-name": self.realm_name} data_raw = self.raw_get(urls_patterns.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path)) return raise_error_from_response(data_raw, KeycloakGetError) + + def get_client_management_permissions(self, client_id): + """ + Get management permissions for a client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :return: Keycloak server response + """ + params_path = {"realm-name": self.realm_name, "id": client_id} + data_raw = self.raw_get( + urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + def update_client_management_permissions(self, payload, client_id): + """ + Update management permissions for a client. + + ManagementPermissionReference + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_managementpermissionreference + + :param payload: ManagementPermissionReference + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :return: Keycloak server response + + + Payload example:: + + payload={ + "enabled": true + } + """ + params_path = {"realm-name": self.realm_name, "id": client_id} + data_raw = self.raw_put( + urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[200]) + + def get_client_authz_policy_scopes(self, client_id, policy_id): + """ + Get scopes for a given policy. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :param policy_id: No Document + :return: Keycloak server response + """ + params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id} + data_raw = self.raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + def get_client_authz_policy_resources(self, client_id, policy_id): + """ + Get resources for a given policy. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :param policy_id: No Document + :return: Keycloak server response + """ + params_path = {"realm-name": self.realm_name, "id": client_id, "policy-id": policy_id} + data_raw = self.raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + def get_client_authz_scope_permission(self, client_id, scope_id): + """ + Get permissions for a given scope. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :param scope_id: No Document + :return: Keycloak server response + """ + params_path = {"realm-name": self.realm_name, "id": client_id, "scope-id": scope_id} + data_raw = self.raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + def update_client_authz_scope_permission(self, payload, client_id, scope_id): + """ + Update permissions for a given scope. + + :param payload: No Document + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :param scope_id: No Document + :return: Keycloak server response + + + Payload example:: + + payload={ + "id": scope_id, + "name": "My Permission Name", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [some_resource_id], + "scopes": [some_scope_id], + "policies": [some_policy_id], + } + """ + params_path = {"realm-name": self.realm_name, "id": client_id, "scope-id": scope_id} + data_raw = self.raw_put( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[201]) + + def create_client_authz_client_policy(self, payload, client_id): + """ + Create a new policy for a given client. + + :param payload: No Document + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :return: Keycloak server response (RoleRepresentation) + + + Payload example:: + + payload={ + "type": "client", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "name": "My Policy", + "clients": [other_client_id], + } + """ + params_path = {"realm-name": self.realm_name, "id": client_id} + data_raw = self.raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py index 16f348a..3ec134c 100644 --- a/src/keycloak/urls_patterns.py +++ b/src/keycloak/urls_patterns.py @@ -87,6 +87,7 @@ URL_ADMIN_CLIENT_ROLE = URL_ADMIN_CLIENT + "/roles/{role-name}" URL_ADMIN_CLIENT_ROLES_COMPOSITE_CLIENT_ROLE = URL_ADMIN_CLIENT_ROLE + "/composites" URL_ADMIN_CLIENT_ROLE_MEMBERS = URL_ADMIN_CLIENT + "/roles/{role-name}/users" URL_ADMIN_CLIENT_ROLE_GROUPS = URL_ADMIN_CLIENT + "/roles/{role-name}/groups" +URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS = URL_ADMIN_CLIENT + "/management/permissions" URL_ADMIN_CLIENT_AUTHZ_SETTINGS = URL_ADMIN_CLIENT + "/authz/resource-server/settings" URL_ADMIN_CLIENT_AUTHZ_RESOURCES = URL_ADMIN_CLIENT + "/authz/resource-server/resource?max=-1" @@ -101,6 +102,16 @@ URL_ADMIN_CLIENT_AUTHZ_ROLE_BASED_POLICY = ( URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = ( URL_ADMIN_CLIENT + "/authz/resource-server/permission/resource?max=-1" ) +URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = ( + URL_ADMIN_CLIENT + "/authz/resource-server/policy/{policy-id}/scopes" +) +URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = ( + URL_ADMIN_CLIENT + "/authz/resource-server/policy/{policy-id}/resources" +) +URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION = ( + URL_ADMIN_CLIENT + "/authz/resource-server/permission/scope/{scope-id}" +) +URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY = URL_ADMIN_CLIENT + "/authz/resource-server/policy/client" URL_ADMIN_CLIENT_SERVICE_ACCOUNT_USER = URL_ADMIN_CLIENT + "/service-account-user" URL_ADMIN_CLIENT_CERTS = URL_ADMIN_CLIENT + "/certificates/{attr}" From 39706bcc68df8ad3c54e4611dfbb95fd80ebb3ed Mon Sep 17 00:00:00 2001 From: Erik Cederstrand Date: Mon, 13 Jun 2022 14:17:38 +0200 Subject: [PATCH 2/2] ci: add test case for token exchange setup --- tests/test_keycloak_admin.py | 80 ++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index ad4e2ce..74cdc14 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -1176,6 +1176,86 @@ def test_client_roles(admin: KeycloakAdmin, client: str): assert err.match('404: b\'{"error":"Could not find role"}\'') +def test_enable_token_exchange(admin: KeycloakAdmin, realm: str): + # Test enabling token exchange between two confidential clients + admin.realm_name = realm + + # Create test clients + source_client_id = admin.create_client( + payload={"name": "Source Client", "clientId": "source-client"} + ) + target_client_id = admin.create_client( + payload={"name": "Target Client", "clientId": "target-client"} + ) + for c in admin.get_clients(): + if c["clientId"] == "realm-management": + realm_management_id = c["id"] + break + else: + raise AssertionError("Missing realm management client") + + # Enable permissions on the Superset client + admin.update_client_management_permissions( + payload={"enabled": True}, client_id=target_client_id + ) + + # Fetch various IDs and strings needed when creating the permission + token_exchange_permission_id = admin.get_client_management_permissions( + client_id=target_client_id + )["scopePermissions"]["token-exchange"] + scopes = admin.get_client_authz_policy_scopes( + client_id=realm_management_id, policy_id=token_exchange_permission_id + ) + + for s in scopes: + if s["name"] == "token-exchange": + token_exchange_scope_id = s["id"] + break + else: + raise AssertionError("Missing token-exchange scope") + + resources = admin.get_client_authz_policy_resources( + client_id=realm_management_id, policy_id=token_exchange_permission_id + ) + for r in resources: + if r["name"] == f"client.resource.{target_client_id}": + token_exchange_resource_id = r["_id"] + break + else: + raise AssertionError("Missing client resource") + + # Create a client policy for source client + client_policy_id = admin.create_client_authz_client_policy( + payload={ + "type": "client", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "name": "Exchange source client token with target client token", + "clients": [source_client_id], + }, + client_id=realm_management_id, + )["id"] + + # Update permissions on the target client to reference this policy + permission_name = admin.get_client_authz_scope_permission( + client_id=realm_management_id, scope_id=token_exchange_permission_id + )["name"] + admin.update_client_authz_scope_permission( + payload={ + "id": token_exchange_permission_id, + "name": permission_name, + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [token_exchange_resource_id], + "scopes": [token_exchange_scope_id], + "policies": [client_policy_id], + }, + client_id=realm_management_id, + scope_id=token_exchange_permission_id, + ) + + def test_email(admin: KeycloakAdmin, user: str): # Emails will fail as we don't have SMTP test setup with pytest.raises(KeycloakPutError) as err: