diff --git a/docs/source/modules/openid_client.rst b/docs/source/modules/openid_client.rst index c3c0c90..3e21b78 100644 --- a/docs/source/modules/openid_client.rst +++ b/docs/source/modules/openid_client.rst @@ -145,3 +145,35 @@ Get auth status for a specific resource and scope by token token = keycloak_openid.token("user", "password") auth_status = keycloak_openid.has_uma_access(token['access_token'], "Resource#Scope") + +PKCE Authorization Flow Example +---------------------------------------------- + +.. code-block:: python + + from keycloak import KeycloakOpenID + from keycloak.pkce_utils import generate_code_verifier, generate_code_challenge + + # Configure client + keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/", + client_id="example_client", + realm_name="example_realm") + + # Generate PKCE values + code_verifier = generate_code_verifier() + code_challenge, code_challenge_method = generate_code_challenge(code_verifier) + + # Get Code With Oauth Authorization Request (PKCE) + auth_url = keycloak_openid.auth_url( + redirect_uri="your_call_back_url", + scope="email", + state="your_state_info", + code_challenge=code_challenge, + code_challenge_method=code_challenge_method) + + # Get Access Token With Code (PKCE) + access_token = keycloak_openid.token( + grant_type='authorization_code', + code='the_code_you_get_from_auth_url_callback', + redirect_uri="your_call_back_url", + code_verifier=code_verifier) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 8e603b6..72993d7 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -282,6 +282,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str | None = None, + code_challenge_method: str | None = None, ) -> str: """ Get authorization URL endpoint. @@ -294,6 +296,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -305,7 +311,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url def token( self, @@ -316,6 +327,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str | None = None, **extra: dict, ) -> dict: """ @@ -342,6 +354,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -357,6 +371,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) @@ -1028,6 +1044,8 @@ class KeycloakOpenID: scope: str = "email", state: str = "", nonce: str = "", + code_challenge: str | None = None, + code_challenge_method: str | None = None, ) -> str: """ Get authorization URL endpoint asynchronously. @@ -1040,6 +1058,10 @@ class KeycloakOpenID: :type state: str :param nonce: Associates a Client session with an ID Token to mitigate replay attacks :type nonce: str + :param code_challenge: PKCE code challenge + :type code_challenge: str + :param code_challenge_method: PKCE code challenge method + :type code_challenge_method: str :returns: Authorization URL Full Build :rtype: str """ @@ -1051,7 +1073,12 @@ class KeycloakOpenID: "state": state, "nonce": nonce, } - return URL_AUTH.format(**params_path) + url = URL_AUTH.format(**params_path) + if code_challenge: + url += f"&code_challenge={code_challenge}" + if code_challenge_method: + url += f"&code_challenge_method={code_challenge_method}" + return url async def a_token( self, @@ -1062,6 +1089,7 @@ class KeycloakOpenID: redirect_uri: str = "", totp: int | None = None, scope: str = "openid", + code_verifier: str | None = None, **extra: dict, ) -> dict: """ @@ -1088,6 +1116,8 @@ class KeycloakOpenID: :type totp: int :param scope: Scope, defaults to openid :type scope: str + :param code_verifier: PKCE code verifier + :type code_verifier: str :param extra: Additional extra arguments :type extra: dict :returns: Keycloak token @@ -1103,6 +1133,8 @@ class KeycloakOpenID: "redirect_uri": redirect_uri, "scope": scope, } + if code_verifier: + payload["code_verifier"] = code_verifier if extra: payload.update(extra) diff --git a/src/keycloak/pkce_utils.py b/src/keycloak/pkce_utils.py new file mode 100644 index 0000000..015bb3f --- /dev/null +++ b/src/keycloak/pkce_utils.py @@ -0,0 +1,55 @@ +# +# The MIT License (MIT) +# +# Copyright (C) 2017 Marcos Pereira +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +"""PKCE utility functions for code verifier and code challenge generation.""" + +import base64 +import hashlib +import os + + +def generate_code_verifier(length: int = 128) -> str: + """ + Generate a high-entropy cryptographic random string for PKCE code_verifier. + + RFC 7636 recommends a length between 43 and 128 characters. + """ + return base64.urlsafe_b64encode(os.urandom(length)).rstrip(b"=").decode("utf-8")[:length] + + +def generate_code_challenge(code_verifier: str, method: str = "S256") -> tuple[str, str]: + """ + Generate a code_challenge from the code_verifier using the specified method. + + Supported methods: "S256" (default), "plain". + Returns (code_challenge, code_challenge_method). + """ + if method == "S256": + code_challenge = ( + base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()) + .rstrip(b"=") + .decode("utf-8") + ) + return code_challenge, "S256" + if method == "plain": + return code_verifier, "plain" + error_msg = f"Unsupported PKCE method: {method}" + raise ValueError(error_msg) diff --git a/tests/test_pkce_flow.py b/tests/test_pkce_flow.py new file mode 100644 index 0000000..c52b465 --- /dev/null +++ b/tests/test_pkce_flow.py @@ -0,0 +1,53 @@ +"""Tests for PKCE flow: code verifier and code challenge handling.""" + +from unittest import mock + +from keycloak import KeycloakOpenID +from keycloak.pkce_utils import generate_code_challenge, generate_code_verifier + + +def test_pkce_auth_url_and_token(env: object) -> None: + """Test PKCE flow: auth_url includes code_challenge, token includes code_verifier.""" + oid = KeycloakOpenID( + server_url=f"http://{env.keycloak_host}:{env.keycloak_port}", + realm_name="master", + client_id="admin-cli", + ) + code_verifier = generate_code_verifier() + code_challenge, code_challenge_method = generate_code_challenge(code_verifier) + + # Build PKCE auth URL + url = oid.auth_url( + redirect_uri="http://test.test/*", + code_challenge=code_challenge, + code_challenge_method=code_challenge_method, + ) + assert f"code_challenge={code_challenge}" in url + assert f"code_challenge_method={code_challenge_method}" in url + + # Simulate token exchange with PKCE + # This part would require a real code from Keycloak, so we mock the response + with mock.patch.object( + oid, + "token", + return_value={ + "access_token": mock.ANY, + "refresh_token": mock.ANY, + "token_type": "Bearer", + }, + ) as mocked_token: + token = oid.token( + grant_type="authorization_code", + code="dummy_code", + redirect_uri="http://test.test/*", + code_verifier=code_verifier, + ) + mocked_token.assert_called_with( + grant_type="authorization_code", + code="dummy_code", + redirect_uri="http://test.test/*", + code_verifier=code_verifier, + ) + assert "access_token" in token + assert "refresh_token" in token + assert token["token_type"] == "Bearer" # noqa: S105