diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index d8dc62f..43dcf1e 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -4299,7 +4299,7 @@ class KeycloakAdmin: query = query or {} return raise_error_from_response(await self.connection.a_raw_get(url, **query), KeycloakGetError) - def a_get_current_realm(self) -> str: + async def a_get_current_realm(self) -> str: """Return the currently configured realm. :returns: Currently configured realm name @@ -4307,7 +4307,7 @@ class KeycloakAdmin: """ return self.connection.realm_name - def a_change_current_realm(self, realm_name: str) -> None: + async def a_change_current_realm(self, realm_name: str) -> None: """Change the current realm. :param realm_name: The name of the realm to be configured as current @@ -5520,7 +5520,7 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - async def get_client_authz_resources(self, client_id): + async def a_get_client_authz_resources(self, client_id): """Get resources from client. :param client_id: id in ClientRepresentation @@ -6840,7 +6840,7 @@ class KeycloakAdmin: ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) - async def get_group_realm_roles(self, group_id, brief_representation=True): + async def a_get_group_realm_roles(self, group_id, brief_representation=True): """Get all realm roles for a group. :param group_id: id of the group @@ -7211,3 +7211,1136 @@ class KeycloakAdmin: urls_patterns.URL_ADMIN_FLOWS_EXECUTION.format(**params_path) ) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_create_authentication_flow_subflow(self, payload, flow_alias, skip_exists=False): + """Create a new sub authentication flow for a given authentication flow. + + AuthenticationFlowRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticationflowrepresentation + + :param payload: AuthenticationFlowRepresentation + :type payload: dict + :param flow_alias: The flow alias + :type flow_alias: str + :param skip_exists: Do not raise an error if authentication flow already exists + :type skip_exists: bool + :return: Keycloak server response (RoleRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "flow-alias": flow_alias} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_FLOWS_EXECUTIONS_FLOW.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + + async def a_get_authenticator_providers(self): + """Get authenticator providers list. + + :return: Authenticator providers + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_AUTHENTICATOR_PROVIDERS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_authenticator_provider_config_description(self, provider_id): + """Get authenticator's provider configuration description. + + AuthenticatorConfigInfoRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticatorconfiginforepresentation + + :param provider_id: Provider Id + :type provider_id: str + :return: AuthenticatorConfigInfoRepresentation + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "provider-id": provider_id} + data_raw = self.connection.raw_get( + urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG_DESCRIPTION.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_authenticator_config(self, config_id): + """Get authenticator configuration. + + Returns all configuration details. + + :param config_id: Authenticator config id + :type config_id: str + :return: Response(json) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": config_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_update_authenticator_config(self, payload, config_id): + """Update an authenticator configuration. + + AuthenticatorConfigRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authenticatorconfigrepresentation + + :param payload: AuthenticatorConfigRepresentation + :type payload: dict + :param config_id: Authenticator config id + :type config_id: str + :return: Response(json) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": config_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_authenticator_config(self, config_id): + """Delete a authenticator configuration. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_authentication_management_resource + + :param config_id: Authenticator config id + :type config_id: str + :return: Keycloak server Response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": config_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_AUTHENTICATOR_CONFIG.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_sync_users(self, storage_id, action): + """Trigger user sync from provider. + + :param storage_id: The id of the user storage provider + :type storage_id: str + :param action: Action can be "triggerFullSync" or "triggerChangedUsersSync" + :type action: str + :return: Keycloak server response + :rtype: bytes + """ + data = {"action": action} + params_query = {"action": action} + + params_path = {"realm-name": self.connection.realm_name, "id": storage_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_USER_STORAGE.format(**params_path), + data=json.dumps(data), + **params_query, + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_get_client_scopes(self): + """Get client scopes. + + Get representation of the client scopes for the realm where we are connected to + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getclientscopes + + :return: Keycloak server response Array of (ClientScopeRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_scope(self, client_scope_id): + """Get client scope. + + Get representation of the client scopes for the realm where we are connected to + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getclientscopes + + :param client_scope_id: The id of the client scope + :type client_scope_id: str + :return: Keycloak server response (ClientScopeRepresentation) + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "scope-id": client_scope_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_scope_by_name(self, client_scope_name): + """Get client scope by name. + + Get representation of the client scope identified by the client scope name. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getclientscopes + :param client_scope_name: (str) Name of the client scope + :type client_scope_name: str + :returns: ClientScopeRepresentation or None + :rtype: dict + """ + client_scopes = await self.a_get_client_scopes() + for client_scope in client_scopes: + if client_scope["name"] == client_scope_name: + return client_scope + + return None + + async def a_create_client_scope(self, payload, skip_exists=False): + """Create a client scope. + + ClientScopeRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getclientscopes + + :param payload: ClientScopeRepresentation + :type payload: dict + :param skip_exists: If true then do not raise an error if client scope already exists + :type skip_exists: bool + :return: Client scope id + :rtype: str + """ + if skip_exists: + exists = self.get_client_scope_by_name(client_scope_name=payload["name"]) + + if exists is not None: + return exists["id"] + + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_SCOPES.format(**params_path), data=json.dumps(payload) + ) + raise_error_from_response( + data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists + ) + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + + async def a_update_client_scope(self, client_scope_id, payload): + """Update a client scope. + + ClientScopeRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_client_scopes_resource + + :param client_scope_id: The id of the client scope + :type client_scope_id: str + :param payload: ClientScopeRepresentation + :type payload: dict + :return: Keycloak server response (ClientScopeRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "scope-id": client_scope_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_client_scope(self, client_scope_id): + """Delete existing client scope. + + ClientScopeRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_client_scopes_resource + + :param client_scope_id: The id of the client scope + :type client_scope_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "scope-id": client_scope_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_SCOPE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_mappers_from_client_scope(self, client_scope_id): + """Get a list of all mappers connected to the client scope. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_protocol_mappers_resource + :param client_scope_id: Client scope id + :type client_scope_id: str + :returns: Keycloak server response (ProtocolMapperRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "scope-id": client_scope_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) + + async def a_add_mapper_to_client_scope(self, client_scope_id, payload): + """Add a mapper to a client scope. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_create_mapper + + :param client_scope_id: The id of the client scope + :type client_scope_id: str + :param payload: ProtocolMapperRepresentation + :type payload: dict + :return: Keycloak server Response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "scope-id": client_scope_id} + + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_SCOPES_ADD_MAPPER.format(**params_path), + data=json.dumps(payload), + ) + + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + async def a_delete_mapper_from_client_scope(self, client_scope_id, protocol_mapper_id): + """Delete a mapper from a client scope. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_delete_mapper + + :param client_scope_id: The id of the client scope + :type client_scope_id: str + :param protocol_mapper_id: Protocol mapper id + :type protocol_mapper_id: str + :return: Keycloak server Response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "scope-id": client_scope_id, + "protocol-mapper-id": protocol_mapper_id, + } + + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_update_mapper_in_client_scope(self, client_scope_id, protocol_mapper_id, payload): + """Update an existing protocol mapper in a client scope. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_protocol_mappers_resource + + :param client_scope_id: The id of the client scope + :type client_scope_id: str + :param protocol_mapper_id: The id of the protocol mapper which exists in the client scope + and should to be updated + :type protocol_mapper_id: str + :param payload: ProtocolMapperRepresentation + :type payload: dict + :return: Keycloak server Response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "scope-id": client_scope_id, + "protocol-mapper-id": protocol_mapper_id, + } + + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_SCOPES_MAPPERS.format(**params_path), + data=json.dumps(payload), + ) + + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_get_default_default_client_scopes(self): + """Get default default client scopes. + + Return list of default default client scopes + + :return: Keycloak server response + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_delete_default_default_client_scope(self, scope_id): + """Delete default default client scope. + + :param scope_id: default default client scope id + :type scope_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": scope_id} + data_raw = self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_add_default_default_client_scope(self, scope_id): + """Add default default client scope. + + :param scope_id: default default client scope id + :type scope_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": scope_id} + payload = {"realm": self.connection.realm_name, "clientScopeId": scope_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_DEFAULT_DEFAULT_CLIENT_SCOPE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_get_default_optional_client_scopes(self): + """Get default optional client scopes. + + Return list of default optional client scopes + + :return: Keycloak server response + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_delete_default_optional_client_scope(self, scope_id): + """Delete default optional client scope. + + :param scope_id: default optional client scope id + :type scope_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": scope_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_add_default_optional_client_scope(self, scope_id): + """Add default optional client scope. + + :param scope_id: default optional client scope id + :type scope_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": scope_id} + payload = {"realm": self.connection.realm_name, "clientScopeId": scope_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_DEFAULT_OPTIONAL_CLIENT_SCOPE.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_get_mappers_from_client(self, client_id): + """List of all client mappers. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_protocolmapperrepresentation + + :param client_id: Client id + :type client_id: str + :returns: KeycloakServerResponse (list of ProtocolMapperRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path) + ) + + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[200]) + + async def a_add_mapper_to_client(self, client_id, payload): + """Add a mapper to a client. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_create_mapper + + :param client_id: The id of the client + :type client_id: str + :param payload: ProtocolMapperRepresentation + :type payload: dict + :return: Keycloak server Response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPERS.format(**params_path), + data=json.dumps(payload), + ) + + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + async def a_update_client_mapper(self, client_id, mapper_id, payload): + """Update client mapper. + + :param client_id: The id of the client + :type client_id: str + :param mapper_id: The id of the mapper to be deleted + :type mapper_id: str + :param payload: ProtocolMapperRepresentation + :type payload: dict + :return: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "protocol-mapper-id": mapper_id, + } + + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path), + data=json.dumps(payload), + ) + + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_remove_client_mapper(self, client_id, client_mapper_id): + """Remove a mapper from the client. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_protocol_mappers_resource + + :param client_id: The id of the client + :type client_id: str + :param client_mapper_id: The id of the mapper to be deleted + :type client_mapper_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "protocol-mapper-id": client_mapper_id, + } + + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_CLIENT_PROTOCOL_MAPPER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_generate_client_secrets(self, client_id): + """Generate a new secret for the client. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_regeneratesecret + + :param client_id: id of client (not client-id) + :type client_id: str + :return: Keycloak server response (ClientRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path), data=None + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_get_client_secrets(self, client_id): + """Get representation of the client secrets. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getclientsecret + + :param client_id: id of client (not client-id) + :type client_id: str + :return: Keycloak server response (ClientRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SECRETS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_components(self, query=None): + """Get components. + + Return a list of components, filtered according to query parameters + + ComponentRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_componentrepresentation + + :param query: Query parameters (optional) + :type query: dict + :return: components list + :rtype: list + """ + query = query or dict() + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=None, **query + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_create_component(self, payload): + """Create a new component. + + ComponentRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_componentrepresentation + + :param payload: ComponentRepresentation + :type payload: dict + :return: Component id + :rtype: str + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_COMPONENTS.format(**params_path), data=json.dumps(payload) + ) + raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + _last_slash_idx = data_raw.headers["Location"].rindex("/") + return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203 + + async def a_get_component(self, component_id): + """Get representation of the component. + + :param component_id: Component id + + ComponentRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_componentrepresentation + + :param component_id: Id of the component + :type component_id: str + :return: ComponentRepresentation + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "component-id": component_id} + data_raw = await self.connection.a_raw_get(urls_patterns.URL_ADMIN_COMPONENT.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_update_component(self, component_id, payload): + """Update the component. + + :param component_id: Component id + :type component_id: str + :param payload: ComponentRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_componentrepresentation + :type payload: dict + :return: Http response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "component-id": component_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_COMPONENT.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_delete_component(self, component_id): + """Delete the component. + + :param component_id: Component id + :type component_id: str + :return: Http response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "component-id": component_id} + data_raw = self.connection.raw_delete( + urls_patterns.URL_ADMIN_COMPONENT.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) + + async def a_get_keys(self): + """Get keys. + + Return a list of keys, filtered according to query parameters + + KeysMetadataRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_key_resource + + :return: keys list + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_KEYS.format(**params_path), data=None + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_admin_events(self, query=None): + """Get Administrative events. + + Return a list of events, filtered according to query parameters + + AdminEvents Representation array + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getevents + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_get_adminrealmsrealmadmin_events + + :param query: Additional query parameters + :type query: dict + :return: events list + :rtype: list + """ + query = query or dict() + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_ADMIN_EVENTS.format(**params_path), data=None, **query + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_events(self, query=None): + """Get events. + + Return a list of events, filtered according to query parameters + + EventRepresentation array + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_eventrepresentation + + :param query: Additional query parameters + :type query: dict + :return: events list + :rtype: list + """ + query = query or dict() + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_USER_EVENTS.format(**params_path), data=None, **query + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_set_events(self, payload): + """Set realm events configuration. + + RealmEventsConfigRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_realmeventsconfigrepresentation + + :param payload: Payload object for the events configuration + :type payload: dict + :return: Http response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_EVENTS_CONFIG.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[204]) + + async def a_get_client_all_sessions(self, client_id): + """Get sessions associated with the client. + + UserSessionRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_usersessionrepresentation + + :param client_id: id of client + :type client_id: str + :return: UserSessionRepresentation + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_ALL_SESSIONS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_sessions_stats(self): + """Get current session count for all clients with active sessions. + + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_getclientsessionstats + + :return: Dict of clients and session count + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_SESSION_STATS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_management_permissions(self, client_id): + """Get management permissions for a client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_update_client_management_permissions(self, payload, client_id): + """Update management permissions for a client. + + ManagementPermissionReference + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_managementpermissionreference + + Payload example:: + + payload={ + "enabled": true + } + + :param payload: ManagementPermissionReference + :type payload: dict + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_MANAGEMENT_PERMISSIONS.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[200]) + + async def a_get_client_authz_policy_scopes(self, client_id, policy_id): + """Get scopes for a given policy. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param policy_id: No Document + :type policy_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "policy-id": policy_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_SCOPES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_authz_policy_resources(self, client_id, policy_id): + """Get resources for a given policy. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param policy_id: No Document + :type policy_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "policy-id": policy_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_POLICY_RESOURCES.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_client_authz_scope_permission(self, client_id, scope_id): + """Get permissions for a given scope. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param scope_id: No Document + :type scope_id: str + :return: Keycloak server response + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "scope-id": scope_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_create_client_authz_scope_permission(self, payload, client_id): + """Create permissions for a authz scope. + + Payload example:: + + payload={ + "name": "My Permission Name", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [some_resource_id], + "scopes": [some_scope_id], + "policies": [some_policy_id], + } + + :param payload: No Document + :type payload: dict + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_ADD_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + async def a_update_client_authz_scope_permission(self, payload, client_id, scope_id): + """Update permissions for a given scope. + + Payload example:: + + payload={ + "id": scope_id, + "name": "My Permission Name", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "resources": [some_resource_id], + "scopes": [some_scope_id], + "policies": [some_policy_id], + } + + :param payload: No Document + :type payload: dict + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :param scope_id: No Document + :type scope_id: str + :return: Keycloak server response + :rtype: bytes + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "scope-id": scope_id, + } + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_PERMISSION.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPutError, expected_codes=[201]) + + async def a_get_client_authz_client_policies(self, client_id): + """Get policies for a given client. + + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response (RoleRepresentation) + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[200]) + + async def a_create_client_authz_client_policy(self, payload, client_id): + """Create a new policy for a given client. + + Payload example:: + + payload={ + "type": "client", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "name": "My Policy", + "clients": [other_client_id], + } + + :param payload: No Document + :type payload: dict + :param client_id: id in ClientRepresentation + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + :type client_id: str + :return: Keycloak server response (RoleRepresentation) + :rtype: bytes + """ + params_path = {"realm-name": self.connection.realm_name, "id": client_id} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_AUTHZ_CLIENT_POLICY.format(**params_path), + data=json.dumps(payload), + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) + + async def a_get_composite_client_roles_of_group(self, client_id, group_id, brief_representation=True): + """Get the composite client roles of the given group for the given client. + + :param client_id: id of the client. + :type client_id: str + :param group_id: id of the group. + :type group_id: str + :param brief_representation: whether to omit attributes in the response + :type brief_representation: bool + :return: the composite client roles of the group (list of RoleRepresentation). + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": group_id, + "client-id": client_id, + } + params = {"briefRepresentation": brief_representation} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_GROUPS_CLIENT_ROLES_COMPOSITE.format(**params_path), **params + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_get_role_client_level_children(self, client_id, role_id): + """Get the child roles of which the given composite client role is composed of. + + :param client_id: id of the client. + :type client_id: str + :param role_id: id of the role. + :type role_id: str + :return: the child roles (list of RoleRepresentation). + :rtype: list + """ + params_path = { + "realm-name": self.connection.realm_name, + "role-id": role_id, + "client-id": client_id, + } + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_CLIENT_ROLE_CHILDREN.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_upload_certificate(self, client_id, certcont): + """Upload a new certificate for the client. + + :param client_id: id of the client. + :type client_id: str + :param certcont: the content of the certificate. + :type certcont: str + :return: dictionary {"certificate": ""}, + where is the content of the uploaded certificate. + :rtype: dict + """ + params_path = { + "realm-name": self.connection.realm_name, + "id": client_id, + "attr": "jwt.credential", + } + m = MultipartEncoder(fields={"keystoreFormat": "Certificate PEM", "file": certcont}) + new_headers = copy.deepcopy(self.connection.headers) + new_headers["Content-Type"] = m.content_type + self.connection.headers = new_headers + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLIENT_CERT_UPLOAD.format(**params_path), + data=m, + headers=new_headers, + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_get_required_action_by_alias(self, action_alias): + """Get a required action by its alias. + + :param action_alias: the alias of the required action. + :type action_alias: str + :return: the required action (RequiredActionProviderRepresentation). + :rtype: dict + """ + actions = await self.a_get_required_actions() + for a in actions: + if a["alias"] == action_alias: + return a + return None + + async def a_get_required_actions(self): + """Get the required actions for the realms. + + :return: the required actions (list of RequiredActionProviderRepresentation). + :rtype: list + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_REQUIRED_ACTIONS.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_update_required_action(self, action_alias, payload): + """Update a required action. + + :param action_alias: the action alias. + :type action_alias: str + :param payload: the new required action (RequiredActionProviderRepresentation). + :type payload: dict + :return: empty dictionary. + :rtype: dict + """ + if not isinstance(payload, str): + payload = json.dumps(payload) + params_path = {"realm-name": self.connection.realm_name, "action-alias": action_alias} + data_raw = await self.connection.a_raw_put( + urls_patterns.URL_ADMIN_REQUIRED_ACTIONS_ALIAS.format(**params_path), data=payload + ) + return raise_error_from_response(data_raw, KeycloakPutError) + + async def a_get_bruteforce_detection_status(self, user_id): + """Get bruteforce detection status for user. + + :param user_id: User id + :type user_id: str + :return: Bruteforce status. + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_get( + urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_clear_bruteforce_attempts_for_user(self, user_id): + """Clear bruteforce attempts for user. + + :param user_id: User id + :type user_id: str + :return: empty dictionary. + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name, "id": user_id} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) + + async def a_clear_all_bruteforce_attempts(self): + """Clear bruteforce attempts for all users in realm. + + :return: empty dictionary. + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_delete( + urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) + + async def a_clear_keys_cache(self): + """Clear keys cache. + + :return: empty dictionary. + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLEAR_KEYS_CACHE.format(**params_path), data="" + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_clear_realm_cache(self): + """Clear realm cache. + + :return: empty dictionary. + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLEAR_REALM_CACHE.format(**params_path), data="" + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_clear_user_cache(self): + """Clear user cache. + + :return: empty dictionary. + :rtype: dict + """ + params_path = {"realm-name": self.connection.realm_name} + data_raw = await self.connection.a_raw_post( + urls_patterns.URL_ADMIN_CLEAR_USER_CACHE.format(**params_path), data="" + ) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 0c14709..49abbc9 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -783,3 +783,563 @@ class KeycloakOpenID: URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError) + + async def a_well_known(self): + """Get the well_known object. + + The most important endpoint to understand is the well-known configuration + endpoint. It lists endpoints and other configuration options relevant to + the OpenID Connect implementation in Keycloak. + + :returns: It lists endpoints and other configuration options relevant + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_WELL_KNOWN.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_auth_url(self, redirect_uri, scope="email", state=""): + """Get authorization URL endpoint. + + :param redirect_uri: Redirect url to receive oauth code + :type redirect_uri: str + :param scope: Scope of authorization request, split with the blank space + :type scope: str + :param state: State will be returned to the redirect_uri + :type state: str + :returns: Authorization URL Full Build + :rtype: str + """ + params_path = { + "authorization-endpoint": await self.a_well_known()["authorization_endpoint"], + "client-id": self.client_id, + "redirect-uri": redirect_uri, + "scope": scope, + "state": state, + } + return URL_AUTH.format(**params_path) + + async def a_token( + self, + username="", + password="", + grant_type=["password"], + code="", + redirect_uri="", + totp=None, + scope="openid", + **extra + ): + """Retrieve user token. + + The token endpoint is used to obtain tokens. Tokens can either be obtained by + exchanging an authorization code or by supplying credentials directly depending on + what flow is used. The token endpoint is also used to obtain new access tokens + when they expire. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + :param username: Username + :type username: str + :param password: Password + :type password: str + :param grant_type: Grant type + :type grant_type: str + :param code: Code + :type code: str + :param redirect_uri: Redirect URI + :type redirect_uri: str + :param totp: Time-based one-time password + :type totp: int + :param scope: Scope, defaults to openid + :type scope: str + :param extra: Additional extra arguments + :type extra: dict + :returns: Keycloak token + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = { + "username": username, + "password": password, + "client_id": self.client_id, + "grant_type": grant_type, + "code": code, + "redirect_uri": redirect_uri, + "scope": scope, + } + if extra: + payload.update(extra) + + if totp: + payload["totp"] = totp + + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_refresh_token(self, refresh_token, grant_type=["refresh_token"]): + """Refresh the user token. + + The token endpoint is used to obtain tokens. Tokens can either be obtained by + exchanging an authorization code or by supplying credentials directly depending on + what flow is used. The token endpoint is also used to obtain new access tokens + when they expire. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + :param refresh_token: Refresh token from Keycloak + :type refresh_token: str + :param grant_type: Grant type + :type grant_type: str + :returns: New token + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = { + "client_id": self.client_id, + "grant_type": grant_type, + "refresh_token": refresh_token, + } + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_exchange_token( + self, + token: str, + audience: Optional[str] = None, + subject: Optional[str] = None, + subject_token_type: Optional[str] = None, + subject_issuer: Optional[str] = None, + requested_issuer: Optional[str] = None, + requested_token_type: str = "urn:ietf:params:oauth:token-type:refresh_token", + scope: str = "openid", + ) -> dict: + """Exchange user token. + + Use a token to obtain an entirely different token. See + https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange + + :param token: Access token + :type token: str + :param audience: Audience + :type audience: str + :param subject: Subject + :type subject: str + :param subject_token_type: Token Type specification + :type subject_token_type: Optional[str] + :param subject_issuer: Issuer + :type subject_issuer: Optional[str] + :param requested_issuer: Issuer + :type requested_issuer: Optional[str] + :param requested_token_type: Token type specification + :type requested_token_type: str + :param scope: Scope, defaults to openid + :type scope: str + :returns: Exchanged token + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = { + "grant_type": ["urn:ietf:params:oauth:grant-type:token-exchange"], + "client_id": self.client_id, + "subject_token": token, + "subject_token_type": subject_token_type, + "subject_issuer": subject_issuer, + "requested_token_type": requested_token_type, + "audience": audience, + "requested_subject": subject, + "requested_issuer": requested_issuer, + "scope": scope, + } + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_userinfo(self, token): + """Get the user info object. + + The userinfo endpoint returns standard claims about the authenticated user, + and is protected by a bearer token. + + http://openid.net/specs/openid-connect-core-1_0.html#UserInfo + + :param token: Access token + :type token: str + :returns: Userinfo object + :rtype: dict + """ + self.connection.add_param_headers("Authorization", "Bearer " + token) + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_USERINFO.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_logout(self, refresh_token): + """Log out the authenticated user. + + :param refresh_token: Refresh token from Keycloak + :type refresh_token: str + :returns: Keycloak server response + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = {"client_id": self.client_id, "refresh_token": refresh_token} + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_LOGOUT.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_certs(self): + """Get certificates. + + The certificate endpoint returns the public keys enabled by the realm, encoded as a + JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled + for verifying tokens. + + https://tools.ietf.org/html/rfc7517 + + :returns: Certificates + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_CERTS.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_public_key(self): + """Retrieve the public key. + + The public key is exposed by the realm page directly. + + :returns: The public key + :rtype: str + """ + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_REALM.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] + + async def a_entitlement(self, token, resource_server_id): + """Get entitlements from the token. + + Client applications can use a specific endpoint to obtain a special security token + called a requesting party token (RPT). This token consists of all the entitlements + (or permissions) for a user as a result of the evaluation of the permissions and + authorization policies associated with the resources being requested. With an RPT, + client applications can gain access to protected resources at the resource server. + + :param token: Access token + :type token: str + :param resource_server_id: Resource server ID + :type resource_server_id: str + :returns: Entitlements + :rtype: dict + """ + self.connection.add_param_headers("Authorization", "Bearer " + token) + params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id} + data_raw = await self.connection.a_raw_get(URL_ENTITLEMENT.format(**params_path)) + + if data_raw.status_code == 404 or data_raw.status_code == 405: + return raise_error_from_response(data_raw, KeycloakDeprecationError) + + return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover + + async def a_introspect(self, token, rpt=None, token_type_hint=None): + """Introspect the user token. + + The introspection endpoint is used to retrieve the active state of a token. + It is can only be invoked by confidential clients. + + https://tools.ietf.org/html/rfc7662 + + :param token: Access token + :type token: str + :param rpt: Requesting party token + :type rpt: str + :param token_type_hint: Token type hint + :type token_type_hint: str + + :returns: Token info + :rtype: dict + :raises KeycloakRPTNotFound: In case of RPT not specified + """ + params_path = {"realm-name": self.realm_name} + payload = {"client_id": self.client_id, "token": token} + + if token_type_hint == "requesting_party_token": + if rpt: + payload.update({"token": rpt, "token_type_hint": token_type_hint}) + self.connection.add_param_headers("Authorization", "Bearer " + token) + else: + raise KeycloakRPTNotFound("Can't found RPT.") + + payload = self._add_secret_key(payload) + + data_raw = await self.connection.a_raw_post(URL_INTROSPECT.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_decode_token(self, token, validate: bool = True, **kwargs): + """Decode user token. + + A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data + structure that represents a cryptographic key. This specification + also defines a JWK Set JSON data structure that represents a set of + JWKs. Cryptographic algorithms and identifiers for use with this + specification are described in the separate JSON Web Algorithms (JWA) + specification and IANA registries established by that specification. + + https://tools.ietf.org/html/rfc7517 + + :param token: Keycloak token + :type token: str + :param validate: Determines whether the token should be validated with the public key. + Defaults to True. + :type validate: bool + :param kwargs: Additional keyword arguments for jwcrypto's JWT object + :type kwargs: dict + :returns: Decoded token + :rtype: dict + """ + if validate: + if "key" not in kwargs: + key = ( + "-----BEGIN PUBLIC KEY-----\n" + + self.public_key() + + "\n-----END PUBLIC KEY-----" + ) + key = jwk.JWK.from_pem(key.encode("utf-8")) + kwargs["key"] = key + + full_jwt = jwt.JWT(jwt=token, **kwargs) + return jwt.json_decode(full_jwt.claims) + else: + full_jwt = jwt.JWT(jwt=token, **kwargs) + full_jwt.token.objects["valid"] = True + return json.loads(full_jwt.token.payload.decode("utf-8")) + + async def a_load_authorization_config(self, path): + """Load Keycloak settings (authorization). + + :param path: settings file (json) + :type path: str + """ + with open(path, "r") as fp: + authorization_json = json.load(fp) + + self.authorization.load_config(authorization_json) + + async def a_get_policies(self, token, method_token_info="introspect", **kwargs): + """Get policies by user token. + + :param token: User token + :type token: str + :param method_token_info: Method for token info decoding + :type method_token_info: str + :param kwargs: Additional keyword arguments + :type kwargs: dict + :return: Policies + :rtype: dict + :raises KeycloakAuthorizationConfigError: In case of bad authorization configuration + :raises KeycloakInvalidTokenError: In case of bad token + """ + if not self.authorization.policies: + raise KeycloakAuthorizationConfigError( + "Keycloak settings not found. Load Authorization Keycloak settings." + ) + + token_info = self._token_info(token, method_token_info, **kwargs) + + if method_token_info == "introspect" and not token_info["active"]: + raise KeycloakInvalidTokenError("Token expired or invalid.") + + user_resources = token_info["resource_access"].get(self.client_id) + + if not user_resources: + return None + + policies = [] + + for policy_name, policy in self.authorization.policies.items(): + for role in user_resources["roles"]: + if self._build_name_role(role) in policy.roles: + policies.append(policy) + + return list(set(policies)) + + async def a_get_permissions(self, token, method_token_info="introspect", **kwargs): + """Get permission by user token. + + :param token: user token + :type token: str + :param method_token_info: Decode token method + :type method_token_info: str + :param kwargs: parameters for decode + :type kwargs: dict + :returns: permissions list + :rtype: list + :raises KeycloakAuthorizationConfigError: In case of bad authorization configuration + :raises KeycloakInvalidTokenError: In case of bad token + """ + if not self.authorization.policies: + raise KeycloakAuthorizationConfigError( + "Keycloak settings not found. Load Authorization Keycloak settings." + ) + + token_info = self._token_info(token, method_token_info, **kwargs) + + if method_token_info == "introspect" and not token_info["active"]: + raise KeycloakInvalidTokenError("Token expired or invalid.") + + user_resources = token_info["resource_access"].get(self.client_id) + + if not user_resources: + return None + + permissions = [] + + for policy_name, policy in self.authorization.policies.items(): + for role in user_resources["roles"]: + if self._build_name_role(role) in policy.roles: + permissions += policy.permissions + + return list(set(permissions)) + + async def a_uma_permissions(self, token, permissions=""): + """Get UMA permissions by user token with requested permissions. + + The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be + invoked by confidential clients. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + :param token: user token + :type token: str + :param permissions: list of uma permissions list(resource:scope) requested by the user + :type permissions: str + :returns: Keycloak server response + :rtype: dict + """ + permission = build_permission_param(permissions) + + params_path = {"realm-name": self.realm_name} + payload = { + "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", + "permission": permission, + "response_mode": "permissions", + "audience": self.client_id, + } + + self.connection.add_param_headers("Authorization", "Bearer " + token) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_has_uma_access(self, token, permissions): + """Determine whether user has uma permissions with specified user token. + + :param token: user token + :type token: str + :param permissions: list of uma permissions (resource:scope) + :type permissions: str + :return: Authentication status + :rtype: AuthStatus + :raises KeycloakAuthenticationError: In case of failed authentication + :raises KeycloakPostError: In case of failed request to Keycloak + """ + needed = build_permission_param(permissions) + try: + granted = await self.a_uma_permissions(token, permissions) + except (KeycloakPostError, KeycloakAuthenticationError) as e: + if e.response_code == 403: # pragma: no cover + return AuthStatus( + is_logged_in=True, is_authorized=False, missing_permissions=needed + ) + elif e.response_code == 401: + return AuthStatus( + is_logged_in=False, is_authorized=False, missing_permissions=needed + ) + raise + + for resource_struct in granted: + resource = resource_struct["rsname"] + scopes = resource_struct.get("scopes", None) + if not scopes: + needed.discard(resource) + continue + for scope in scopes: # pragma: no cover + needed.discard("{}#{}".format(resource, scope)) + + return AuthStatus( + is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed + ) + + async def a_register_client(self, token: str, payload: dict): + """Create a client. + + ClientRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :param token: Initial access token + :type token: str + :param payload: ClientRepresentation + :type payload: dict + :return: Client Representation + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + self.connection.add_param_headers("Authorization", "Bearer " + token) + self.connection.add_param_headers("Content-Type", "application/json") + data_raw = await self.connection.a_raw_post( + URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_device(self): + """Get device authorization grant. + + The device endpoint is used to obtain a user code verification and user authentication. + The response contains a device_code, user_code, verification_uri, + verification_uri_complete, expires_in (lifetime in seconds for device_code + and user_code), and polling interval. + Users can either follow the verification_uri and enter the user_code or + follow the verification_uri_complete. + After authenticating with valid credentials, users can obtain tokens using the + "urn:ietf:params:oauth:grant-type:device_code" grant_type and the device_code. + + https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow + https://github.com/keycloak/keycloak-community/blob/main/design/oauth2-device-authorization-grant.md#how-to-try-it + + :returns: Device Authorization Response + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = {"client_id": self.client_id} + + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_DEVICE.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_update_client(self, token: str, client_id: str, payload: dict): + """Update a client. + + ClientRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :param token: registration access token + :type token: str + :param client_id: Keycloak client id + :type client_id: str + :param payload: ClientRepresentation + :type payload: dict + :return: Client Representation + :rtype: dict + """ + params_path = {"realm-name": self.realm_name, "client-id": client_id} + self.connection.add_param_headers("Authorization", "Bearer " + token) + self.connection.add_param_headers("Content-Type", "application/json") + + # Keycloak complains if the clientId is not set in the payload + if "clientId" not in payload: + payload["clientId"] = client_id + + data_raw = await self.connection.a_raw_put( + URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError)