You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
208 lines
5.6 KiB
208 lines
5.6 KiB
"""Middleware to handle authentication."""
|
|
import base64
|
|
import binascii
|
|
from enum import Enum
|
|
from functools import wraps
|
|
from typing import Optional, Callable, Any
|
|
|
|
from flask import request, Response, g, json
|
|
from werkzeug.datastructures import Authorization
|
|
from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes
|
|
|
|
from corvus.api.model import APIMessage
|
|
from corvus.service import (
|
|
authentication_service,
|
|
user_service,
|
|
user_token_service
|
|
)
|
|
from corvus.service.role_service import ROLES, Role
|
|
from corvus.service import transformation_service
|
|
|
|
|
|
class Auth(Enum):
|
|
"""Authentication scheme definitions."""
|
|
|
|
TOKEN = 'TOKEN'
|
|
BASIC = 'BASIC'
|
|
NONE = 'NONE'
|
|
|
|
|
|
def authenticate_with_password(
|
|
name: Optional[str], password: Optional[str]) -> bool:
|
|
"""
|
|
Authenticate a username and a password.
|
|
|
|
:param name:
|
|
:param password:
|
|
:return:
|
|
"""
|
|
if name is None or password is None:
|
|
return False
|
|
user = user_service.find_by_name(name)
|
|
if user is not None \
|
|
and authentication_service.is_valid_password(user, password):
|
|
g.user = user
|
|
return True
|
|
return False
|
|
|
|
|
|
def authenticate_with_token(name: Optional[str], token: Optional[str]) -> bool:
|
|
"""
|
|
Authenticate a username and a token.
|
|
|
|
:param name:
|
|
:param token:
|
|
:return:
|
|
"""
|
|
if name is None or token is None:
|
|
return False
|
|
user = user_service.find_by_name(name)
|
|
if user is not None:
|
|
user_token = user_token_service.find_by_user_and_token(user, token)
|
|
if user is not None \
|
|
and user_token_service.is_valid_token(user_token):
|
|
g.user = user
|
|
g.user_token = user_token
|
|
return True
|
|
return False
|
|
|
|
|
|
def authentication_failed(auth_type: str) -> Response:
|
|
"""
|
|
Return a correct response for failed authentication.
|
|
|
|
:param auth_type:
|
|
:return:
|
|
"""
|
|
return Response(
|
|
status=401,
|
|
headers={
|
|
'WWW-Authenticate': '%s realm="Login Required"' % auth_type
|
|
})
|
|
|
|
|
|
def authorization_failed(required_role: str) -> Response:
|
|
"""Return a correct response for failed authorization."""
|
|
return Response(
|
|
status=401,
|
|
response=json.dumps({
|
|
'message': '{} role not present'.format(required_role)
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
|
|
def parse_token_header(
|
|
header_value: Optional[str]) -> Optional[Authorization]:
|
|
"""
|
|
Parse the Authorization: Token header for the username and token.
|
|
|
|
:param header_value:
|
|
:return:
|
|
"""
|
|
if not header_value:
|
|
return None
|
|
auth_info = wsgi_to_bytes(header_value)
|
|
try:
|
|
username, token = base64.b64decode(auth_info).split(b':', 1)
|
|
except binascii.Error:
|
|
return None
|
|
return Authorization('token', {'username': bytes_to_wsgi(username),
|
|
'password': bytes_to_wsgi(token)})
|
|
|
|
|
|
def require_basic_auth(func: Callable) -> Callable:
|
|
"""
|
|
Decorate require inline basic auth.
|
|
|
|
:param func:
|
|
:return:
|
|
"""
|
|
@wraps(func)
|
|
def decorate(*args: list, **kwargs: dict) -> Any:
|
|
"""
|
|
Authenticate with a password.
|
|
|
|
:param args:
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
auth = request.authorization
|
|
if auth and authenticate_with_password(auth.username, auth.password):
|
|
return func(*args, **kwargs)
|
|
return authentication_failed('Basic')
|
|
|
|
return decorate
|
|
|
|
|
|
def require_token_auth(func: Callable) -> Callable:
|
|
"""
|
|
Decorate require inline token auth.
|
|
|
|
:param func:
|
|
:return:
|
|
"""
|
|
@wraps(func)
|
|
def decorate(*args: list, **kwargs: dict) -> Any:
|
|
"""
|
|
Authenticate with a token.
|
|
|
|
:param args:
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
token = parse_token_header(
|
|
request.headers.get('X-Auth-Token'))
|
|
if token and authenticate_with_token(token.username, token.password):
|
|
return func(*args, **kwargs)
|
|
return authentication_failed('Token')
|
|
|
|
return decorate
|
|
|
|
|
|
def require_role(required_role: Role) -> Callable:
|
|
"""
|
|
Decorate require a user role.
|
|
|
|
:param required_role:
|
|
:return:
|
|
"""
|
|
def required_role_decorator(func: Callable) -> Callable:
|
|
"""Decorate the function."""
|
|
@wraps(func)
|
|
def decorate(*args: list, **kwargs: dict) -> Any:
|
|
"""Require a user role."""
|
|
if g.user.role in ROLES.find_roles_in_hierarchy(required_role):
|
|
return func(*args, **kwargs)
|
|
return authorization_failed(required_role.value)
|
|
return decorate
|
|
return required_role_decorator
|
|
|
|
|
|
def require(required_auth: Auth, required_role: Role) -> Callable:
|
|
"""
|
|
Decorate require Auth and Role.
|
|
|
|
:param required_auth:
|
|
:param required_role:
|
|
:return:
|
|
"""
|
|
def require_decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def decorate(*args: list, **kwargs: dict) -> Any:
|
|
decorated = require_role(required_role)(func)
|
|
if required_auth == Auth.BASIC:
|
|
decorated = require_basic_auth(decorated)
|
|
elif required_auth == Auth.TOKEN:
|
|
decorated = require_token_auth(decorated)
|
|
else:
|
|
return Response(
|
|
response=transformation_service.serialize_model(
|
|
APIMessage(
|
|
message="Unexpected Server Error",
|
|
success=False
|
|
)),
|
|
status=500)
|
|
return decorated(*args, **kwargs)
|
|
return decorate
|
|
return require_decorator
|