import base64 import logging import random import string from os import path from typing import Optional import click from click import Context from corvus import corvus from corvus.model import User from corvus.service import user_service from corvus.service.role_service import ROLE_LIST logging.basicConfig() ENCODING = 'utf-8' @click.group() def main(): pass @click.group(name='user') def user_command_group(): pass @click.group(name='base64') def base64_command_group(): pass @click.command(name='delete') @click.argument('name') def delete_user(name: str): logging.info('Deleting user with name \'%s\'', name) existing_user = User.query.filter_by(name=name).first() if existing_user is not None: successful = user_service.delete(existing_user) if successful: logging.warning('Deleted user with name \'%s\'', name) else: logging.error('Failed to delete user with name \'%s\'', name) else: logging.warning('User with name \'%s\' doesn\'t exist', name) @click.command('register') @click.argument('name') @click.argument('password', required=False) @click.option('--role', default=User.ROLE_USER, envvar='ROLE', help='Role to assign to the user. ' + 'default=[USER] acceptable values = [' + ','.join( sorted(set(map(lambda rt: str(rt.data), ROLE_LIST)))) + ']') def register_user( name: str, role: str, password: Optional[str] = None): logging.info('Registering user with name \'%s\'', name) existing_user = User.query.filter_by(name=name).first() if existing_user is None: user_password = password if password else ''.join( random.choices(string.ascii_letters + string.digits, k=24)) new_user = user_service.register(name, user_password, role, False) logging.warning( 'Created new user: \'%s\' with password \'%s\' and role %s', new_user.name, user_password, new_user.role) else: logging.warning('User \'%s\' already exists. Did you mean to update?', name) @click.command(name='register-admin') @click.pass_context def register_admin_user(ctx: Context): admin_users = User.query.filter_by(role=User.ROLE_ADMIN).all() if len(admin_users) == 0: name = 'corvus_administrator' password = ''.join( random.choices(string.ascii_letters + string.digits, k=32)) ctx.invoke( register_user, name=name, role=User.ROLE_ADMIN, password=password) admin_credential_file = '.admin_credentials' with open(admin_credential_file, 'w') as f: f.write('{}:{}'.format(name, password)) logging.info( 'These credentials can also be retrieved from {}'.format( path.abspath(admin_credential_file))) @click.command(name='reset-password') @click.argument('name') @click.argument('password', required=False) def reset_user_password(name: str, password: Optional[str] = None): logging.info('Resetting user password for \'%s\'', name) existing_user = User.query.filter_by(name=name).first() if existing_user is not None: user_password = password if password else ''.join( random.choices(string.ascii_letters + string.digits, k=24)) user_service.update_password(existing_user, user_password) logging.warning( 'Updated user: \'%s\' with password \'%s\'', name, user_password) else: logging.warning('User with name \'%s\' doesn\'t exist', name) @click.command(name='list') def list_users(): all_users = User.query.all() [click.echo(user.name) for user in all_users] @click.command(name='encode') @click.argument('text') def convert_to_base64(text: str): encoded_text = base64.standard_b64encode(text.encode(ENCODING)).decode(ENCODING) logging.info('Encoded base64: \'%s\'', encoded_text) @click.command(name='decode') @click.argument('text') def convert_from_base64(text: str): decoded_text = base64.standard_b64decode(text.encode(ENCODING)).decode(ENCODING) logging.info('Decoded base64: \'%s\'', decoded_text) main.add_command(base64_command_group) base64_command_group.add_command(convert_to_base64) base64_command_group.add_command(convert_from_base64) main.add_command(user_command_group) user_command_group.add_command(register_user) user_command_group.add_command(register_admin_user) user_command_group.add_command(delete_user) user_command_group.add_command(reset_user_password) user_command_group.add_command(list_users) if __name__ == '__main__': logging.debug('Managing: %s', corvus.name) with corvus.app_context(): main()