A multipurpose python flask API server and administration SPA
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.5 KiB

  1. """Middleware to handle authentication."""
  2. import base64
  3. from functools import wraps
  4. from typing import Optional, Callable, Any
  5. import binascii
  6. from flask import request, Response, g
  7. from werkzeug.datastructures import Authorization
  8. from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes
  9. from corvus.service import (
  10. authentication_service,
  11. user_service,
  12. user_token_service
  13. )
  14. def authenticate_with_password(name: str, password: str) -> bool:
  15. """
  16. Authenticate a username and a password.
  17. :param name:
  18. :param password:
  19. :return:
  20. """
  21. user = user_service.find_by_name(name)
  22. if user is not None \
  23. and authentication_service.is_valid_password(user, password):
  24. g.user = user
  25. return True
  26. return False
  27. def authenticate_with_token(name: str, token: str) -> bool:
  28. """
  29. Authenticate a username and a token.
  30. :param name:
  31. :param token:
  32. :return:
  33. """
  34. user = user_service.find_by_name(name)
  35. if user is not None:
  36. user_token = user_token_service.find_by_user_and_token(user, token)
  37. if user is not None \
  38. and authentication_service.is_valid_token(user_token):
  39. g.user = user
  40. g.user_token = user_token
  41. return True
  42. return False
  43. def authentication_failed(auth_type: str) -> Response:
  44. """
  45. Return a correct response for failed authentication.
  46. :param auth_type:
  47. :return:
  48. """
  49. return Response(
  50. status=401,
  51. headers={
  52. 'WWW-Authenticate': '%s realm="Login Required"' % auth_type
  53. })
  54. def parse_token_header(
  55. header_value: str) -> Optional[Authorization]:
  56. """
  57. Parse the Authorization: Token header for the username and token.
  58. :param header_value:
  59. :return:
  60. """
  61. if not header_value:
  62. return None
  63. value = wsgi_to_bytes(header_value)
  64. try:
  65. auth_type, auth_info = value.split(None, 1)
  66. auth_type = auth_type.lower()
  67. except ValueError:
  68. return None
  69. if auth_type == b'token':
  70. try:
  71. username, token = base64.b64decode(auth_info).split(b':', 1)
  72. except binascii.Error:
  73. return None
  74. return Authorization('token', {'username': bytes_to_wsgi(username),
  75. 'password': bytes_to_wsgi(token)})
  76. return None
  77. def require_basic_auth(func: Callable) -> Callable:
  78. """
  79. Decorate require inline basic auth.
  80. :param func:
  81. :return:
  82. """
  83. @wraps(func)
  84. def decorate(*args: list, **kwargs: dict) -> Any:
  85. """
  86. Authenticate with a password.
  87. :param args:
  88. :param kwargs:
  89. :return:
  90. """
  91. auth = request.authorization
  92. if auth and authenticate_with_password(auth.username, auth.password):
  93. return func(*args, **kwargs)
  94. return authentication_failed('Basic')
  95. return decorate
  96. def require_token_auth(func: Callable) -> Callable:
  97. """
  98. Decorate require inline token auth.
  99. :param func:
  100. :return:
  101. """
  102. @wraps(func)
  103. def decorate(*args: list, **kwargs: dict) -> Any:
  104. """
  105. Authenticate with a token.
  106. :param args:
  107. :param kwargs:
  108. :return:
  109. """
  110. token = parse_token_header(
  111. request.headers.get('Authorization', None))
  112. if token and authenticate_with_token(token.username, token.password):
  113. return func(*args, **kwargs)
  114. return authentication_failed('Bearer')
  115. return decorate