diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 0c14709..733265f 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -605,7 +605,7 @@ class KeycloakOpenID: return list(set(policies)) def get_permissions(self, token, method_token_info="introspect", **kwargs): - """Get permission by user token. + """Get permission by user token . :param token: user token :type token: str @@ -620,7 +620,7 @@ class KeycloakOpenID: """ if not self.authorization.policies: raise KeycloakAuthorizationConfigError( - "Keycloak settings not found. Load Authorization Keycloak settings." + "Keycloak settings not found. Load Authorization Keycloak settings ." ) token_info = self._token_info(token, method_token_info, **kwargs) @@ -783,3 +783,563 @@ class KeycloakOpenID: URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload) ) return raise_error_from_response(data_raw, KeycloakPutError) + + async def a_well_known(self): + """Get the well_known object asynchronously. + + The most important endpoint to understand is the well-known configuration + endpoint. It lists endpoints and other configuration options relevant to + the OpenID Connect implementation in Keycloak. + + :returns: It lists endpoints and other configuration options relevant + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_WELL_KNOWN.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_auth_url(self, redirect_uri, scope="email", state=""): + """Get authorization URL endpoint asynchronously. + + :param redirect_uri: Redirect url to receive oauth code + :type redirect_uri: str + :param scope: Scope of authorization request, split with the blank space + :type scope: str + :param state: State will be returned to the redirect_uri + :type state: str + :returns: Authorization URL Full Build + :rtype: str + """ + params_path = { + "authorization-endpoint": (await self.a_well_known())["authorization_endpoint"], + "client-id": self.client_id, + "redirect-uri": redirect_uri, + "scope": scope, + "state": state, + } + return URL_AUTH.format(**params_path) + + async def a_token( + self, + username="", + password="", + grant_type=["password"], + code="", + redirect_uri="", + totp=None, + scope="openid", + **extra + ): + """Retrieve user token asynchronously. + + The token endpoint is used to obtain tokens. Tokens can either be obtained by + exchanging an authorization code or by supplying credentials directly depending on + what flow is used. The token endpoint is also used to obtain new access tokens + when they expire. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + :param username: Username + :type username: str + :param password: Password + :type password: str + :param grant_type: Grant type + :type grant_type: str + :param code: Code + :type code: str + :param redirect_uri: Redirect URI + :type redirect_uri: str + :param totp: Time-based one-time password + :type totp: int + :param scope: Scope, defaults to openid + :type scope: str + :param extra: Additional extra arguments + :type extra: dict + :returns: Keycloak token + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = { + "username": username, + "password": password, + "client_id": self.client_id, + "grant_type": grant_type, + "code": code, + "redirect_uri": redirect_uri, + "scope": scope, + } + if extra: + payload.update(extra) + + if totp: + payload["totp"] = totp + + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_refresh_token(self, refresh_token, grant_type=["refresh_token"]): + """Refresh the user token asynchronously. + + The token endpoint is used to obtain tokens. Tokens can either be obtained by + exchanging an authorization code or by supplying credentials directly depending on + what flow is used. The token endpoint is also used to obtain new access tokens + when they expire. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + :param refresh_token: Refresh token from Keycloak + :type refresh_token: str + :param grant_type: Grant type + :type grant_type: str + :returns: New token + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = { + "client_id": self.client_id, + "grant_type": grant_type, + "refresh_token": refresh_token, + } + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_exchange_token( + self, + token: str, + audience: Optional[str] = None, + subject: Optional[str] = None, + subject_token_type: Optional[str] = None, + subject_issuer: Optional[str] = None, + requested_issuer: Optional[str] = None, + requested_token_type: str = "urn:ietf:params:oauth:token-type:refresh_token", + scope: str = "openid", + ) -> dict: + """Exchange user token asynchronously. + + Use a token to obtain an entirely different token. See + https://www.keycloak.org/docs/latest/securing_apps/index.html#_token-exchange + + :param token: Access token + :type token: str + :param audience: Audience + :type audience: str + :param subject: Subject + :type subject: str + :param subject_token_type: Token Type specification + :type subject_token_type: Optional[str] + :param subject_issuer: Issuer + :type subject_issuer: Optional[str] + :param requested_issuer: Issuer + :type requested_issuer: Optional[str] + :param requested_token_type: Token type specification + :type requested_token_type: str + :param scope: Scope, defaults to openid + :type scope: str + :returns: Exchanged token + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = { + "grant_type": ["urn:ietf:params:oauth:grant-type:token-exchange"], + "client_id": self.client_id, + "subject_token": token, + "subject_token_type": subject_token_type, + "subject_issuer": subject_issuer, + "requested_token_type": requested_token_type, + "audience": audience, + "requested_subject": subject, + "requested_issuer": requested_issuer, + "scope": scope, + } + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_userinfo(self, token): + """Get the user info object asynchronously. + + The userinfo endpoint returns standard claims about the authenticated user, + and is protected by a bearer token. + + http://openid.net/specs/openid-connect-core-1_0.html#UserInfo + + :param token: Access token + :type token: str + :returns: Userinfo object + :rtype: dict + """ + self.connection.add_param_headers("Authorization", "Bearer " + token) + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_USERINFO.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_logout(self, refresh_token): + """Log out the authenticated user asynchronously. + + :param refresh_token: Refresh token from Keycloak + :type refresh_token: str + :returns: Keycloak server response + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = {"client_id": self.client_id, "refresh_token": refresh_token} + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_LOGOUT.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) + + async def a_certs(self): + """Get certificates asynchronously. + + The certificate endpoint returns the public keys enabled by the realm, encoded as a + JSON Web Key (JWK). Depending on the realm settings there can be one or more keys enabled + for verifying tokens. + + https://tools.ietf.org/html/rfc7517 + + :returns: Certificates + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_CERTS.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError) + + async def a_public_key(self): + """Retrieve the public key asynchronously. + + The public key is exposed by the realm page directly. + + :returns: The public key + :rtype: str + """ + params_path = {"realm-name": self.realm_name} + data_raw = await self.connection.a_raw_get(URL_REALM.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakGetError)["public_key"] + + async def a_entitlement(self, token, resource_server_id): + """Get entitlements from the token asynchronously. + + Client applications can use a specific endpoint to obtain a special security token + called a requesting party token (RPT). This token consists of all the entitlements + (or permissions) for a user as a result of the evaluation of the permissions and + authorization policies associated with the resources being requested. With an RPT, + client applications can gain access to protected resources at the resource server. + + :param token: Access token + :type token: str + :param resource_server_id: Resource server ID + :type resource_server_id: str + :returns: Entitlements + :rtype: dict + """ + self.connection.add_param_headers("Authorization", "Bearer " + token) + params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id} + data_raw = await self.connection.a_raw_get(URL_ENTITLEMENT.format(**params_path)) + + if data_raw.status_code == 404 or data_raw.status_code == 405: + return raise_error_from_response(data_raw, KeycloakDeprecationError) + + return raise_error_from_response(data_raw, KeycloakGetError) # pragma: no cover + + async def a_introspect(self, token, rpt=None, token_type_hint=None): + """Introspect the user token asynchronously. + + The introspection endpoint is used to retrieve the active state of a token. + It is can only be invoked by confidential clients. + + https://tools.ietf.org/html/rfc7662 + + :param token: Access token + :type token: str + :param rpt: Requesting party token + :type rpt: str + :param token_type_hint: Token type hint + :type token_type_hint: str + + :returns: Token info + :rtype: dict + :raises KeycloakRPTNotFound: In case of RPT not specified + """ + params_path = {"realm-name": self.realm_name} + payload = {"client_id": self.client_id, "token": token} + + if token_type_hint == "requesting_party_token": + if rpt: + payload.update({"token": rpt, "token_type_hint": token_type_hint}) + self.connection.add_param_headers("Authorization", "Bearer " + token) + else: + raise KeycloakRPTNotFound("Can't found RPT.") + + payload = self._add_secret_key(payload) + + data_raw = await self.connection.a_raw_post(URL_INTROSPECT.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_decode_token(self, token, validate: bool = True, **kwargs): + """Decode user token asynchronously. + + A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data + structure that represents a cryptographic key. This specification + also defines a JWK Set JSON data structure that represents a set of + JWKs. Cryptographic algorithms and identifiers for use with this + specification are described in the separate JSON Web Algorithms (JWA) + specification and IANA registries established by that specification. + + https://tools.ietf.org/html/rfc7517 + + :param token: Keycloak token + :type token: str + :param validate: Determines whether the token should be validated with the public key. + Defaults to True. + :type validate: bool + :param kwargs: Additional keyword arguments for jwcrypto's JWT object + :type kwargs: dict + :returns: Decoded token + :rtype: dict + """ + if validate: + if "key" not in kwargs: + key = ( + "-----BEGIN PUBLIC KEY-----\n" + + self.public_key() + + "\n-----END PUBLIC KEY-----" + ) + key = jwk.JWK.from_pem(key.encode("utf-8")) + kwargs["key"] = key + + full_jwt = jwt.JWT(jwt=token, **kwargs) + return jwt.json_decode(full_jwt.claims) + else: + full_jwt = jwt.JWT(jwt=token, **kwargs) + full_jwt.token.objects["valid"] = True + return json.loads(full_jwt.token.payload.decode("utf-8")) + + async def a_load_authorization_config(self, path): + """Load Keycloak settings (authorization) asynchronously. + + :param path: settings file (json) + :type path: str + """ + with open(path, "r") as fp: + authorization_json = json.load(fp) + + self.authorization.load_config(authorization_json) + + async def a_get_policies(self, token, method_token_info="introspect", **kwargs): + """Get policies by user token asynchronously. + + :param token: User token + :type token: str + :param method_token_info: Method for token info decoding + :type method_token_info: str + :param kwargs: Additional keyword arguments + :type kwargs: dict + :return: Policies + :rtype: dict + :raises KeycloakAuthorizationConfigError: In case of bad authorization configuration + :raises KeycloakInvalidTokenError: In case of bad token + """ + if not self.authorization.policies: + raise KeycloakAuthorizationConfigError( + "Keycloak settings not found. Load Authorization Keycloak settings." + ) + + token_info = self._token_info(token, method_token_info, **kwargs) + + if method_token_info == "introspect" and not token_info["active"]: + raise KeycloakInvalidTokenError("Token expired or invalid.") + + user_resources = token_info["resource_access"].get(self.client_id) + + if not user_resources: + return None + + policies = [] + + for policy_name, policy in self.authorization.policies.items(): + for role in user_resources["roles"]: + if self._build_name_role(role) in policy.roles: + policies.append(policy) + + return list(set(policies)) + + async def a_get_permissions(self, token, method_token_info="introspect", **kwargs): + """Get permission by user token asynchronously. + + :param token: user token + :type token: str + :param method_token_info: Decode token method + :type method_token_info: str + :param kwargs: parameters for decode + :type kwargs: dict + :returns: permissions list + :rtype: list + :raises KeycloakAuthorizationConfigError: In case of bad authorization configuration + :raises KeycloakInvalidTokenError: In case of bad token + """ + if not self.authorization.policies: + raise KeycloakAuthorizationConfigError( + "Keycloak settings not found. Load Authorization Keycloak settings." + ) + + token_info = self._token_info(token, method_token_info, **kwargs) + + if method_token_info == "introspect" and not token_info["active"]: + raise KeycloakInvalidTokenError("Token expired or invalid.") + + user_resources = token_info["resource_access"].get(self.client_id) + + if not user_resources: + return None + + permissions = [] + + for policy_name, policy in self.authorization.policies.items(): + for role in user_resources["roles"]: + if self._build_name_role(role) in policy.roles: + permissions += policy.permissions + + return list(set(permissions)) + + async def a_uma_permissions(self, token, permissions=""): + """Get UMA permissions by user token with requested permissions asynchronously. + + The token endpoint is used to retrieve UMA permissions from Keycloak. It can only be + invoked by confidential clients. + + http://openid.net/specs/openid-connect-core-1_0.html#TokenEndpoint + + :param token: user token + :type token: str + :param permissions: list of uma permissions list(resource:scope) requested by the user + :type permissions: str + :returns: Keycloak server response + :rtype: dict + """ + permission = build_permission_param(permissions) + + params_path = {"realm-name": self.realm_name} + payload = { + "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket", + "permission": permission, + "response_mode": "permissions", + "audience": self.client_id, + } + + self.connection.add_param_headers("Authorization", "Bearer " + token) + data_raw = await self.connection.a_raw_post(URL_TOKEN.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_has_uma_access(self, token, permissions): + """Determine whether user has uma permissions with specified user token asynchronously. + + :param token: user token + :type token: str + :param permissions: list of uma permissions (resource:scope) + :type permissions: str + :return: Authentication status + :rtype: AuthStatus + :raises KeycloakAuthenticationError: In case of failed authentication + :raises KeycloakPostError: In case of failed request to Keycloak + """ + needed = build_permission_param(permissions) + try: + granted = await self.a_uma_permissions(token, permissions) + except (KeycloakPostError, KeycloakAuthenticationError) as e: + if e.response_code == 403: # pragma: no cover + return AuthStatus( + is_logged_in=True, is_authorized=False, missing_permissions=needed + ) + elif e.response_code == 401: + return AuthStatus( + is_logged_in=False, is_authorized=False, missing_permissions=needed + ) + raise + + for resource_struct in granted: + resource = resource_struct["rsname"] + scopes = resource_struct.get("scopes", None) + if not scopes: + needed.discard(resource) + continue + for scope in scopes: # pragma: no cover + needed.discard("{}#{}".format(resource, scope)) + + return AuthStatus( + is_logged_in=True, is_authorized=len(needed) == 0, missing_permissions=needed + ) + + async def a_register_client(self, token: str, payload: dict): + """Create a client asynchronously. + + ClientRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :param token: Initial access token + :type token: str + :param payload: ClientRepresentation + :type payload: dict + :return: Client Representation + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + self.connection.add_param_headers("Authorization", "Bearer " + token) + self.connection.add_param_headers("Content-Type", "application/json") + data_raw = await self.connection.a_raw_post( + URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_device(self): + """Get device authorization grant asynchronously. + + The device endpoint is used to obtain a user code verification and user authentication. + The response contains a device_code, user_code, verification_uri, + verification_uri_complete, expires_in (lifetime in seconds for device_code + and user_code), and polling interval. + Users can either follow the verification_uri and enter the user_code or + follow the verification_uri_complete. + After authenticating with valid credentials, users can obtain tokens using the + "urn:ietf:params:oauth:grant-type:device_code" grant_type and the device_code. + + https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow + https://github.com/keycloak/keycloak-community/blob/main/design/oauth2-device-authorization-grant.md#how-to-try-it + + :returns: Device Authorization Response + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + payload = {"client_id": self.client_id} + + payload = self._add_secret_key(payload) + data_raw = await self.connection.a_raw_post(URL_DEVICE.format(**params_path), data=payload) + return raise_error_from_response(data_raw, KeycloakPostError) + + async def a_update_client(self, token: str, client_id: str, payload: dict): + """Update a client asynchronously. + + ClientRepresentation: + https://www.keycloak.org/docs-api/24.0.2/rest-api/index.html#_clientrepresentation + + :param token: registration access token + :type token: str + :param client_id: Keycloak client id + :type client_id: str + :param payload: ClientRepresentation + :type payload: dict + :return: Client Representation + :rtype: dict + """ + params_path = {"realm-name": self.realm_name, "client-id": client_id} + self.connection.add_param_headers("Authorization", "Bearer " + token) + self.connection.add_param_headers("Content-Type", "application/json") + + # Keycloak complains if the clientId is not set in the payload + if "clientId" not in payload: + payload["clientId"] = client_id + + data_raw = await self.connection.a_raw_put( + URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload) + ) + return raise_error_from_response(data_raw, KeycloakPutError) diff --git a/src/keycloak/openid_connection.py b/src/keycloak/openid_connection.py index 583afcd..635586f 100644 --- a/src/keycloak/openid_connection.py +++ b/src/keycloak/openid_connection.py @@ -117,6 +117,7 @@ class KeycloakOpenIDConnection(ConnectionManager): self.headers = {} self.custom_headers = custom_headers + if self.token is None: self.get_token() @@ -418,3 +419,71 @@ class KeycloakOpenIDConnection(ConnectionManager): self._refresh_if_required() r = super().raw_delete(*args, **kwargs) return r + + async def a_raw_get(self, *args, **kwargs): + """Call connection.raw_get. + + If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token + and try *get* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_get(*args, **kwargs) + return r + + async def a_raw_post(self, *args, **kwargs): + """Call connection.raw_post. + + If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token + and try *post* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_post(*args, **kwargs) + return r + + async def a_raw_put(self, *args, **kwargs): + """Call connection.raw_put. + + If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token + and try *put* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_put(*args, **kwargs) + return r + + async def a_raw_delete(self, *args, **kwargs): + """Call connection.raw_delete. + + If auto_refresh is set for *delete* and *access_token* is expired, + it will refresh the token and try *delete* once more. + + :param args: Additional arguments + :type args: tuple + :param kwargs: Additional keyword arguments + :type kwargs: dict + :returns: Response + :rtype: Response + """ + self._refresh_if_required() + r = await super().a_raw_delete(*args, **kwargs) + return r