diff --git a/src/keycloak/connection.py b/src/keycloak/connection.py index 09db614..3c63551 100644 --- a/src/keycloak/connection.py +++ b/src/keycloak/connection.py @@ -24,11 +24,6 @@ from __future__ import annotations -try: - from urllib.parse import urljoin -except ImportError: # pragma: no cover - from urlparse import urljoin # pyright: ignore[reportMissingImports] - from typing import Any import httpx @@ -167,6 +162,13 @@ class ConnectionManager: def base_url(self, value: str | None) -> None: self._base_url = value + def _build_url(self, path: str) -> str: + """Join the base_url and path, and handle trailing slashes.""" + if not self.base_url or path.startswith(("http://", "https://")): + return path + + return f"{self.base_url.rstrip('/')}/{path.lstrip('/')}" + @property def timeout(self) -> int | None: """ @@ -334,7 +336,7 @@ class ConnectionManager: raise AttributeError(msg) try: return self._s.get( - urljoin(self.base_url, path), + self._build_url(path), params=kwargs, headers=self.headers, timeout=self.timeout, @@ -364,7 +366,7 @@ class ConnectionManager: raise AttributeError(msg) try: return self._s.post( - urljoin(self.base_url, path), + self._build_url(path), params=kwargs, data=data, headers=self.headers, @@ -396,7 +398,7 @@ class ConnectionManager: try: return self._s.put( - urljoin(self.base_url, path), + self._build_url(path), params=kwargs, data=data, headers=self.headers, @@ -428,7 +430,7 @@ class ConnectionManager: try: return self._s.delete( - urljoin(self.base_url, path), + self._build_url(path), params=kwargs, data=data or {}, headers=self.headers, @@ -458,7 +460,7 @@ class ConnectionManager: try: return await self.async_s.get( - urljoin(self.base_url, path), + self._build_url(path), params=self._filter_query_params(kwargs), headers=self.headers, timeout=self.timeout, @@ -493,7 +495,7 @@ class ConnectionManager: try: return await self.async_s.request( method="POST", - url=urljoin(self.base_url, path), + url=self._build_url(path), params=self._filter_query_params(kwargs), **self._prepare_httpx_request_content(data), headers=self.headers, @@ -528,7 +530,7 @@ class ConnectionManager: try: return await self.async_s.put( - urljoin(self.base_url, path), + self._build_url(path), params=self._filter_query_params(kwargs), **self._prepare_httpx_request_content(data), headers=self.headers, @@ -564,7 +566,7 @@ class ConnectionManager: try: return await self.async_s.request( method="DELETE", - url=urljoin(self.base_url, path), + url=self._build_url(path), **self._prepare_httpx_request_content(data or {}), params=self._filter_query_params(kwargs), headers=self.headers, diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 16e2a8c..3fc4808 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -2386,7 +2386,7 @@ class KeycloakAdmin: :return: Keycloak server response (GroupRepresentation) :rtype: dict """ - params_path = {"realm-name": self.connection.realm_name, "path": path} + params_path = {"realm-name": self.connection.realm_name, "path": path.lstrip("/")} data_raw = self.connection.raw_get( urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path), ) @@ -9276,7 +9276,7 @@ class KeycloakAdmin: :return: Keycloak server response (GroupRepresentation) :rtype: dict """ - params_path = {"realm-name": self.connection.realm_name, "path": path} + params_path = {"realm-name": self.connection.realm_name, "path": path.lstrip("/")} data_raw = await self.connection.a_raw_get( urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path), ) diff --git a/tests/test_connection.py b/tests/test_connection.py index 0563bb6..e4d51bf 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -95,3 +95,28 @@ def test_counter_part() -> None: continue assert async_method[2:] in sync_methods + + +def test_build_url() -> None: + """Test URL building and sub-path preservation.""" + # Scenario 1: Base URL WITHOUT a trailing slash + cm = ConnectionManager(base_url="http://test.test/auth") + + assert cm._build_url("realms/master") == "http://test.test/auth/realms/master" + assert cm._build_url("/realms/master") == "http://test.test/auth/realms/master" + + # Scenario 2: Base URL WITH a trailing slash + cm_slashed = ConnectionManager(base_url="http://test.test/auth/") + + assert cm_slashed._build_url("realms/master") == "http://test.test/auth/realms/master" + assert cm_slashed._build_url("/realms/master") == "http://test.test/auth/realms/master" + + # Scenario 3: Path is already an absolute URL + assert cm._build_url("http://absolute.test/realms") == "http://absolute.test/realms" + assert cm._build_url("https://absolute.test/realms") == "https://absolute.test/realms" + + # Scenario 4: Empty base URL + cm_empty = ConnectionManager(base_url="") + + assert cm_empty._build_url("realms/master") == "realms/master" + assert cm_empty._build_url("/realms/master") == "/realms/master" diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index 680b1bc..d3f814f 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -139,6 +139,35 @@ def test_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None: ) +@pytest.mark.parametrize("trailing_slash", [True, False]) +def test_openid_subpath_request_normalization(env: KeycloakTestEnv, trailing_slash: bool) -> None: + """ + Test that KeycloakOpenID builds correct request URLs when a sub-path is present. + + :param env: Environment fixture + :type env: KeycloakTestEnv + :param trailing_slash: Indicator of trailing slash in server URL + :type trailing_slash: bool + """ + host_port = f"http://{env.keycloak_host}:{env.keycloak_port}/auth" + server_url = f"{host_port}/" if trailing_slash else host_port + + oid = KeycloakOpenID( + server_url=server_url, + realm_name="master", + client_id="admin-cli", + ) + + with mock.patch.object(oid.connection._s, "get") as mock_get: + mock_get.return_value = mock.Mock(status_code=200, json=lambda: {"issuer": "test"}) + + oid.well_known() + assert ( + mock_get.call_args[0][0] + == f"http://{env.keycloak_host}:{env.keycloak_port}/auth/realms/master/.well-known/openid-configuration" + ) + + def test_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None: """ Test the token method. @@ -685,6 +714,35 @@ async def test_a_auth_url(env: KeycloakTestEnv, oid: KeycloakOpenID) -> None: ) +@pytest.mark.asyncio +@pytest.mark.parametrize("trailing_slash", [True, False]) +async def test_a_openid_subpath_request_normalization( + env: KeycloakTestEnv, trailing_slash: bool +) -> None: + """ + Test that KeycloakOpenID builds correct request URLs when a sub-path is present asynchronously. + + :param env: Environment fixture + :type env: KeycloakTestEnv + :param trailing_slash: Indicator of trailing slash in server URL + :type trailing_slash: bool + """ + host_port = f"http://{env.keycloak_host}:{env.keycloak_port}/auth" + server_url = f"{host_port}/" if trailing_slash else host_port + + oid = KeycloakOpenID(server_url=server_url, realm_name="master", client_id="admin-cli") + + with mock.patch.object(oid.connection.async_s, "get", new_callable=mock.AsyncMock) as mock_get: + mock_get.return_value = mock.Mock(status_code=200, json=lambda: {"issuer": "test"}) + + await oid.a_well_known() + + expected = f"http://{env.keycloak_host}:{env.keycloak_port}/auth/realms/master/.well-known/openid-configuration" + + mock_get.assert_called() + assert str(mock_get.call_args[0][0]) == expected + + @pytest.mark.asyncio async def test_a_token(oid_with_credentials: tuple[KeycloakOpenID, str, str]) -> None: """