You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

420 lines
12 KiB

  1. # -*- coding: utf-8 -*-
  2. #
  3. # The MIT License (MIT)
  4. #
  5. # Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
  6. #
  7. # Permission is hereby granted, free of charge, to any person obtaining a copy of
  8. # this software and associated documentation files (the "Software"), to deal in
  9. # the Software without restriction, including without limitation the rights to
  10. # use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  11. # the Software, and to permit persons to whom the Software is furnished to do so,
  12. # subject to the following conditions:
  13. #
  14. # The above copyright notice and this permission notice shall be included in all
  15. # copies or substantial portions of the Software.
  16. #
  17. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  19. # FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  20. # COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  21. # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  22. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23. """Keycloak OpenID Connection Manager module.
  24. The module contains mainly the implementation of KeycloakOpenIDConnection class.
  25. This is an extension of the ConnectionManager class, and handles the automatic refresh
  26. of openid tokens when required.
  27. """
  28. from datetime import datetime, timedelta
  29. from .connection import ConnectionManager
  30. from .exceptions import KeycloakPostError
  31. from .keycloak_openid import KeycloakOpenID
  32. class KeycloakOpenIDConnection(ConnectionManager):
  33. """A class to help with OpenID connections which can auto refresh tokens.
  34. :param object: _description_
  35. :type object: _type_
  36. """
  37. _server_url = None
  38. _username = None
  39. _password = None
  40. _totp = None
  41. _realm_name = None
  42. _client_id = None
  43. _verify = None
  44. _client_secret_key = None
  45. _connection = None
  46. _custom_headers = None
  47. _user_realm_name = None
  48. _expires_at = None
  49. _keycloak_openid = None
  50. def __init__(
  51. self,
  52. server_url,
  53. username=None,
  54. password=None,
  55. token=None,
  56. totp=None,
  57. realm_name="master",
  58. client_id="admin-cli",
  59. verify=True,
  60. client_secret_key=None,
  61. custom_headers=None,
  62. user_realm_name=None,
  63. timeout=60,
  64. ):
  65. """Init method.
  66. :param server_url: Keycloak server url
  67. :type server_url: str
  68. :param username: admin username
  69. :type username: str
  70. :param password: admin password
  71. :type password: str
  72. :param token: access and refresh tokens
  73. :type token: dict
  74. :param totp: Time based OTP
  75. :type totp: str
  76. :param realm_name: realm name
  77. :type realm_name: str
  78. :param client_id: client id
  79. :type client_id: str
  80. :param verify: Boolean value to enable or disable certificate validation or a string
  81. containing a path to a CA bundle to use
  82. :type verify: Union[bool,str]
  83. :param client_secret_key: client secret key
  84. (optional, required only for access type confidential)
  85. :type client_secret_key: str
  86. :param custom_headers: dict of custom header to pass to each HTML request
  87. :type custom_headers: dict
  88. :param user_realm_name: The realm name of the user, if different from realm_name
  89. :type user_realm_name: str
  90. :param timeout: connection timeout in seconds
  91. :type timeout: int
  92. """
  93. # token is renewed when it hits 90% of its lifetime. This is to account for any possible
  94. # clock skew.
  95. self.token_lifetime_fraction = 0.9
  96. self.server_url = server_url
  97. self.username = username
  98. self.password = password
  99. self.token = token
  100. self.totp = totp
  101. self.realm_name = realm_name
  102. self.client_id = client_id
  103. self.verify = verify
  104. self.client_secret_key = client_secret_key
  105. self.user_realm_name = user_realm_name
  106. self.timeout = timeout
  107. if self.token is None:
  108. self.get_token()
  109. self.headers = (
  110. {
  111. "Authorization": "Bearer " + self.token.get("access_token"),
  112. "Content-Type": "application/json",
  113. }
  114. if self.token is not None
  115. else {}
  116. )
  117. self.custom_headers = custom_headers
  118. super().__init__(
  119. base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
  120. )
  121. @property
  122. def server_url(self):
  123. """Get server url.
  124. :returns: Keycloak server url
  125. :rtype: str
  126. """
  127. return self.base_url
  128. @server_url.setter
  129. def server_url(self, value):
  130. self.base_url = value
  131. @property
  132. def realm_name(self):
  133. """Get realm name.
  134. :returns: Realm name
  135. :rtype: str
  136. """
  137. return self._realm_name
  138. @realm_name.setter
  139. def realm_name(self, value):
  140. self._realm_name = value
  141. @property
  142. def client_id(self):
  143. """Get client id.
  144. :returns: Client id
  145. :rtype: str
  146. """
  147. return self._client_id
  148. @client_id.setter
  149. def client_id(self, value):
  150. self._client_id = value
  151. @property
  152. def client_secret_key(self):
  153. """Get client secret key.
  154. :returns: Client secret key
  155. :rtype: str
  156. """
  157. return self._client_secret_key
  158. @client_secret_key.setter
  159. def client_secret_key(self, value):
  160. self._client_secret_key = value
  161. @property
  162. def username(self):
  163. """Get username.
  164. :returns: Admin username
  165. :rtype: str
  166. """
  167. return self._username
  168. @username.setter
  169. def username(self, value):
  170. self._username = value
  171. @property
  172. def password(self):
  173. """Get password.
  174. :returns: Admin password
  175. :rtype: str
  176. """
  177. return self._password
  178. @password.setter
  179. def password(self, value):
  180. self._password = value
  181. @property
  182. def totp(self):
  183. """Get totp.
  184. :returns: TOTP
  185. :rtype: str
  186. """
  187. return self._totp
  188. @totp.setter
  189. def totp(self, value):
  190. self._totp = value
  191. @property
  192. def token(self):
  193. """Get token.
  194. :returns: Access and refresh token
  195. :rtype: dict
  196. """
  197. return self._token
  198. @token.setter
  199. def token(self, value):
  200. self._token = value
  201. self._expires_at = datetime.now() + timedelta(
  202. seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
  203. )
  204. @property
  205. def expires_at(self):
  206. """Get token expiry time.
  207. :returns: Datetime at which the current token will expire
  208. :rtype: datetime
  209. """
  210. return self._expires_at
  211. @property
  212. def user_realm_name(self):
  213. """Get user realm name.
  214. :returns: User realm name
  215. :rtype: str
  216. """
  217. return self._user_realm_name
  218. @user_realm_name.setter
  219. def user_realm_name(self, value):
  220. self._user_realm_name = value
  221. @property
  222. def custom_headers(self):
  223. """Get custom headers.
  224. :returns: Custom headers
  225. :rtype: dict
  226. """
  227. return self._custom_headers
  228. @custom_headers.setter
  229. def custom_headers(self, value):
  230. self._custom_headers = value
  231. if self.custom_headers is not None:
  232. # merge custom headers to main headers
  233. self.headers.update(self.custom_headers)
  234. @property
  235. def keycloak_openid(self) -> KeycloakOpenID:
  236. """Get the KeycloakOpenID object.
  237. The KeycloakOpenID is used to refresh tokens
  238. :returns: KeycloakOpenID
  239. :rtype: KeycloakOpenID
  240. """
  241. if self._keycloak_openid is None:
  242. if self.user_realm_name:
  243. token_realm_name = self.user_realm_name
  244. elif self.realm_name:
  245. token_realm_name = self.realm_name
  246. else:
  247. token_realm_name = "master"
  248. self._keycloak_openid = KeycloakOpenID(
  249. server_url=self.server_url,
  250. client_id=self.client_id,
  251. realm_name=token_realm_name,
  252. verify=self.verify,
  253. client_secret_key=self.client_secret_key,
  254. timeout=self.timeout,
  255. )
  256. return self._keycloak_openid
  257. def get_token(self):
  258. """Get admin token.
  259. The admin token is then set in the `token` attribute.
  260. """
  261. grant_type = []
  262. if self.username and self.password:
  263. grant_type.append("password")
  264. elif self.client_secret_key:
  265. grant_type.append("client_credentials")
  266. if grant_type:
  267. self.token = self.keycloak_openid.token(
  268. self.username, self.password, grant_type=grant_type, totp=self.totp
  269. )
  270. else:
  271. self.token = None
  272. def refresh_token(self):
  273. """Refresh the token.
  274. :raises KeycloakPostError: In case the refresh token request failed.
  275. """
  276. refresh_token = self.token.get("refresh_token", None) if self.token else None
  277. if refresh_token is None:
  278. self.get_token()
  279. else:
  280. try:
  281. self.token = self.keycloak_openid.refresh_token(refresh_token)
  282. except KeycloakPostError as e:
  283. list_errors = [
  284. b"Refresh token expired",
  285. b"Token is not active",
  286. b"Session not active",
  287. ]
  288. if e.response_code == 400 and any(err in e.response_body for err in list_errors):
  289. self.get_token()
  290. else:
  291. raise
  292. self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
  293. def _refresh_if_required(self):
  294. if datetime.now() >= self.expires_at:
  295. self.refresh_token()
  296. def raw_get(self, *args, **kwargs):
  297. """Call connection.raw_get.
  298. If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
  299. and try *get* once more.
  300. :param args: Additional arguments
  301. :type args: tuple
  302. :param kwargs: Additional keyword arguments
  303. :type kwargs: dict
  304. :returns: Response
  305. :rtype: Response
  306. """
  307. self._refresh_if_required()
  308. r = super().raw_get(*args, **kwargs)
  309. return r
  310. def raw_post(self, *args, **kwargs):
  311. """Call connection.raw_post.
  312. If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
  313. and try *post* once more.
  314. :param args: Additional arguments
  315. :type args: tuple
  316. :param kwargs: Additional keyword arguments
  317. :type kwargs: dict
  318. :returns: Response
  319. :rtype: Response
  320. """
  321. self._refresh_if_required()
  322. r = super().raw_post(*args, **kwargs)
  323. return r
  324. def raw_put(self, *args, **kwargs):
  325. """Call connection.raw_put.
  326. If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
  327. and try *put* once more.
  328. :param args: Additional arguments
  329. :type args: tuple
  330. :param kwargs: Additional keyword arguments
  331. :type kwargs: dict
  332. :returns: Response
  333. :rtype: Response
  334. """
  335. self._refresh_if_required()
  336. r = super().raw_put(*args, **kwargs)
  337. return r
  338. def raw_delete(self, *args, **kwargs):
  339. """Call connection.raw_delete.
  340. If auto_refresh is set for *delete* and *access_token* is expired,
  341. it will refresh the token and try *delete* once more.
  342. :param args: Additional arguments
  343. :type args: tuple
  344. :param kwargs: Additional keyword arguments
  345. :type kwargs: dict
  346. :returns: Response
  347. :rtype: Response
  348. """
  349. self._refresh_if_required()
  350. r = super().raw_delete(*args, **kwargs)
  351. return r