Browse Source

fix: preserve original bearer

pull/566/head
Richard Nemeth 11 months ago
parent
commit
7da17707a5
No known key found for this signature in database GPG Key ID: 21C39470DF3DEC39
  1. 84
      src/keycloak/keycloak_openid.py

84
src/keycloak/keycloak_openid.py

@ -431,9 +431,15 @@ class KeycloakOpenID:
:returns: Userinfo object
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakGetError)
def logout(self, refresh_token):
@ -494,9 +500,15 @@ class KeycloakOpenID:
:returns: Entitlements
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
if data_raw.status_code == 404 or data_raw.status_code == 405:
return raise_error_from_response(data_raw, KeycloakDeprecationError)
@ -525,16 +537,26 @@ class KeycloakOpenID:
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "token": token}
bearer_changed = False
orig_bearer = None
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
bearer_changed = True
else:
raise KeycloakRPTNotFound("Can't found RPT.")
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
if bearer_changed:
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def decode_token(self, token, validate: bool = True, **kwargs):
@ -688,6 +710,7 @@ class KeycloakOpenID:
"audience": self.client_id,
}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
@ -697,6 +720,11 @@ class KeycloakOpenID:
if content_type
else self.connection.del_param_headers("Content-Type")
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def has_uma_access(self, token, permissions):
@ -752,11 +780,16 @@ class KeycloakOpenID:
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
data_raw = self.connection.raw_post(
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
def device(self):
@ -800,8 +833,8 @@ class KeycloakOpenID:
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
if "clientId" not in payload:
@ -810,6 +843,11 @@ class KeycloakOpenID:
data_raw = self.connection.raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPutError)
async def a_well_known(self):
@ -1019,9 +1057,15 @@ class KeycloakOpenID:
:returns: Userinfo object
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name}
data_raw = await self.connection.a_raw_get(URL_USERINFO.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakGetError)
async def a_logout(self, refresh_token):
@ -1082,9 +1126,15 @@ class KeycloakOpenID:
:returns: Entitlements
:rtype: dict
"""
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = await self.connection.a_raw_get(URL_ENTITLEMENT.format(**params_path))
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
if data_raw.status_code == 404 or data_raw.status_code == 405:
return raise_error_from_response(data_raw, KeycloakDeprecationError)
@ -1113,10 +1163,14 @@ class KeycloakOpenID:
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "token": token}
orig_bearer = None
bearer_changed = False
if token_type_hint == "requesting_party_token":
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
bearer_changed = True
else:
raise KeycloakRPTNotFound("Can't found RPT.")
@ -1125,6 +1179,12 @@ class KeycloakOpenID:
data_raw = await self.connection.a_raw_post(
URL_INTROSPECT.format(**params_path), data=payload
)
if bearer_changed:
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_decode_token(self, token, validate: bool = True, **kwargs):
@ -1278,6 +1338,7 @@ class KeycloakOpenID:
"audience": self.client_id,
}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
content_type = self.connection.headers.get("Content-Type")
self.connection.add_param_headers("Content-Type", "application/x-www-form-urlencoded")
@ -1287,6 +1348,11 @@ class KeycloakOpenID:
if content_type
else self.connection.del_param_headers("Content-Type")
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_has_uma_access(self, token, permissions):
@ -1342,11 +1408,16 @@ class KeycloakOpenID:
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
data_raw = await self.connection.a_raw_post(
URL_CLIENT_REGISTRATION.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPostError)
async def a_device(self):
@ -1390,8 +1461,8 @@ class KeycloakOpenID:
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "client-id": client_id}
orig_bearer = self.connection.headers.get("Authorization")
self.connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Content-Type", "application/json")
# Keycloak complains if the clientId is not set in the payload
if "clientId" not in payload:
@ -1400,4 +1471,9 @@ class KeycloakOpenID:
data_raw = await self.connection.a_raw_put(
URL_CLIENT_UPDATE.format(**params_path), data=json.dumps(payload)
)
(
self.connection.add_param_headers("Authorization", orig_bearer)
if orig_bearer is not None
else self.connection.del_param_headers("Authorization")
)
return raise_error_from_response(data_raw, KeycloakPutError)
Loading…
Cancel
Save