"""Service to handle user_token operations.""" import uuid from datetime import datetime from typing import Optional, Dict, Callable, Any from flask_sqlalchemy import Pagination from iso8601 import iso8601, ParseError from corvus.db import db from corvus.model import User, UserToken from corvus.service.transformation_service import ( BaseTransformer, register_transformer ) from corvus import errors @register_transformer class UserTokenTransformer(BaseTransformer): """Serialize User model.""" type = UserToken def _deserializers( self) -> Dict[str, Callable[[UserToken, Any], None]]: """Define the fields and the accompanying serializer factory.""" return { 'token': self.deserialize_token, 'note': self.deserialize_note, 'enabled': self.deserialize_enabled, 'expirationTime': self.deserialize_expiration_time, 'creationTime': self.deserialize_creation_time, 'lastUsageTime': self.deserialize_last_usage_time, 'version': self.deserialize_version } def _serializers(self) -> Dict[str, Callable[[], Any]]: """Define the fields and the accompanying serializer factory.""" return { 'token': self.serialize_token, 'note': self.serialize_note, 'enabled': self.serialize_enabled, 'expirationTime': self.serialize_expiration_time, 'creationTime': self.serialize_creation_time, 'lastUsageTime': self.serialize_last_usage_time, 'isValid': self.serialize_is_valid, 'version': self.serialize_version } def serialize_token(self) -> str: """User token.""" return self.model.token @staticmethod def deserialize_token(model: UserToken, token: str) -> None: """User token.""" model.token = token def serialize_note(self) -> str: """User token note.""" return self.model.note @staticmethod def deserialize_note(model: UserToken, note: str) -> None: """User token note.""" model.note = note def serialize_enabled(self) -> bool: """User token enabled.""" return self.model.enabled @staticmethod def deserialize_enabled(model: UserToken, enabled: bool) -> None: """User token enabled.""" model.enabled = enabled def serialize_expiration_time(self) -> datetime: """User token expiration time.""" return self.model.expiration_time @staticmethod def deserialize_expiration_time( model: UserToken, expiration_time: datetime) -> None: """User token expiration time.""" try: model.expiration_time = iso8601.parse_date(expiration_time) except ParseError: raise errors.ValidationError( 'Cannot parse datetime from %s' % expiration_time) def serialize_creation_time(self) -> datetime: """User token creation time.""" return self.model.creation_time @staticmethod def deserialize_creation_time( model: UserToken, creation_time: datetime) -> None: """User token creation time.""" try: model.creation_time = iso8601.parse_date(creation_time) except ParseError: raise errors.ValidationError( 'Cannot parse datetime from %s' % creation_time) def serialize_last_usage_time(self) -> datetime: """User token last usage time.""" return self.model.last_usage_time @staticmethod def deserialize_last_usage_time( model: UserToken, last_usage_time: datetime) -> None: """User token last usage time.""" model.last_usage_time = iso8601.parse_date(last_usage_time) def serialize_is_valid(self): """User token is_valid computed value.""" return is_valid_token(self.model) def serialize_version(self) -> int: """User token version.""" return self.model.version @staticmethod def deserialize_version(model: UserToken, version: int) -> None: """User token version.""" model.version = version def generate_token() -> uuid.UUID: """ Generate a unique token. :return: """ return uuid.uuid4() def create( user: User, note: Optional[str] = None, enabled: bool = True, expiration_time: Optional[datetime] = None) -> UserToken: """ Create and save a UserToken. :param user: The User object to bind the token to :param note: An optional field to store additional information about a token :param enabled: A boolean to indicate whether a token can be considered eligible for authentication :param expiration_time: An optional argument to determine when the token becomes invalid as a means of authentication. Defaults to None, which means no expiration :return: """ token = generate_token() user_token = UserToken( user_id=user.id, token=token.__str__(), note=note, enabled=enabled, creation_time=datetime.now(), expiration_time=expiration_time, version=0) db.session.add(user_token) db.session.commit() return user_token def is_valid_token(user_token: Optional[UserToken]) -> bool: """ Validate a token. Token must be enabled and if it has an expiration, it must be greater than now. :param user_token: :return: """ if user_token is None: return False if not user_token.enabled: return False if (user_token.expiration_time is not None and user_token.expiration_time < datetime.utcnow()): return False return True def delete(user_token: UserToken) -> bool: """ Delete a user_token. :param user_token: :return: """ existing_user_token = db.session.delete(user_token) if existing_user_token is None: db.session.commit() return True return False def find_by_user_and_token(user: User, token: str) -> Optional[UserToken]: """ Lookup a user_token by user and token string. :param user: :param token: :return: """ return UserToken.query.filter_by(user_id=user.id, token=token).first() def find_by_user(user: User, page: int, per_page: int = 20, max_per_page: int = 100) -> Pagination: """ Find all tokens for a user :param user: The user to find tokens for :param page: The page to request :param per_page: The number to get per page :param max_per_page: :return: """ return UserToken.query.filter_by( user_id=user.id).paginate(page, per_page, True, max_per_page)