An ebook/comic library service and web client
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

84 lines
2.5 KiB

import base64
from functools import wraps
from typing import Optional, Callable
from flask import request, Response, g
from werkzeug.datastructures import Authorization
from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes
from atheneum.service import (
authentication_service,
user_service,
user_token_service
)
def authenticate_with_password(name: str, password: str) -> bool:
user = user_service.find_by_name(name)
if user is not None \
and authentication_service.is_valid_password(user, password):
g.user = user
return True
return False
def authenticate_with_token(name: str, token: str) -> bool:
user = user_service.find_by_name(name)
user_token = user_token_service.find_by_user_and_token(user, token)
if user is not None \
and authentication_service.is_valid_token(user_token):
g.user = user
g.user_token = user_token
return True
return False
def authentication_failed(auth_type: str) -> Response:
return Response(
status=401,
headers={
'WWW-Authenticate': '%s realm="Login Required"' % auth_type
})
def parse_token_authorization_header(header_value) -> Optional[Authorization]:
if not header_value:
return
value = wsgi_to_bytes(header_value)
try:
auth_type, auth_info = value.split(None, 1)
auth_type = auth_type.lower()
except ValueError:
return
if auth_type == b'token':
try:
username, token = base64.b64decode(auth_info).split(b':', 1)
except Exception:
return
return Authorization('token', {'username': bytes_to_wsgi(username),
'password': bytes_to_wsgi(token)})
def require_basic_auth(func: Callable) -> Callable:
@wraps(func)
def decorate(*args, **kwargs):
auth = request.authorization
if auth and authenticate_with_password(auth.username, auth.password):
return func(*args, **kwargs)
else:
return authentication_failed('Basic')
return decorate
def require_token_auth(func: Callable) -> Callable:
@wraps(func)
def decorate(*args, **kwargs):
token = parse_token_authorization_header(
request.headers.get('Authorization', None))
if token and authenticate_with_token(token.username, token.password):
return func(*args, **kwargs)
else:
return authentication_failed('Bearer')
return decorate