Browse Source

Refactor: Address mock issues, mypy complaints, and pycodestyle issues

merge-requests/1/head
Drew Short 5 years ago
parent
commit
8c1d420c35
  1. 4
      server/corvus/api/authentication_api.py
  2. 6
      server/corvus/api/model.py
  3. 3
      server/corvus/db.py
  4. 2
      server/corvus/middleware/authentication_middleware.py
  5. 23
      server/corvus/service/patch_service.py
  6. 18
      server/corvus/service/transformation_service.py
  7. 2
      server/corvus/service/user_token_service.py
  8. 15
      server/corvus/service/validation_service.py
  9. 4
      server/tests/conftest.py
  10. 112
      server/tests/middleware/test_authentication_middleware.py
  11. 7
      server/tests/service/test_patch_service.py

4
server/corvus/api/authentication_api.py

@ -36,7 +36,7 @@ def login() -> APIResponse:
@AUTH_BLUEPRINT.route('/bump', methods=['POST'])
@return_json
@authentication_middleware.require(
required_auth=Auth.BASIC, required_role=Role.USER)
required_auth=Auth.TOKEN, required_role=Role.USER)
def login_bump() -> APIResponse:
"""
Update the user last seen timestamp.
@ -50,7 +50,7 @@ def login_bump() -> APIResponse:
@AUTH_BLUEPRINT.route('/logout', methods=['POST'])
@return_json
@authentication_middleware.require(
required_auth=Auth.BASIC, required_role=Role.USER)
required_auth=Auth.TOKEN, required_role=Role.USER)
def logout() -> APIResponse:
"""
Logout and delete a token.

6
server/corvus/api/model.py

@ -1,9 +1,9 @@
"""Model definitions for the api module."""
from typing import Any, List, Optional, Dict, Type
from typing import Any, List, Optional, Dict
from flask_sqlalchemy import Pagination
from corvus import db
from corvus.db import db_model
# pylint: disable=too-few-public-methods
@ -61,7 +61,7 @@ class APIPage(BaseAPIMessage):
page: int,
total_count: int,
last_page: int,
items: List[Type[db.Model]]) -> None:
items: List[db_model]) -> None:
"""Construct and APIPage."""
self.page = page
self.count = len(items)

3
server/corvus/db.py

@ -1,8 +1,11 @@
"""Database configuration and methods."""
from flask_migrate import upgrade
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.ext.declarative import DeclarativeMeta
db: SQLAlchemy = SQLAlchemy()
db_model: DeclarativeMeta = db.Model
def init_db() -> None:

2
server/corvus/middleware/authentication_middleware.py

@ -181,7 +181,7 @@ def require_role(required_role: Role) -> Callable:
def require(required_auth: Auth, required_role: Role) -> Callable:
"""
Decorate require Auth and Role
Decorate require Auth and Role.
:param required_auth:
:param required_role:

23
server/corvus/service/patch_service.py

@ -1,8 +1,8 @@
"""Patching support for db.Model objects."""
from typing import Type, Set, Optional, Any, Dict
from typing import Set, Optional, Any, Dict
from corvus import db
from corvus import errors
from corvus.db import db_model
from corvus.model import User
from corvus.service import transformation_service
from corvus.service import validation_service
@ -16,11 +16,11 @@ def get_patch_fields(patch_json: Dict[str, Any]) -> Set[str]:
def perform_patch(request_user: User,
original_model: Type[db.Model],
patch_model: Type[db.Model],
original_model: db_model,
patch_model: db_model,
model_attributes: Set[str],
patched_fields: Optional[Set[str]]) \
-> Type[db.Model]:
-> db_model:
"""
Patch changed attributes onto original model.
@ -38,7 +38,6 @@ def perform_patch(request_user: User,
if model_validation.success:
for attribute, value in change_set.items():
setattr(original_model, attribute, value)
db.session.commit()
else:
raise errors.ValidationError(
'Restricted attributes modified. Invalid Patch Set.')
@ -46,11 +45,11 @@ def perform_patch(request_user: User,
def versioning_aware_patch(request_user: User,
original_model: Type[db.Model],
patch_model: Type[db.Model],
original_model: db_model,
patch_model: db_model,
model_attributes: Set[str],
patched_fields: Optional[Set[str]]) \
-> Type[db.Model]:
-> db_model:
"""
Account for version numbers in the model.
@ -80,9 +79,9 @@ def versioning_aware_patch(request_user: User,
def patch(
request_user: User,
original_model: Type[db.Model],
patch_model: Type[db.Model],
patched_fields: Optional[Set[str]] = None) -> Type[db.Model]:
original_model: db_model,
patch_model: db_model,
patched_fields: Optional[Set[str]] = None) -> db_model:
"""
Patch the original model with the patch model data.

18
server/corvus/service/transformation_service.py

@ -4,7 +4,7 @@ import re
from typing import Dict, Callable, Any, List, Optional, Type
from corvus import errors
from corvus.db import db
from corvus.db import db_model
LOGGER = logging.getLogger(__name__)
@ -12,10 +12,9 @@ LOGGER = logging.getLogger(__name__)
class BaseTransformer:
"""Base Model serializer."""
type: Type[db.Model]
model: Type[db.Model]
type: db_model
def __init__(self, model: Type[db.Model]) -> None:
def __init__(self, model: db_model) -> None:
"""Initialize the base serializer."""
self.model = model
@ -54,7 +53,8 @@ class BaseTransformer:
if value is not None:
factory(self.model, value)
except KeyError as key_error:
LOGGER.error('Unable to transform field: %s %s', key, key_error)
LOGGER.error(
'Unable to transform field: %s %s', key, key_error)
return self.model
def _serializers(self) -> Dict[str, Callable[[], Any]]:
@ -62,7 +62,7 @@ class BaseTransformer:
raise NotImplementedError()
def _deserializers(
self) -> Dict[str, Callable[[db.Model, Any], None]]:
self) -> Dict[str, Callable[[db_model, Any], None]]:
"""Field definitions."""
raise NotImplementedError()
@ -88,7 +88,7 @@ def register_transformer(
return model_serializer
def serialize_model(model_obj: db.Model,
def serialize_model(model_obj: db_model,
options: Optional[List[str]] = None) -> Any:
"""Lookup a Model and hand off to the serializer."""
try:
@ -100,9 +100,9 @@ def serialize_model(model_obj: db.Model,
def deserialize_model(
model_type: Type[db.Model],
model_type: db_model,
json_model_object: dict,
options: Optional[List[str]] = None) -> db.Model:
options: Optional[List[str]] = None) -> db_model:
"""Lookup a Model and hand it off to the deserializer."""
try:
transformer = _model_transformers[model_type.__name__]

2
server/corvus/service/user_token_service.py

@ -217,7 +217,7 @@ def find_by_user_and_token(user: User, token: str) -> Optional[UserToken]:
def find_by_user(user: User, page: int, per_page: int = 20,
max_per_page: int = 100) -> Pagination:
"""
Find all tokens for a user
Find all tokens for a user.
:param user: The user to find tokens for
:param page: The page to request

15
server/corvus/service/validation_service.py

@ -4,13 +4,14 @@ from typing import Type, Dict, Callable, Any, Set, Optional, Tuple
from sqlalchemy import orm
from corvus import db, errors
from corvus import errors
from corvus.db import db_model
from corvus.model import User
_changable_attribute_names: Dict[str, Set[str]] = {}
def get_changable_attribute_names(model: Type[db.Model]) -> Set[str]:
def get_changable_attribute_names(model: db_model) -> Set[str]:
"""
Retrieve columns from a SQLAlchemy model.
@ -30,8 +31,8 @@ def get_changable_attribute_names(model: Type[db.Model]) -> Set[str]:
return model_attributes
def determine_change_set(original_model: Type[db.Model],
update_model: Type[db.Model],
def determine_change_set(original_model: db_model,
update_model: db_model,
model_attributes: Set[str],
options: Optional[Set[str]]) -> Dict[str, Any]:
"""
@ -88,9 +89,9 @@ def get_change_set_value(
class BaseValidator:
"""Base Model validator."""
type: Type[db.Model]
type: db_model
def __init__(self, request_user: User, model: Type[db.Model]) -> None:
def __init__(self, request_user: User, model: db_model) -> None:
"""Initialize the base validator."""
self.request_user = request_user
self._fields: Set[str] = get_changable_attribute_names(model)
@ -158,7 +159,7 @@ def register_validator(
def validate_model(request_user: User,
model_obj: db.Model,
model_obj: db_model,
change_set: Optional[Dict[str, Any]] = None) \
-> ModelValidationResult:
"""Lookup a Model and hand off to the validator."""

4
server/tests/conftest.py

@ -26,7 +26,7 @@ def add_test_user() -> Tuple[str, str]:
@pytest.fixture
def app() -> Generator[Flask]:
def app() -> Generator[Flask, Any, Any]:
"""Create and configure a new corvus_app instance for each test."""
# create a temporary file to isolate the database for each test
db_fd, db_path = tempfile.mkstemp(suffix='.db')
@ -120,7 +120,7 @@ class AuthActions(object):
credentials = base64.b64encode(
'{}:{}'.format(self.username, self.token).encode('utf8')) \
.decode('utf8').strip()
return 'Authorization', 'Token {}'.format(credentials)
return 'X-Auth-Token', '{}'.format(credentials)
@pytest.fixture

112
server/tests/middleware/test_authentication_middleware.py

@ -10,15 +10,15 @@ middleware_module = 'corvus.middleware.authentication_middleware'
@patch(middleware_module + '.authentication_service.is_valid_password')
@patch(middleware_module + '.user_service.find_by_name')
def test_authenticate_with_password_happy_path(
mock_user_service: MagicMock,
mock_authentication_service: MagicMock,
mock_find_by_name: MagicMock,
mock_is_valid_password: MagicMock,
mock_g: MagicMock):
mock_g.user = Mock()
mock_user_service.return_value = Mock()
mock_authentication_service.return_value = True
mock_find_by_name.return_value = Mock()
mock_is_valid_password.return_value = True
assert authenticate_with_password('test', 'test')
mock_user_service.assert_called_once()
mock_authentication_service.assert_called_once()
mock_find_by_name.assert_called_once()
mock_is_valid_password.assert_called_once()
mock_g.user.assert_not_called()
@ -26,15 +26,15 @@ def test_authenticate_with_password_happy_path(
@patch(middleware_module + '.authentication_service.is_valid_password')
@patch(middleware_module + '.user_service.find_by_name')
def test_authenticate_with_password_no_user(
mock_user_service: MagicMock,
mock_authentication_service: MagicMock,
mock_find_by_name: MagicMock,
mock_is_valid_password: MagicMock,
mock_g: MagicMock):
mock_g.user = Mock()
mock_user_service.return_value = None
mock_authentication_service.return_value = True
mock_find_by_name.return_value = None
mock_is_valid_password.return_value = True
assert not authenticate_with_password('test', 'test')
mock_user_service.assert_called_once()
mock_authentication_service.assert_not_called()
mock_find_by_name.assert_called_once()
mock_is_valid_password.assert_not_called()
mock_g.user.assert_not_called()
@ -42,91 +42,91 @@ def test_authenticate_with_password_no_user(
@patch(middleware_module + '.authentication_service.is_valid_password')
@patch(middleware_module + '.user_service.find_by_name')
def test_authenticate_with_password_invalid_password(
mock_user_service: MagicMock,
mock_authentication_service: MagicMock,
mock_find_by_name: MagicMock,
mock_is_valid_password: MagicMock,
mock_g: MagicMock):
mock_g.user = Mock()
mock_user_service.return_value = Mock()
mock_authentication_service.return_value = False
mock_find_by_name.return_value = Mock()
mock_is_valid_password.return_value = False
assert not authenticate_with_password('test', 'test')
mock_user_service.assert_called_once()
mock_authentication_service.assert_called_once()
mock_find_by_name.assert_called_once()
mock_is_valid_password.assert_called_once()
mock_g.user.assert_not_called()
@patch(middleware_module + '.g')
@patch(middleware_module + '.authentication_service.is_valid_token')
@patch(middleware_module + '.user_token_service.is_valid_token')
@patch(middleware_module + '.user_token_service.find_by_user_and_token')
@patch(middleware_module + '.user_service.find_by_name')
def test_authenticate_with_token_happy_path(
mock_user_service: MagicMock,
mock_user_token_service: MagicMock,
mock_authentication_service: MagicMock,
mock_find_by_name: MagicMock,
mock_find_by_user_and_token: MagicMock,
mock_is_valid_token: MagicMock,
mock_g: MagicMock):
mock_g.user = Mock()
mock_user_service.return_value = Mock()
mock_user_token_service.return_value = Mock()
mock_authentication_service.return_value = True
mock_find_by_name.return_value = Mock()
mock_find_by_user_and_token.return_value = Mock()
mock_is_valid_token.return_value = True
assert authenticate_with_token('test', 'test')
mock_user_service.assert_called_once()
mock_user_token_service.assert_called_once()
mock_authentication_service.assert_called_once()
mock_find_by_name.assert_called_once()
mock_find_by_user_and_token.assert_called_once()
mock_is_valid_token.assert_called_once()
mock_g.user.assert_not_called()
@patch(middleware_module + '.g')
@patch(middleware_module + '.authentication_service.is_valid_token')
@patch(middleware_module + '.user_token_service.is_valid_token')
@patch(middleware_module + '.user_token_service.find_by_user_and_token')
@patch(middleware_module + '.user_service.find_by_name')
def test_authenticate_with_token_no_user(
mock_user_service: MagicMock,
mock_user_token_service: MagicMock,
mock_authentication_service: MagicMock,
mock_find_by_name: MagicMock,
mock_find_by_user_and_token: MagicMock,
mock_is_valid_token: MagicMock,
mock_g: MagicMock):
mock_g.user = Mock()
mock_user_service.return_value = None
mock_find_by_name.return_value = None
assert not authenticate_with_token('test', 'test')
mock_user_service.assert_called_once()
mock_user_token_service.assert_not_called()
mock_authentication_service.assert_not_called()
mock_find_by_name.assert_called_once()
mock_find_by_user_and_token.assert_not_called()
mock_is_valid_token.assert_not_called()
mock_g.user.assert_not_called()
@patch(middleware_module + '.g')
@patch(middleware_module + '.authentication_service.is_valid_token')
@patch(middleware_module + '.user_token_service.is_valid_token')
@patch(middleware_module + '.user_token_service.find_by_user_and_token')
@patch(middleware_module + '.user_service.find_by_name')
def test_authenticate_with_token_no_user_token(
mock_user_service: MagicMock,
mock_user_token_service: MagicMock,
mock_authentication_service: MagicMock,
mock_find_by_name: MagicMock,
mock_find_by_user_and_token: MagicMock,
mock_is_valid_token: MagicMock,
mock_g: MagicMock):
mock_g.user = Mock()
mock_user_service.return_value = Mock()
mock_user_token_service.return_value = None
mock_authentication_service.return_value = False
mock_find_by_name.return_value = Mock()
mock_find_by_user_and_token.return_value = None
mock_is_valid_token.return_value = False
assert not authenticate_with_token('test', 'test')
mock_user_service.assert_called_once()
mock_user_token_service.assert_called_once()
mock_authentication_service.assert_called_once()
mock_find_by_name.assert_called_once()
mock_find_by_user_and_token.assert_called_once()
mock_is_valid_token.assert_called_once()
mock_g.user.assert_not_called()
@patch(middleware_module + '.g')
@patch(middleware_module + '.authentication_service.is_valid_token')
@patch(middleware_module + '.user_token_service.is_valid_token')
@patch(middleware_module + '.user_token_service.find_by_user_and_token')
@patch(middleware_module + '.user_service.find_by_name')
def test_authenticate_with_token_invalid_token(
mock_user_service: MagicMock,
mock_user_token_service: MagicMock,
mock_authentication_service: MagicMock,
mock_find_by_name: MagicMock,
mock_find_by_user_and_token: MagicMock,
mock_is_valid_token: MagicMock,
mock_g: MagicMock):
mock_g.user = Mock()
mock_user_service.return_value = Mock()
mock_user_token_service.return_value = Mock()
mock_authentication_service.return_value = False
mock_find_by_name.return_value = Mock()
mock_find_by_user_and_token.return_value = Mock()
mock_is_valid_token.return_value = False
assert not authenticate_with_token('test', 'test')
mock_user_service.assert_called_once()
mock_user_token_service.assert_called_once()
mock_authentication_service.assert_called_once()
mock_find_by_name.assert_called_once()
mock_find_by_user_and_token.assert_called_once()
mock_is_valid_token.assert_called_once()
mock_g.user.assert_not_called()

7
server/tests/service/test_patch_service.py

@ -7,12 +7,8 @@ from corvus import errors
from corvus.model import UserToken, User
from corvus.service import patch_service, role_service
service_module = 'corvus.service.patch_service'
@patch(service_module + '.db.session.commit')
def test_patch_models(
mock_db_session_commit: MagicMock):
def test_patch_models():
request_user = User()
request_user.role = role_service.Role.ADMIN
@ -29,7 +25,6 @@ def test_patch_models(
patched_user = patch_service.patch(request_user, user, user_patch)
assert patched_user.version > 1
assert patched_user.last_login_time == user_patch.last_login_time
mock_db_session_commit.assert_called_once()
def test_patch_of_different_types():

Loading…
Cancel
Save