From 4fa7119367987104c13376d685f31b566be9c873 Mon Sep 17 00:00:00 2001 From: Gabriel Rudloff Date: Fri, 19 Sep 2025 12:45:47 +0000 Subject: [PATCH 1/4] feat: support PKCE in authorization flow (RFC 7636) --- src/keycloak/keycloak_openid.py | 36 +++++++++++++++++++++++-- src/keycloak/pkce_utils.py | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 src/keycloak/pkce_utils.py diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 60f85e6..abedc7c 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -282,6 +282,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str = None, + code_challenge_method: str = None, ) -> str: """ Get authorization URL endpoint. @@ -294,6 +296,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -305,7 +311,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url def token( self, @@ -316,6 +327,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str = None, **extra: dict, ) -> dict: """ @@ -342,6 +354,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -357,6 +371,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) @@ -1028,6 +1044,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str = None, + code_challenge_method: str = None, ) -> str: """ Get authorization URL endpoint asynchronously. @@ -1040,6 +1058,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -1051,7 +1073,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url async def a_token( self, @@ -1062,6 +1089,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str = None, **extra: dict, ) -> dict: """ @@ -1088,6 +1116,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -1103,6 +1133,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) diff --git a/src/keycloak/pkce_utils.py b/src/keycloak/pkce_utils.py new file mode 100644 index 0000000..2953bc6 --- /dev/null +++ b/src/keycloak/pkce_utils.py @@ -0,0 +1,48 @@ +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +import base64 +import hashlib +import os +from typing import Tuple + + +def generate_code_verifier(length: int = 128) -> str: + """ + Generates a high-entropy cryptographic random string for PKCE code_verifier. + RFC 7636 recommends a length between 43 and 128 characters. + """ + return base64.urlsafe_b64encode(os.urandom(length)).rstrip(b"=").decode("utf-8")[:length] + +def generate_code_challenge(code_verifier: str, method: str = "S256") -> Tuple[str, str]: + """ + Generates a code_challenge from the code_verifier using the specified method. + Supported methods: "S256" (default), "plain" + Returns (code_challenge, code_challenge_method) + """ + if method == "S256": + code_challenge = base64.urlsafe_b64encode( + hashlib.sha256(code_verifier.encode("utf-8")).digest() + ).rstrip(b"=").decode("utf-8") + return code_challenge, "S256" + if method == "plain": + return code_verifier, "plain" + raise ValueError(f"Unsupported PKCE method: {method}") From d1879b456bb826edce0e8f60e48b60c836709242 Mon Sep 17 00:00:00 2001 From: Gabriel Rudloff Date: Fri, 19 Sep 2025 12:46:04 +0000 Subject: [PATCH 2/4] test: add tests for PKCE flow --- tests/test_pkce_flow.py | 49 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 tests/test_pkce_flow.py diff --git a/tests/test_pkce_flow.py b/tests/test_pkce_flow.py new file mode 100644 index 0000000..3823f7f --- /dev/null +++ b/tests/test_pkce_flow.py @@ -0,0 +1,49 @@ +from unittest import mock + +from keycloak import KeycloakOpenID +from keycloak.pkce_utils import generate_code_challenge, generate_code_verifier + + +def test_pkce_auth_url_and_token(env): + """ + Test PKCE flow: auth_url includes code_challenge, token includes code_verifier. + """ + oid = KeycloakOpenID( + server_url=f"http://{env.keycloak_host}:{env.keycloak_port}", + realm_name="master", + client_id="admin-cli", + ) + code_verifier = generate_code_verifier() + code_challenge, code_challenge_method = generate_code_challenge(code_verifier) + + # Build PKCE auth URL + url = oid.auth_url( + redirect_uri="http://test.test/*", + code_challenge=code_challenge, + code_challenge_method=code_challenge_method, + ) + assert f"code_challenge={code_challenge}" in url + assert f"code_challenge_method={code_challenge_method}" in url + + # Simulate token exchange with PKCE + # This part would require a real code from Keycloak, so we mock the response + with mock.patch.object(oid, "token", return_value={ + "access_token": mock.ANY, + "refresh_token": mock.ANY, + "token_type": "Bearer", + }) as mocked_token: + token = oid.token( + grant_type="authorization_code", + code="dummy_code", + redirect_uri="http://test.test/*", + code_verifier=code_verifier, + ) + mocked_token.assert_called_with( + grant_type="authorization_code", + code="dummy_code", + redirect_uri="http://test.test/*", + code_verifier=code_verifier, + ) + assert "access_token" in token + assert "refresh_token" in token + assert token["token_type"] == "Bearer" From 9f0c76af1cad43cae6b79590f1b06b851e49be0f Mon Sep 17 00:00:00 2001 From: Gabriel Rudloff Date: Fri, 19 Sep 2025 12:46:10 +0000 Subject: [PATCH 3/4] docs: document PKCE usage in OpenID client --- docs/source/modules/openid_client.rst | 32 +++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/docs/source/modules/openid_client.rst b/docs/source/modules/openid_client.rst index c3c0c90..3e21b78 100644 --- a/docs/source/modules/openid_client.rst +++ b/docs/source/modules/openid_client.rst @@ -145,3 +145,35 @@ Get auth status for a specific resource and scope by token token = keycloak_openid.token("user", "password") auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope") + +PKCE Authorization Flow Example +---------------------------------------------- + +.. code-block:: python + + from keycloak import KeycloakOpenID + from keycloak.pkce_utils import generate_code_verifier, generate_code_challenge + + # Configure client + keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/", + client_id="example_client", + realm_name="example_realm") + + # Generate PKCE values + code_verifier = generate_code_verifier() + code_challenge, code_challenge_method = generate_code_challenge(code_verifier) + + # Get Code With Oauth Authorization Request (PKCE) + auth_url = keycloak_openid.auth_url( + redirect_uri="your_call_back_url", + scope="email", + state="your_state_info", + code_challenge=code_challenge, + code_challenge_method=code_challenge_method) + + # Get Access Token With Code (PKCE) + access_token = keycloak_openid.token( + grant_type='authorization_code', + code='the_code_you_get_from_auth_url_callback', + redirect_uri="your_call_back_url", + code_verifier=code_verifier) From a9409b8a36eed0e8ce70dab02aa559fad4c86a15 Mon Sep 17 00:00:00 2001 From: Gabriel Rudloff Date: Mon, 22 Sep 2025 07:19:31 +0000 Subject: [PATCH 4/4] chore: address linting and formatting issues --- src/keycloak/keycloak_openid.py | 12 ++++++------ src/keycloak/pkce_utils.py | 27 +++++++++++++++++---------- tests/test_pkce_flow.py | 24 ++++++++++++++---------- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index abedc7c..9f26bfd 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -282,8 +282,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", - code_challenge: str = None, - code_challenge_method: str = None, + code_challenge: str | None = None, + code_challenge_method: str | None = None, ) -> str: """ Get authorization URL endpoint. @@ -327,7 +327,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", - code_verifier: str = None, + code_verifier: str | None = None, **extra: dict, ) -> dict: """ @@ -1044,8 +1044,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", - code_challenge: str = None, - code_challenge_method: str = None, + code_challenge: str | None = None, + code_challenge_method: str | None = None, ) -> str: """ Get authorization URL endpoint asynchronously. @@ -1089,7 +1089,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", - code_verifier: str = None, + code_verifier: str | None = None, **extra: dict, ) -> dict: """ diff --git a/src/keycloak/pkce_utils.py b/src/keycloak/pkce_utils.py index 2953bc6..015bb3f 100644 --- a/src/keycloak/pkce_utils.py +++ b/src/keycloak/pkce_utils.py @@ -19,30 +19,37 @@ # COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +"""PKCE utility functions for code verifier and code challenge generation.""" + import base64 import hashlib import os -from typing import Tuple def generate_code_verifier(length: int = 128) -> str: """ - Generates a high-entropy cryptographic random string for PKCE code_verifier. + Generate a high-entropy cryptographic random string for PKCE code_verifier. + RFC 7636 recommends a length between 43 and 128 characters. """ return base64.urlsafe_b64encode(os.urandom(length)).rstrip(b"=").decode("utf-8")[:length] -def generate_code_challenge(code_verifier: str, method: str = "S256") -> Tuple[str, str]: + +def generate_code_challenge(code_verifier: str, method: str = "S256") -> tuple[str, str]: """ - Generates a code_challenge from the code_verifier using the specified method. - Supported methods: "S256" (default), "plain" - Returns (code_challenge, code_challenge_method) + Generate a code_challenge from the code_verifier using the specified method. + + Supported methods: "S256" (default), "plain". + Returns (code_challenge, code_challenge_method). """ if method == "S256": - code_challenge = base64.urlsafe_b64encode( - hashlib.sha256(code_verifier.encode("utf-8")).digest() - ).rstrip(b"=").decode("utf-8") + code_challenge = ( + base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()) + .rstrip(b"=") + .decode("utf-8") + ) return code_challenge, "S256" if method == "plain": return code_verifier, "plain" - raise ValueError(f"Unsupported PKCE method: {method}") + error_msg = f"Unsupported PKCE method: {method}" + raise ValueError(error_msg) diff --git a/tests/test_pkce_flow.py b/tests/test_pkce_flow.py index 3823f7f..c52b465 100644 --- a/tests/test_pkce_flow.py +++ b/tests/test_pkce_flow.py @@ -1,13 +1,13 @@ +"""Tests for PKCE flow: code verifier and code challenge handling.""" + from unittest import mock from keycloak import KeycloakOpenID from keycloak.pkce_utils import generate_code_challenge, generate_code_verifier -def test_pkce_auth_url_and_token(env): - """ - Test PKCE flow: auth_url includes code_challenge, token includes code_verifier. - """ +def test_pkce_auth_url_and_token(env: object) -> None: + """Test PKCE flow: auth_url includes code_challenge, token includes code_verifier.""" oid = KeycloakOpenID( server_url=f"http://{env.keycloak_host}:{env.keycloak_port}", realm_name="master", @@ -27,11 +27,15 @@ def test_pkce_auth_url_and_token(env): # Simulate token exchange with PKCE # This part would require a real code from Keycloak, so we mock the response - with mock.patch.object(oid, "token", return_value={ - "access_token": mock.ANY, - "refresh_token": mock.ANY, - "token_type": "Bearer", - }) as mocked_token: + with mock.patch.object( + oid, + "token", + return_value={ + "access_token": mock.ANY, + "refresh_token": mock.ANY, + "token_type": "Bearer", + }, + ) as mocked_token: token = oid.token( grant_type="authorization_code", code="dummy_code", @@ -46,4 +50,4 @@ def test_pkce_auth_url_and_token(env): ) assert "access_token" in token assert "refresh_token" in token - assert token["token_type"] == "Bearer" + assert token["token_type"] == "Bearer" # noqa: S105