Browse Source

Refactor: Address some mypy complaints

merge-requests/1/head
Drew Short 5 years ago
parent
commit
2972b1d57c
  1. 6
      server/corvus/api/authentication_api.py
  2. 3
      server/corvus/errors.py
  3. 25
      server/corvus/middleware/authentication_middleware.py
  4. 1
      server/corvus/service/transformation_service.py
  5. 2
      server/corvus/service/user_token_service.py
  6. 4
      server/tests/conftest.py

6
server/corvus/api/authentication_api.py

@ -82,7 +82,7 @@ def get_tokens() -> APIResponse:
@return_json
@authentication_middleware.require(
required_auth=Auth.BASIC, required_role=Role.USER)
def create_token():
def create_token() -> APIResponse:
"""
Create a new token with optional parameters.
@ -104,7 +104,7 @@ def create_token():
@return_json
@authentication_middleware.require(
required_auth=Auth.BASIC, required_role=Role.USER)
def get_token(token: str):
def get_token(token: str) -> APIResponse:
"""
Retrieve a specific token for this user.
@ -121,7 +121,7 @@ def get_token(token: str):
@return_json
@authentication_middleware.require(
required_auth=Auth.BASIC, required_role=Role.USER)
def delete_token(token: str):
def delete_token(token: str) -> APIResponse:
"""
Delete a specific token for this user.

3
server/corvus/errors.py

@ -48,7 +48,8 @@ class ValidationError(ClientError):
def handle_corvus_404_error(exception: HTTPException) -> APIResponse:
"""Error handler for 404 Corvus errors."""
return APIResponse(
payload=APIMessage(False, 'Not Found'), status=exception.code)
payload=APIMessage(False, 'Not Found'),
status=exception.code if exception.code is not None else 404)
@return_json

25
server/corvus/middleware/authentication_middleware.py

@ -16,6 +16,7 @@ from corvus.service import (
user_token_service
)
from corvus.service.role_service import ROLES, Role
from corvus.service import transformation_service
class Auth(Enum):
@ -26,7 +27,8 @@ class Auth(Enum):
NONE = 'NONE'
def authenticate_with_password(name: str, password: str) -> bool:
def authenticate_with_password(
name: Optional[str], password: Optional[str]) -> bool:
"""
Authenticate a username and a password.
@ -34,6 +36,8 @@ def authenticate_with_password(name: str, password: str) -> bool:
:param password:
:return:
"""
if name is None or password is None:
return False
user = user_service.find_by_name(name)
if user is not None \
and authentication_service.is_valid_password(user, password):
@ -42,7 +46,7 @@ def authenticate_with_password(name: str, password: str) -> bool:
return False
def authenticate_with_token(name: str, token: str) -> bool:
def authenticate_with_token(name: Optional[str], token: Optional[str]) -> bool:
"""
Authenticate a username and a token.
@ -50,6 +54,8 @@ def authenticate_with_token(name: str, token: str) -> bool:
:param token:
:return:
"""
if name is None or token is None:
return False
user = user_service.find_by_name(name)
if user is not None:
user_token = user_token_service.find_by_user_and_token(user, token)
@ -87,7 +93,7 @@ def authorization_failed(required_role: str) -> Response:
def parse_token_header(
header_value: str) -> Optional[Authorization]:
header_value: Optional[str]) -> Optional[Authorization]:
"""
Parse the Authorization: Token header for the username and token.
@ -146,7 +152,7 @@ def require_token_auth(func: Callable) -> Callable:
:return:
"""
token = parse_token_header(
request.headers.get('X-Auth-Token', None))
request.headers.get('X-Auth-Token'))
if token and authenticate_with_token(token.username, token.password):
return func(*args, **kwargs)
return authentication_failed('Token')
@ -190,11 +196,12 @@ def require(required_auth: Auth, required_role: Role) -> Callable:
elif required_auth == Auth.TOKEN:
decorated = require_token_auth(decorated)
else:
Response(
response=APIMessage(
message="Unexpected Server Error",
success=False
),
return Response(
response=transformation_service.serialize_model(
APIMessage(
message="Unexpected Server Error",
success=False
)),
status=500)
return decorated(*args, **kwargs)
return decorate

1
server/corvus/service/transformation_service.py

@ -13,6 +13,7 @@ class BaseTransformer:
"""Base Model serializer."""
type: Type[db.Model]
model: Type[db.Model]
def __init__(self, model: Type[db.Model]) -> None:
"""Initialize the base serializer."""

2
server/corvus/service/user_token_service.py

@ -112,7 +112,7 @@ class UserTokenTransformer(BaseTransformer):
"""User token last usage time."""
model.last_usage_time = iso8601.parse_date(last_usage_time)
def serialize_is_valid(self):
def serialize_is_valid(self) -> bool:
"""User token is_valid computed value."""
return is_valid_token(self.model)

4
server/tests/conftest.py

@ -3,7 +3,7 @@ import os
import random
import string
import tempfile
from typing import Tuple, Any
from typing import Tuple, Any, Generator
import pytest
from flask import Flask
@ -26,7 +26,7 @@ def add_test_user() -> Tuple[str, str]:
@pytest.fixture
def app() -> Flask:
def app() -> Generator[Flask]:
"""Create and configure a new corvus_app instance for each test."""
# create a temporary file to isolate the database for each test
db_fd, db_path = tempfile.mkstemp(suffix='.db')

Loading…
Cancel
Save