From 4fa7119367987104c13376d685f31b566be9c873 Mon Sep 17 00:00:00 2001 From: Gabriel Rudloff Date: Fri, 19 Sep 2025 12:45:47 +0000 Subject: [PATCH] feat: support PKCE in authorization flow (RFC 7636) --- src/keycloak/keycloak_openid.py | 36 +++++++++++++++++++++++-- src/keycloak/pkce_utils.py | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 src/keycloak/pkce_utils.py diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 60f85e6..abedc7c 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -282,6 +282,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str = None, + code_challenge_method: str = None, ) -> str: """ Get authorization URL endpoint. @@ -294,6 +296,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -305,7 +311,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url def token( self, @@ -316,6 +327,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str = None, **extra: dict, ) -> dict: """ @@ -342,6 +354,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -357,6 +371,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) @@ -1028,6 +1044,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str = None, + code_challenge_method: str = None, ) -> str: """ Get authorization URL endpoint asynchronously. @@ -1040,6 +1058,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -1051,7 +1073,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url async def a_token( self, @@ -1062,6 +1089,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str = None, **extra: dict, ) -> dict: """ @@ -1088,6 +1116,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -1103,6 +1133,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) diff --git a/src/keycloak/pkce_utils.py b/src/keycloak/pkce_utils.py new file mode 100644 index 0000000..2953bc6 --- /dev/null +++ b/src/keycloak/pkce_utils.py @@ -0,0 +1,48 @@ +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +import base64 +import hashlib +import os +from typing import Tuple + + +def generate_code_verifier(length: int = 128) -> str: + """ + Generates a high-entropy cryptographic random string for PKCE code_verifier. + RFC 7636 recommends a length between 43 and 128 characters. + """ + return base64.urlsafe_b64encode(os.urandom(length)).rstrip(b"=").decode("utf-8")[:length] + +def generate_code_challenge(code_verifier: str, method: str = "S256") -> Tuple[str, str]: + """ + Generates a code_challenge from the code_verifier using the specified method. + Supported methods: "S256" (default), "plain" + Returns (code_challenge, code_challenge_method) + """ + if method == "S256": + code_challenge = base64.urlsafe_b64encode( + hashlib.sha256(code_verifier.encode("utf-8")).digest() + ).rstrip(b"=").decode("utf-8") + return code_challenge, "S256" + if method == "plain": + return code_verifier, "plain" + raise ValueError(f"Unsupported PKCE method: {method}")