Browse Source

Merge pull request #406 from JeronimoMendes/master

feat: init KeycloakAdmin with token
pull/409/head v2.10.0
Richard Nemeth 2 years ago
committed by GitHub
parent
commit
9f16441fa5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 40
      src/keycloak/keycloak_admin.py
  2. 9
      tests/test_keycloak_admin.py

40
src/keycloak/keycloak_admin.py

@ -54,6 +54,8 @@ class KeycloakAdmin:
:type username: str
:param password: admin password
:type password: str
:param token: access and refresh tokens
:type token: dict
:param totp: Time based OTP
:type totp: str
:param realm_name: realm name
@ -88,7 +90,6 @@ class KeycloakAdmin:
_client_secret_key = None
_auto_refresh_token = None
_connection = None
_token = None
_custom_headers = None
_user_realm_name = None
@ -97,6 +98,7 @@ class KeycloakAdmin:
server_url,
username=None,
password=None,
token=None,
totp=None,
realm_name="master",
client_id="admin-cli",
@ -115,6 +117,8 @@ class KeycloakAdmin:
:type username: str
:param password: admin password
:type password: str
:param token: access and refresh tokens
:type token: dict
:param totp: Time based OTP
:type totp: str
:param realm_name: realm name
@ -139,6 +143,7 @@ class KeycloakAdmin:
self.server_url = server_url
self.username = username
self.password = password
self.token = token
self.totp = totp
self.realm_name = realm_name
self.client_id = client_id
@ -149,9 +154,26 @@ class KeycloakAdmin:
self.custom_headers = custom_headers
self.timeout = timeout
# Get token Admin
if self.token is None:
self.get_token()
headers = (
{
"Authorization": "Bearer " + self.token.get("access_token"),
"Content-Type": "application/json",
}
if self.token is not None
else {}
)
if self.custom_headers is not None:
# merge custom headers to main headers
headers.update(self.custom_headers)
self.connection = ConnectionManager(
base_url=self.server_url, headers=headers, timeout=60, verify=self.verify
)
@property
def server_url(self):
"""Get server url.
@ -3372,22 +3394,8 @@ class KeycloakAdmin:
self.token = self.keycloak_openid.token(
self.username, self.password, grant_type=grant_type, totp=self.totp
)
headers = {
"Authorization": "Bearer " + self.token.get("access_token"),
"Content-Type": "application/json",
}
else:
self.token = None
headers = {}
if self.custom_headers is not None:
# merge custom headers to main headers
headers.update(self.custom_headers)
self.connection = ConnectionManager(
base_url=self.server_url, headers=headers, timeout=60, verify=self.verify
)
def refresh_token(self):
"""Refresh the token.

9
tests/test_keycloak_admin.py

@ -90,6 +90,15 @@ def test_keycloak_admin_init(env):
)
assert admin.token
token = admin.token
admin = KeycloakAdmin(
server_url=f"http://{env.KEYCLOAK_HOST}:{env.KEYCLOAK_PORT}",
token=token,
realm_name=None,
user_realm_name=None,
)
assert admin.token == token
admin.create_realm(payload={"realm": "authz", "enabled": True})
admin.realm_name = "authz"
admin.create_client(

Loading…
Cancel
Save