A multipurpose python flask API server and administration SPA
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

196 lines
5.6 KiB

"""Service to handle user_token operations."""
import uuid
from datetime import datetime
from typing import Optional, Dict, Callable, Any, List
from iso8601 import iso8601, ParseError
from corvus.db import db
from corvus.model import User, UserToken
from corvus.service.transformation_service import (
BaseTransformer,
register_transformer
)
from corvus import errors
@register_transformer
class UserTokenTransformer(BaseTransformer):
"""Serialize User model."""
type = UserToken
def _deserializers(
self) -> Dict[str, Callable[[UserToken, Any], None]]:
"""Define the fields and the accompanying serializer factory."""
return {
'token': self.deserialize_token,
'note': self.deserialize_note,
'enabled': self.deserialize_enabled,
'expirationTime': self.deserialize_expiration_time,
'creationTime': self.deserialize_creation_time,
'lastUsageTime': self.deserialize_last_usage_time,
'version': self.deserialize_version
}
def _serializers(self) -> Dict[str, Callable[[], Any]]:
"""Define the fields and the accompanying serializer factory."""
return {
'token': self.serialize_token,
'note': self.serialize_note,
'enabled': self.serialize_enabled,
'expirationTime': self.serialize_expiration_time,
'creationTime': self.serialize_creation_time,
'lastUsageTime': self.serialize_last_usage_time,
'version': self.serialize_version
}
def serialize_token(self) -> str:
"""User token."""
return self.model.token
@staticmethod
def deserialize_token(model: UserToken, token: str) -> None:
"""User token."""
model.token = token
def serialize_note(self) -> str:
"""User token note."""
return self.model.note
@staticmethod
def deserialize_note(model: UserToken, note: str) -> None:
"""User token note."""
model.note = note
def serialize_enabled(self) -> bool:
"""User token enabled."""
return self.model.enabled
@staticmethod
def deserialize_enabled(model: UserToken, enabled: bool) -> None:
"""User token enabled."""
model.enabled = enabled
def serialize_expiration_time(self) -> datetime:
"""User token expiration time."""
return self.model.expiration_time
@staticmethod
def deserialize_expiration_time(
model: UserToken, expiration_time: datetime) -> None:
"""User token expiration time."""
try:
model.expiration_time = iso8601.parse_date(expiration_time)
except ParseError:
raise errors.ValidationError('Cannot parse datetime from %s' % expiration_time)
def serialize_creation_time(self) -> datetime:
"""User token creation time."""
return self.model.creation_time
@staticmethod
def deserialize_creation_time(
model: UserToken, creation_time: datetime) -> None:
"""User token creation time."""
try:
model.creation_time = iso8601.parse_date(creation_time)
except ParseError:
raise errors.ValidationError('Cannot parse datetime from %s' % creation_time)
def serialize_last_usage_time(self) -> datetime:
"""User token last usage time."""
return self.model.last_usage_time
@staticmethod
def deserialize_last_usage_time(
model: UserToken, last_usage_time: datetime) -> None:
"""User token last usage time."""
model.last_usage_time = iso8601.parse_date(last_usage_time)
def serialize_version(self) -> int:
"""User token version."""
return self.model.version
@staticmethod
def deserialize_version(model: UserToken, version: int) -> None:
"""User token version."""
model.version = version
def generate_token() -> uuid.UUID:
"""
Generate a unique token.
:return:
"""
return uuid.uuid4()
def create(
user: User,
note: Optional[str] = None,
enabled: bool = True,
expiration_time: Optional[datetime] = None) -> UserToken:
"""
Create and save a UserToken.
:param user: The User object to bind the token to
:param note: An optional field to store additional information about a
token
:param enabled: A boolean to indicate whether a token can be considered
eligible for authentication
:param expiration_time: An optional argument to determine when the token
becomes invalid as a means of authentication. Defaults to None, which means
no expiration
:return:
"""
token = generate_token()
user_token = UserToken(
user_id=user.id,
token=token.__str__(),
note=note,
enabled=enabled,
creation_time=datetime.now(),
expiration_time=expiration_time,
version=0)
db.session.add(user_token)
db.session.commit()
return user_token
def delete(user_token: UserToken) -> bool:
"""
Delete a user_token.
:param user_token:
:return:
"""
existing_user_token = db.session.delete(user_token)
if existing_user_token is None:
db.session.commit()
return True
return False
def find_by_user_and_token(user: User, token: str) -> Optional[UserToken]:
"""
Lookup a user_token by user and token string.
:param user:
:param token:
:return:
"""
return UserToken.query.filter_by(user_id=user.id, token=token).first()
def find_by_user(user: User) -> List[UserToken]:
"""
Find all tokens for a user
:param user:
:return:
"""
return UserToken.query.filter_by(user_id=user.id)