|
|
@ -581,6 +581,20 @@ class KeycloakOpenID: |
|
|
|
) |
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
|
|
|
|
def _verify_token(self, token, key, **kwargs): |
|
|
|
# keep the function free of IO |
|
|
|
# this way it can be used by `decode_token` and `a_decode_token` |
|
|
|
if key is not None: |
|
|
|
leeway = kwargs.pop("leeway", 60) |
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
full_jwt.leeway = leeway |
|
|
|
full_jwt.validate(key) |
|
|
|
return jwt.json_decode(full_jwt.claims) |
|
|
|
else: |
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
full_jwt.token.objects["valid"] = True |
|
|
|
return json.loads(full_jwt.token.payload.decode("utf-8")) |
|
|
|
|
|
|
|
def decode_token(self, token, validate: bool = True, **kwargs): |
|
|
|
"""Decode user token. |
|
|
|
|
|
|
@ -603,26 +617,12 @@ class KeycloakOpenID: |
|
|
|
:returns: Decoded token |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
if validate: |
|
|
|
if "key" not in kwargs: |
|
|
|
key = ( |
|
|
|
"-----BEGIN PUBLIC KEY-----\n" |
|
|
|
+ self.public_key() |
|
|
|
+ "\n-----END PUBLIC KEY-----" |
|
|
|
) |
|
|
|
key = jwk.JWK.from_pem(key.encode("utf-8")) |
|
|
|
kwargs["key"] = key |
|
|
|
key = kwargs.pop("key", None) |
|
|
|
if validate and key is None: |
|
|
|
key = "-----BEGIN PUBLIC KEY-----\n" + self.public_key() + "\n-----END PUBLIC KEY-----" |
|
|
|
key = jwk.JWK.from_pem(key.encode("utf-8")) |
|
|
|
|
|
|
|
key = kwargs.pop("key") |
|
|
|
leeway = kwargs.pop("leeway", 60) |
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
full_jwt.leeway = leeway |
|
|
|
full_jwt.validate(key) |
|
|
|
return jwt.json_decode(full_jwt.claims) |
|
|
|
else: |
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
full_jwt.token.objects["valid"] = True |
|
|
|
return json.loads(full_jwt.token.payload.decode("utf-8")) |
|
|
|
return self._verify_token(token, key, **kwargs) |
|
|
|
|
|
|
|
def load_authorization_config(self, path): |
|
|
|
"""Load Keycloak settings (authorization). |
|
|
@ -1254,22 +1254,16 @@ class KeycloakOpenID: |
|
|
|
:returns: Decoded token |
|
|
|
:rtype: dict |
|
|
|
""" |
|
|
|
if validate: |
|
|
|
if "key" not in kwargs: |
|
|
|
key = ( |
|
|
|
"-----BEGIN PUBLIC KEY-----\n" |
|
|
|
+ await self.a_public_key() |
|
|
|
+ "\n-----END PUBLIC KEY-----" |
|
|
|
) |
|
|
|
key = jwk.JWK.from_pem(key.encode("utf-8")) |
|
|
|
kwargs["key"] = key |
|
|
|
key = kwargs.pop("key", None) |
|
|
|
if validate and key is None: |
|
|
|
key = ( |
|
|
|
"-----BEGIN PUBLIC KEY-----\n" |
|
|
|
+ await self.a_public_key() |
|
|
|
+ "\n-----END PUBLIC KEY-----" |
|
|
|
) |
|
|
|
key = jwk.JWK.from_pem(key.encode("utf-8")) |
|
|
|
|
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
return jwt.json_decode(full_jwt.claims) |
|
|
|
else: |
|
|
|
full_jwt = jwt.JWT(jwt=token, **kwargs) |
|
|
|
full_jwt.token.objects["valid"] = True |
|
|
|
return json.loads(full_jwt.token.payload.decode("utf-8")) |
|
|
|
return self._verify_token(token, key, **kwargs) |
|
|
|
|
|
|
|
async def a_load_authorization_config(self, path): |
|
|
|
"""Load Keycloak settings (authorization) asynchronously. |
|
|
|