|
|
@ -4,6 +4,8 @@ from inspect import iscoroutinefunction, signature |
|
|
|
from typing import Tuple |
|
|
|
from unittest import mock |
|
|
|
|
|
|
|
import jwcrypto.jwk |
|
|
|
import jwcrypto.jws |
|
|
|
import pytest |
|
|
|
|
|
|
|
from keycloak import KeycloakAdmin, KeycloakOpenID |
|
|
@ -317,6 +319,39 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token |
|
|
|
|
|
|
|
|
|
|
|
def test_decode_token_validate(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test decode token. |
|
|
|
|
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
token = oid.token(username=username, password=password) |
|
|
|
access_token = token["access_token"] |
|
|
|
decoded_access_token = oid.decode_token(token=access_token) |
|
|
|
|
|
|
|
key = oid.public_key() |
|
|
|
key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" |
|
|
|
key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8")) |
|
|
|
|
|
|
|
invalid_access_token = access_token + "a" |
|
|
|
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): |
|
|
|
decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=True) |
|
|
|
|
|
|
|
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): |
|
|
|
decoded_invalid_access_token = oid.decode_token( |
|
|
|
token=invalid_access_token, validate=True, key=key |
|
|
|
) |
|
|
|
|
|
|
|
decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False) |
|
|
|
assert decoded_access_token == decoded_invalid_access_token |
|
|
|
|
|
|
|
decoded_invalid_access_token = oid.decode_token( |
|
|
|
token=invalid_access_token, validate=False, key=key |
|
|
|
) |
|
|
|
assert decoded_access_token == decoded_invalid_access_token |
|
|
|
|
|
|
|
|
|
|
|
def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test load authorization config. |
|
|
|
|
|
|
@ -765,7 +800,7 @@ async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test decode token. |
|
|
|
"""Test decode token asynchronously. |
|
|
|
|
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
@ -781,6 +816,44 @@ async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, s |
|
|
|
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_decode_token_validate(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): |
|
|
|
"""Test decode token asynchronously. |
|
|
|
|
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
token = await oid.a_token(username=username, password=password) |
|
|
|
access_token = token["access_token"] |
|
|
|
decoded_access_token = await oid.a_decode_token(token=access_token) |
|
|
|
|
|
|
|
key = await oid.a_public_key() |
|
|
|
key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" |
|
|
|
key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8")) |
|
|
|
|
|
|
|
invalid_access_token = access_token + "a" |
|
|
|
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): |
|
|
|
decoded_invalid_access_token = await oid.a_decode_token( |
|
|
|
token=invalid_access_token, validate=True |
|
|
|
) |
|
|
|
|
|
|
|
with pytest.raises(jwcrypto.jws.InvalidJWSSignature): |
|
|
|
decoded_invalid_access_token = await oid.a_decode_token( |
|
|
|
token=invalid_access_token, validate=True, key=key |
|
|
|
) |
|
|
|
|
|
|
|
decoded_invalid_access_token = await oid.a_decode_token( |
|
|
|
token=invalid_access_token, validate=False |
|
|
|
) |
|
|
|
assert decoded_access_token == decoded_invalid_access_token |
|
|
|
|
|
|
|
decoded_invalid_access_token = await oid.a_decode_token( |
|
|
|
token=invalid_access_token, validate=False, key=key |
|
|
|
) |
|
|
|
assert decoded_access_token == decoded_invalid_access_token |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_load_authorization_config( |
|
|
|
oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] |
|
|
|