|
|
@ -1,6 +1,6 @@ |
|
|
|
import base64 |
|
|
|
from functools import wraps |
|
|
|
from typing import Optional, Callable |
|
|
|
from typing import Optional, Callable, Any |
|
|
|
|
|
|
|
from flask import request, Response, g |
|
|
|
from werkzeug.datastructures import Authorization |
|
|
@ -24,12 +24,13 @@ def authenticate_with_password(name: str, password: str) -> bool: |
|
|
|
|
|
|
|
def authenticate_with_token(name: str, token: str) -> bool: |
|
|
|
user = user_service.find_by_name(name) |
|
|
|
user_token = user_token_service.find_by_user_and_token(user, token) |
|
|
|
if user is not None \ |
|
|
|
and authentication_service.is_valid_token(user_token): |
|
|
|
g.user = user |
|
|
|
g.user_token = user_token |
|
|
|
return True |
|
|
|
if user is not None: |
|
|
|
user_token = user_token_service.find_by_user_and_token(user, token) |
|
|
|
if user is not None \ |
|
|
|
and authentication_service.is_valid_token(user_token): |
|
|
|
g.user = user |
|
|
|
g.user_token = user_token |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
@ -41,27 +42,29 @@ def authentication_failed(auth_type: str) -> Response: |
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
def parse_token_authorization_header(header_value) -> Optional[Authorization]: |
|
|
|
def parse_token_authorization_header( |
|
|
|
header_value: str) -> Optional[Authorization]: |
|
|
|
if not header_value: |
|
|
|
return |
|
|
|
return None |
|
|
|
value = wsgi_to_bytes(header_value) |
|
|
|
try: |
|
|
|
auth_type, auth_info = value.split(None, 1) |
|
|
|
auth_type = auth_type.lower() |
|
|
|
except ValueError: |
|
|
|
return |
|
|
|
return None |
|
|
|
if auth_type == b'token': |
|
|
|
try: |
|
|
|
username, token = base64.b64decode(auth_info).split(b':', 1) |
|
|
|
except Exception: |
|
|
|
return |
|
|
|
return None |
|
|
|
return Authorization('token', {'username': bytes_to_wsgi(username), |
|
|
|
'password': bytes_to_wsgi(token)}) |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def require_basic_auth(func: Callable) -> Callable: |
|
|
|
@wraps(func) |
|
|
|
def decorate(*args, **kwargs): |
|
|
|
def decorate(*args: list, **kwargs: dict) -> Any: |
|
|
|
auth = request.authorization |
|
|
|
if auth and authenticate_with_password(auth.username, auth.password): |
|
|
|
return func(*args, **kwargs) |
|
|
@ -73,7 +76,7 @@ def require_basic_auth(func: Callable) -> Callable: |
|
|
|
|
|
|
|
def require_token_auth(func: Callable) -> Callable: |
|
|
|
@wraps(func) |
|
|
|
def decorate(*args, **kwargs): |
|
|
|
def decorate(*args: list, **kwargs: dict) -> Any: |
|
|
|
token = parse_token_authorization_header( |
|
|
|
request.headers.get('Authorization', None)) |
|
|
|
if token and authenticate_with_token(token.username, token.password): |
|
|
|