Browse Source

Add a database schema, admin page, db commands, code creation form

* Added a simple database schema that is created at start
* Added database methods to CRUD registration codes and read registered users
* Added a table to the admin page with registration codes
* Added a form to create registration codes
* Added embedded forms to expire and delete registration codes
* Added endpoints to facilitate expiring and deleting registration codes
* Added models and helpers to facilitate storing and retrieving data
* Added a password safety validator on registration
master
Drew Short 5 years ago
parent
commit
45a8c1d86f
  1. 4
      .gitignore
  2. 90
      app.py
  3. 112
      db.py
  4. 34
      forms.py
  5. 43
      models.py
  6. 3
      requirements.txt
  7. 12
      schema.sql
  8. 42
      templates/admin.html
  9. 1
      templates/register.html

4
.gitignore

@ -1 +1,3 @@
.idea
.idea
*.db

90
app.py

@ -1,30 +1,56 @@
import logging
import os
import uuid
from urllib.parse import urlparse, urljoin
import flask
from flask import Flask, redirect, render_template, request
from flask import Flask, redirect, render_template, request, g
from flask_login import LoginManager, login_required, login_user, logout_user, UserMixin, \
current_user
from flask_wtf import CSRFProtect
from forms import RegistrationForm, LoginForm
from db import get_db, get_registration_codes, add_registration_code, \
expire_registration_code, delete_registration_code
from forms import RegistrationForm, LoginForm, RegistrationCodeForm, \
ExpireRegistrationCodeForm
app = Flask(__name__)
csrf = CSRFProtect()
login_manager = LoginManager()
app.config.update(dict(
ADMIN_TOKEN=os.getenv("ADMIN_TOKEN", uuid.uuid4().__str__()),
SECRET_KEY=os.getenv("SECRET_KEY", "changeme"),
WTF_CSRF_SECRET_KEY=os.getenv("CSRF_SECRET_KEY", "csrf_changeme")
))
log = logging.getLogger(__name__)
print("Admin Token: %s" % app.config.get("ADMIN_TOKEN"))
csrf = CSRFProtect(app)
def init_db(flask_app):
with flask_app.app_context():
db = get_db()
with flask_app.open_resource('schema.sql', mode='r') as f:
db.cursor().executescript(f.read())
log.info("Initialized DB")
db.commit()
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "admin_login"
def create_app():
flask_app = Flask(__name__)
flask_app.config.update(dict(
ADMIN_TOKEN=os.getenv("ADMIN_TOKEN", uuid.uuid4().__str__()),
SECRET_KEY=os.getenv("SECRET_KEY", "changeme"),
WTF_CSRF_SECRET_KEY=os.getenv("CSRF_SECRET_KEY", "csrf_changeme")
))
print("Admin Token: %s" % flask_app.config.get("ADMIN_TOKEN"))
csrf.init_app(flask_app)
login_manager.init_app(flask_app)
login_manager.login_view = "admin_login"
init_db(flask_app)
return flask_app
app = create_app()
class User(UserMixin):
@ -73,7 +99,36 @@ def registration():
@app.route('/admin')
@login_required
def admin_index():
return render_template('admin.html')
context = {
'add_registration_code_form': RegistrationCodeForm(),
'registration_codes': get_registration_codes()
}
return render_template('admin.html', **context)
@app.route('/admin/add_registration_code', methods=['POST'])
@login_required
def admin_add_registration_code():
form = RegistrationCodeForm()
if form.validate_on_submit():
expiration_time = form.expiration_time.data
max_usages = form.max_usages.data
add_registration_code(expiration_time, max_usages)
redirect('/admin')
return redirect('/admin')
@app.route('/admin/expire_registration_code', methods=['POST'])
@login_required
def admin_expire_registration_code():
form = ExpireRegistrationCodeForm()
if form.validate_on_submit():
if form.expire.data:
expire_registration_code(form.registration_code.data)
elif form.delete.data:
delete_registration_code(form.registration_code.data)
redirect('/admin')
return redirect('/admin')
@app.route('/admin/login', methods=('GET', 'POST'))
@ -107,5 +162,12 @@ def admin_logout():
return redirect('/')
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
if __name__ == '__main__':
app.run()

112
db.py

@ -1,11 +1,30 @@
import os
import sqlite3
import uuid
from datetime import datetime
from typing import Optional
from flask import g
from models import RegistrationCode, RegisteredUser
DATABASE = os.getenv("DATA_DIRECTORY", ".") + "/data.db"
REGISTRATION_CODE_INSERT_SQL = """INSERT INTO
registration_codes(code, creationTime, expirationTime, usages, maxUsages)
VALUES(?, ?, ?, ?, ?)"""
REGISTRATION_CODE_UPDATE_SQL = """UPDATE registration_codes
SET expirationTime = ?, usages = ?, maxUsages = ?
WHERE code = ?"""
REGISTERED_USER_INSERT_SQL = """INSERT INTO
registered_users(username, registeredTime)
VALUES(?, ?)
"""
def get_db():
db = getattr(g, '_database', None)
if db is None:
@ -13,8 +32,91 @@ def get_db():
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def _query_db(query, args=(), one=False):
cur = get_db().execute(query, args)
rv = cur.fetchall()
cur.close()
return (rv[0] if rv else None) if one else rv
def get_registration_codes() -> [RegistrationCode]:
registration_codes = []
for registration_code in _query_db("SELECT * FROM registration_codes"):
registration_codes.append(RegistrationCode.from_db(registration_code))
return registration_codes
def get_registration_code(code: str) -> Optional[RegistrationCode]:
return RegistrationCode.from_db(
_query_db("SELECT * FROM registration_codes WHERE code = ?", [code], one=True)
)
def add_registration_code(
expiration_time: datetime = datetime.now(),
max_usages: int = 1) -> RegistrationCode:
code = RegistrationCode(
uuid.uuid4().__str__(), expiration_time=expiration_time, max_usages=max_usages)
db = get_db()
db.execute(
REGISTRATION_CODE_INSERT_SQL,
[code.code,
code.creation_time,
code.expiration_time,
code.usages,
code.max_usages]
)
db.commit()
return get_registration_code(code.code)
def update_registration_code(code: RegistrationCode) -> RegistrationCode:
db = get_db()
db.execute(REGISTRATION_CODE_UPDATE_SQL,
[code.expiration_time, code.usages, code.max_usages, code.code]
)
db.commit()
return get_registration_code(code.code)
def expire_registration_code(code: str):
db = get_db()
db.execute(
"UPDATE registration_codes SET expirationTime = ? WHERE code = ?",
[datetime.now(), code]
)
db.commit()
def delete_registration_code(code:str):
db = get_db()
db.execute(
"DELETE FROM registration_codes WHERE code = ?",
[code]
)
db.commit()
def get_registered_users() -> [RegisteredUser]:
registered_users = []
for registered_user in _query_db("SELECT * FROM registered_users"):
registered_users.append(RegisteredUser.from_db(registered_user))
return registered_users
def get_registered_user(username: str) -> Optional[RegisteredUser]:
return RegisteredUser.from_db(
_query_db("SELECT * FROM registered_users WHERE username = ?", username, one=True)
)
def add_registered_user(code: str, username: str) -> RegisteredUser:
registration_code = get_registration_code(code)
registration_code.usages += 1
update_registration_code(registration_code)
db = get_db()
db.execute(REGISTERED_USER_INSERT_SQL, [username, datetime.now()])
db.commit()
return get_registered_user(username)

34
forms.py

@ -1,14 +1,40 @@
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField
from wtforms.validators import DataRequired
from wtforms import StringField, PasswordField, DateField, IntegerField, SubmitField
from wtforms.validators import DataRequired, Length, EqualTo, InputRequired, \
ValidationError, NumberRange, Optional
import safe
def safe_password_validator(form: FlaskForm, field):
strength = safe.check(field.data, level=safe.MEDIUM)
if not strength.valid:
raise ValidationError("Password is not secure enough: %s" % strength.message)
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
username = StringField('Username', validators=[DataRequired(), Length(min=3, max=30)])
password = PasswordField(
'Password',
validators=[
InputRequired(),
EqualTo('confirm', message='Passwords must match'),
safe_password_validator
])
confirm = PasswordField('Repeat Password')
registration_code = StringField('Registration Code', validators=[DataRequired()])
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
token = PasswordField('Token', validators=[DataRequired()])
class RegistrationCodeForm(FlaskForm):
expiration_time = DateField('Expiration Time', validators=[Optional()])
max_usages = IntegerField('Max Usages', validators=[NumberRange(min=1)])
class ExpireRegistrationCodeForm(FlaskForm):
registration_code = StringField('Registration Code')
expire = SubmitField(label='Expire')
delete = SubmitField(label='Delete')

43
models.py

@ -0,0 +1,43 @@
from datetime import datetime
from typing import Tuple
class RegistrationCode:
def __init__(self,
code: str,
creation_time: datetime = datetime.now(),
expiration_time: datetime = None,
usages: int = 0,
max_usages: int = 1):
self.code = code
self.creation_time = creation_time
self.expiration_time = expiration_time
self.usages = usages
self.max_usages = max_usages
@staticmethod
def from_db(db_registration_code: Tuple) -> 'RegistrationCode':
expiration_time = None if db_registration_code[2] is None else datetime.fromisoformat(db_registration_code[2])
return RegistrationCode(
db_registration_code[0],
datetime.fromisoformat(db_registration_code[1]),
expiration_time,
db_registration_code[3],
db_registration_code[4]
)
def is_expired(self):
return self.expiration_time is not None and self.expiration_time < datetime.now()
class RegisteredUser:
def __init__(self, username: str, registered_time: datetime = datetime.now()):
self.username = username
self.registered_time = registered_time
@staticmethod
def from_db(db_registered_user: Tuple) -> 'RegisteredUser':
return RegisteredUser(
db_registered_user[0],
datetime.fromisoformat(db_registered_user[1])
)

3
requirements.txt

@ -1,4 +1,5 @@
flask==1.0.2
flask-wtf==0.14
flask-login==0.4.1
requests==2.21.0
requests==2.21.0
safe

12
schema.sql

@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS registration_codes (
code VARCHAR(60) NOT NULL UNIQUE,
creationTime TIMESTAMP NOT NULL,
expirationTime TIMESTAMP,
usages INTEGER NOT NULL DEFAULT 0,
maxUsages Integer NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS registered_users (
username VARCHAR(30) NOT NULL UNIQUE ,
registeredTime TIMESTAMP NOT NULL
);

42
templates/admin.html

@ -5,5 +5,45 @@
{% endblock %}
{% block content %}
<p>ADMIN</p>
<div id="activeRegistrationCodes">
<table>
<thead>
<th>Registration Code</th>
<th>Creation Time</th>
<th>Expiration Time</th>
<th>Usages</th>
<th>Max Usages</th>
<th>Expire</th>
</thead>
<tbody>
{% for registration_code in registration_codes %}
<tr>
<td>{{ registration_code.code|tojson|safe }}</td>
<td>{{ registration_code.creation_time|tojson|safe }}</td>
<td>{{ registration_code.expiration_time|tojson|safe }}</td>
<td>{{ registration_code.usages|tojson|safe }}</td>
<td>{{ registration_code.max_usages|tojson|safe }}</td>
<td>
<form method="POST" action="/admin/expire_registration_code">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<input type="hidden" name="registration_code" value="{{ registration_code.code }}"/>
{% if not registration_code.is_expired() %}
<input type="submit" name="expire" value="Expire">
{% else %}
<input type="submit" name="delete" value="Delete">
{% endif %}
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<hr>
<form method="POST" action="/admin/add_registration_code">
{{ add_registration_code_form.csrf_token }}
{{ add_registration_code_form.expiration_time.label }} {{ add_registration_code_form.expiration_time() }}
{{ add_registration_code_form.max_usages.label }} {{ add_registration_code_form.max_usages(default="1", value="1") }}
<input type="submit" value="Add">
</form>
{% endblock %}

1
templates/register.html

@ -9,6 +9,7 @@
{{ form.csrf_token }}
{{ form.username.label }} {{ form.username(size=20) }}
{{ form.password.label }} {{ form.password() }}
{{ form.confirm.label }} {{ form.confirm() }}
{{ form.registration_code.label }} {{ form.registration_code() }}
<input type="submit" value="Go">
</form>
Loading…
Cancel
Save