|
|
@ -5650,3 +5650,349 @@ async def test_a_update_required_action(admin: KeycloakAdmin, realm: str): |
|
|
|
newra = await admin.a_get_required_action_by_alias("UPDATE_PASSWORD") |
|
|
|
assert old != newra |
|
|
|
assert newra["enabled"] is False |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_get_composite_client_roles_of_group( |
|
|
|
admin: KeycloakAdmin, realm: str, client: str, group: str, composite_client_role: str |
|
|
|
): |
|
|
|
"""Test get composite client roles of group. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
:param client: Keycloak client |
|
|
|
:type client: str |
|
|
|
:param group: Keycloak group |
|
|
|
:type group: str |
|
|
|
:param composite_client_role: Composite client role |
|
|
|
:type composite_client_role: str |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
role = await admin.a_get_client_role(client, composite_client_role) |
|
|
|
await admin.a_assign_group_client_roles(group_id=group, client_id=client, roles=[role]) |
|
|
|
result = await admin.a_get_composite_client_roles_of_group(client, group) |
|
|
|
assert role["id"] in [x["id"] for x in result] |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_get_role_client_level_children( |
|
|
|
admin: KeycloakAdmin, realm: str, client: str, composite_client_role: str, client_role: str |
|
|
|
): |
|
|
|
"""Test get children of composite client role. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
:param client: Keycloak client |
|
|
|
:type client: str |
|
|
|
:param composite_client_role: Composite client role |
|
|
|
:type composite_client_role: str |
|
|
|
:param client_role: Client role |
|
|
|
:type client_role: str |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
child = await admin.a_get_client_role(client, client_role) |
|
|
|
parent = await admin.a_get_client_role(client, composite_client_role) |
|
|
|
res = await admin.a_get_role_client_level_children(client, parent["id"]) |
|
|
|
assert child["id"] in [x["id"] for x in res] |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfsigned_cert: tuple): |
|
|
|
"""Test upload certificate. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
:param client: Keycloak client |
|
|
|
:type client: str |
|
|
|
:param selfsigned_cert: Selfsigned certificates |
|
|
|
:type selfsigned_cert: tuple |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
cert, _ = selfsigned_cert |
|
|
|
cert = cert.decode("utf-8").strip() |
|
|
|
admin.upload_certificate(client, cert) |
|
|
|
cl = await admin.a_get_client(client) |
|
|
|
assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1]) |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_get_bruteforce_status_for_user( |
|
|
|
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str |
|
|
|
): |
|
|
|
"""Test users. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": True}) |
|
|
|
res = await admin.a_get_realm(realm_name=realm) |
|
|
|
assert res["bruteForceProtected"] is True |
|
|
|
|
|
|
|
# Test login user with wrong credentials |
|
|
|
try: |
|
|
|
oid.token(username=username, password="wrongpassword") |
|
|
|
except KeycloakAuthenticationError: |
|
|
|
pass |
|
|
|
|
|
|
|
user_id = await admin.a_get_user_id(username) |
|
|
|
bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) |
|
|
|
|
|
|
|
assert bruteforce_status["numFailures"] == 1 |
|
|
|
|
|
|
|
# Cleanup |
|
|
|
res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": False}) |
|
|
|
res = await admin.a_get_realm(realm_name=realm) |
|
|
|
assert res["bruteForceProtected"] is False |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_clear_bruteforce_attempts_for_user( |
|
|
|
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str |
|
|
|
): |
|
|
|
"""Test users. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": True}) |
|
|
|
res = await admin.a_get_realm(realm_name=realm) |
|
|
|
assert res["bruteForceProtected"] is True |
|
|
|
|
|
|
|
# Test login user with wrong credentials |
|
|
|
try: |
|
|
|
oid.token(username=username, password="wrongpassword") |
|
|
|
except KeycloakAuthenticationError: |
|
|
|
pass |
|
|
|
|
|
|
|
user_id = await admin.a_get_user_id(username) |
|
|
|
bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) |
|
|
|
assert bruteforce_status["numFailures"] == 1 |
|
|
|
|
|
|
|
res = await admin.a_clear_bruteforce_attempts_for_user(user_id) |
|
|
|
bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) |
|
|
|
assert bruteforce_status["numFailures"] == 0 |
|
|
|
|
|
|
|
# Cleanup |
|
|
|
res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": False}) |
|
|
|
res = await admin.a_get_realm(realm_name=realm) |
|
|
|
assert res["bruteForceProtected"] is False |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_clear_bruteforce_attempts_for_all_users( |
|
|
|
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str |
|
|
|
): |
|
|
|
"""Test users. |
|
|
|
|
|
|
|
:param admin: Keycloak Admin client |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
:param realm: Keycloak realm |
|
|
|
:type realm: str |
|
|
|
""" |
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Turn on bruteforce protection |
|
|
|
res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": True}) |
|
|
|
res = await admin.a_get_realm(realm_name=realm) |
|
|
|
assert res["bruteForceProtected"] is True |
|
|
|
|
|
|
|
# Test login user with wrong credentials |
|
|
|
try: |
|
|
|
oid.token(username=username, password="wrongpassword") |
|
|
|
except KeycloakAuthenticationError: |
|
|
|
pass |
|
|
|
|
|
|
|
user_id = await admin.a_get_user_id(username) |
|
|
|
bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) |
|
|
|
assert bruteforce_status["numFailures"] == 1 |
|
|
|
|
|
|
|
res = await admin.a_clear_all_bruteforce_attempts() |
|
|
|
bruteforce_status = await admin.a_get_bruteforce_detection_status(user_id) |
|
|
|
assert bruteforce_status["numFailures"] == 0 |
|
|
|
|
|
|
|
# Cleanup |
|
|
|
res = await admin.a_update_realm(realm_name=realm, payload={"bruteForceProtected": False}) |
|
|
|
res = await admin.a_get_realm(realm_name=realm) |
|
|
|
assert res["bruteForceProtected"] is False |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_default_realm_role_present(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
"""Test that the default realm role is present in a brand new realm. |
|
|
|
|
|
|
|
:param realm: Realm name |
|
|
|
:type realm: str |
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
assert f"default-roles-{realm}" in [x["name"] for x in admin.get_realm_roles()] |
|
|
|
assert ( |
|
|
|
len([x["name"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"]) |
|
|
|
== 1 |
|
|
|
) |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_get_default_realm_role_id(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
"""Test getter for the ID of the default realm role. |
|
|
|
|
|
|
|
:param realm: Realm name |
|
|
|
:type realm: str |
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
assert ( |
|
|
|
await admin.a_get_default_realm_role_id() |
|
|
|
== [x["id"] for x in await admin.a_get_realm_roles() if x["name"] == f"default-roles-{realm}"][0] |
|
|
|
) |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_realm_default_roles(admin: KeycloakAdmin, realm: str) -> None: |
|
|
|
"""Test getting, adding and deleting default realm roles. |
|
|
|
|
|
|
|
:param realm: Realm name |
|
|
|
:type realm: str |
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Test listing all default realm roles |
|
|
|
roles = await admin.a_get_realm_default_roles() |
|
|
|
assert len(roles) == 2 |
|
|
|
assert {x["name"] for x in roles} == {"offline_access", "uma_authorization"} |
|
|
|
|
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
await admin.a_change_current_realm("doesnotexist") |
|
|
|
await admin.a_get_realm_default_roles() |
|
|
|
assert err.match('404: b\'{"error":"Realm not found.".*}\'') |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
|
|
|
|
# Test removing a default realm role |
|
|
|
res = await admin.a_remove_realm_default_roles(payload=[roles[0]]) |
|
|
|
assert res == {} |
|
|
|
assert roles[0] not in await admin.a_get_realm_default_roles() |
|
|
|
assert len(await admin.a_get_realm_default_roles()) == 1 |
|
|
|
|
|
|
|
with pytest.raises(KeycloakDeleteError) as err: |
|
|
|
await admin.a_remove_realm_default_roles(payload=[{"id": "bad id"}]) |
|
|
|
assert err.match('404: b\'{"error":"Could not find composite role".*}\'') |
|
|
|
|
|
|
|
# Test adding a default realm role |
|
|
|
res = await admin.a_add_realm_default_roles(payload=[roles[0]]) |
|
|
|
assert res == {} |
|
|
|
assert roles[0] in await admin.a_get_realm_default_roles() |
|
|
|
assert len(await admin.a_get_realm_default_roles()) == 2 |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPostError) as err: |
|
|
|
await admin.a_add_realm_default_roles(payload=[{"id": "bad id"}]) |
|
|
|
assert err.match('404: b\'{"error":"Could not find composite role".*}\'') |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_clear_keys_cache(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
"""Test clearing the keys cache. |
|
|
|
|
|
|
|
:param realm: Realm name |
|
|
|
:type realm: str |
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
res = await admin.a_clear_keys_cache() |
|
|
|
assert res == {} |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_clear_realm_cache(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
"""Test clearing the realm cache. |
|
|
|
|
|
|
|
:param realm: Realm name |
|
|
|
:type realm: str |
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
res = await admin.a_clear_realm_cache() |
|
|
|
assert res == {} |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
def test_clear_user_cache(realm: str, admin: KeycloakAdmin) -> None: |
|
|
|
"""Test clearing the user cache. |
|
|
|
|
|
|
|
:param realm: Realm name |
|
|
|
:type realm: str |
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
await admin.a_change_current_realm(realm) |
|
|
|
res = await admin.a_clear_user_cache() |
|
|
|
assert res == {} |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_initial_access_token( |
|
|
|
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
) -> None: |
|
|
|
"""Test initial access token and client creation. |
|
|
|
|
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials |
|
|
|
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str] |
|
|
|
""" |
|
|
|
res = await admin.a_create_initial_access_token(2, 3) |
|
|
|
assert "token" in res |
|
|
|
assert res["count"] == 2 |
|
|
|
assert res["expiration"] == 3 |
|
|
|
|
|
|
|
oid, username, password = oid_with_credentials |
|
|
|
|
|
|
|
client = str(uuid.uuid4()) |
|
|
|
secret = str(uuid.uuid4()) |
|
|
|
|
|
|
|
res = oid.register_client( |
|
|
|
token=res["token"], |
|
|
|
payload={ |
|
|
|
"name": "DynamicRegisteredClient", |
|
|
|
"clientId": client, |
|
|
|
"enabled": True, |
|
|
|
"publicClient": False, |
|
|
|
"protocol": "openid-connect", |
|
|
|
"secret": secret, |
|
|
|
"clientAuthenticatorType": "client-secret", |
|
|
|
}, |
|
|
|
) |
|
|
|
assert res["clientId"] == client |
|
|
|
|
|
|
|
new_secret = str(uuid.uuid4()) |
|
|
|
res = await oid.a_update_client(res["registrationAccessToken"], client, payload={"secret": new_secret}) |
|
|
|
assert res["secret"] == new_secret |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
|
|
async def test_a_refresh_token(admin: KeycloakAdmin): |
|
|
|
"""Test refresh token on connection even if it is expired. |
|
|
|
|
|
|
|
:param admin: Keycloak admin |
|
|
|
:type admin: KeycloakAdmin |
|
|
|
""" |
|
|
|
assert admin.connection.token is not None |
|
|
|
await admin.a_user_logout(await admin.a_get_user_id(admin.connection.username)) |
|
|
|
admin.connection.refresh_token() |