You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
4.8 KiB
155 lines
4.8 KiB
import base64
|
|
import logging
|
|
import random
|
|
import string
|
|
from os import path
|
|
from typing import Optional
|
|
|
|
import click
|
|
from click import Context
|
|
|
|
from server import server
|
|
from server.model import User
|
|
from server.service import user_service
|
|
from server.service.role_service import ROLE_LIST
|
|
|
|
logging.basicConfig()
|
|
|
|
|
|
ENCODING = 'utf-8'
|
|
|
|
|
|
@click.group()
|
|
def main():
|
|
pass
|
|
|
|
|
|
@click.group(name='user')
|
|
def user_command_group():
|
|
pass
|
|
|
|
|
|
@click.group(name='base64')
|
|
def base64_command_group():
|
|
pass
|
|
|
|
|
|
@click.command(name='delete')
|
|
@click.argument('name')
|
|
def delete_user(name: str):
|
|
logging.info('Deleting user with name \'%s\'', name)
|
|
existing_user = User.query.filter_by(name=name).first()
|
|
if existing_user is not None:
|
|
successful = user_service.delete(existing_user)
|
|
if successful:
|
|
logging.warning('Deleted user with name \'%s\'', name)
|
|
else:
|
|
logging.error('Failed to delete user with name \'%s\'', name)
|
|
else:
|
|
logging.warning('User with name \'%s\' doesn\'t exist', name)
|
|
|
|
|
|
@click.command('register')
|
|
@click.argument('name')
|
|
@click.argument('password', required=False)
|
|
@click.option('--role',
|
|
default=User.ROLE_USER,
|
|
envvar='ROLE',
|
|
help='Role to assign to the user. '
|
|
+ 'default=[USER] acceptable values = ['
|
|
+ ','.join(
|
|
sorted(set(map(lambda rt: str(rt.data), ROLE_LIST))))
|
|
+ ']')
|
|
def register_user(
|
|
name: str,
|
|
role: str,
|
|
password: Optional[str] = None):
|
|
logging.info('Registering user with name \'%s\'', name)
|
|
existing_user = User.query.filter_by(name=name).first()
|
|
if existing_user is None:
|
|
user_password = password if password else ''.join(
|
|
random.choices(string.ascii_letters + string.digits, k=24))
|
|
new_user = user_service.register(name, user_password, role, False)
|
|
logging.warning(
|
|
'Created new user: \'%s\' with password \'%s\' and role %s',
|
|
new_user.name,
|
|
user_password,
|
|
new_user.role)
|
|
else:
|
|
logging.warning('User \'%s\' already exists. Did you mean to update?',
|
|
name)
|
|
|
|
|
|
@click.command(name='register-admin')
|
|
@click.pass_context
|
|
def register_admin_user(ctx: Context):
|
|
admin_users = User.query.filter_by(role=User.ROLE_ADMIN).all()
|
|
if len(admin_users) == 0:
|
|
name = 'server_administrator'
|
|
password = ''.join(
|
|
random.choices(string.ascii_letters + string.digits, k=32))
|
|
ctx.invoke(
|
|
register_user,
|
|
name=name,
|
|
role=User.ROLE_ADMIN,
|
|
password=password)
|
|
admin_credential_file = '.admin_credentials'
|
|
with open(admin_credential_file, 'w') as f:
|
|
f.write('{}:{}'.format(name, password))
|
|
logging.info(
|
|
'These credentials can also be retrieved from {}'.format(
|
|
path.abspath(admin_credential_file)))
|
|
|
|
|
|
@click.command(name='reset-password')
|
|
@click.argument('name')
|
|
@click.argument('password', required=False)
|
|
def reset_user_password(name: str, password: Optional[str] = None):
|
|
logging.info('Resetting user password for \'%s\'', name)
|
|
existing_user = User.query.filter_by(name=name).first()
|
|
if existing_user is not None:
|
|
user_password = password if password else ''.join(
|
|
random.choices(string.ascii_letters + string.digits, k=24))
|
|
user_service.update_password(existing_user, user_password)
|
|
logging.warning(
|
|
'Updated user: \'%s\' with password \'%s\'',
|
|
name,
|
|
user_password)
|
|
else:
|
|
logging.warning('User with name \'%s\' doesn\'t exist', name)
|
|
|
|
|
|
@click.command(name='list')
|
|
def list_users():
|
|
all_users = User.query.all()
|
|
[click.echo(user.name) for user in all_users]
|
|
|
|
|
|
@click.command(name='encode')
|
|
@click.argument('text')
|
|
def convert_to_base64(text: str):
|
|
encoded_text = base64.standard_b64encode(text.encode(ENCODING)).decode(ENCODING)
|
|
logging.info('Encoded base64: \'%s\'', encoded_text)
|
|
|
|
|
|
@click.command(name='decode')
|
|
@click.argument('text')
|
|
def convert_from_base64(text: str):
|
|
decoded_text = base64.standard_b64decode(text.encode(ENCODING)).decode(ENCODING)
|
|
logging.info('Decoded base64: \'%s\'', decoded_text)
|
|
|
|
|
|
main.add_command(base64_command_group)
|
|
base64_command_group.add_command(convert_to_base64)
|
|
base64_command_group.add_command(convert_from_base64)
|
|
main.add_command(user_command_group)
|
|
user_command_group.add_command(register_user)
|
|
user_command_group.add_command(register_admin_user)
|
|
user_command_group.add_command(delete_user)
|
|
user_command_group.add_command(reset_user_password)
|
|
user_command_group.add_command(list_users)
|
|
|
|
if __name__ == '__main__':
|
|
logging.debug('Managing: %s', server.name)
|
|
with server.app_context():
|
|
main()
|