"""Service to handle user operations.""" import logging import random import string from datetime import datetime from typing import Optional, Dict, Callable, Any, Tuple from flask_sqlalchemy import Pagination from iso8601 import iso8601 from server import errors from server.db import db from server.model import User from server.service import role_service from server.service.authentication_service import validate_password_strength from server.service.transformation_service import ( BaseTransformer, register_transformer ) from server.service.validation_service import ( BaseValidator, register_validator ) from server.utility import authentication_utility LOGGER = logging.getLogger(__name__) @register_transformer class UserTransformer(BaseTransformer): """Serialize User model.""" type = User def _deserializers( self) -> Dict[str, Callable[[User, Any], None]]: """Define the fields and the accompanying deserializer factory.""" return { 'name': self.deserialize_name, 'creationTime': self.deserialize_creation_time, 'lastLoginTime': self.deserialize_last_login_time, 'version': self.deserialize_version, 'role': self.deserialize_role, } def _serializers(self) -> Dict[str, Callable[[], Any]]: """Define the fields and the accompanying serializer factory.""" return { 'name': self.serialize_name, 'creationTime': self.serialize_creation_time, 'lastLoginTime': self.serialize_last_login_time, 'version': self.serialize_version, 'role': self.serialize_role, } def serialize_name(self) -> str: """User name.""" return self.model.name @staticmethod def deserialize_name(model: User, name: str) -> None: """User name.""" model.name = name def serialize_creation_time(self) -> datetime: """User creation time.""" return self.model.creation_time @staticmethod def deserialize_creation_time( model: User, creation_time: datetime) -> None: """User creation time.""" model.creation_time = iso8601.parse_date(creation_time) def serialize_last_login_time(self) -> datetime: """User last login time.""" return self.model.last_login_time @staticmethod def deserialize_last_login_time( model: User, last_login_time: datetime) -> None: """User last login time.""" model.last_login_time = iso8601.parse_date(last_login_time) def serialize_version(self) -> int: """User version.""" return self.model.version @staticmethod def deserialize_version(model: User, version: int) -> None: """User version.""" model.version = version def serialize_role(self) -> str: """User role.""" return self.model.role.value @staticmethod def deserialize_role(model: User, role_value: str) -> None: """User role.""" model.role = role_service.Role(role_value) @register_validator class UserValidator(BaseValidator): """Validate User model.""" type = User def _validators( self) -> Dict[str, Callable[[Any], Tuple[bool, str]]]: return { 'id': self.no_validation, 'name': self.validate_name, 'role': self.validate_role, 'password_hash': self.no_validation, 'password_revision': self.no_validation, 'creation_time': self.no_validation, 'last_login_time': self.no_validation, 'version': self.validate_version } def validate_name(self, new_name: Any) -> Tuple[bool, str]: """ Name changes are only allowed to be performed by an Admin. :param new_name: :return: """ validation_result = (self.request_user.role == role_service.Role.ADMIN or new_name is None) if validation_result: return validation_result, '' return (validation_result, 'Names can only be changed by an administrator') def validate_role(self, new_role: Any) -> Tuple[bool, str]: """ Roles can only be increased to the level of the request_user. :param new_role: :return: """ acceptable_roles = role_service.ROLES.find_children_roles( self.request_user.role) role = new_role if new_role is not None else self.model.role if role in acceptable_roles: return True, '' return False, 'Role escalation is not permitted' def get_users( page: int, per_page: int = 20, max_per_page: int = 100) -> Pagination: """ Page through users in the system. :param page: The page to request :param per_page: The number per page :param max_per_page: :return: """ return User.query.paginate(page, per_page, True, max_per_page) def find_by_name(name: str) -> Optional[User]: """ Find a user by name. :param name: :return: """ return User.query.filter_by(name=name).first() def register( name: str, password: Optional[str], role: Optional[str], validate_password: bool = True) -> User: """ Register a new user. :param name: Desired user name. Must be unique and not already registered :param password: Password to be hashed and stored for the user :param role: Role to assign the user [ROLE_USER, ROLE_ADMIN] :param validate_password: Perform password validation :return: """ if validate_password and password is not None: validate_password_strength(password) password = password if password is not None else ''.join( random.choices(string.ascii_letters + string.digits, k=32)) role = role if role is not None else User.ROLE_USER if find_by_name(name=name) is not None: raise errors.ValidationError('User name is already taken.') pw_hash, pw_revision = authentication_utility.get_password_hash(password) new_user = User( name=name, role=role, password_hash=pw_hash, password_revision=pw_revision, creation_time=datetime.now(), version=0) db.session.add(new_user) db.session.commit() LOGGER.info('Registered new user: %s with role: %s', name, role) return new_user def delete(user: User) -> bool: """ Delete a user. :param user: :return: """ existing_user = db.session.delete(user) if existing_user is None: db.session.commit() return True return False def update_last_login_time(user: User) -> None: """ Bump the last login time for the user. :param user: :return: """ if user is not None: user.last_login_time = datetime.now() db.session.commit() def update_password(user: User, password: str) -> None: """ Change the user password. :param user: :param password: :return: """ pw_hash, pw_revision = authentication_utility.get_password_hash( password) user.password_hash = pw_hash user.password_revision = pw_revision db.session.commit()