A multipurpose python flask API server and administration SPA
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

87 lines
2.6 KiB

  1. import base64
  2. from functools import wraps
  3. from typing import Optional, Callable, Any
  4. from flask import request, Response, g
  5. from werkzeug.datastructures import Authorization
  6. from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes
  7. from corvus.service import (
  8. authentication_service,
  9. user_service,
  10. user_token_service
  11. )
  12. def authenticate_with_password(name: str, password: str) -> bool:
  13. user = user_service.find_by_name(name)
  14. if user is not None \
  15. and authentication_service.is_valid_password(user, password):
  16. g.user = user
  17. return True
  18. return False
  19. def authenticate_with_token(name: str, token: str) -> bool:
  20. user = user_service.find_by_name(name)
  21. if user is not None:
  22. user_token = user_token_service.find_by_user_and_token(user, token)
  23. if user is not None \
  24. and authentication_service.is_valid_token(user_token):
  25. g.user = user
  26. g.user_token = user_token
  27. return True
  28. return False
  29. def authentication_failed(auth_type: str) -> Response:
  30. return Response(
  31. status=401,
  32. headers={
  33. 'WWW-Authenticate': '%s realm="Login Required"' % auth_type
  34. })
  35. def parse_token_authorization_header(
  36. header_value: str) -> Optional[Authorization]:
  37. if not header_value:
  38. return None
  39. value = wsgi_to_bytes(header_value)
  40. try:
  41. auth_type, auth_info = value.split(None, 1)
  42. auth_type = auth_type.lower()
  43. except ValueError:
  44. return None
  45. if auth_type == b'token':
  46. try:
  47. username, token = base64.b64decode(auth_info).split(b':', 1)
  48. except Exception:
  49. return None
  50. return Authorization('token', {'username': bytes_to_wsgi(username),
  51. 'password': bytes_to_wsgi(token)})
  52. return None
  53. def require_basic_auth(func: Callable) -> Callable:
  54. @wraps(func)
  55. def decorate(*args: list, **kwargs: dict) -> Any:
  56. auth = request.authorization
  57. if auth and authenticate_with_password(auth.username, auth.password):
  58. return func(*args, **kwargs)
  59. else:
  60. return authentication_failed('Basic')
  61. return decorate
  62. def require_token_auth(func: Callable) -> Callable:
  63. @wraps(func)
  64. def decorate(*args: list, **kwargs: dict) -> Any:
  65. token = parse_token_authorization_header(
  66. request.headers.get('Authorization', None))
  67. if token and authenticate_with_token(token.username, token.password):
  68. return func(*args, **kwargs)
  69. else:
  70. return authentication_failed('Bearer')
  71. return decorate