An ebook/comic library service and web client
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
4.7 KiB

  1. """Service to handle user operations."""
  2. import logging
  3. import random
  4. import string
  5. from datetime import datetime
  6. from typing import Optional, Dict, Callable, Any
  7. from atheneum import errors
  8. from atheneum.db import db
  9. from atheneum.model import User
  10. from atheneum.service.transformation_service import (
  11. BaseTransformer,
  12. register_transformer
  13. )
  14. from atheneum.utility import authentication_utility
  15. LOGGER = logging.getLogger(__name__)
  16. class UserTransformer(BaseTransformer):
  17. """Serialize User model."""
  18. type = User
  19. def _deserializers(
  20. self) -> Dict[str, Callable[[User, Any], None]]:
  21. """Define the fields and the accompanying deserializer factory."""
  22. return {
  23. 'name': self.deserialize_name,
  24. 'creationTime': self.deserialize_creation_time,
  25. 'lastLoginTime': self.deserialize_last_login_time,
  26. 'version': self.deserialize_version,
  27. 'role': self.deserialize_role,
  28. }
  29. def _serializers(self) -> Dict[str, Callable[[], Any]]:
  30. """Define the fields and the accompanying serializer factory."""
  31. return {
  32. 'name': self.serialize_name,
  33. 'creationTime': self.serialize_creation_time,
  34. 'lastLoginTime': self.serialize_last_login_time,
  35. 'version': self.serialize_version,
  36. 'role': self.serialize_role,
  37. }
  38. def serialize_name(self) -> str:
  39. """User name."""
  40. return self.model.name
  41. @staticmethod
  42. def deserialize_name(model: User, name: str) -> None:
  43. """User name."""
  44. model.name = name
  45. def serialize_creation_time(self) -> datetime:
  46. """User creation time."""
  47. return self.model.creation_time
  48. @staticmethod
  49. def deserialize_creation_time(
  50. model: User, creation_time: datetime) -> None:
  51. """User creation time."""
  52. model.creation_time = creation_time
  53. def serialize_last_login_time(self) -> datetime:
  54. """User last login time."""
  55. return self.model.last_login_time
  56. @staticmethod
  57. def deserialize_last_login_time(
  58. model: User, last_login_time: datetime) -> None:
  59. """User last login time."""
  60. model.last_login_time = last_login_time
  61. def serialize_version(self) -> int:
  62. """User version."""
  63. return self.model.version
  64. @staticmethod
  65. def deserialize_version(model: User, version: int) -> None:
  66. """User version."""
  67. model.version = version
  68. def serialize_role(self) -> str:
  69. """User role."""
  70. return self.model.role
  71. @staticmethod
  72. def deserialize_role(model: User, role: str) -> None:
  73. """User role."""
  74. model.role = role
  75. register_transformer(User.__name__, UserTransformer)
  76. def find_by_name(name: str) -> Optional[User]:
  77. """
  78. Find a user by name.
  79. :param name:
  80. :return:
  81. """
  82. return User.query.filter_by(name=name).first()
  83. def register(name: str, password: Optional[str], role: Optional[str]) -> User:
  84. """
  85. Register a new user.
  86. :param name: Desired user name. Must be unique and not already registered
  87. :param password: Password to be hashed and stored for the user
  88. :param role: Role to assign the user [ROLE_USER, ROLE_ADMIN]
  89. :return:
  90. """
  91. password = password if password is not None else ''.join(
  92. random.choices(string.ascii_letters + string.digits, k=32))
  93. role = role if role is not None else User.ROLE_USER
  94. if find_by_name(name=name) is not None:
  95. raise errors.ValidationError('User name is already taken.')
  96. pw_hash, pw_revision = authentication_utility.get_password_hash(password)
  97. new_user = User(
  98. name=name,
  99. role=role,
  100. password_hash=pw_hash,
  101. password_revision=pw_revision,
  102. creation_time=datetime.now(),
  103. version=0)
  104. db.session.add(new_user)
  105. db.session.commit()
  106. LOGGER.info('Registered new user: %s with role: %s', name, role)
  107. return new_user
  108. def delete(user: User) -> bool:
  109. """
  110. Delete a user.
  111. :param user:
  112. :return:
  113. """
  114. existing_user = db.session.delete(user)
  115. if existing_user is None:
  116. db.session.commit()
  117. return True
  118. return False
  119. def update_last_login_time(user: User) -> None:
  120. """
  121. Bump the last login time for the user.
  122. :param user:
  123. :return:
  124. """
  125. if user is not None:
  126. user.last_login_time = datetime.now()
  127. db.session.commit()
  128. def update_password(user: User, password: str) -> None:
  129. """
  130. Change the user password.
  131. :param user:
  132. :param password:
  133. :return:
  134. """
  135. pw_hash, pw_revision = authentication_utility.get_password_hash(
  136. password)
  137. user.password_hash = pw_hash
  138. user.password_revision = pw_revision
  139. db.session.commit()