Browse Source

feat: orgs

feat/orgs
Richard Nemeth 2 weeks ago
parent
commit
5aff8f3f5e
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 585
      poetry.lock
  2. 123
      src/keycloak/keycloak_admin.py
  3. 5
      src/keycloak/urls_patterns.py
  4. 18
      tests/conftest.py
  5. 53
      tests/test_keycloak_admin.py

585
poetry.lock
File diff suppressed because it is too large
View File

123
src/keycloak/keycloak_admin.py

@ -5383,6 +5383,129 @@ class KeycloakAdmin:
expected_codes=[HTTP_NO_CONTENT],
)
def get_organizations(self, query: dict | None = None) -> list:
"""
Get organizations.
:param query: Query parameters (optional)
:type query: dict
:return: organizations list
:rtype: list
"""
params_path = {"realm-name": self.connection.realm_name}
query = query or {}
url = urls_patterns.URL_ADMIN_ORGANIZATIONS.format(**params_path)
if "first" in query or "max" in query:
return self.__fetch_paginated(url, query)
return self.__fetch_all(url, query)
def get_organization(self, organization_id: str) -> dict:
"""
Get representation of the organization.
:param organization_id: Organization id
:type organization_id: str
:return: OrganizationRepresentation
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "org-id": organization_id}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_ORGANIZATIONS_SINGLE.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def create_organization(self, payload: dict) -> str:
"""
Create a new organization.
:param payload: OrganizationRepresentation
:type payload: dict
:return: Organization id
:rtype: str
"""
params_path = {"realm-name": self.connection.realm_name}
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_ORGANIZATIONS.format(**params_path),
data=json.dumps(payload),
)
raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[HTTP_CREATED])
_last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :]
def update_organization(self, organization_id: str, payload: dict) -> dict:
"""
Update the organization.
:param organization_id: Organization id
:type organization_id: str
:param payload: OrganizationRepresentation
:type payload: dict
:return: Http response
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "org-id": organization_id}
data_raw = self.connection.raw_put(
urls_patterns.URL_ADMIN_ORGANIZATIONS_SINGLE.format(**params_path),
data=json.dumps(payload),
)
return raise_error_from_response(
data_raw, KeycloakPutError, expected_codes=[HTTP_NO_CONTENT]
)
def delete_organization(self, organization_id: str) -> dict:
"""
Delete the organization.
:param organization_id: Organization id
:type organization_id: str
:return: Http response
:rtype: dict
"""
params_path = {"realm-name": self.connection.realm_name, "org-id": organization_id}
data_raw = self.connection.raw_delete(
urls_patterns.URL_ADMIN_ORGANIZATIONS_SINGLE.format(**params_path)
)
return raise_error_from_response(
data_raw, KeycloakDeleteError, expected_codes=[HTTP_NO_CONTENT]
)
def get_organizations_identity_providers(self, organization_id: str) -> list:
"""
Get identity providers for the organization.
:param organization_id: Organization id
:type organization_id: str
:return: Identity providers list
:rtype: list
"""
params_path = {"realm-name": self.connection.realm_name, "org-id": organization_id}
data_raw = self.connection.raw_get(
urls_patterns.URL_ADMIN_ORGANIZATIONS_IDP.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def add_identity_provider_to_organization(self, organization_id: str, payload: dict) -> str:
"""
Create a new identity provider for the organization.
:param organization_id: Organization id
:type organization_id: str
:param payload: Identity provider representation
:type payload: dict
:return: Identity provider id
:rtype: str
"""
params_path = {"realm-name": self.connection.realm_name, "org-id": organization_id}
data_raw = self.connection.raw_post(
urls_patterns.URL_ADMIN_ORGANIZATIONS_IDP.format(**params_path),
data=json.dumps(payload),
)
raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[HTTP_CREATED])
_last_slash_idx = data_raw.headers["Location"].rindex("/")
return data_raw.headers["Location"][_last_slash_idx + 1 :]
# async functions start
async def a___fetch_all(self, url: str, query: dict | None = None) -> list:
"""

5
src/keycloak/urls_patterns.py

@ -227,6 +227,11 @@ URL_ADMIN_CLEAR_KEYS_CACHE = URL_ADMIN_REALM + "/clear-keys-cache"
URL_ADMIN_CLEAR_REALM_CACHE = URL_ADMIN_REALM + "/clear-realm-cache"
URL_ADMIN_CLEAR_USER_CACHE = URL_ADMIN_REALM + "/clear-user-cache"
URL_ADMIN_ORGANIZATIONS = URL_ADMIN_REALM + "/organizations"
URL_ADMIN_ORGANIZATIONS_SINGLE = URL_ADMIN_ORGANIZATIONS + "/{org-id}"
URL_ADMIN_ORGANIZATIONS_IDP = URL_ADMIN_ORGANIZATIONS + "/{org-id}/identity-providers"
URL_ADMIN_ORGANIZATIONS_IDP_SINGLE = URL_ADMIN_ORGANIZATIONS_IDP + "{idp-alias}"
# UMA URLS
URL_UMA_WELL_KNOWN = URL_WELL_KNOWN_BASE + "/uma2-configuration"

18
tests/conftest.py

@ -447,6 +447,24 @@ def realm(admin: KeycloakAdmin) -> Generator[str, None, None]:
admin.delete_realm(realm_name=realm_name)
@pytest.fixture
def realm_org(admin: KeycloakAdmin) -> Generator[str, None, None]:
"""
Fixture for a new random realm with Organizations enabled.
:param admin: Keycloak admin
:type admin: KeycloakAdmin
:yields: Keycloak realm
:rtype: str
"""
realm_name = str(uuid.uuid4())
admin.create_realm(
payload={"realm": realm_name, "enabled": True, "organizationsEnabled": True},
)
yield realm_name
admin.delete_realm(realm_name=realm_name)
@pytest.fixture
def user(admin: KeycloakAdmin, realm: str) -> Generator[str, None, None]:
"""

53
tests/test_keycloak_admin.py

@ -2821,6 +2821,59 @@ def test_client_scopes(admin: KeycloakAdmin, realm: str) -> None:
assert err.match(NO_CLIENT_SCOPE_REGEX)
def test_organizations(admin: KeycloakAdmin, realm_org: str) -> None:
"""
Test organizations.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param realm: Keycloak realm
:type realm: str
"""
admin.change_current_realm(realm_org)
res = admin.get_organizations()
assert len(res) == 0
# Create
org_id = admin.create_organization(
payload={
"name": "test-org",
"enabled": True,
"domains": [{"name": "test.com", "verified": True}],
}
)
res = admin.get_organizations(query={"max": 2})
assert len(res) == 1
res = admin.get_organization(organization_id=org_id)
assert res["name"] == "test-org"
# Update
res["enabled"] = False
res["name"] = "test-org-update"
res = admin.update_organization(organization_id=org_id, payload=res)
assert res == {}
res = admin.get_organization(organization_id=org_id)
assert res["name"] == "test-org-update"
assert not res["enabled"]
# Get IdPs
res = admin.get_organizations_identity_providers(organization_id=org_id)
assert res == []
# Create IdP
res = admin.create_idp(payload={"alias": "test-idp", "providerId": "oidc"})
res = admin.add_identity_provider_to_organization(
organization_id=org_id, payload={"alias": "test-idp"}
)
assert res == ""
# Delete
res = admin.delete_organization(organization_id=org_id)
assert res == {}
assert len(admin.get_organizations()) == 0
def test_components(admin: KeycloakAdmin, realm: str) -> None:
"""
Test components.

Loading…
Cancel
Save