Browse Source

fix: initializing KeycloakAdmin without server_url

According to the project readme, we could initialize a KeycloakAdmin object with a KeycloakOpenIDConnection object without other arguments but, server_url is required.

I made server_url optional and wrote a test for it.
pull/439/head
Armin Shoughi 2 years ago
parent
commit
09cf503415
  1. 48
      src/keycloak/keycloak_admin.py
  2. 83
      tests/test_keycloak_admin.py

48
src/keycloak/keycloak_admin.py

@ -87,21 +87,21 @@ class KeycloakAdmin:
_connection = None _connection = None
def __init__( def __init__(
self,
server_url,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
auto_refresh_token=None,
timeout=60,
connection: Optional[KeycloakOpenIDConnection] = None,
self,
server_url=None,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
verify=True,
client_secret_key=None,
custom_headers=None,
user_realm_name=None,
auto_refresh_token=None,
timeout=60,
connection: Optional[KeycloakOpenIDConnection] = None,
): ):
"""Init method. """Init method.
@ -815,7 +815,7 @@ class KeycloakAdmin:
) )
raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
_last_slash_idx = data_raw.headers["Location"].rindex("/") _last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203
def users_count(self, query=None): def users_count(self, query=None):
"""Count users. """Count users.
@ -1118,7 +1118,7 @@ class KeycloakAdmin:
return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204]) return raise_error_from_response(data_raw, KeycloakDeleteError, expected_codes=[204])
def send_update_account( def send_update_account(
self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None
self, user_id, payload, client_id=None, lifespan=None, redirect_uri=None
): ):
"""Send an update account email to the user. """Send an update account email to the user.
@ -1339,7 +1339,7 @@ class KeycloakAdmin:
) )
try: try:
_last_slash_idx = data_raw.headers["Location"].rindex("/") _last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203
except KeyError: except KeyError:
return return
@ -1965,7 +1965,7 @@ class KeycloakAdmin:
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
_last_slash_idx = data_raw.headers["Location"].rindex("/") _last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203
def update_client(self, client_id, payload): def update_client(self, client_id, payload):
"""Update a client. """Update a client.
@ -2218,7 +2218,7 @@ class KeycloakAdmin:
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
_last_slash_idx = data_raw.headers["Location"].rindex("/") _last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203
def add_composite_client_roles_to_role(self, client_role_id, role_name, roles): def add_composite_client_roles_to_role(self, client_role_id, role_name, roles):
"""Add composite roles to client role. """Add composite roles to client role.
@ -2385,7 +2385,7 @@ class KeycloakAdmin:
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
_last_slash_idx = data_raw.headers["Location"].rindex("/") _last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203
def get_realm_role(self, role_name): def get_realm_role(self, role_name):
"""Get realm role by role name. """Get realm role by role name.
@ -2851,7 +2851,7 @@ class KeycloakAdmin:
) )
def _get_client_roles_of_user( def _get_client_roles_of_user(
self, client_level_role_mapping_url, user_id, client_id, **params
self, client_level_role_mapping_url, user_id, client_id, **params
): ):
"""Get client roles of a single user helper. """Get client roles of a single user helper.
@ -3281,7 +3281,7 @@ class KeycloakAdmin:
data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists data_raw, KeycloakPostError, expected_codes=[201], skip_exists=skip_exists
) )
_last_slash_idx = data_raw.headers["Location"].rindex("/") _last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203
def update_client_scope(self, client_scope_id, payload): def update_client_scope(self, client_scope_id, payload):
"""Update a client scope. """Update a client scope.
@ -3650,7 +3650,7 @@ class KeycloakAdmin:
) )
raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201]) raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[201])
_last_slash_idx = data_raw.headers["Location"].rindex("/") _last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :] # noqa: E203
return data_raw.headers["Location"][_last_slash_idx + 1:] # noqa: E203
def get_component(self, component_id): def get_component(self, component_id):
"""Get representation of the component. """Get representation of the component.

83
tests/test_keycloak_admin.py

@ -19,6 +19,8 @@ from keycloak.exceptions import (
KeycloakPutError, KeycloakPutError,
) )
from src.keycloak import KeycloakOpenIDConnection
def test_keycloak_version(): def test_keycloak_version():
"""Test version.""" """Test version."""
@ -101,15 +103,26 @@ def test_keycloak_admin_init(env):
admin.delete_realm(realm_name="authz") admin.delete_realm(realm_name="authz")
assert ( assert (
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=None,
password=None,
client_secret_key=None,
custom_headers={"custom": "header"},
).token
is None
KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=None,
password=None,
client_secret_key=None,
custom_headers={"custom": "header"},
).token
is None
)
keycloak_connection = KeycloakOpenIDConnection(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
username=env.KEYCLOAK_ADMIN,
password=env.KEYCLOAK_ADMIN_PASSWORD,
realm_name="master",
client_id="admin-cli",
verify=True
) )
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
assert keycloak_admin.token
def test_realms(admin: KeycloakAdmin): def test_realms(admin: KeycloakAdmin):
@ -960,8 +973,8 @@ def test_clients(admin: KeycloakAdmin, realm: str):
) )
assert res assert res
assert ( assert (
admin.get_client_secrets(client_id=admin.get_client_id(client_id="test-confidential"))
== res
admin.get_client_secrets(client_id=admin.get_client_id(client_id="test-confidential"))
== res
) )
@ -1136,12 +1149,12 @@ def test_realm_roles(admin: KeycloakAdmin, realm: str):
], ],
) )
def test_role_attributes( def test_role_attributes(
admin: KeycloakAdmin,
realm: str,
client: str,
arg_brief_repr: dict,
includes_attributes: bool,
testcase: str,
admin: KeycloakAdmin,
realm: str,
client: str,
arg_brief_repr: dict,
includes_attributes: bool,
testcase: str,
): ):
"""Test getting role attributes for bulk calls. """Test getting role attributes for bulk calls.
@ -1489,8 +1502,8 @@ def test_client_roles(admin: KeycloakAdmin, client: str):
) )
assert res == dict() assert res == dict()
assert ( assert (
len(admin.get_client_role_members(client_id=client, role_name="client-role-test-update"))
== 1
len(admin.get_client_role_members(client_id=client, role_name="client-role-test-update"))
== 1
) )
roles = admin.get_client_roles_of_user(user_id=user_id, client_id=client) roles = admin.get_client_roles_of_user(user_id=user_id, client_id=client)
@ -1545,8 +1558,8 @@ def test_client_roles(admin: KeycloakAdmin, client: str):
) )
assert res == dict() assert res == dict()
assert ( assert (
len(admin.get_client_role_groups(client_id=client, role_name="client-role-test-update"))
== 1
len(admin.get_client_role_groups(client_id=client, role_name="client-role-test-update"))
== 1
) )
assert len(admin.get_group_client_roles(group_id=group_id, client_id=client)) == 1 assert len(admin.get_group_client_roles(group_id=group_id, client_id=client)) == 1
@ -2025,8 +2038,8 @@ def test_client_scopes(admin: KeycloakAdmin, realm: str):
) )
assert res_update == dict() assert res_update == dict()
assert ( assert (
admin.get_mappers_from_client_scope(client_scope_id=res)[0]["config"]["user.attribute"]
== "test"
admin.get_mappers_from_client_scope(client_scope_id=res)[0]["config"]["user.attribute"]
== "test"
) )
# Test delete mapper # Test delete mapper
@ -2119,7 +2132,7 @@ def test_components(admin: KeycloakAdmin, realm: str):
"name": "Test Component", "name": "Test Component",
"providerId": "max-clients", "providerId": "max-clients",
"providerType": "org.keycloak.services.clientregistration." "providerType": "org.keycloak.services.clientregistration."
+ "policy.ClientRegistrationPolicy",
+ "policy.ClientRegistrationPolicy",
"config": {"max-clients": ["1000"]}, "config": {"max-clients": ["1000"]},
} }
) )
@ -2238,8 +2251,8 @@ def test_auto_refresh(admin_frozen: KeycloakAdmin, realm: str):
assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00") assert admin.connection.expires_at < datetime_parser.parse("2023-02-25 10:25:00")
admin.connection.token = None admin.connection.token = None
assert ( assert (
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
== dict()
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"})
== dict()
) )
assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00") assert admin.connection.expires_at > datetime_parser.parse("2023-02-25 10:25:00")
@ -2310,7 +2323,7 @@ def test_update_required_action(admin: KeycloakAdmin, realm: str):
def test_get_composite_client_roles_of_group( def test_get_composite_client_roles_of_group(
admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str
admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str
): ):
"""Test get composite client roles of group. """Test get composite client roles of group.
@ -2333,7 +2346,7 @@ def test_get_composite_client_roles_of_group(
def test_get_role_client_level_children( def test_get_role_client_level_children(
admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str
admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str
): ):
"""Test get children of composite client role. """Test get children of composite client role.
@ -2376,7 +2389,7 @@ def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfs
def test_get_bruteforce_status_for_user( def test_get_bruteforce_status_for_user(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
): ):
"""Test users. """Test users.
@ -2413,7 +2426,7 @@ def test_get_bruteforce_status_for_user(
def test_clear_bruteforce_attempts_for_user( def test_clear_bruteforce_attempts_for_user(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
): ):
"""Test users. """Test users.
@ -2453,7 +2466,7 @@ def test_clear_bruteforce_attempts_for_user(
def test_clear_bruteforce_attempts_for_all_users( def test_clear_bruteforce_attempts_for_all_users(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
): ):
"""Test users. """Test users.
@ -2503,8 +2516,8 @@ def test_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> None:
admin.realm_name = realm admin.realm_name = realm
assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()] assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()]
assert ( assert (
len([x["name"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"])
== 1
len([x["name"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"])
== 1
) )
@ -2518,8 +2531,8 @@ def test_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> None:
""" """
admin.realm_name = realm admin.realm_name = realm
assert ( assert (
admin.get_default_realm_role_id()
== [x["id"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"][0]
admin.get_default_realm_role_id()
== [x["id"] for x in admin.get_realm_roles() if x["name"] == f"default-roles-{realm}"][0]
) )
@ -2605,7 +2618,7 @@ def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None:
def test_initial_access_token( def test_initial_access_token(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str]
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str]
) -> None: ) -> None:
"""Test initial access token and client creation. """Test initial access token and client creation.

Loading…
Cancel
Save