You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

419 lines
12 KiB

  1. # -*- coding: utf-8 -*-
  2. #
  3. # The MIT License (MIT)
  4. #
  5. # Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
  6. #
  7. # Permission is hereby granted, free of charge, to any person obtaining a copy of
  8. # this software and associated documentation files (the "Software"), to deal in
  9. # the Software without restriction, including without limitation the rights to
  10. # use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  11. # the Software, and to permit persons to whom the Software is furnished to do so,
  12. # subject to the following conditions:
  13. #
  14. # The above copyright notice and this permission notice shall be included in all
  15. # copies or substantial portions of the Software.
  16. #
  17. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  19. # FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  20. # COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  21. # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  22. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23. """Keycloak OpenID Connection Manager module.
  24. The module contains mainly the implementation of KeycloakOpenIDConnection class.
  25. This is an extension of the ConnectionManager class, and handles the automatic refresh
  26. of openid tokens when required.
  27. """
  28. from datetime import datetime, timedelta
  29. from .connection import ConnectionManager
  30. from .exceptions import KeycloakPostError
  31. from .keycloak_openid import KeycloakOpenID
  32. class KeycloakOpenIDConnection(ConnectionManager):
  33. """A class to help with OpenID connections which can auto refresh tokens.
  34. :param object: _description_
  35. :type object: _type_
  36. """
  37. _server_url = None
  38. _username = None
  39. _password = None
  40. _totp = None
  41. _realm_name = None
  42. _client_id = None
  43. _verify = None
  44. _client_secret_key = None
  45. _connection = None
  46. _custom_headers = None
  47. _user_realm_name = None
  48. _expires_at = None
  49. _keycloak_openid = None
  50. def __init__(
  51. self,
  52. server_url,
  53. username=None,
  54. password=None,
  55. token=None,
  56. totp=None,
  57. realm_name="master",
  58. client_id="admin-cli",
  59. verify=True,
  60. client_secret_key=None,
  61. custom_headers=None,
  62. user_realm_name=None,
  63. timeout=60,
  64. ):
  65. """Init method.
  66. :param server_url: Keycloak server url
  67. :type server_url: str
  68. :param username: admin username
  69. :type username: str
  70. :param password: admin password
  71. :type password: str
  72. :param token: access and refresh tokens
  73. :type token: dict
  74. :param totp: Time based OTP
  75. :type totp: str
  76. :param realm_name: realm name
  77. :type realm_name: str
  78. :param client_id: client id
  79. :type client_id: str
  80. :param verify: Boolean value to enable or disable certificate validation or a string containing a path to a CA bundle to use
  81. :type verify: Union[bool,str]
  82. :param client_secret_key: client secret key
  83. (optional, required only for access type confidential)
  84. :type client_secret_key: str
  85. :param custom_headers: dict of custom header to pass to each HTML request
  86. :type custom_headers: dict
  87. :param user_realm_name: The realm name of the user, if different from realm_name
  88. :type user_realm_name: str
  89. :param timeout: connection timeout in seconds
  90. :type timeout: int
  91. """
  92. # token is renewed when it hits 90% of its lifetime. This is to account for any possible
  93. # clock skew.
  94. self.token_lifetime_fraction = 0.9
  95. self.server_url = server_url
  96. self.username = username
  97. self.password = password
  98. self.token = token
  99. self.totp = totp
  100. self.realm_name = realm_name
  101. self.client_id = client_id
  102. self.verify = verify
  103. self.client_secret_key = client_secret_key
  104. self.user_realm_name = user_realm_name
  105. self.timeout = timeout
  106. if self.token is None:
  107. self.get_token()
  108. self.headers = (
  109. {
  110. "Authorization": "Bearer " + self.token.get("access_token"),
  111. "Content-Type": "application/json",
  112. }
  113. if self.token is not None
  114. else {}
  115. )
  116. self.custom_headers = custom_headers
  117. super().__init__(
  118. base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
  119. )
  120. @property
  121. def server_url(self):
  122. """Get server url.
  123. :returns: Keycloak server url
  124. :rtype: str
  125. """
  126. return self.base_url
  127. @server_url.setter
  128. def server_url(self, value):
  129. self.base_url = value
  130. @property
  131. def realm_name(self):
  132. """Get realm name.
  133. :returns: Realm name
  134. :rtype: str
  135. """
  136. return self._realm_name
  137. @realm_name.setter
  138. def realm_name(self, value):
  139. self._realm_name = value
  140. @property
  141. def client_id(self):
  142. """Get client id.
  143. :returns: Client id
  144. :rtype: str
  145. """
  146. return self._client_id
  147. @client_id.setter
  148. def client_id(self, value):
  149. self._client_id = value
  150. @property
  151. def client_secret_key(self):
  152. """Get client secret key.
  153. :returns: Client secret key
  154. :rtype: str
  155. """
  156. return self._client_secret_key
  157. @client_secret_key.setter
  158. def client_secret_key(self, value):
  159. self._client_secret_key = value
  160. @property
  161. def username(self):
  162. """Get username.
  163. :returns: Admin username
  164. :rtype: str
  165. """
  166. return self._username
  167. @username.setter
  168. def username(self, value):
  169. self._username = value
  170. @property
  171. def password(self):
  172. """Get password.
  173. :returns: Admin password
  174. :rtype: str
  175. """
  176. return self._password
  177. @password.setter
  178. def password(self, value):
  179. self._password = value
  180. @property
  181. def totp(self):
  182. """Get totp.
  183. :returns: TOTP
  184. :rtype: str
  185. """
  186. return self._totp
  187. @totp.setter
  188. def totp(self, value):
  189. self._totp = value
  190. @property
  191. def token(self):
  192. """Get token.
  193. :returns: Access and refresh token
  194. :rtype: dict
  195. """
  196. return self._token
  197. @token.setter
  198. def token(self, value):
  199. self._token = value
  200. self._expires_at = datetime.now() + timedelta(
  201. seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
  202. )
  203. @property
  204. def expires_at(self):
  205. """Get token expiry time.
  206. :returns: Datetime at which the current token will expire
  207. :rtype: datetime
  208. """
  209. return self._expires_at
  210. @property
  211. def user_realm_name(self):
  212. """Get user realm name.
  213. :returns: User realm name
  214. :rtype: str
  215. """
  216. return self._user_realm_name
  217. @user_realm_name.setter
  218. def user_realm_name(self, value):
  219. self._user_realm_name = value
  220. @property
  221. def custom_headers(self):
  222. """Get custom headers.
  223. :returns: Custom headers
  224. :rtype: dict
  225. """
  226. return self._custom_headers
  227. @custom_headers.setter
  228. def custom_headers(self, value):
  229. self._custom_headers = value
  230. if self.custom_headers is not None:
  231. # merge custom headers to main headers
  232. self.headers.update(self.custom_headers)
  233. @property
  234. def keycloak_openid(self) -> KeycloakOpenID:
  235. """Get the KeycloakOpenID object.
  236. The KeycloakOpenID is used to refresh tokens
  237. :returns: KeycloakOpenID
  238. :rtype: KeycloakOpenID
  239. """
  240. if self._keycloak_openid is None:
  241. if self.user_realm_name:
  242. token_realm_name = self.user_realm_name
  243. elif self.realm_name:
  244. token_realm_name = self.realm_name
  245. else:
  246. token_realm_name = "master"
  247. self._keycloak_openid = KeycloakOpenID(
  248. server_url=self.server_url,
  249. client_id=self.client_id,
  250. realm_name=token_realm_name,
  251. verify=self.verify,
  252. client_secret_key=self.client_secret_key,
  253. timeout=self.timeout,
  254. )
  255. return self._keycloak_openid
  256. def get_token(self):
  257. """Get admin token.
  258. The admin token is then set in the `token` attribute.
  259. """
  260. grant_type = []
  261. if self.client_secret_key:
  262. grant_type.append("client_credentials")
  263. elif self.username and self.password:
  264. grant_type.append("password")
  265. if grant_type:
  266. self.token = self.keycloak_openid.token(
  267. self.username, self.password, grant_type=grant_type, totp=self.totp
  268. )
  269. else:
  270. self.token = None
  271. def refresh_token(self):
  272. """Refresh the token.
  273. :raises KeycloakPostError: In case the refresh token request failed.
  274. """
  275. refresh_token = self.token.get("refresh_token", None) if self.token else None
  276. if refresh_token is None:
  277. self.get_token()
  278. else:
  279. try:
  280. self.token = self.keycloak_openid.refresh_token(refresh_token)
  281. except KeycloakPostError as e:
  282. list_errors = [
  283. b"Refresh token expired",
  284. b"Token is not active",
  285. b"Session not active",
  286. ]
  287. if e.response_code == 400 and any(err in e.response_body for err in list_errors):
  288. self.get_token()
  289. else:
  290. raise
  291. self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
  292. def _refresh_if_required(self):
  293. if datetime.now() >= self.expires_at:
  294. self.refresh_token()
  295. def raw_get(self, *args, **kwargs):
  296. """Call connection.raw_get.
  297. If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
  298. and try *get* once more.
  299. :param args: Additional arguments
  300. :type args: tuple
  301. :param kwargs: Additional keyword arguments
  302. :type kwargs: dict
  303. :returns: Response
  304. :rtype: Response
  305. """
  306. self._refresh_if_required()
  307. r = super().raw_get(*args, **kwargs)
  308. return r
  309. def raw_post(self, *args, **kwargs):
  310. """Call connection.raw_post.
  311. If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
  312. and try *post* once more.
  313. :param args: Additional arguments
  314. :type args: tuple
  315. :param kwargs: Additional keyword arguments
  316. :type kwargs: dict
  317. :returns: Response
  318. :rtype: Response
  319. """
  320. self._refresh_if_required()
  321. r = super().raw_post(*args, **kwargs)
  322. return r
  323. def raw_put(self, *args, **kwargs):
  324. """Call connection.raw_put.
  325. If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
  326. and try *put* once more.
  327. :param args: Additional arguments
  328. :type args: tuple
  329. :param kwargs: Additional keyword arguments
  330. :type kwargs: dict
  331. :returns: Response
  332. :rtype: Response
  333. """
  334. self._refresh_if_required()
  335. r = super().raw_put(*args, **kwargs)
  336. return r
  337. def raw_delete(self, *args, **kwargs):
  338. """Call connection.raw_delete.
  339. If auto_refresh is set for *delete* and *access_token* is expired,
  340. it will refresh the token and try *delete* once more.
  341. :param args: Additional arguments
  342. :type args: tuple
  343. :param kwargs: Additional keyword arguments
  344. :type kwargs: dict
  345. :returns: Response
  346. :rtype: Response
  347. """
  348. self._refresh_if_required()
  349. r = super().raw_delete(*args, **kwargs)
  350. return r