You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

420 lines
12 KiB

  1. # -*- coding: utf-8 -*-
  2. #
  3. # The MIT License (MIT)
  4. #
  5. # Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
  6. #
  7. # Permission is hereby granted, free of charge, to any person obtaining a copy of
  8. # this software and associated documentation files (the "Software"), to deal in
  9. # the Software without restriction, including without limitation the rights to
  10. # use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  11. # the Software, and to permit persons to whom the Software is furnished to do so,
  12. # subject to the following conditions:
  13. #
  14. # The above copyright notice and this permission notice shall be included in all
  15. # copies or substantial portions of the Software.
  16. #
  17. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  19. # FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  20. # COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  21. # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  22. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23. """Keycloak OpenID Connection Manager module.
  24. The module contains mainly the implementation of KeycloakOpenIDConnection class.
  25. This is an extension of the ConnectionManager class, and handles the automatic refresh
  26. of openid tokens when required.
  27. """
  28. from datetime import datetime, timedelta
  29. from .connection import ConnectionManager
  30. from .exceptions import KeycloakPostError
  31. from .keycloak_openid import KeycloakOpenID
  32. class KeycloakOpenIDConnection(ConnectionManager):
  33. """A class to help with OpenID connections which can auto refresh tokens.
  34. :param object: _description_
  35. :type object: _type_
  36. """
  37. _server_url = None
  38. _username = None
  39. _password = None
  40. _totp = None
  41. _realm_name = None
  42. _client_id = None
  43. _verify = None
  44. _client_secret_key = None
  45. _connection = None
  46. _custom_headers = None
  47. _user_realm_name = None
  48. _expires_at = None
  49. _keycloak_openid = None
  50. def __init__(
  51. self,
  52. server_url,
  53. username=None,
  54. password=None,
  55. token=None,
  56. totp=None,
  57. realm_name="master",
  58. client_id="admin-cli",
  59. verify=True,
  60. client_secret_key=None,
  61. custom_headers=None,
  62. user_realm_name=None,
  63. timeout=60,
  64. ):
  65. """Init method.
  66. :param server_url: Keycloak server url
  67. :type server_url: str
  68. :param username: admin username
  69. :type username: str
  70. :param password: admin password
  71. :type password: str
  72. :param token: access and refresh tokens
  73. :type token: dict
  74. :param totp: Time based OTP
  75. :type totp: str
  76. :param realm_name: realm name
  77. :type realm_name: str
  78. :param client_id: client id
  79. :type client_id: str
  80. :param verify: Boolean value to enable or disable certificate validation or a string
  81. containing a path to a CA bundle to use
  82. :type verify: Union[bool,str]
  83. :param client_secret_key: client secret key
  84. (optional, required only for access type confidential)
  85. :type client_secret_key: str
  86. :param custom_headers: dict of custom header to pass to each HTML request
  87. :type custom_headers: dict
  88. :param user_realm_name: The realm name of the user, if different from realm_name
  89. :type user_realm_name: str
  90. :param timeout: connection timeout in seconds
  91. :type timeout: int
  92. """
  93. # token is renewed when it hits 90% of its lifetime. This is to account for any possible
  94. # clock skew.
  95. self.token_lifetime_fraction = 0.9
  96. self.server_url = server_url
  97. self.username = username
  98. self.password = password
  99. self.token = token
  100. self.totp = totp
  101. self.realm_name = realm_name
  102. self.client_id = client_id
  103. self.verify = verify
  104. self.client_secret_key = client_secret_key
  105. self.user_realm_name = user_realm_name
  106. self.timeout = timeout
  107. self.headers = {}
  108. self.custom_headers = custom_headers
  109. if self.token is None:
  110. self.get_token()
  111. if self.token is not None:
  112. self.headers = {
  113. **self.headers,
  114. "Authorization": "Bearer " + self.token.get("access_token"),
  115. "Content-Type": "application/json",
  116. }
  117. super().__init__(
  118. base_url=self.server_url, headers=self.headers, timeout=60, verify=self.verify
  119. )
  120. @property
  121. def server_url(self):
  122. """Get server url.
  123. :returns: Keycloak server url
  124. :rtype: str
  125. """
  126. return self.base_url
  127. @server_url.setter
  128. def server_url(self, value):
  129. self.base_url = value
  130. @property
  131. def realm_name(self):
  132. """Get realm name.
  133. :returns: Realm name
  134. :rtype: str
  135. """
  136. return self._realm_name
  137. @realm_name.setter
  138. def realm_name(self, value):
  139. self._realm_name = value
  140. @property
  141. def client_id(self):
  142. """Get client id.
  143. :returns: Client id
  144. :rtype: str
  145. """
  146. return self._client_id
  147. @client_id.setter
  148. def client_id(self, value):
  149. self._client_id = value
  150. @property
  151. def client_secret_key(self):
  152. """Get client secret key.
  153. :returns: Client secret key
  154. :rtype: str
  155. """
  156. return self._client_secret_key
  157. @client_secret_key.setter
  158. def client_secret_key(self, value):
  159. self._client_secret_key = value
  160. @property
  161. def username(self):
  162. """Get username.
  163. :returns: Admin username
  164. :rtype: str
  165. """
  166. return self._username
  167. @username.setter
  168. def username(self, value):
  169. self._username = value
  170. @property
  171. def password(self):
  172. """Get password.
  173. :returns: Admin password
  174. :rtype: str
  175. """
  176. return self._password
  177. @password.setter
  178. def password(self, value):
  179. self._password = value
  180. @property
  181. def totp(self):
  182. """Get totp.
  183. :returns: TOTP
  184. :rtype: str
  185. """
  186. return self._totp
  187. @totp.setter
  188. def totp(self, value):
  189. self._totp = value
  190. @property
  191. def token(self):
  192. """Get token.
  193. :returns: Access and refresh token
  194. :rtype: dict
  195. """
  196. return self._token
  197. @token.setter
  198. def token(self, value):
  199. self._token = value
  200. self._expires_at = datetime.now() + timedelta(
  201. seconds=int(self.token_lifetime_fraction * self.token["expires_in"] if value else 0)
  202. )
  203. @property
  204. def expires_at(self):
  205. """Get token expiry time.
  206. :returns: Datetime at which the current token will expire
  207. :rtype: datetime
  208. """
  209. return self._expires_at
  210. @property
  211. def user_realm_name(self):
  212. """Get user realm name.
  213. :returns: User realm name
  214. :rtype: str
  215. """
  216. return self._user_realm_name
  217. @user_realm_name.setter
  218. def user_realm_name(self, value):
  219. self._user_realm_name = value
  220. @property
  221. def custom_headers(self):
  222. """Get custom headers.
  223. :returns: Custom headers
  224. :rtype: dict
  225. """
  226. return self._custom_headers
  227. @custom_headers.setter
  228. def custom_headers(self, value):
  229. self._custom_headers = value
  230. if self.custom_headers is not None:
  231. # merge custom headers to main headers
  232. self.headers.update(self.custom_headers)
  233. @property
  234. def keycloak_openid(self) -> KeycloakOpenID:
  235. """Get the KeycloakOpenID object.
  236. The KeycloakOpenID is used to refresh tokens
  237. :returns: KeycloakOpenID
  238. :rtype: KeycloakOpenID
  239. """
  240. if self._keycloak_openid is None:
  241. if self.user_realm_name:
  242. token_realm_name = self.user_realm_name
  243. elif self.realm_name:
  244. token_realm_name = self.realm_name
  245. else:
  246. token_realm_name = "master"
  247. self._keycloak_openid = KeycloakOpenID(
  248. server_url=self.server_url,
  249. client_id=self.client_id,
  250. realm_name=token_realm_name,
  251. verify=self.verify,
  252. client_secret_key=self.client_secret_key,
  253. timeout=self.timeout,
  254. custom_headers=self.custom_headers,
  255. )
  256. return self._keycloak_openid
  257. def get_token(self):
  258. """Get admin token.
  259. The admin token is then set in the `token` attribute.
  260. """
  261. grant_type = []
  262. if self.username and self.password:
  263. grant_type.append("password")
  264. elif self.client_secret_key:
  265. grant_type.append("client_credentials")
  266. if grant_type:
  267. self.token = self.keycloak_openid.token(
  268. self.username, self.password, grant_type=grant_type, totp=self.totp
  269. )
  270. else:
  271. self.token = None
  272. def refresh_token(self):
  273. """Refresh the token.
  274. :raises KeycloakPostError: In case the refresh token request failed.
  275. """
  276. refresh_token = self.token.get("refresh_token", None) if self.token else None
  277. if refresh_token is None:
  278. self.get_token()
  279. else:
  280. try:
  281. self.token = self.keycloak_openid.refresh_token(refresh_token)
  282. except KeycloakPostError as e:
  283. list_errors = [
  284. b"Refresh token expired",
  285. b"Token is not active",
  286. b"Session not active",
  287. ]
  288. if e.response_code == 400 and any(err in e.response_body for err in list_errors):
  289. self.get_token()
  290. else:
  291. raise
  292. self.add_param_headers("Authorization", "Bearer " + self.token.get("access_token"))
  293. def _refresh_if_required(self):
  294. if datetime.now() >= self.expires_at:
  295. self.refresh_token()
  296. def raw_get(self, *args, **kwargs):
  297. """Call connection.raw_get.
  298. If auto_refresh is set for *get* and *access_token* is expired, it will refresh the token
  299. and try *get* once more.
  300. :param args: Additional arguments
  301. :type args: tuple
  302. :param kwargs: Additional keyword arguments
  303. :type kwargs: dict
  304. :returns: Response
  305. :rtype: Response
  306. """
  307. self._refresh_if_required()
  308. r = super().raw_get(*args, **kwargs)
  309. return r
  310. def raw_post(self, *args, **kwargs):
  311. """Call connection.raw_post.
  312. If auto_refresh is set for *post* and *access_token* is expired, it will refresh the token
  313. and try *post* once more.
  314. :param args: Additional arguments
  315. :type args: tuple
  316. :param kwargs: Additional keyword arguments
  317. :type kwargs: dict
  318. :returns: Response
  319. :rtype: Response
  320. """
  321. self._refresh_if_required()
  322. r = super().raw_post(*args, **kwargs)
  323. return r
  324. def raw_put(self, *args, **kwargs):
  325. """Call connection.raw_put.
  326. If auto_refresh is set for *put* and *access_token* is expired, it will refresh the token
  327. and try *put* once more.
  328. :param args: Additional arguments
  329. :type args: tuple
  330. :param kwargs: Additional keyword arguments
  331. :type kwargs: dict
  332. :returns: Response
  333. :rtype: Response
  334. """
  335. self._refresh_if_required()
  336. r = super().raw_put(*args, **kwargs)
  337. return r
  338. def raw_delete(self, *args, **kwargs):
  339. """Call connection.raw_delete.
  340. If auto_refresh is set for *delete* and *access_token* is expired,
  341. it will refresh the token and try *delete* once more.
  342. :param args: Additional arguments
  343. :type args: tuple
  344. :param kwargs: Additional keyword arguments
  345. :type kwargs: dict
  346. :returns: Response
  347. :rtype: Response
  348. """
  349. self._refresh_if_required()
  350. r = super().raw_delete(*args, **kwargs)
  351. return r