A multipurpose python flask API server and administration SPA
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
5.2 KiB

  1. """Middleware to handle authentication."""
  2. import base64
  3. import binascii
  4. from enum import Enum
  5. from functools import wraps
  6. from typing import Optional, Callable, Any
  7. from flask import request, Response, g, json
  8. from werkzeug.datastructures import Authorization
  9. from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes
  10. from corvus.api.model import APIMessage
  11. from corvus.service import (
  12. authentication_service,
  13. user_service,
  14. user_token_service
  15. )
  16. from corvus.service.role_service import ROLES, Role
  17. class Auth(Enum):
  18. """Authentication scheme definitions."""
  19. TOKEN = 'TOKEN'
  20. BASIC = 'BASIC'
  21. NONE = 'NONE'
  22. def authenticate_with_password(name: str, password: str) -> bool:
  23. """
  24. Authenticate a username and a password.
  25. :param name:
  26. :param password:
  27. :return:
  28. """
  29. user = user_service.find_by_name(name)
  30. if user is not None \
  31. and authentication_service.is_valid_password(user, password):
  32. g.user = user
  33. return True
  34. return False
  35. def authenticate_with_token(name: str, token: str) -> bool:
  36. """
  37. Authenticate a username and a token.
  38. :param name:
  39. :param token:
  40. :return:
  41. """
  42. user = user_service.find_by_name(name)
  43. if user is not None:
  44. user_token = user_token_service.find_by_user_and_token(user, token)
  45. if user is not None \
  46. and user_token_service.is_valid_token(user_token):
  47. g.user = user
  48. g.user_token = user_token
  49. return True
  50. return False
  51. def authentication_failed(auth_type: str) -> Response:
  52. """
  53. Return a correct response for failed authentication.
  54. :param auth_type:
  55. :return:
  56. """
  57. return Response(
  58. status=401,
  59. headers={
  60. 'WWW-Authenticate': '%s realm="Login Required"' % auth_type
  61. })
  62. def authorization_failed(required_role: str) -> Response:
  63. """Return a correct response for failed authorization."""
  64. return Response(
  65. status=401,
  66. response=json.dumps({
  67. 'message': '{} role not present'.format(required_role)
  68. }),
  69. content_type='application/json'
  70. )
  71. def parse_token_header(
  72. header_value: str) -> Optional[Authorization]:
  73. """
  74. Parse the Authorization: Token header for the username and token.
  75. :param header_value:
  76. :return:
  77. """
  78. if not header_value:
  79. return None
  80. auth_info = wsgi_to_bytes(header_value)
  81. try:
  82. username, token = base64.b64decode(auth_info).split(b':', 1)
  83. except binascii.Error:
  84. return None
  85. return Authorization('token', {'username': bytes_to_wsgi(username),
  86. 'password': bytes_to_wsgi(token)})
  87. return None
  88. def require_basic_auth(func: Callable) -> Callable:
  89. """
  90. Decorate require inline basic auth.
  91. :param func:
  92. :return:
  93. """
  94. @wraps(func)
  95. def decorate(*args: list, **kwargs: dict) -> Any:
  96. """
  97. Authenticate with a password.
  98. :param args:
  99. :param kwargs:
  100. :return:
  101. """
  102. auth = request.authorization
  103. if auth and authenticate_with_password(auth.username, auth.password):
  104. return func(*args, **kwargs)
  105. return authentication_failed('Basic')
  106. return decorate
  107. def require_token_auth(func: Callable) -> Callable:
  108. """
  109. Decorate require inline token auth.
  110. :param func:
  111. :return:
  112. """
  113. @wraps(func)
  114. def decorate(*args: list, **kwargs: dict) -> Any:
  115. """
  116. Authenticate with a token.
  117. :param args:
  118. :param kwargs:
  119. :return:
  120. """
  121. token = parse_token_header(
  122. request.headers.get('X-Auth-Token', None))
  123. if token and authenticate_with_token(token.username, token.password):
  124. return func(*args, **kwargs)
  125. return authentication_failed('Token')
  126. return decorate
  127. def require_role(required_role: Role) -> Callable:
  128. """
  129. Decorate require a user role.
  130. :param required_role:
  131. :return:
  132. """
  133. def required_role_decorator(func: Callable) -> Callable:
  134. """Decorate the function."""
  135. @wraps(func)
  136. def decorate(*args: list, **kwargs: dict) -> Any:
  137. """Require a user role."""
  138. if g.user.role in ROLES.find_roles_in_hierarchy(required_role):
  139. return func(*args, **kwargs)
  140. return authorization_failed(required_role.value)
  141. return decorate
  142. return required_role_decorator
  143. def require(required_auth: Auth, required_role: Role) -> Callable:
  144. """
  145. Decorate require Auth and Role
  146. :param required_auth:
  147. :param required_role:
  148. :return:
  149. """
  150. def require_decorator(func: Callable) -> Callable:
  151. @wraps(func)
  152. def decorate(*args: list, **kwargs: dict) -> Any:
  153. f = require_role(required_role)(func)
  154. if required_auth == Auth.BASIC:
  155. f = require_basic_auth(f)
  156. elif required_auth == Auth.TOKEN:
  157. f = require_token_auth(f)
  158. else:
  159. Response(
  160. response=APIMessage(
  161. message="Unexpected Server Error",
  162. success=False
  163. ),
  164. status=500)
  165. return f(*args, **kwargs)
  166. return decorate
  167. return require_decorator