|
@ -229,7 +229,7 @@ class KeycloakOpenID: |
|
|
|
|
|
|
|
|
payload = self._add_secret_key(payload) |
|
|
payload = self._add_secret_key(payload) |
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
|
|
|
|
|
def refresh_token(self, refresh_token, grant_type=["refresh_token"]): |
|
|
def refresh_token(self, refresh_token, grant_type=["refresh_token"]): |
|
|
""" |
|
|
""" |
|
@ -252,7 +252,7 @@ class KeycloakOpenID: |
|
|
} |
|
|
} |
|
|
payload = self._add_secret_key(payload) |
|
|
payload = self._add_secret_key(payload) |
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
|
|
|
|
|
def exchange_token(self, token: str, client_id: str, audience: str, subject: str) -> dict: |
|
|
def exchange_token(self, token: str, client_id: str, audience: str, subject: str) -> dict: |
|
|
""" |
|
|
""" |
|
@ -276,7 +276,7 @@ class KeycloakOpenID: |
|
|
} |
|
|
} |
|
|
payload = self._add_secret_key(payload) |
|
|
payload = self._add_secret_key(payload) |
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path), data=payload) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
|
|
|
|
|
def userinfo(self, token): |
|
|
def userinfo(self, token): |
|
|
""" |
|
|
""" |
|
@ -288,12 +288,9 @@ class KeycloakOpenID: |
|
|
:param token: |
|
|
:param token: |
|
|
:return: |
|
|
:return: |
|
|
""" |
|
|
""" |
|
|
|
|
|
|
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
self.connection.add_param_headers("Authorization", "Bearer " + token) |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
|
|
|
|
|
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path)) |
|
|
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path)) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
def logout(self, refresh_token): |
|
|
def logout(self, refresh_token): |
|
@ -304,11 +301,9 @@ class KeycloakOpenID: |
|
|
""" |
|
|
""" |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
payload = {"client_id": self.client_id, "refresh_token": refresh_token} |
|
|
payload = {"client_id": self.client_id, "refresh_token": refresh_token} |
|
|
|
|
|
|
|
|
payload = self._add_secret_key(payload) |
|
|
payload = self._add_secret_key(payload) |
|
|
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload) |
|
|
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path), data=payload) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError, expected_codes=[204]) |
|
|
|
|
|
|
|
|
def certs(self): |
|
|
def certs(self): |
|
|
""" |
|
|
""" |
|
@ -367,7 +362,6 @@ class KeycloakOpenID: |
|
|
:return: |
|
|
:return: |
|
|
""" |
|
|
""" |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|
|
|
|
|
|
|
payload = {"client_id": self.client_id, "token": token} |
|
|
payload = {"client_id": self.client_id, "token": token} |
|
|
|
|
|
|
|
|
if token_type_hint == "requesting_party_token": |
|
|
if token_type_hint == "requesting_party_token": |
|
@ -380,8 +374,7 @@ class KeycloakOpenID: |
|
|
payload = self._add_secret_key(payload) |
|
|
payload = self._add_secret_key(payload) |
|
|
|
|
|
|
|
|
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload) |
|
|
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakGetError) |
|
|
|
|
|
|
|
|
return raise_error_from_response(data_raw, KeycloakPostError) |
|
|
|
|
|
|
|
|
def decode_token(self, token, key, algorithms=["RS256"], **kwargs): |
|
|
def decode_token(self, token, key, algorithms=["RS256"], **kwargs): |
|
|
""" |
|
|
""" |
|
@ -399,7 +392,6 @@ class KeycloakOpenID: |
|
|
:param algorithms: |
|
|
:param algorithms: |
|
|
:return: |
|
|
:return: |
|
|
""" |
|
|
""" |
|
|
|
|
|
|
|
|
return jwt.decode(token, key, algorithms=algorithms, audience=self.client_id, **kwargs) |
|
|
return jwt.decode(token, key, algorithms=algorithms, audience=self.client_id, **kwargs) |
|
|
|
|
|
|
|
|
def load_authorization_config(self, path): |
|
|
def load_authorization_config(self, path): |
|
@ -409,10 +401,10 @@ class KeycloakOpenID: |
|
|
:param path: settings file (json) |
|
|
:param path: settings file (json) |
|
|
:return: |
|
|
:return: |
|
|
""" |
|
|
""" |
|
|
authorization_file = open(path, "r") |
|
|
|
|
|
authorization_json = json.loads(authorization_file.read()) |
|
|
|
|
|
|
|
|
with open(path, "r") as fp: |
|
|
|
|
|
authorization_json = json.load(fp) |
|
|
|
|
|
|
|
|
self.authorization.load_config(authorization_json) |
|
|
self.authorization.load_config(authorization_json) |
|
|
authorization_file.close() |
|
|
|
|
|
|
|
|
|
|
|
def get_policies(self, token, method_token_info="introspect", **kwargs): |
|
|
def get_policies(self, token, method_token_info="introspect", **kwargs): |
|
|
""" |
|
|
""" |
|
@ -421,7 +413,6 @@ class KeycloakOpenID: |
|
|
:param token: user token |
|
|
:param token: user token |
|
|
:return: policies list |
|
|
:return: policies list |
|
|
""" |
|
|
""" |
|
|
|
|
|
|
|
|
if not self.authorization.policies: |
|
|
if not self.authorization.policies: |
|
|
raise KeycloakAuthorizationConfigError( |
|
|
raise KeycloakAuthorizationConfigError( |
|
|
"Keycloak settings not found. Load Authorization Keycloak settings." |
|
|
"Keycloak settings not found. Load Authorization Keycloak settings." |
|
@ -455,7 +446,6 @@ class KeycloakOpenID: |
|
|
:param kwargs: parameters for decode |
|
|
:param kwargs: parameters for decode |
|
|
:return: permissions list |
|
|
:return: permissions list |
|
|
""" |
|
|
""" |
|
|
|
|
|
|
|
|
if not self.authorization.policies: |
|
|
if not self.authorization.policies: |
|
|
raise KeycloakAuthorizationConfigError( |
|
|
raise KeycloakAuthorizationConfigError( |
|
|
"Keycloak settings not found. Load Authorization Keycloak settings." |
|
|
"Keycloak settings not found. Load Authorization Keycloak settings." |
|
@ -493,7 +483,6 @@ class KeycloakOpenID: |
|
|
:param permissions: list of uma permissions list(resource:scope) requested by the user |
|
|
:param permissions: list of uma permissions list(resource:scope) requested by the user |
|
|
:return: permissions list |
|
|
:return: permissions list |
|
|
""" |
|
|
""" |
|
|
|
|
|
|
|
|
permission = build_permission_param(permissions) |
|
|
permission = build_permission_param(permissions) |
|
|
|
|
|
|
|
|
params_path = {"realm-name": self.realm_name} |
|
|
params_path = {"realm-name": self.realm_name} |
|
|