Browse Source
server: initial commit with simple API for login, login_bump, logout
merge-requests/1/head
server: initial commit with simple API for login, login_bump, logout
merge-requests/1/head
Drew Short
7 years ago
7 changed files with 192 additions and 0 deletions
-
1server/.env
-
31server/atheneum/__init__.py
-
74server/atheneum/api.py
-
73server/atheneum/authentication.py
-
3server/atheneum/default_settings.py
-
0server/atheneum/model/User.py
-
10server/setup.py
@ -0,0 +1 @@ |
|||
FLASK_APP=atheneum |
@ -0,0 +1,31 @@ |
|||
import os |
|||
|
|||
from flask import Flask |
|||
|
|||
from atheneum.api import api_blueprint |
|||
|
|||
|
|||
def create_app(test_config=None): |
|||
app = Flask(__name__, instance_relative_config=True) |
|||
app.config.from_mapping( |
|||
SECRET_KEY='dev', |
|||
SQLALCHEMY_DATABASE_URI='sqlite:////tmp/test.db' |
|||
# SQLALCHEMY_DATABASE_URI=os.path.join(app.instance_path, 'atheneum.db') |
|||
) |
|||
|
|||
if test_config is None: |
|||
app.config.from_object('atheneum.default_settings') |
|||
app.config.from_pyfile('config.py', silent=True) |
|||
if (os.getenv('ATHENEUM_SERVER_SETTINGS', None)): |
|||
app.config.from_envvar('ATHENEUM_SERVER_SETTINGS') |
|||
else: |
|||
app.config.from_pyfile(test_config) |
|||
|
|||
try: |
|||
os.makedirs(app.instance_path) |
|||
except OSError: |
|||
pass |
|||
|
|||
app.register_blueprint(api_blueprint) |
|||
|
|||
return app |
@ -0,0 +1,74 @@ |
|||
from functools import wraps |
|||
from typing import Any, Callable, NamedTuple |
|||
|
|||
from flask import Blueprint, jsonify, Response, session |
|||
|
|||
from atheneum.authentication import ( |
|||
generate_token, |
|||
require_basic_auth, |
|||
require_token_auth |
|||
) |
|||
|
|||
api_blueprint = Blueprint(name='api', import_name=__name__, url_prefix='/api') |
|||
|
|||
|
|||
class APIMessage(NamedTuple): |
|||
payload: Any |
|||
status: int |
|||
|
|||
|
|||
def return_json(func: Callable) -> Callable: |
|||
""" |
|||
if a Response object is not returned, jsonify the result and return it |
|||
:param func: |
|||
:return: |
|||
""" |
|||
|
|||
@wraps(func) |
|||
def decorate(*args, **kwargs): |
|||
result = func(*args, **kwargs) |
|||
if isinstance(result, Response): |
|||
return result |
|||
if isinstance(result, APIMessage): |
|||
response = jsonify(result.payload) |
|||
response.status = result.status |
|||
return jsonify(result) |
|||
|
|||
return decorate |
|||
|
|||
|
|||
@api_blueprint.route('/login', methods=['POST']) |
|||
@return_json |
|||
@require_basic_auth |
|||
def login() -> APIMessage: |
|||
""" |
|||
Get a token for continued authentication |
|||
:return: A login token for continued authentication |
|||
""" |
|||
token = generate_token() |
|||
response = APIMessage(token, 200) |
|||
return response |
|||
|
|||
|
|||
@api_blueprint.route('/login/bump', methods=['POST']) |
|||
@return_json |
|||
@require_token_auth |
|||
def login_bump() -> APIMessage: |
|||
""" |
|||
Update the user last seen timestamp |
|||
:return: A time stamp for the bumped login |
|||
""" |
|||
response = APIMessage(None, 200) |
|||
return response |
|||
|
|||
|
|||
@api_blueprint.route('/logout', methods=['POST']) |
|||
@return_json |
|||
@require_token_auth |
|||
def logout() -> APIMessage: |
|||
""" |
|||
logout and delete a token |
|||
:return: |
|||
""" |
|||
session.pop('user') |
|||
return APIMessage(None, 200) |
@ -0,0 +1,73 @@ |
|||
import base64 |
|||
import uuid |
|||
from functools import wraps |
|||
from typing import Optional, Callable |
|||
|
|||
from flask import request, Response, session |
|||
from werkzeug.datastructures import Authorization |
|||
from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes |
|||
|
|||
|
|||
def authenticate_with_password(username: str, password: str) -> bool: |
|||
session['user'] = None |
|||
return True |
|||
|
|||
|
|||
def authenticate_with_token(username: str, token: str) -> bool: |
|||
session['user'] = None |
|||
return True |
|||
|
|||
|
|||
def authentication_failed(auth_type: str) -> Response: |
|||
return Response( |
|||
status=401, |
|||
headers={ |
|||
'WWW-Authenticate': '%s realm="Login Required"' % auth_type |
|||
}) |
|||
|
|||
|
|||
def parse_token_authorization_header(header_value) -> Optional[Authorization]: |
|||
if not header_value: |
|||
return |
|||
value = wsgi_to_bytes(header_value) |
|||
try: |
|||
auth_type, auth_info = value.split(None, 1) |
|||
auth_type = auth_type.lower() |
|||
except ValueError: |
|||
return |
|||
if auth_type == b'token': |
|||
try: |
|||
username, token = base64.b64decode(auth_info).split(b':', 1) |
|||
except Exception: |
|||
return |
|||
return Authorization('token', {'username': bytes_to_wsgi(username), |
|||
'password': bytes_to_wsgi(token)}) |
|||
|
|||
|
|||
def require_basic_auth(func: Callable) -> Callable: |
|||
@wraps(func) |
|||
def decorate(*args, **kwargs): |
|||
auth = request.authorization |
|||
if auth and authenticate_with_password(auth.username, auth.password): |
|||
return func(*args, **kwargs) |
|||
else: |
|||
return authentication_failed('Basic') |
|||
|
|||
return decorate |
|||
|
|||
|
|||
def require_token_auth(func: Callable) -> Callable: |
|||
@wraps(func) |
|||
def decorate(*args, **kwargs): |
|||
token = parse_token_authorization_header( |
|||
request.headers.get('WWW-Authenticate', None)) |
|||
if token and authenticate_with_token(token.username, token.password): |
|||
return func(*args, **kwargs) |
|||
else: |
|||
return authentication_failed('Token') |
|||
|
|||
return decorate |
|||
|
|||
|
|||
def generate_token() -> uuid.UUID: |
|||
return uuid.uuid4() |
@ -0,0 +1,3 @@ |
|||
DEBUG = True |
|||
SECRET_KEY = b'\xb4\x89\x0f\x0f\xe5\x88\x97\xfe\x8d<\x0b@d\xe9\xa5\x87%' \ |
|||
b'\xc6\xf0@l1\xe3\x90g\xfaA.?u=s' # CHANGE ME IN REAL CONFIG |
@ -0,0 +1,10 @@ |
|||
from setuptools import setup |
|||
|
|||
setup( |
|||
name='atheneum', |
|||
packages=['atheneum'], |
|||
include_package_data=True, |
|||
install_requires=[ |
|||
'flask', |
|||
], |
|||
) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue