Browse Source

refactor: refactored decode_token

complete refactor of the decode_token method in oid

BREAKING CHANGE: changes signatures significantly
pull/556/head
Richard Nemeth 8 months ago
parent
commit
4dc5b0ab70
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 6
      docs/source/modules/openid_client.rst
  2. 39
      src/keycloak/keycloak_openid.py
  3. 36
      tests/test_keycloak_openid.py

6
docs/source/modules/openid_client.rst

@ -116,9 +116,9 @@ Decode token
.. code-block:: python .. code-block:: python
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options)
token_info = keycloak_openid.decode_token(token['access_token'])
# Without validation
token_info = keycloak_openid.decode_token(token['access_token'], validate=False)
Get UMA-permissions by token Get UMA-permissions by token

39
src/keycloak/keycloak_openid.py

@ -212,7 +212,7 @@ class KeycloakOpenID:
:type token: str :type token: str
:param method_token_info: Token info method to use :param method_token_info: Token info method to use
:type method_token_info: str :type method_token_info: str
:param kwargs: Additional keyword arguments
:param kwargs: Additional keyword arguments passed to the decode_token method
:type kwargs: dict :type kwargs: dict
:returns: Token info :returns: Token info
:rtype: dict :rtype: dict
@ -516,7 +516,7 @@ class KeycloakOpenID:
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload) data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError) return raise_error_from_response(data_raw, KeycloakPostError)
def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
def decode_token(self, token, validate: bool = True, **kwargs):
"""Decode user token. """Decode user token.
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
@ -530,25 +530,30 @@ class KeycloakOpenID:
:param token: Keycloak token :param token: Keycloak token
:type token: str :type token: str
:param key: Decode key
:type key: str
:param algorithms: Algorithms to use for decoding
:type algorithms: list[str]
:param kwargs: Keyword arguments
:param validate: Determines whether the token should be validated with the public key.
Defaults to True.
:type validate: bool
:param kwargs: Additional keyword arguments for jwcrypto's JWT object
:type kwargs: dict :type kwargs: dict
:returns: Decoded token :returns: Decoded token
:rtype: dict :rtype: dict
""" """
# To keep the same API, we map the python-jose options to our claims for jwcrypto
# Per the jwcrypto dev, `exp` and `nbf` are always checked
options = kwargs.get("options", {})
check_claims = {}
if options.get("verify_aud") is True:
check_claims["aud"] = self.client_id
k = jwk.JWK.from_pem(key.encode("utf-8"))
full_jwt = jwt.JWT(jwt=token, key=k, algs=algorithms, check_claims=check_claims)
return jwt.json_decode(full_jwt.claims)
if validate:
if "key" not in kwargs:
key = (
"-----BEGIN PUBLIC KEY-----\n"
+ self.public_key()
+ "\n-----END PUBLIC KEY-----"
)
key = jwk.JWK.from_pem(key.encode("utf-8"))
kwargs["key"] = key
full_jwt = jwt.JWT(jwt=token, **kwargs)
return jwt.json_decode(full_jwt.claims)
else:
full_jwt = jwt.JWT(jwt=token, **kwargs)
full_jwt.token.objects["valid"] = True
return json.loads(full_jwt.token.payload.decode("utf-8"))
def load_authorization_config(self, path): def load_authorization_config(self, path):
"""Load Keycloak settings (authorization). """Load Keycloak settings (authorization).

36
tests/test_keycloak_openid.py

@ -307,15 +307,13 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
""" """
oid, username, password = oid_with_credentials oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password) token = oid.token(username=username, password=password)
decoded_access_token = oid.decode_token(token=token["access_token"])
decoded_access_token_2 = oid.decode_token(token=token["access_token"], validate=False)
decoded_refresh_token = oid.decode_token(token=token["refresh_token"], validate=False)
assert (
oid.decode_token(
token=token["access_token"],
key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----",
options={"verify_aud": False},
)["preferred_username"]
== username
)
assert decoded_access_token == decoded_access_token_2
assert decoded_access_token["preferred_username"] == username, decoded_access_token
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token
def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
@ -354,20 +352,17 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str
oid.load_authorization_config(path="tests/data/authz_settings.json") oid.load_authorization_config(path="tests/data/authz_settings.json")
assert oid.get_policies(token=token["access_token"]) is None assert oid.get_policies(token=token["access_token"]) is None
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
assert oid.get_policies(token=token["access_token"], method_token_info="decode") == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
] == ["Policy: test (role)"] ] == ["Policy: test (role)"]
assert [ assert [
repr(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
] == ["<Policy: test (role)>"] ] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id
@ -392,12 +387,9 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
oid.load_authorization_config(path="tests/data/authz_settings.json") oid.load_authorization_config(path="tests/data/authz_settings.json")
assert oid.get_permissions(token=token["access_token"]) is None assert oid.get_permissions(token=token["access_token"]) is None
key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
orig_client_id = oid.client_id orig_client_id = oid.client_id
oid.client_id = "account" oid.client_id = "account"
assert (
oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
)
assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile") policy.add_role(role="account/view-profile")
policy.add_permission( policy.add_permission(
@ -408,15 +400,11 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
oid.authorization.policies["test"] = policy oid.authorization.policies["test"] = policy
assert [ assert [
str(x) str(x)
for x in oid.get_permissions(
token=token["access_token"], method_token_info="decode", key=key
)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
] == ["Permission: test-perm (resource)"] ] == ["Permission: test-perm (resource)"]
assert [ assert [
repr(x) repr(x)
for x in oid.get_permissions(
token=token["access_token"], method_token_info="decode", key=key
)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
] == ["<Permission: test-perm (resource)>"] ] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id oid.client_id = orig_client_id

Loading…
Cancel
Save