Browse Source
Major quality of life changes
Major quality of life changes
* Refactored code and modules for consistency/readability * Updated primary migration script * Impletmented rudimentary User and UserToken services * Started work on adding testing * Broke API into blueprints, starting with authentication * Added a simple cli manage.py utility * Added a Dockerfile to create a runnable Atheneum instancemerge-requests/1/head
Drew Short
7 years ago
23 changed files with 598 additions and 215 deletions
-
4.dockerignore
-
5.gitignore
-
23Dockerfile
-
3server/Pipfile
-
84server/Pipfile.lock
-
61server/atheneum/__init__.py
-
71server/atheneum/api.py
-
1server/atheneum/api/__init__.py
-
45server/atheneum/api/authentication_api.py
-
25server/atheneum/api/decorators.py
-
6server/atheneum/api/model.py
-
73server/atheneum/authentication.py
-
2server/atheneum/default_settings.py
-
2server/atheneum/model/__init__.py
-
15server/atheneum/model/user.py
-
42server/atheneum/model/user_model.py
-
0server/atheneum/service/__init__.py
-
74server/atheneum/service/authentication_service.py
-
49server/atheneum/service/user_service.py
-
10server/entrypoint.sh
-
124server/manage.py
-
38server/migrations/versions/7160f2b96a1c_.py
-
56server/migrations/versions/96442b147e22_.py
@ -0,0 +1,4 @@ |
|||
server/instance/ |
|||
server/setup.py |
|||
server/test/ |
|||
.admin_credentials |
@ -0,0 +1,5 @@ |
|||
instance/ |
|||
.idea |
|||
.admin_credentials |
|||
*__pycache__/ |
|||
.pytest_cache/ |
@ -0,0 +1,23 @@ |
|||
FROM python:3.6-slim-stretch |
|||
MAINTAINER Drew Short <warrick@sothr.com> |
|||
|
|||
ENV ATHENEUM_APP_DIRECTORY /opt/atheneum |
|||
ENV ATHENEUM_CONFIG_DIRECTORY /srv/atheneum/config |
|||
ENV ATHENEUM_DATA_DIRECTORY /srv/atheneum/data |
|||
|
|||
RUN mkdir -p ${ATHENEUM_APP_DIRECTORY} \ |
|||
&& mkdir -p ${ATHENEUM_CONFIG_DIRECTORY} \ |
|||
&& mkdir -p ${ATHENEUM_DATA_DIRECTORY} \ |
|||
&& pip install pipenv gunicorn |
|||
|
|||
VOLUME ${ATHENEUM_CONFIG_DIRECTORY} |
|||
VOLUME ${ATHENEUM_DATA_DIRECTORY} |
|||
|
|||
COPY ./server/ ${ATHENEUM_APP_DIRECTORY}/ |
|||
|
|||
RUN cd ${ATHENEUM_APP_DIRECTORY} \ |
|||
&& pipenv install --system --deploy --ignore-pipfile |
|||
|
|||
WORKDIR ${ATHENEUM_APP_DIRECTORY} |
|||
|
|||
CMD ./entrypoint.sh |
@ -1,43 +1,82 @@ |
|||
import os |
|||
from logging.config import dictConfig |
|||
|
|||
from flask import Flask |
|||
from flask_migrate import Migrate |
|||
from flask_migrate import Migrate, upgrade |
|||
from flask_sqlalchemy import SQLAlchemy |
|||
|
|||
from atheneum import utility |
|||
|
|||
db: SQLAlchemy = SQLAlchemy() |
|||
|
|||
from atheneum.api import api_blueprint |
|||
dictConfig({ |
|||
'version': 1, |
|||
'formatters': {'default': { |
|||
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', |
|||
}}, |
|||
'handlers': {'wsgi': { |
|||
'class': 'logging.StreamHandler', |
|||
'stream': 'ext://flask.logging.wsgi_errors_stream', |
|||
'formatter': 'default' |
|||
}}, |
|||
'root': { |
|||
'level': 'INFO', |
|||
'handlers': ['wsgi'] |
|||
} |
|||
}) |
|||
|
|||
|
|||
def create_app(test_config=None): |
|||
app = Flask(__name__, instance_relative_config=True) |
|||
app.logger.debug('Creating Atheneum Server') |
|||
|
|||
data_directory = os.getenv('ATHENEUM_DATA_DIRECTORY', '/tmp') |
|||
app.logger.debug('Atheneum Data Directory: %s', data_directory) |
|||
|
|||
app.config.from_mapping( |
|||
SECRET_KEY='dev', |
|||
SQLALCHEMY_DATABASE_URI='sqlite:////tmp/atheneum-test.db' |
|||
# SQLALCHEMY_DATABASE_URI=os.path.join(app.instance_path, 'atheneum.db') |
|||
SQLALCHEMY_DATABASE_URI='sqlite:///{}/atheneum.db' |
|||
.format(data_directory) |
|||
) |
|||
|
|||
if test_config is None: |
|||
app.logger.debug('Loading configurations') |
|||
app.config.from_object('atheneum.default_settings') |
|||
app.config.from_pyfile('config.py', silent=True) |
|||
if os.getenv('ATHENEUM_SERVER_SETTINGS', None): |
|||
app.config.from_envvar('ATHENEUM_SERVER_SETTINGS') |
|||
if os.getenv('ATHENEUM_SETTINGS', None): |
|||
app.config.from_envvar('ATHENEUM_SETTINGS') |
|||
else: |
|||
app.config.from_pyfile(test_config) |
|||
app.logger.debug('Loading test configuration') |
|||
app.config.from_object(test_config) |
|||
|
|||
try: |
|||
os.makedirs(app.instance_path) |
|||
except OSError: |
|||
pass |
|||
|
|||
app.register_blueprint(api_blueprint) |
|||
app.json_encoder = utility.CustomJSONEncoder |
|||
|
|||
app.logger.debug('Initializing Application') |
|||
db.init_app(app) |
|||
migrate = Migrate(app, db) |
|||
app.logger.debug('Registering Database Models') |
|||
Migrate(app, db) |
|||
|
|||
return app |
|||
|
|||
|
|||
def register_blueprints(app): |
|||
from atheneum.api import auth_blueprint |
|||
app.register_blueprint(auth_blueprint) |
|||
|
|||
|
|||
app = create_app() |
|||
register_blueprints(app) |
|||
|
|||
|
|||
def init_db(): |
|||
"""Clear existing data and create new tables.""" |
|||
upgrade('migrations') |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
app = create_app() |
|||
app.run() |
|||
app.run() |
@ -1,71 +0,0 @@ |
|||
from functools import wraps |
|||
from typing import Any, Callable, NamedTuple |
|||
|
|||
from flask import Blueprint, jsonify, Response, session |
|||
|
|||
from atheneum.authentication import ( |
|||
generate_token, |
|||
require_basic_auth, |
|||
require_token_auth |
|||
) |
|||
|
|||
api_blueprint = Blueprint(name='api', import_name=__name__, url_prefix='/api') |
|||
|
|||
|
|||
class APIResponse(NamedTuple): |
|||
payload: Any |
|||
status: int |
|||
|
|||
|
|||
def return_json(func: Callable) -> Callable: |
|||
""" |
|||
If an Response object is not returned, jsonify the result and return it |
|||
:param func: |
|||
:return: |
|||
""" |
|||
|
|||
@wraps(func) |
|||
def decorate(*args, **kwargs): |
|||
result = func(*args, **kwargs) |
|||
if isinstance(result, Response): |
|||
return result |
|||
if isinstance(result, APIResponse): |
|||
return jsonify(result.payload), result.status |
|||
return jsonify(result) |
|||
|
|||
return decorate |
|||
|
|||
|
|||
@api_blueprint.route('/login', methods=['POST']) |
|||
@return_json |
|||
@require_basic_auth |
|||
def login() -> APIResponse: |
|||
""" |
|||
Get a token for continued authentication |
|||
:return: A login token for continued authentication |
|||
""" |
|||
token = generate_token() |
|||
return APIResponse({'token': token}, 200) |
|||
|
|||
|
|||
@api_blueprint.route('/login/bump', methods=['POST']) |
|||
@return_json |
|||
@require_token_auth |
|||
def login_bump() -> APIResponse: |
|||
""" |
|||
Update the user last seen timestamp |
|||
:return: A time stamp for the bumped login |
|||
""" |
|||
return APIResponse(None, 200) |
|||
|
|||
|
|||
@api_blueprint.route('/logout', methods=['POST']) |
|||
@return_json |
|||
@require_token_auth |
|||
def logout() -> APIResponse: |
|||
""" |
|||
logout and delete a token |
|||
:return: |
|||
""" |
|||
session.pop('user') |
|||
return APIResponse(None, 200) |
@ -0,0 +1 @@ |
|||
from atheneum.api.authentication_api import auth_blueprint |
@ -0,0 +1,45 @@ |
|||
from flask import Blueprint, g |
|||
|
|||
from atheneum.api.decorators import return_json |
|||
from atheneum.api.model import APIResponse |
|||
from atheneum.middleware import authentication_middleware |
|||
from atheneum.service import user_token_service, authentication_service |
|||
|
|||
auth_blueprint = Blueprint( |
|||
name='auth', import_name=__name__, url_prefix='/auth') |
|||
|
|||
|
|||
@auth_blueprint.route('/login', methods=['POST']) |
|||
@return_json |
|||
@authentication_middleware.require_basic_auth |
|||
def login() -> APIResponse: |
|||
""" |
|||
Get a token for continued authentication |
|||
:return: A login token for continued authentication |
|||
""" |
|||
user_token = user_token_service.create(g.user) |
|||
return APIResponse({'token': user_token.token}, 200) |
|||
|
|||
|
|||
@auth_blueprint.route('/bump', methods=['POST']) |
|||
@return_json |
|||
@authentication_middleware.require_token_auth |
|||
def login_bump() -> APIResponse: |
|||
""" |
|||
Update the user last seen timestamp |
|||
:return: A time stamp for the bumped login |
|||
""" |
|||
authentication_service.bump_login(g.user) |
|||
return APIResponse(g.user.last_login_time, 200) |
|||
|
|||
|
|||
@auth_blueprint.route('/logout', methods=['POST']) |
|||
@return_json |
|||
@authentication_middleware.require_token_auth |
|||
def logout() -> APIResponse: |
|||
""" |
|||
logout and delete a token |
|||
:return: |
|||
""" |
|||
authentication_service.logout(g.user_token) |
|||
return APIResponse(None, 200) |
@ -0,0 +1,25 @@ |
|||
from functools import wraps |
|||
from typing import Callable |
|||
|
|||
from flask import jsonify, Response |
|||
|
|||
from atheneum.api.model import APIResponse |
|||
|
|||
|
|||
def return_json(func: Callable) -> Callable: |
|||
""" |
|||
If an Response object is not returned, jsonify the result and return it |
|||
:param func: |
|||
:return: |
|||
""" |
|||
|
|||
@wraps(func) |
|||
def decorate(*args, **kwargs): |
|||
result = func(*args, **kwargs) |
|||
if isinstance(result, Response): |
|||
return result |
|||
if isinstance(result, APIResponse): |
|||
return jsonify(result.payload), result.status |
|||
return jsonify(result) |
|||
|
|||
return decorate |
@ -0,0 +1,6 @@ |
|||
from typing import Any, NamedTuple |
|||
|
|||
|
|||
class APIResponse(NamedTuple): |
|||
payload: Any |
|||
status: int |
@ -1,73 +0,0 @@ |
|||
import base64 |
|||
import uuid |
|||
from functools import wraps |
|||
from typing import Optional, Callable |
|||
|
|||
from flask import request, Response, session |
|||
from werkzeug.datastructures import Authorization |
|||
from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes |
|||
|
|||
|
|||
def authenticate_with_password(username: str, password: str) -> bool: |
|||
session['user'] = None |
|||
return True |
|||
|
|||
|
|||
def authenticate_with_token(username: str, token: str) -> bool: |
|||
session['user'] = None |
|||
return True |
|||
|
|||
|
|||
def authentication_failed(auth_type: str) -> Response: |
|||
return Response( |
|||
status=401, |
|||
headers={ |
|||
'WWW-Authenticate': '%s realm="Login Required"' % auth_type |
|||
}) |
|||
|
|||
|
|||
def parse_token_authorization_header(header_value) -> Optional[Authorization]: |
|||
if not header_value: |
|||
return |
|||
value = wsgi_to_bytes(header_value) |
|||
try: |
|||
auth_type, auth_info = value.split(None, 1) |
|||
auth_type = auth_type.lower() |
|||
except ValueError: |
|||
return |
|||
if auth_type == b'token': |
|||
try: |
|||
username, token = base64.b64decode(auth_info).split(b':', 1) |
|||
except Exception: |
|||
return |
|||
return Authorization('token', {'username': bytes_to_wsgi(username), |
|||
'password': bytes_to_wsgi(token)}) |
|||
|
|||
|
|||
def require_basic_auth(func: Callable) -> Callable: |
|||
@wraps(func) |
|||
def decorate(*args, **kwargs): |
|||
auth = request.authorization |
|||
if auth and authenticate_with_password(auth.username, auth.password): |
|||
return func(*args, **kwargs) |
|||
else: |
|||
return authentication_failed('Basic') |
|||
|
|||
return decorate |
|||
|
|||
|
|||
def require_token_auth(func: Callable) -> Callable: |
|||
@wraps(func) |
|||
def decorate(*args, **kwargs): |
|||
token = parse_token_authorization_header( |
|||
request.headers.get('WWW-Authenticate', None)) |
|||
if token and authenticate_with_token(token.username, token.password): |
|||
return func(*args, **kwargs) |
|||
else: |
|||
return authentication_failed('Token') |
|||
|
|||
return decorate |
|||
|
|||
|
|||
def generate_token() -> uuid.UUID: |
|||
return uuid.uuid4() |
@ -1,4 +1,4 @@ |
|||
DEBUG = True |
|||
DEBUG = False |
|||
SECRET_KEY = b'\xb4\x89\x0f\x0f\xe5\x88\x97\xfe\x8d<\x0b@d\xe9\xa5\x87%' \ |
|||
b'\xc6\xf0@l1\xe3\x90g\xfaA.?u=s' # CHANGE ME IN REAL CONFIG |
|||
SQLALCHEMY_TRACK_MODIFICATIONS=False |
@ -1 +1 @@ |
|||
from atheneum.model.user import User |
|||
from atheneum.model.user_model import User, UserToken |
@ -1,15 +0,0 @@ |
|||
from atheneum import db |
|||
|
|||
|
|||
class User(db.Model): |
|||
__tablename__ = 'user' |
|||
|
|||
id = db.Column(db.Integer, primary_key=True) |
|||
name = db.Column(db.Unicode(60), unique=True, nullable=False) |
|||
password_hash = db.Column('password_hash', db.Unicode(128), nullable=False) |
|||
password_dblt = db.Column('password_dblt', db.Unicode(32)) |
|||
password_revision = db.Column( |
|||
'password_revision', db.SmallInteger, default=0, nullable=False) |
|||
creation_time = db.Column('creation_time', db.DateTime, nullable=False) |
|||
last_login_time = db.Column('last_login_time', db.DateTime) |
|||
version = db.Column('version', db.Integer, default=1, nullable=False) |
@ -0,0 +1,42 @@ |
|||
from atheneum import db |
|||
|
|||
|
|||
class User(db.Model): |
|||
__tablename__ = 'user' |
|||
|
|||
ROLE_USER = 'USER' |
|||
ROLE_ADMIN = 'ADMIN' |
|||
|
|||
id = db.Column(db.Integer, primary_key=True) |
|||
name = db.Column(db.Unicode(60), unique=True, nullable=False) |
|||
role = db.Column( |
|||
'role', |
|||
db.Unicode(32), |
|||
nullable=False, |
|||
default=ROLE_USER, ) |
|||
password_hash = db.Column('password_hash', db.Unicode(128), nullable=False) |
|||
password_revision = db.Column( |
|||
'password_revision', db.SmallInteger, default=0, nullable=False) |
|||
creation_time = db.Column('creation_time', db.DateTime, nullable=False) |
|||
last_login_time = db.Column('last_login_time', db.DateTime) |
|||
version = db.Column('version', db.Integer, default=1, nullable=False) |
|||
|
|||
|
|||
class UserToken(db.Model): |
|||
__tablename__ = 'user_token' |
|||
|
|||
user_token_id = db.Column('id', db.Integer, primary_key=True) |
|||
user_id = db.Column( |
|||
'user_id', |
|||
db.Integer, |
|||
db.ForeignKey('user.id', ondelete='CASCADE'), |
|||
nullable=False, |
|||
index=True) |
|||
token = db.Column('token', db.Unicode(36), nullable=False) |
|||
note = db.Column('note', db.Unicode(128), nullable=True) |
|||
enabled = db.Column('enabled', db.Boolean, nullable=False, default=True) |
|||
expiration_time = db.Column('expiration_time', db.DateTime, nullable=True) |
|||
creation_time = db.Column('creation_time', db.DateTime, nullable=False) |
|||
last_edit_time = db.Column('last_edit_time', db.DateTime) |
|||
last_usage_time = db.Column('last_usage_time', db.DateTime) |
|||
version = db.Column('version', db.Integer, default=1, nullable=False) |
@ -0,0 +1,74 @@ |
|||
import uuid |
|||
from datetime import datetime |
|||
from typing import Optional, Tuple |
|||
|
|||
from nacl import pwhash |
|||
from nacl.exceptions import InvalidkeyError |
|||
|
|||
from atheneum.model import User, UserToken |
|||
from atheneum.service import user_service, user_token_service |
|||
|
|||
|
|||
def generate_token() -> uuid.UUID: |
|||
return uuid.uuid4() |
|||
|
|||
|
|||
def get_password_hash(password: str) -> Tuple[str, int]: |
|||
""" |
|||
Retrieve argon2id password hash. |
|||
|
|||
:param password: plaintext password to convert |
|||
:return: Tuple[password_hash, password_revision] |
|||
""" |
|||
return pwhash.argon2id.str(password.encode('utf8')).decode('utf8'), 1 |
|||
|
|||
|
|||
def is_valid_password(user: User, password: str) -> bool: |
|||
assert user |
|||
|
|||
try: |
|||
return pwhash.verify( |
|||
user.password_hash.encode('utf8'), password.encode('utf8')) |
|||
except InvalidkeyError: |
|||
pass |
|||
return False |
|||
|
|||
|
|||
def is_valid_token(user_token: Optional[UserToken]) -> bool: |
|||
""" |
|||
Token must be enabled and if it has an expiration, it must be |
|||
greater than now. |
|||
|
|||
:param user_token: |
|||
:return: |
|||
""" |
|||
if user_token is None: |
|||
return False |
|||
if not user_token.enabled: |
|||
return False |
|||
if (user_token.expiration_time is not None |
|||
and user_token.expiration_time < datetime.utcnow()): |
|||
return False |
|||
return True |
|||
|
|||
|
|||
def bump_login(user: Optional[User]): |
|||
""" |
|||
Update the last login time for the user |
|||
|
|||
:param user: |
|||
:return: |
|||
""" |
|||
if user is not None: |
|||
user_service.update_last_login_time(user) |
|||
|
|||
|
|||
def logout(user_token: Optional[UserToken] = None): |
|||
""" |
|||
Remove a user_token associated with a client session |
|||
|
|||
:param user_token: |
|||
:return: |
|||
""" |
|||
if user_token is not None: |
|||
user_token_service.delete(user_token) |
@ -0,0 +1,49 @@ |
|||
from datetime import datetime |
|||
from typing import Optional |
|||
|
|||
from atheneum import app, db |
|||
from atheneum.model import User |
|||
from atheneum.service import authentication_service |
|||
|
|||
|
|||
def register(name: str, password: str, role: str) -> User: |
|||
password_hash, password_revision = authentication_service.get_password_hash( |
|||
password) |
|||
|
|||
new_user = User( |
|||
name=name, |
|||
role=role, |
|||
password_hash=password_hash, |
|||
password_revision=password_revision, |
|||
creation_time=datetime.now(), |
|||
version=0) |
|||
db.session.add(new_user) |
|||
db.session.commit() |
|||
|
|||
app.logger.info('Registered new user: %s with role: %s', name, role) |
|||
return new_user |
|||
|
|||
|
|||
def delete(user: User) -> bool: |
|||
existing_user = db.session.delete(user) |
|||
if existing_user is None: |
|||
db.session.commit() |
|||
return True |
|||
return False |
|||
|
|||
|
|||
def update_last_login_time(user: User): |
|||
user.last_login_time = datetime.now() |
|||
db.session.commit() |
|||
|
|||
|
|||
def update_password(user: User, password: str): |
|||
password_hash, password_revision = authentication_service.get_password_hash( |
|||
password) |
|||
user.password_hash = password_hash |
|||
user.password_revision = password_revision |
|||
db.session.commit() |
|||
|
|||
|
|||
def find_by_name(name: str) -> Optional[User]: |
|||
return User.query.filter_by(name=name).first() |
@ -0,0 +1,10 @@ |
|||
#!/usr/bin/env bash |
|||
|
|||
# Migrate the Database |
|||
FLASK_APP=atheneum:app flask db upgrade |
|||
|
|||
# Make sure an administrator is registered |
|||
python manage.py user register-admin |
|||
|
|||
# Start the application |
|||
gunicorn -b 0.0.0.0:8080 atheneum:app |
@ -0,0 +1,124 @@ |
|||
import logging |
|||
import random |
|||
import string |
|||
from typing import Optional |
|||
from os import path |
|||
|
|||
import click |
|||
from click import Context |
|||
|
|||
from atheneum import app |
|||
from atheneum.model import User |
|||
from atheneum.service import user_service |
|||
|
|||
logging.basicConfig() |
|||
|
|||
|
|||
@click.group() |
|||
def main(): |
|||
pass |
|||
|
|||
|
|||
@click.group(name='user') |
|||
def user_command_group(): |
|||
pass |
|||
|
|||
|
|||
@click.command(name='delete') |
|||
@click.argument('name') |
|||
def delete_user(name: str): |
|||
logging.info('Deleting user with name \'%s\'', name) |
|||
existing_user = User.query.filter_by(name=name).first() |
|||
if existing_user is not None: |
|||
successful = user_service.delete(existing_user) |
|||
if successful: |
|||
logging.warning('Deleted user with name \'%s\'', name) |
|||
else: |
|||
logging.error('Failed to delete user with name \'%s\'', name) |
|||
else: |
|||
logging.warning('User with name \'%s\' doesn\'t exist', name) |
|||
|
|||
|
|||
@click.command('register') |
|||
@click.argument('name') |
|||
@click.argument('password', required=False) |
|||
@click.option('--role', |
|||
default=User.ROLE_USER, |
|||
envvar='ROLE', |
|||
help='Role to assign to the user. default=[USER]') |
|||
def register_user( |
|||
name: str, |
|||
role: str, |
|||
password: Optional[str] = None): |
|||
logging.info('Registering user with name \'%s\'', name) |
|||
existing_user = User.query.filter_by(name=name).first() |
|||
if existing_user is None: |
|||
user_password = password if password else ''.join( |
|||
random.choices(string.ascii_letters + string.digits, k=24)) |
|||
new_user = user_service.register(name, user_password, role) |
|||
logging.warning( |
|||
'Created new user: \'%s\' with password \'%s\' and role %s', |
|||
new_user.name, |
|||
user_password, |
|||
new_user.role) |
|||
else: |
|||
logging.warning('User \'%s\' already exists. Did you mean to update?', |
|||
name) |
|||
|
|||
|
|||
@click.command(name='register-admin') |
|||
@click.pass_context |
|||
def register_admin_user(ctx: Context): |
|||
admin_users = User.query.filter_by(role=User.ROLE_ADMIN).all() |
|||
if len(admin_users) == 0: |
|||
name = 'atheneum_administrator' |
|||
password = ''.join( |
|||
random.choices(string.ascii_letters + string.digits, k=32)) |
|||
ctx.invoke( |
|||
register_user, |
|||
name=name, |
|||
role=User.ROLE_ADMIN, |
|||
password=password) |
|||
admin_credential_file = '.admin_credentials' |
|||
with open(admin_credential_file, 'w') as f: |
|||
f.write('{}:{}'.format(name, password)) |
|||
logging.info( |
|||
'These credentials can also be retrieved from {}'.format( |
|||
path.abspath(admin_credential_file))) |
|||
|
|||
|
|||
@click.command(name='reset-password') |
|||
@click.argument('name') |
|||
@click.argument('password', required=False) |
|||
def reset_user_password(name: str, password: Optional[str] = None): |
|||
logging.info('Resetting user password for \'%s\'', name) |
|||
existing_user = User.query.filter_by(name=name).first() |
|||
if existing_user is not None: |
|||
user_password = password if password else ''.join( |
|||
random.choices(string.ascii_letters + string.digits, k=24)) |
|||
user_service.update_password(existing_user, user_password) |
|||
logging.warning( |
|||
'Updated user: \'%s\' with password \'%s\'', |
|||
name, |
|||
user_password) |
|||
else: |
|||
logging.warning('User with name \'%s\' doesn\'t exist', name) |
|||
|
|||
|
|||
@click.command(name='list') |
|||
def list_users(): |
|||
all_users = User.query.all() |
|||
[click.echo(user.name) for user in all_users] |
|||
|
|||
|
|||
main.add_command(user_command_group) |
|||
user_command_group.add_command(register_user) |
|||
user_command_group.add_command(register_admin_user) |
|||
user_command_group.add_command(delete_user) |
|||
user_command_group.add_command(reset_user_password) |
|||
user_command_group.add_command(list_users) |
|||
|
|||
if __name__ == '__main__': |
|||
logging.debug('Managing: %s', app.name) |
|||
with app.app_context(): |
|||
main() |
@ -1,38 +0,0 @@ |
|||
"""empty message |
|||
|
|||
Revision ID: 7160f2b96a1c |
|||
Revises: |
|||
Create Date: 2018-05-15 23:39:48.110843 |
|||
|
|||
""" |
|||
import sqlalchemy as sa |
|||
from alembic import op |
|||
|
|||
# revision identifiers, used by Alembic. |
|||
revision = '7160f2b96a1c' |
|||
down_revision = None |
|||
branch_labels = None |
|||
depends_on = None |
|||
|
|||
|
|||
def upgrade(): |
|||
# ### commands auto generated by Alembic - please adjust! ### |
|||
op.create_table('user', |
|||
sa.Column('id', sa.Integer(), nullable=False), |
|||
sa.Column('name', sa.Unicode(length=60), nullable=False), |
|||
sa.Column('password_hash', sa.Unicode(length=128), nullable=False), |
|||
sa.Column('password_dblt', sa.Unicode(length=32), nullable=True), |
|||
sa.Column('password_revision', sa.SmallInteger(), nullable=False), |
|||
sa.Column('creation_time', sa.DateTime(), nullable=False), |
|||
sa.Column('last_login_time', sa.DateTime(), nullable=True), |
|||
sa.Column('version', sa.Integer(), nullable=False), |
|||
sa.PrimaryKeyConstraint('id'), |
|||
sa.UniqueConstraint('name') |
|||
) |
|||
# ### end Alembic commands ### |
|||
|
|||
|
|||
def downgrade(): |
|||
# ### commands auto generated by Alembic - please adjust! ### |
|||
op.drop_table('user') |
|||
# ### end Alembic commands ### |
@ -0,0 +1,56 @@ |
|||
"""Initial User DB Migration |
|||
Revision ID: 96442b147e22 |
|||
Revises: |
|||
Create Date: 2018-07-03 14:22:42.833390 |
|||
|
|||
""" |
|||
import sqlalchemy as sa |
|||
from alembic import op |
|||
|
|||
# revision identifiers, used by Alembic. |
|||
revision = '96442b147e22' |
|||
down_revision = None |
|||
branch_labels = None |
|||
depends_on = None |
|||
|
|||
|
|||
def upgrade(): |
|||
op.create_table('user', |
|||
sa.Column('id', sa.Integer(), nullable=False), |
|||
sa.Column('name', sa.Unicode(length=60), nullable=False), |
|||
sa.Column('role', sa.Unicode(length=32), nullable=False), |
|||
sa.Column( |
|||
'password_hash', |
|||
sa.Unicode(length=128), |
|||
nullable=False), |
|||
sa.Column( |
|||
'password_revision', sa.SmallInteger(), nullable=False), |
|||
sa.Column('creation_time', sa.DateTime(), nullable=False), |
|||
sa.Column('last_login_time', sa.DateTime(), nullable=True), |
|||
sa.Column('version', sa.Integer(), nullable=False), |
|||
sa.PrimaryKeyConstraint('id'), |
|||
sa.UniqueConstraint('name')) |
|||
|
|||
op.create_table('user_token', |
|||
sa.Column('id', sa.Integer(), nullable=False), |
|||
sa.Column('user_id', sa.Integer(), nullable=False), |
|||
sa.Column('token', sa.Unicode(length=36), nullable=False), |
|||
sa.Column('note', sa.Unicode(length=128), nullable=True), |
|||
sa.Column('enabled', sa.Boolean(), nullable=False), |
|||
sa.Column('expiration_time', sa.DateTime(), nullable=True), |
|||
sa.Column('creation_time', sa.DateTime(), nullable=False), |
|||
sa.Column('last_edit_time', sa.DateTime(), nullable=True), |
|||
sa.Column('last_usage_time', sa.DateTime(), nullable=True), |
|||
sa.Column('version', sa.Integer(), nullable=False), |
|||
sa.ForeignKeyConstraint( |
|||
['user_id'], ['user.id'], ondelete='CASCADE'), |
|||
sa.PrimaryKeyConstraint('id')) |
|||
|
|||
op.create_index(op.f('ix_user_token_user_id'), 'user_token', ['user_id'], |
|||
unique=False) |
|||
|
|||
|
|||
def downgrade(): |
|||
op.drop_index(op.f('ix_user_token_user_id'), table_name='user_token') |
|||
op.drop_table('user_token') |
|||
op.drop_table('user') |
Write
Preview
Loading…
Cancel
Save
Reference in new issue