diff --git a/docs/source/modules/openid_client.rst b/docs/source/modules/openid_client.rst index a7ae009..e184033 100644 --- a/docs/source/modules/openid_client.rst +++ b/docs/source/modules/openid_client.rst @@ -146,3 +146,35 @@ Get auth status for a specific resource and scope by token token = keycloak_openid.token("user", "password") auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope") + +PKCE Authorization Flow Example +---------------------------------------------- + +.. code-block:: python + + from keycloak import KeycloakOpenID + from keycloak.pkce_utils import generate_code_verifier, generate_code_challenge + + # Configure client + keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/", + client_id="example_client", + realm_name="example_realm") + + # Generate PKCE values + code_verifier = generate_code_verifier() + code_challenge, code_challenge_method = generate_code_challenge(code_verifier) + + # Get Code With Oauth Authorization Request (PKCE) + auth_url = keycloak_openid.auth_url( + redirect_uri="your_call_back_url", + scope="email", + state="your_state_info", + code_challenge=code_challenge, + code_challenge_method=code_challenge_method) + + # Get Access Token With Code (PKCE) + access_token = keycloak_openid.token( + grant_type='authorization_code', + code='the_code_you_get_from_auth_url_callback', + redirect_uri="your_call_back_url", + code_verifier=code_verifier) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index a35d6ac..e4ef560 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -287,6 +287,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str | None = None, + code_challenge_method: str | None = None, ) -> str: """ Get authorization URL endpoint. @@ -299,6 +301,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -310,7 +316,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url def token( self, @@ -321,6 +332,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str | None = None, **extra: dict, ) -> dict: """ @@ -347,6 +359,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -362,6 +376,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) @@ -1033,6 +1049,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str | None = None, + code_challenge_method: str | None = None, ) -> str: """ Get authorization URL endpoint asynchronously. @@ -1045,6 +1063,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -1056,7 +1078,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url async def a_token( self, @@ -1067,6 +1094,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str | None = None, **extra: dict, ) -> dict: """ @@ -1093,6 +1121,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -1108,6 +1138,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) diff --git a/src/keycloak/pkce_utils.py b/src/keycloak/pkce_utils.py new file mode 100644 index 0000000..015bb3f --- /dev/null +++ b/src/keycloak/pkce_utils.py @@ -0,0 +1,55 @@ +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +"""PKCE utility functions for code verifier and code challenge generation.""" + +import base64 +import hashlib +import os + + +def generate_code_verifier(length: int = 128) -> str: + """ + Generate a high-entropy cryptographic random string for PKCE code_verifier. + + RFC 7636 recommends a length between 43 and 128 characters. + """ + return base64.urlsafe_b64encode(os.urandom(length)).rstrip(b"=").decode("utf-8")[:length] + + +def generate_code_challenge(code_verifier: str, method: str = "S256") -> tuple[str, str]: + """ + Generate a code_challenge from the code_verifier using the specified method. + + Supported methods: "S256" (default), "plain". + Returns (code_challenge, code_challenge_method). + """ + if method == "S256": + code_challenge = ( + base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()) + .rstrip(b"=") + .decode("utf-8") + ) + return code_challenge, "S256" + if method == "plain": + return code_verifier, "plain" + error_msg = f"Unsupported PKCE method: {method}" + raise ValueError(error_msg) diff --git a/tests/test_pkce_flow.py b/tests/test_pkce_flow.py new file mode 100644 index 0000000..5e411fd --- /dev/null +++ b/tests/test_pkce_flow.py @@ -0,0 +1,77 @@ +"""Tests for PKCE flow: code verifier and code challenge handling.""" + +import os +import re +import urllib.parse + +import requests +from packaging.version import Version + +from keycloak import KeycloakAdmin, KeycloakOpenID +from keycloak.pkce_utils import generate_code_challenge, generate_code_verifier + + +def test_pkce_auth_url_and_token(env: object, admin: KeycloakAdmin) -> None: + """Test PKCE flow: auth_url includes code_challenge, token includes code_verifier.""" + if os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"] != "latest" and Version( + os.environ["KEYCLOAK_DOCKER_IMAGE_TAG"], + ) <= Version("22"): + return + + client_representation = { + "clientId": "pkce-test", + "enabled": True, + "publicClient": True, + "standardFlowEnabled": True, + "directAccessGrantsEnabled": False, + "serviceAccountsEnabled": False, + "implicitFlowEnabled": False, + "redirectUris": ["http://test.test/callback"], + "webOrigins": ["*"], + } + admin.create_client(client_representation) + + oid = KeycloakOpenID( + server_url=f"http://{env.keycloak_host}:{env.keycloak_port}", + realm_name="master", + client_id="pkce-test", + ) + code_verifier = generate_code_verifier() + code_challenge, code_challenge_method = generate_code_challenge(code_verifier) + + # Build PKCE auth URL + url = oid.auth_url( + redirect_uri="http://test.test/callback", + code_challenge=code_challenge, + code_challenge_method=code_challenge_method, + scope="openid%20email", + ) + assert f"code_challenge={code_challenge}" in url + assert f"code_challenge_method={code_challenge_method}" in url + + session = requests.Session() + resp = session.get(url, allow_redirects=False) + cookies = resp.cookies.get_dict() + assert resp.status_code == 200 + resp_url = re.findall(r"action=\"(.*)\" method", resp.text)[0] + resp = session.post( + resp_url, + data={"username": env.keycloak_admin, "password": env.keycloak_admin_password}, + allow_redirects=False, + cookies=cookies, + ) + assert resp.status_code == 302, resp.text + resp_code = urllib.parse.parse_qs(resp.headers["Location"])["code"][0] + + access_token = oid.token( + grant_type="authorization_code", + code=resp_code, + redirect_uri="http://test.test/callback", + code_verifier=code_verifier, + ) + info = oid.userinfo(access_token["access_token"]) + assert info["preferred_username"] == env.keycloak_admin + + # Cleanup + client_id = admin.get_client_id("pkce-test") + admin.delete_client(client_id)