Browse Source

feat: Update dynamic client using registration access token (#491)

* fix: remove internal use of deprecated methods

* fix: missing keycloak_openid during refresh (#431)

Error occurs when token is set so refresh_token is called before call
to get_token.

* feat: Add client update method

* fix: keycloak 22.0 dropped http challenge support

* fix: keycloak 22 changes default authenticator providers
pull/508/head v3.5.0
mklassen 6 months ago
committed by GitHub
parent
commit
45116bc02b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/keycloak/keycloak_admin.py
  2. 30
      src/keycloak/keycloak_openid.py
  3. 45
      src/keycloak/openid_connection.py
  4. 35
      tests/test_keycloak_admin.py

12
src/keycloak/keycloak_admin.py

@ -1314,7 +1314,9 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "path": path}
data_raw = self.raw_get(urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path))
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_GROUP_BY_PATH.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def create_group(self, payload, parent=None, skip_exists=False):
@ -1775,7 +1777,7 @@ class KeycloakAdmin:
"""
params_path = {"realm-name": self.realm_name, "id": client_id}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLIENT_AUTHZ_SCOPE_BASED_PERMISSION.format(**params_path),
data=json.dumps(payload),
)
@ -4377,7 +4379,7 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_KEYS_CACHE.format(**params_path), data=""
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
@ -4389,7 +4391,7 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_REALM_CACHE.format(**params_path), data=""
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])
@ -4401,7 +4403,7 @@ class KeycloakAdmin:
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.raw_post(
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_CLEAR_USER_CACHE.format(**params_path), data=""
)
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204])

30
src/keycloak/keycloak_openid.py

@ -41,6 +41,7 @@ from .exceptions import (
KeycloakGetError,
KeycloakInvalidTokenError,
KeycloakPostError,
KeycloakPutError,
KeycloakRPTNotFound,
raise_error_from_response,
)
@ -49,6 +50,7 @@ from .urls_patterns import (
URL_AUTH,
URL_CERTS,
URL_CLIENT_REGISTRATION,
URL_CLIENT_UPDATE,
URL_ENTITLEMENT,
URL_INTROSPECT,
URL_LOGOUT,
@ -711,3 +713,31 @@ class KeycloakOpenID:
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPostError)
def update_client(self, token: str, client_id: str, payload: dict):
"""Update a client.
ClientRepresentation:
https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_clientrepresentation
:param token: registration access token
:type token: str
:param client_id: Keycloak client id
:type client_id: str
:param payload: ClientRepresentation
:type payload: dict
:return: Client Representation
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
if "clientId" not in payload:
payload["clientId"] = client_id
data_raw = self.connection.raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
return raise_error_from_response(data_raw, KeycloakPutError)

45
src/keycloak/openid_connection.py

@ -54,6 +54,7 @@ class KeycloakOpenIDConnection(ConnectionManager):
_custom_headers = None
_user_realm_name = None
_expires_at = None
_keycloak_openid = None
def __init__(
self,
@ -275,27 +276,39 @@ class KeycloakOpenIDConnection(ConnectionManager):
# merge custom headers to main headers
self.headers.update(self.custom_headers)
@property
def keycloak_openid(self) -> KeycloakOpenID:
"""Get the KeycloakOpenID object.
The KeycloakOpenID is used to refresh tokens
:returns: KeycloakOpenID
:rtype: KeycloakOpenID
"""
if self._keycloak_openid is None:
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self._keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
)
return self._keycloak_openid
def get_token(self):
"""Get admin token.
The admin token is then set in the `token` attribute.
"""
if self.user_realm_name:
token_realm_name = self.user_realm_name
elif self.realm_name:
token_realm_name = self.realm_name
else:
token_realm_name = "master"
self.keycloak_openid = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=token_realm_name,
verify=self.verify,
client_secret_key=self.client_secret_key,
timeout=self.timeout,
)
grant_type = []
if self.client_secret_key:
grant_type.append("client_credentials")

35
tests/test_keycloak_admin.py

@ -548,6 +548,7 @@ def test_server_info(admin: KeycloakAdmin):
"passwordPolicies",
"enums",
"cryptoInfo",
"features",
}
), info.keys()
@ -1875,7 +1876,19 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
admin.realm_name = realm
res = admin.get_authentication_flows()
assert len(res) == 8, res
default_flows = len(res)
assert {x["alias"] for x in res}.issubset(
{
"reset credentials",
"browser",
"registration",
"http challenge",
"docker auth",
"direct grant",
"first broker login",
"clients",
}
)
assert set(res[0].keys()) == {
"alias",
"authenticationExecutions",
@ -1885,16 +1898,6 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
"providerId",
"topLevel",
}
assert {x["alias"] for x in res} == {
"reset credentials",
"browser",
"http challenge",
"registration",
"docker auth",
"direct grant",
"first broker login",
"clients",
}
with pytest.raises(KeycloakGetError) as err:
admin.get_authentication_flow_for_id(flow_id="bad")
@ -1910,7 +1913,7 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str):
res = admin.copy_authentication_flow(payload={"newName": "test-browser"}, flow_alias="browser")
assert res == b"", res
assert len(admin.get_authentication_flows()) == 9
assert len(admin.get_authentication_flows()) == (default_flows + 1)
# Test create
res = admin.create_authentication_flow(
@ -2029,7 +2032,7 @@ def test_authentication_configs(admin: KeycloakAdmin, realm: str):
# Test list of auth providers
res = admin.get_authenticator_providers()
assert len(res) == 38
assert len(res) > 1
res = admin.get_authenticator_provider_config_description(provider_id="auth-cookie")
assert res == {
@ -2754,7 +2757,7 @@ def test_initial_access_token(
res = oid.register_client(
token=res["token"],
payload={
"name": client,
"name": "DynamicRegisteredClient",
"clientId": client,
"enabled": True,
"publicClient": False,
@ -2764,3 +2767,7 @@ def test_initial_access_token(
},
)
assert res["clientId"] == client
new_secret = str(uuid.uuid4())
res = oid.update_client(res["registrationAccessToken"], client, payload={"secret": new_secret})
assert res["secret"] == new_secret
Loading…
Cancel
Save