diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index ae9cd82..b1fe8ec 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -581,6 +581,20 @@ class KeycloakOpenID: ) return raise_error_from_response(data_raw, KeycloakPostError) + def _verify_token(self, token, key, **kwargs): + # keep the function free of IO + # this way it can be used by `decode_token` and `a_decode_token` + if key is not None: + leeway = kwargs.pop("leeway", 60) + full_jwt = jwt.JWT(jwt=token, **kwargs) + full_jwt.leeway = leeway + full_jwt.validate(key) + return jwt.json_decode(full_jwt.claims) + else: + full_jwt = jwt.JWT(jwt=token, **kwargs) + full_jwt.token.objects["valid"] = True + return json.loads(full_jwt.token.payload.decode("utf-8")) + def decode_token(self, token, validate: bool = True, **kwargs): """Decode user token. @@ -603,26 +617,12 @@ class KeycloakOpenID: :returns: Decoded token :rtype: dict """ - if validate: - if "key" not in kwargs: - key = ( - "-----BEGIN PUBLIC KEY-----\n" - + self.public_key() - + "\n-----END PUBLIC KEY-----" - ) - key = jwk.JWK.from_pem(key.encode("utf-8")) - kwargs["key"] = key + key = kwargs.pop("key", None) + if validate and key is None: + key = "-----BEGIN PUBLIC KEY-----\n" + self.public_key() + "\n-----END PUBLIC KEY-----" + key = jwk.JWK.from_pem(key.encode("utf-8")) - key = kwargs.pop("key") - leeway = kwargs.pop("leeway", 60) - full_jwt = jwt.JWT(jwt=token, **kwargs) - full_jwt.leeway = leeway - full_jwt.validate(key) - return jwt.json_decode(full_jwt.claims) - else: - full_jwt = jwt.JWT(jwt=token, **kwargs) - full_jwt.token.objects["valid"] = True - return json.loads(full_jwt.token.payload.decode("utf-8")) + return self._verify_token(token, key, **kwargs) def load_authorization_config(self, path): """Load Keycloak settings (authorization). @@ -1254,22 +1254,16 @@ class KeycloakOpenID: :returns: Decoded token :rtype: dict """ - if validate: - if "key" not in kwargs: - key = ( - "-----BEGIN PUBLIC KEY-----\n" - + await self.a_public_key() - + "\n-----END PUBLIC KEY-----" - ) - key = jwk.JWK.from_pem(key.encode("utf-8")) - kwargs["key"] = key + key = kwargs.pop("key", None) + if validate and key is None: + key = ( + "-----BEGIN PUBLIC KEY-----\n" + + await self.a_public_key() + + "\n-----END PUBLIC KEY-----" + ) + key = jwk.JWK.from_pem(key.encode("utf-8")) - full_jwt = jwt.JWT(jwt=token, **kwargs) - return jwt.json_decode(full_jwt.claims) - else: - full_jwt = jwt.JWT(jwt=token, **kwargs) - full_jwt.token.objects["valid"] = True - return json.loads(full_jwt.token.payload.decode("utf-8")) + return self._verify_token(token, key, **kwargs) async def a_load_authorization_config(self, path): """Load Keycloak settings (authorization) asynchronously.