diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 20e5883..bf84239 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -1632,6 +1632,43 @@ class KeycloakAdmin: data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) + def create_client_authz_policy(self, client_id, payload, skip_exists=False): + """Create an authz policy of client. + + Payload example:: + + payload={ + "name": "Policy-time-based", + "type": "time", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "hourEnd": "18", + "hour": "9" + } + } + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :type client_id: str + :param payload: No Document + :type payload: dict + :param skip_exists: Skip creation in case the object exists + :type skip_exists: bool + :return: Keycloak server response + :rtype: bytes + + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + + data_raw = self.connection.raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICIES.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + def create_client_authz_group_based_policy(self, client_id, payload, skip_exists=False): """Create group-based policy of client. @@ -1828,6 +1865,48 @@ class KeycloakAdmin: data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists ) + def create_client_authz_scope_based_permission(self, client_id, payload, skip_exists=False): + """Create scope-based permission of client. + + Payload example:: + + payload={ + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "name": "Permission-Name", + "resources": [ + resource_id + ], + "policies": [ + policy_id + ], + "scopes": [ + scope_id + ] + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation + :type client_id: str + :param payload: PolicyRepresentation + https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_policyrepresentation + :type payload: dict + :param skip_exists: Skip creation in case the object already exists + :type skip_exists: bool + :return: Keycloak server response + :rtype: bytes + + """ + params_path = {"realm-name": self.realm_name, "id": client_id} + + data_raw = self.raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + def get_client_authz_scopes(self, client_id): """Get scopes from client. diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py index 3ea6427..66f2ecc 100644 --- a/src/keycloak/urls_patterns.py +++ b/src/keycloak/urls_patterns.py @@ -125,6 +125,7 @@ URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permi URL_ADMIN_CLIENT_AUTHZ_RESOURCE_BASED_PERMISSION = ( URL_ADMIN_CLIENT_AUTHZ + "/permission/resource?max=-1" ) +URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION = URL_ADMIN_CLIENT_AUTHZ + "/permission/scope?max=-1" URL_ADMIN_CLIENT_AUTHZ_POLICY = URL_ADMIN_CLIENT_AUTHZ + "/policy/{policy-id}" URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/scopes" URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES = URL_ADMIN_CLIENT_AUTHZ_POLICY + "/resources" @@ -216,6 +217,5 @@ URL_ADMIN_CLEAR_KEYS_CACHE = URL_ADMIN_REALM + "/clear-keys-cache" URL_ADMIN_CLEAR_REALM_CACHE = URL_ADMIN_REALM + "/clear-realm-cache" URL_ADMIN_CLEAR_USER_CACHE = URL_ADMIN_REALM + "/clear-user-cache" - # UMA URLS URL_UMA_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/uma2-configuration" diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index 7d6d572..fdbc56b 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -897,6 +897,37 @@ def test_clients(admin: KeycloakAdmin, realm: str): admin.get_client_authz_policy(client_id=auth_client_id, policy_id=res["id"]) assert err.match("404: b''") + res = admin.create_client_authz_policy( + client_id=auth_client_id, + payload={ + "name": "test-authz-policy", + "type": "time", + "config": {"hourEnd": "18", "hour": "9"}, + }, + ) + assert res["name"] == "test-authz-policy", res + + with pytest.raises(KeycloakPostError) as err: + admin.create_client_authz_policy( + client_id=auth_client_id, + payload={ + "name": "test-authz-policy", + "type": "time", + "config": {"hourEnd": "18", "hour": "9"}, + }, + ) + assert err.match('409: b\'{"error":"Policy with name') + assert admin.create_client_authz_policy( + client_id=auth_client_id, + payload={ + "name": "test-authz-policy", + "type": "time", + "config": {"hourEnd": "18", "hour": "9"}, + }, + skip_exists=True, + ) == {"msg": "Already exists"} + assert len(admin.get_client_authz_policies(client_id=auth_client_id)) == 3 + # Test authz permissions res = admin.get_client_authz_permissions(client_id=auth_client_id) assert len(res) == 1, res @@ -939,6 +970,7 @@ def test_clients(admin: KeycloakAdmin, realm: str): client_id=auth_client_id, payload={"name": "test-authz-scope"} ) assert res["name"] == "test-authz-scope", res + test_scope_id = res["id"] with pytest.raises(KeycloakPostError) as err: admin.create_client_authz_scopes( @@ -953,6 +985,40 @@ def test_clients(admin: KeycloakAdmin, realm: str): assert len(res) == 1 assert {x["name"] for x in res} == {"test-authz-scope"} + res = admin.create_client_authz_scope_based_permission( + client_id=auth_client_id, + payload={ + "name": "test-permission-sb", + "resources": [test_resource_id], + "scopes": [test_scope_id], + }, + ) + assert res, res + assert res["name"] == "test-permission-sb" + assert res["resources"] == [test_resource_id] + assert res["scopes"] == [test_scope_id] + + with pytest.raises(KeycloakPostError) as err: + admin.create_client_authz_scope_based_permission( + client_id=auth_client_id, + payload={ + "name": "test-permission-sb", + "resources": [test_resource_id], + "scopes": [test_scope_id], + }, + ) + assert err.match('409: b\'{"error":"Policy with name') + assert admin.create_client_authz_scope_based_permission( + client_id=auth_client_id, + payload={ + "name": "test-permission-sb", + "resources": [test_resource_id], + "scopes": [test_scope_id], + }, + skip_exists=True, + ) == {"msg": "Already exists"} + assert len(admin.get_client_authz_permissions(client_id=auth_client_id)) == 3 + # Test service account user res = admin.get_client_service_account_user(client_id=auth_client_id) assert res["username"] == "service-account-authz-client", res