|
|
|
@ -57,6 +57,7 @@ def test_keycloak_admin_init(env: KeycloakTestEnv) -> None: |
|
|
|
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}", |
|
|
|
username=env.keycloak_admin, |
|
|
|
password=env.keycloak_admin_password, |
|
|
|
pool_maxsize=5, |
|
|
|
) |
|
|
|
assert admin.connection.server_url == f"http://{env.keycloak_host}:{env.keycloak_port}", ( |
|
|
|
admin.connection.server_url |
|
|
|
@ -72,6 +73,7 @@ def test_keycloak_admin_init(env: KeycloakTestEnv) -> None: |
|
|
|
assert admin.connection.token is None, admin.connection.token |
|
|
|
assert admin.connection.user_realm_name is None, admin.connection.user_realm_name |
|
|
|
assert admin.connection.custom_headers is None, admin.connection.custom_headers |
|
|
|
assert admin.connection.pool_maxsize == 5, admin.connection.pool_maxsize |
|
|
|
|
|
|
|
admin = KeycloakAdmin( |
|
|
|
server_url=f"http://{env.keycloak_host}:{env.keycloak_port}", |
|
|
|
@ -1432,6 +1434,41 @@ def test_clients(admin: KeycloakAdmin, realm: str) -> None: |
|
|
|
UNKOWN_ERROR_REGEX, |
|
|
|
) |
|
|
|
|
|
|
|
# Test import authz |
|
|
|
authz_config = admin.get_client_authz_settings(client_id=auth_client_id) |
|
|
|
|
|
|
|
authz_config["resources"] = [{"name": "test-import-resource"}] |
|
|
|
authz_config["policies"] = [ |
|
|
|
{ |
|
|
|
"name": "test-import-policy", |
|
|
|
"type": "time", |
|
|
|
"config": {"hourEnd": "18", "hour": "9"}, |
|
|
|
} |
|
|
|
] |
|
|
|
admin.import_client_authz_config(client_id=auth_client_id, payload=authz_config) |
|
|
|
exported = admin.get_client_authz_settings(client_id=auth_client_id) |
|
|
|
assert ( |
|
|
|
len( |
|
|
|
[ |
|
|
|
resource |
|
|
|
for resource in exported["resources"] |
|
|
|
if resource["name"] == "test-import-resource" |
|
|
|
] |
|
|
|
) |
|
|
|
== 1 |
|
|
|
) |
|
|
|
|
|
|
|
assert ( |
|
|
|
len( |
|
|
|
[ |
|
|
|
resource |
|
|
|
for resource in exported["policies"] |
|
|
|
if resource["name"] == "test-import-policy" |
|
|
|
] |
|
|
|
) |
|
|
|
== 1 |
|
|
|
) |
|
|
|
|
|
|
|
# Test delete client |
|
|
|
res = admin.delete_client(client_id=auth_client_id) |
|
|
|
assert res == {}, res |
|
|
|
@ -2620,7 +2657,7 @@ def test_auth_flows(admin: KeycloakAdmin, realm: str) -> None: |
|
|
|
|
|
|
|
# Test flow executions |
|
|
|
res = admin.get_authentication_flow_executions(flow_alias="browser") |
|
|
|
assert len(res) in [8, 12, 14], res |
|
|
|
assert len(res) in [8, 12, 14, 15], res |
|
|
|
|
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
admin.get_authentication_flow_executions(flow_alias="bad") |
|
|
|
@ -2763,7 +2800,7 @@ def test_authentication_configs(admin: KeycloakAdmin, realm: str) -> None: |
|
|
|
|
|
|
|
# Test list of auth providers |
|
|
|
res = admin.get_authenticator_providers() |
|
|
|
assert len(res) <= 41 |
|
|
|
assert len(res) <= 42 |
|
|
|
|
|
|
|
res = admin.get_authenticator_provider_config_description(provider_id="auth-cookie") |
|
|
|
assert res == { |
|
|
|
@ -3285,6 +3322,48 @@ def test_get_role_client_level_children( |
|
|
|
assert child["id"] in [x["id"] for x in res] |
|
|
|
|
|
|
|
|
|
|
|
def test_get_role_composites_by_id( |
|
|
|
admin: KeycloakAdmin, |
|
|
|
realm: str, |
|
|
|
client: str, |
|
|
|
composite_client_role: str, |
|
|
|
client_role: str, |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Test get role's children by role ID. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
:param client: Keycloak client |
|
|
|
:type client: str |
|
|
|
:param composite_client_role: Composite client role |
|
|
|
:type composite_client_role: str |
|
|
|
:param client_role: Client role |
|
|
|
:type client_role: str |
|
|
|
""" |
|
|
|
admin.change_current_realm(realm) |
|
|
|
|
|
|
|
parent_role = admin.get_client_role(client, composite_client_role) |
|
|
|
child_role = admin.get_client_role(client, client_role) |
|
|
|
|
|
|
|
composites = admin.get_role_composites_by_id(parent_role["id"]) |
|
|
|
assert len(composites) > 0 |
|
|
|
assert child_role["id"] in [x["id"] for x in composites] |
|
|
|
|
|
|
|
composites_paginated = admin.get_role_composites_by_id( |
|
|
|
parent_role["id"], query={"first": 0, "max": 10} |
|
|
|
) |
|
|
|
assert len(composites_paginated) > 0 |
|
|
|
assert child_role["id"] in [x["id"] for x in composites_paginated] |
|
|
|
|
|
|
|
composites_searched = admin.get_role_composites_by_id( |
|
|
|
parent_role["id"], query={"search": client_role[:3]} |
|
|
|
) |
|
|
|
assert len(composites_searched) > 0 |
|
|
|
|
|
|
|
|
|
|
|
def test_upload_certificate( |
|
|
|
admin: KeycloakAdmin, |
|
|
|
realm: str, |
|
|
|
@ -3326,7 +3405,7 @@ def test_get_bruteforce_status_for_user( |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, username, _ = oid_with_credentials |
|
|
|
admin.change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
@ -3364,7 +3443,7 @@ def test_clear_bruteforce_attempts_for_user( |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, username, _ = oid_with_credentials |
|
|
|
admin.change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
@ -3405,7 +3484,7 @@ def test_clear_bruteforce_attempts_for_all_users( |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, username, _ = oid_with_credentials |
|
|
|
admin.change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
@ -3566,7 +3645,7 @@ def test_initial_access_token( |
|
|
|
assert res["count"] == 2 |
|
|
|
assert res["expiration"] == 3 |
|
|
|
|
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, _, _ = oid_with_credentials |
|
|
|
|
|
|
|
client = str(uuid.uuid4()) |
|
|
|
secret = str(uuid.uuid4()) |
|
|
|
@ -4998,6 +5077,41 @@ async def test_a_clients(admin: KeycloakAdmin, realm: str) -> None: |
|
|
|
UNKOWN_ERROR_REGEX, |
|
|
|
) |
|
|
|
|
|
|
|
# Test async import authz |
|
|
|
authz_config = await admin.a_get_client_authz_settings(client_id=auth_client_id) |
|
|
|
|
|
|
|
authz_config["resources"] = [{"name": "test-import-resource"}] |
|
|
|
authz_config["policies"] = [ |
|
|
|
{ |
|
|
|
"name": "test-import-policy", |
|
|
|
"type": "time", |
|
|
|
"config": {"hourEnd": "18", "hour": "9"}, |
|
|
|
} |
|
|
|
] |
|
|
|
await admin.a_import_client_authz_config(client_id=auth_client_id, payload=authz_config) |
|
|
|
exported = await admin.a_get_client_authz_settings(client_id=auth_client_id) |
|
|
|
assert ( |
|
|
|
len( |
|
|
|
[ |
|
|
|
resource |
|
|
|
for resource in exported["resources"] |
|
|
|
if resource["name"] == "test-import-resource" |
|
|
|
] |
|
|
|
) |
|
|
|
== 1 |
|
|
|
) |
|
|
|
|
|
|
|
assert ( |
|
|
|
len( |
|
|
|
[ |
|
|
|
resource |
|
|
|
for resource in exported["policies"] |
|
|
|
if resource["name"] == "test-import-policy" |
|
|
|
] |
|
|
|
) |
|
|
|
== 1 |
|
|
|
) |
|
|
|
|
|
|
|
# Test delete client |
|
|
|
res = await admin.a_delete_client(client_id=auth_client_id) |
|
|
|
assert res == {}, res |
|
|
|
@ -6326,7 +6440,7 @@ async def test_a_auth_flows(admin: KeycloakAdmin, realm: str) -> None: |
|
|
|
|
|
|
|
# Test flow executions |
|
|
|
res = await admin.a_get_authentication_flow_executions(flow_alias="browser") |
|
|
|
assert len(res) in [8, 12, 14], res |
|
|
|
assert len(res) in [8, 12, 14, 15], res |
|
|
|
|
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
await admin.a_get_authentication_flow_executions(flow_alias="bad") |
|
|
|
@ -6474,7 +6588,7 @@ async def test_a_authentication_configs(admin: KeycloakAdmin, realm: str) -> Non |
|
|
|
|
|
|
|
# Test list of auth providers |
|
|
|
res = await admin.a_get_authenticator_providers() |
|
|
|
assert len(res) <= 41 |
|
|
|
assert len(res) <= 42 |
|
|
|
|
|
|
|
res = await admin.a_get_authenticator_provider_config_description(provider_id="auth-cookie") |
|
|
|
assert res == { |
|
|
|
@ -7017,6 +7131,49 @@ async def test_a_get_role_client_level_children( |
|
|
|
assert child["id"] in [x["id"] for x in res] |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_get_role_composites_by_id( |
|
|
|
admin: KeycloakAdmin, |
|
|
|
realm: str, |
|
|
|
client: str, |
|
|
|
composite_client_role: str, |
|
|
|
client_role: str, |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Test get all composite roles by role id asynchronously. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
:param client: Keycloak client |
|
|
|
:type client: str |
|
|
|
:param composite_client_role: Composite client role |
|
|
|
:type composite_client_role: str |
|
|
|
:param client_role: Client role |
|
|
|
:type client_role: str |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
parent_role = await admin.a_get_client_role(client, composite_client_role) |
|
|
|
child_role = await admin.a_get_client_role(client, client_role) |
|
|
|
|
|
|
|
composites = await admin.a_get_role_composites_by_id(parent_role["id"]) |
|
|
|
assert len(composites) > 0 |
|
|
|
assert child_role["id"] in [x["id"] for x in composites] |
|
|
|
|
|
|
|
composites_paginated = await admin.a_get_role_composites_by_id( |
|
|
|
parent_role["id"], query={"first": 0, "max": 10} |
|
|
|
) |
|
|
|
assert len(composites_paginated) > 0 |
|
|
|
assert child_role["id"] in [x["id"] for x in composites_paginated] |
|
|
|
|
|
|
|
composites_searched = await admin.a_get_role_composites_by_id( |
|
|
|
parent_role["id"], query={"search": client_role[:3]} |
|
|
|
) |
|
|
|
assert len(composites_searched) > 0 |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_upload_certificate( |
|
|
|
admin: KeycloakAdmin, |
|
|
|
@ -7060,7 +7217,7 @@ async def test_a_get_bruteforce_status_for_user( |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, username, _ = oid_with_credentials |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
@ -7099,7 +7256,7 @@ async def test_a_clear_bruteforce_attempts_for_user( |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, username, _ = oid_with_credentials |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
@ -7141,7 +7298,7 @@ async def test_a_clear_bruteforce_attempts_for_all_users( |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, username, _ = oid_with_credentials |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
@ -7315,7 +7472,7 @@ async def test_a_initial_access_token( |
|
|
|
assert res["count"] == 2 |
|
|
|
assert res["expiration"] == 3 |
|
|
|
|
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
oid, _, _ = oid_with_credentials |
|
|
|
|
|
|
|
client = str(uuid.uuid4()) |
|
|
|
secret = str(uuid.uuid4()) |
|
|
|
|