An ebook/comic library service and web client
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
3.9 KiB

  1. import logging
  2. import random
  3. import string
  4. from typing import Optional
  5. from os import path
  6. import click
  7. from click import Context
  8. from atheneum import atheneum
  9. from atheneum.model import User
  10. from atheneum.service import user_service
  11. logging.basicConfig()
  12. @click.group()
  13. def main():
  14. pass
  15. @click.group(name='user')
  16. def user_command_group():
  17. pass
  18. @click.command(name='delete')
  19. @click.argument('name')
  20. def delete_user(name: str):
  21. logging.info('Deleting user with name \'%s\'', name)
  22. existing_user = User.query.filter_by(name=name).first()
  23. if existing_user is not None:
  24. successful = user_service.delete(existing_user)
  25. if successful:
  26. logging.warning('Deleted user with name \'%s\'', name)
  27. else:
  28. logging.error('Failed to delete user with name \'%s\'', name)
  29. else:
  30. logging.warning('User with name \'%s\' doesn\'t exist', name)
  31. @click.command('register')
  32. @click.argument('name')
  33. @click.argument('password', required=False)
  34. @click.option('--role',
  35. default=User.ROLE_USER,
  36. envvar='ROLE',
  37. help='Role to assign to the user. default=[USER]')
  38. def register_user(
  39. name: str,
  40. role: str,
  41. password: Optional[str] = None):
  42. logging.info('Registering user with name \'%s\'', name)
  43. existing_user = User.query.filter_by(name=name).first()
  44. if existing_user is None:
  45. user_password = password if password else ''.join(
  46. random.choices(string.ascii_letters + string.digits, k=24))
  47. new_user = user_service.register(name, user_password, role)
  48. logging.warning(
  49. 'Created new user: \'%s\' with password \'%s\' and role %s',
  50. new_user.name,
  51. user_password,
  52. new_user.role)
  53. else:
  54. logging.warning('User \'%s\' already exists. Did you mean to update?',
  55. name)
  56. @click.command(name='register-admin')
  57. @click.pass_context
  58. def register_admin_user(ctx: Context):
  59. admin_users = User.query.filter_by(role=User.ROLE_ADMIN).all()
  60. if len(admin_users) == 0:
  61. name = 'atheneum_administrator'
  62. password = ''.join(
  63. random.choices(string.ascii_letters + string.digits, k=32))
  64. ctx.invoke(
  65. register_user,
  66. name=name,
  67. role=User.ROLE_ADMIN,
  68. password=password)
  69. admin_credential_file = '.admin_credentials'
  70. with open(admin_credential_file, 'w') as f:
  71. f.write('{}:{}'.format(name, password))
  72. logging.info(
  73. 'These credentials can also be retrieved from {}'.format(
  74. path.abspath(admin_credential_file)))
  75. @click.command(name='reset-password')
  76. @click.argument('name')
  77. @click.argument('password', required=False)
  78. def reset_user_password(name: str, password: Optional[str] = None):
  79. logging.info('Resetting user password for \'%s\'', name)
  80. existing_user = User.query.filter_by(name=name).first()
  81. if existing_user is not None:
  82. user_password = password if password else ''.join(
  83. random.choices(string.ascii_letters + string.digits, k=24))
  84. user_service.update_password(existing_user, user_password)
  85. logging.warning(
  86. 'Updated user: \'%s\' with password \'%s\'',
  87. name,
  88. user_password)
  89. else:
  90. logging.warning('User with name \'%s\' doesn\'t exist', name)
  91. @click.command(name='list')
  92. def list_users():
  93. all_users = User.query.all()
  94. [click.echo(user.name) for user in all_users]
  95. main.add_command(user_command_group)
  96. user_command_group.add_command(register_user)
  97. user_command_group.add_command(register_admin_user)
  98. user_command_group.add_command(delete_user)
  99. user_command_group.add_command(reset_user_password)
  100. user_command_group.add_command(list_users)
  101. if __name__ == '__main__':
  102. logging.debug('Managing: %s', atheneum.name)
  103. with atheneum.app_context():
  104. main()