|
|
@ -4,6 +4,7 @@ import keycloak |
|
|
|
from keycloak import KeycloakAdmin |
|
|
|
from keycloak.connection import ConnectionManager |
|
|
|
from keycloak.exceptions import ( |
|
|
|
KeycloakAuthenticationError, |
|
|
|
KeycloakDeleteError, |
|
|
|
KeycloakGetError, |
|
|
|
KeycloakPostError, |
|
|
@ -54,6 +55,59 @@ def test_keycloak_admin_init(env): |
|
|
|
assert admin.auto_refresh_token == list(), admin.auto_refresh_token |
|
|
|
assert admin.user_realm_name is None, admin.user_realm_name |
|
|
|
assert admin.custom_headers is None, admin.custom_headers |
|
|
|
assert admin.token |
|
|
|
|
|
|
|
admin = KeycloakAdmin( |
|
|
|
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", |
|
|
|
username=env.KEYCLOAK_ADMIN, |
|
|
|
password=env.KEYCLOAK_ADMIN_PASSWORD, |
|
|
|
realm_name=None, |
|
|
|
user_realm_name="master", |
|
|
|
) |
|
|
|
assert admin.token |
|
|
|
admin = KeycloakAdmin( |
|
|
|
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", |
|
|
|
username=env.KEYCLOAK_ADMIN, |
|
|
|
password=env.KEYCLOAK_ADMIN_PASSWORD, |
|
|
|
realm_name=None, |
|
|
|
user_realm_name=None, |
|
|
|
) |
|
|
|
assert admin.token |
|
|
|
|
|
|
|
admin.create_realm(payload={"realm": "authz", "enabled": True}) |
|
|
|
admin.realm_name = "authz" |
|
|
|
admin.create_client( |
|
|
|
payload={ |
|
|
|
"name": "authz-client", |
|
|
|
"clientId": "authz-client", |
|
|
|
"authorizationServicesEnabled": True, |
|
|
|
"serviceAccountsEnabled": True, |
|
|
|
"clientAuthenticatorType": "client-secret", |
|
|
|
"directAccessGrantsEnabled": False, |
|
|
|
"enabled": True, |
|
|
|
"implicitFlowEnabled": False, |
|
|
|
"publicClient": False, |
|
|
|
} |
|
|
|
) |
|
|
|
secret = admin.generate_client_secrets(client_id=admin.get_client_id("authz-client")) |
|
|
|
assert KeycloakAdmin( |
|
|
|
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", |
|
|
|
user_realm_name="authz", |
|
|
|
client_id="authz-client", |
|
|
|
client_secret_key=secret["value"], |
|
|
|
).token |
|
|
|
admin.delete_realm(realm_name="authz") |
|
|
|
|
|
|
|
assert ( |
|
|
|
KeycloakAdmin( |
|
|
|
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}", |
|
|
|
username=None, |
|
|
|
password=None, |
|
|
|
client_secret_key=None, |
|
|
|
custom_headers={"custom": "header"}, |
|
|
|
).token |
|
|
|
is None |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_realms(admin: KeycloakAdmin): |
|
|
@ -573,6 +627,47 @@ def test_clients(admin: KeycloakAdmin, realm: str): |
|
|
|
admin.update_client(client_id="does-not-exist", payload={"name": "test-client-change"}) |
|
|
|
assert err.match('404: b\'{"error":"Could not find client"}\'') |
|
|
|
|
|
|
|
# Test client mappers |
|
|
|
res = admin.get_mappers_from_client(client_id=client_id) |
|
|
|
assert len(res) == 0 |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPostError) as err: |
|
|
|
admin.add_mapper_to_client(client_id="does-not-exist", payload=dict()) |
|
|
|
assert err.match('404: b\'{"error":"Could not find client"}\'') |
|
|
|
|
|
|
|
res = admin.add_mapper_to_client( |
|
|
|
client_id=client_id, |
|
|
|
payload={ |
|
|
|
"name": "test-mapper", |
|
|
|
"protocol": "openid-connect", |
|
|
|
"protocolMapper": "oidc-usermodel-attribute-mapper", |
|
|
|
}, |
|
|
|
) |
|
|
|
assert res == b"" |
|
|
|
assert len(admin.get_mappers_from_client(client_id=client_id)) == 1 |
|
|
|
|
|
|
|
mapper = admin.get_mappers_from_client(client_id=client_id)[0] |
|
|
|
with pytest.raises(KeycloakPutError) as err: |
|
|
|
admin.update_client_mapper(client_id=client_id, mapper_id="does-not-exist", payload=dict()) |
|
|
|
assert err.match('404: b\'{"error":"Model not found"}\'') |
|
|
|
mapper["config"]["user.attribute"] = "test" |
|
|
|
res = admin.update_client_mapper(client_id=client_id, mapper_id=mapper["id"], payload=mapper) |
|
|
|
assert res == dict() |
|
|
|
|
|
|
|
res = admin.remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) |
|
|
|
assert res == dict() |
|
|
|
with pytest.raises(KeycloakDeleteError) as err: |
|
|
|
admin.remove_client_mapper(client_id=client_id, client_mapper_id=mapper["id"]) |
|
|
|
assert err.match('404: b\'{"error":"Model not found"}\'') |
|
|
|
|
|
|
|
# Test client sessions |
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
admin.get_client_all_sessions(client_id="does-not-exist") |
|
|
|
assert err.match('404: b\'{"error":"Could not find client"}\'') |
|
|
|
|
|
|
|
assert admin.get_client_all_sessions(client_id=client_id) == list() |
|
|
|
assert admin.get_client_sessions_stats() == list() |
|
|
|
|
|
|
|
# Test authz |
|
|
|
auth_client_id = admin.create_client( |
|
|
|
payload={ |
|
|
@ -703,6 +798,42 @@ def test_clients(admin: KeycloakAdmin, realm: str): |
|
|
|
admin.delete_client(client_id=auth_client_id) |
|
|
|
assert err.match('404: b\'{"error":"Could not find client"}\'') |
|
|
|
|
|
|
|
# Test client credentials |
|
|
|
admin.create_client( |
|
|
|
payload={ |
|
|
|
"name": "test-confidential", |
|
|
|
"enabled": True, |
|
|
|
"protocol": "openid-connect", |
|
|
|
"publicClient": False, |
|
|
|
"redirectUris": ["http://localhost/*"], |
|
|
|
"webOrigins": ["+"], |
|
|
|
"clientId": "test-confidential", |
|
|
|
"secret": "test-secret", |
|
|
|
"clientAuthenticatorType": "client-secret", |
|
|
|
} |
|
|
|
) |
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
admin.get_client_secrets(client_id="does-not-exist") |
|
|
|
assert err.match('404: b\'{"error":"Could not find client"}\'') |
|
|
|
|
|
|
|
secrets = admin.get_client_secrets( |
|
|
|
client_id=admin.get_client_id(client_name="test-confidential") |
|
|
|
) |
|
|
|
assert secrets == {"type": "secret", "value": "test-secret"} |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPostError) as err: |
|
|
|
admin.generate_client_secrets(client_id="does-not-exist") |
|
|
|
assert err.match('404: b\'{"error":"Could not find client"}\'') |
|
|
|
|
|
|
|
res = admin.generate_client_secrets( |
|
|
|
client_id=admin.get_client_id(client_name="test-confidential") |
|
|
|
) |
|
|
|
assert res |
|
|
|
assert ( |
|
|
|
admin.get_client_secrets(client_id=admin.get_client_id(client_name="test-confidential")) |
|
|
|
== res |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_realm_roles(admin: KeycloakAdmin, realm: str): |
|
|
|
admin.realm_name = realm |
|
|
@ -1382,3 +1513,166 @@ def test_client_scopes(admin: KeycloakAdmin, realm: str): |
|
|
|
with pytest.raises(KeycloakDeleteError) as err: |
|
|
|
admin.delete_client_scope(client_scope_id=res) |
|
|
|
assert err.match('404: b\'{"error":"Could not find client scope"}\'') |
|
|
|
|
|
|
|
|
|
|
|
def test_components(admin: KeycloakAdmin, realm: str): |
|
|
|
admin.realm_name = realm |
|
|
|
|
|
|
|
# Test get components |
|
|
|
res = admin.get_components() |
|
|
|
assert len(res) == 12 |
|
|
|
|
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
admin.get_component(component_id="does-not-exist") |
|
|
|
assert err.match('404: b\'{"error":"Could not find component"}\'') |
|
|
|
|
|
|
|
res_get = admin.get_component(component_id=res[0]["id"]) |
|
|
|
assert res_get == res[0] |
|
|
|
|
|
|
|
# Test create component |
|
|
|
with pytest.raises(KeycloakPostError) as err: |
|
|
|
admin.create_component(payload={"bad": "dict"}) |
|
|
|
assert err.match('400: b\'{"error":"Unrecognized field') |
|
|
|
|
|
|
|
res = admin.create_component( |
|
|
|
payload={ |
|
|
|
"name": "Test Component", |
|
|
|
"providerId": "max-clients", |
|
|
|
"providerType": "org.keycloak.services.clientregistration." |
|
|
|
+ "policy.ClientRegistrationPolicy", |
|
|
|
"config": {"max-clients": ["1000"]}, |
|
|
|
} |
|
|
|
) |
|
|
|
assert res |
|
|
|
assert admin.get_component(component_id=res)["name"] == "Test Component" |
|
|
|
|
|
|
|
# Test update component |
|
|
|
component = admin.get_component(component_id=res) |
|
|
|
component["name"] = "Test Component Update" |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPutError) as err: |
|
|
|
admin.update_component(component_id="does-not-exist", payload=dict()) |
|
|
|
assert err.match('404: b\'{"error":"Could not find component"}\'') |
|
|
|
res_upd = admin.update_component(component_id=res, payload=component) |
|
|
|
assert res_upd == dict() |
|
|
|
assert admin.get_component(component_id=res)["name"] == "Test Component Update" |
|
|
|
|
|
|
|
# Test delete component |
|
|
|
res_del = admin.delete_component(component_id=res) |
|
|
|
assert res_del == dict() |
|
|
|
with pytest.raises(KeycloakDeleteError) as err: |
|
|
|
admin.delete_component(component_id=res) |
|
|
|
assert err.match('404: b\'{"error":"Could not find component"}\'') |
|
|
|
|
|
|
|
|
|
|
|
def test_keys(admin: KeycloakAdmin, realm: str): |
|
|
|
admin.realm_name = realm |
|
|
|
assert set(admin.get_keys()["active"].keys()) == {"AES", "HS256", "RS256", "RSA-OAEP"} |
|
|
|
assert {k["algorithm"] for k in admin.get_keys()["keys"]} == { |
|
|
|
"HS256", |
|
|
|
"RSA-OAEP", |
|
|
|
"AES", |
|
|
|
"RS256", |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def test_events(admin: KeycloakAdmin, realm: str): |
|
|
|
admin.realm_name = realm |
|
|
|
|
|
|
|
events = admin.get_events() |
|
|
|
assert events == list() |
|
|
|
|
|
|
|
with pytest.raises(KeycloakPutError) as err: |
|
|
|
admin.set_events(payload={"bad": "conf"}) |
|
|
|
assert err.match('400: b\'{"error":"Unrecognized field') |
|
|
|
|
|
|
|
res = admin.set_events(payload={"adminEventsDetailsEnabled": True, "adminEventsEnabled": True}) |
|
|
|
assert res == dict() |
|
|
|
|
|
|
|
admin.create_client(payload={"name": "test", "clientId": "test"}) |
|
|
|
|
|
|
|
events = admin.get_events() |
|
|
|
assert events == list() |
|
|
|
|
|
|
|
|
|
|
|
def test_auto_refresh(admin: KeycloakAdmin, realm: str): |
|
|
|
# Test get refresh |
|
|
|
admin.auto_refresh_token = list() |
|
|
|
admin.connection = ConnectionManager( |
|
|
|
base_url=admin.server_url, |
|
|
|
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, |
|
|
|
timeout=60, |
|
|
|
verify=admin.verify, |
|
|
|
) |
|
|
|
|
|
|
|
with pytest.raises(KeycloakAuthenticationError) as err: |
|
|
|
admin.get_realm(realm_name=realm) |
|
|
|
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') |
|
|
|
|
|
|
|
admin.auto_refresh_token = ["get"] |
|
|
|
del admin.token["refresh_token"] |
|
|
|
assert admin.get_realm(realm_name=realm) |
|
|
|
|
|
|
|
# Test bad refresh token |
|
|
|
admin.connection = ConnectionManager( |
|
|
|
base_url=admin.server_url, |
|
|
|
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, |
|
|
|
timeout=60, |
|
|
|
verify=admin.verify, |
|
|
|
) |
|
|
|
admin.token["refresh_token"] = "bad" |
|
|
|
with pytest.raises(KeycloakGetError) as err: |
|
|
|
admin.get_realm(realm_name="test-refresh") |
|
|
|
assert err.match( |
|
|
|
'400: b\'{"error":"invalid_grant","error_description":"Invalid refresh token"}\'' |
|
|
|
) |
|
|
|
admin.realm_name = "master" |
|
|
|
admin.get_token() |
|
|
|
admin.realm_name = realm |
|
|
|
|
|
|
|
# Test post refresh |
|
|
|
admin.connection = ConnectionManager( |
|
|
|
base_url=admin.server_url, |
|
|
|
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, |
|
|
|
timeout=60, |
|
|
|
verify=admin.verify, |
|
|
|
) |
|
|
|
with pytest.raises(KeycloakAuthenticationError) as err: |
|
|
|
admin.create_realm(payload={"realm": "test-refresh"}) |
|
|
|
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') |
|
|
|
|
|
|
|
admin.auto_refresh_token = ["get", "post"] |
|
|
|
admin.realm_name = "master" |
|
|
|
admin.user_logout(user_id=admin.get_user_id(username=admin.username)) |
|
|
|
assert admin.create_realm(payload={"realm": "test-refresh"}) == b"" |
|
|
|
admin.realm_name = realm |
|
|
|
|
|
|
|
# Test update refresh |
|
|
|
admin.connection = ConnectionManager( |
|
|
|
base_url=admin.server_url, |
|
|
|
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, |
|
|
|
timeout=60, |
|
|
|
verify=admin.verify, |
|
|
|
) |
|
|
|
with pytest.raises(KeycloakAuthenticationError) as err: |
|
|
|
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) |
|
|
|
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') |
|
|
|
|
|
|
|
admin.auto_refresh_token = ["get", "post", "put"] |
|
|
|
assert ( |
|
|
|
admin.update_realm(realm_name="test-refresh", payload={"accountTheme": "test"}) == dict() |
|
|
|
) |
|
|
|
|
|
|
|
# Test delete refresh |
|
|
|
admin.connection = ConnectionManager( |
|
|
|
base_url=admin.server_url, |
|
|
|
headers={"Authorization": "Bearer bad", "Content-Type": "application/json"}, |
|
|
|
timeout=60, |
|
|
|
verify=admin.verify, |
|
|
|
) |
|
|
|
with pytest.raises(KeycloakAuthenticationError) as err: |
|
|
|
admin.delete_realm(realm_name="test-refresh") |
|
|
|
assert err.match('401: b\'{"error":"HTTP 401 Unauthorized"}\'') |
|
|
|
|
|
|
|
admin.auto_refresh_token = ["get", "post", "put", "delete"] |
|
|
|
assert admin.delete_realm(realm_name="test-refresh") == dict() |