From 43fc990ddcea66a2b90c5c0ea7c324920e3d15f6 Mon Sep 17 00:00:00 2001 From: Drew Short Date: Wed, 4 Jul 2018 19:04:10 -0500 Subject: [PATCH] Adding some simple tests for the login/bump/logout features --- server/.admin_credentials.swp | Bin 0 -> 12288 bytes server/atheneum/__init__.py | 1 + server/atheneum/api/authentication_api.py | 2 +- server/atheneum/middleware/__init__.py | 0 .../middleware/authentication_middleware.py | 84 ++++++++++++ server/atheneum/service/user_token_service.py | 52 ++++++++ server/test_settings.py | 4 + server/tests/conftest.py | 122 ++++++++++++++++++ 8 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 server/.admin_credentials.swp create mode 100644 server/atheneum/middleware/__init__.py create mode 100644 server/atheneum/middleware/authentication_middleware.py create mode 100644 server/atheneum/service/user_token_service.py create mode 100644 server/test_settings.py create mode 100644 server/tests/conftest.py diff --git a/server/.admin_credentials.swp b/server/.admin_credentials.swp new file mode 100644 index 0000000000000000000000000000000000000000..fe31a45cb9779e2de5b8ad6f5bc25bde133dd92d GIT binary patch literal 12288 zcmeI&Jxc>I7{KwT;_3_f1*-17RuM!&ur7)e+TnC?D7jwXPVS{9sZa;q{Um;qy8CI| zyp%f1b+q_@AcQ1O0?%&?Nz}f*?ln(Z9jRhlrP}H0R)y8*pQ>cPTS`n z?C&*qcf#ejl8FEU2q1s}0tg_000Jv7;KmJkt*TgCRMPmRU-?2Q009ILKmY**5I_I{ z1Q0*~fqyKZVv+YPk&mkR_y3>!zaP*19Dx7=2q1s}0tg_000IagfB*u2MnEn8(hui< n=u_*Aa-})dSJUwF(H;+a-K?)}Cf(^EG%k&!{&N{;`JH?N^7cDj literal 0 HcmV?d00001 diff --git a/server/atheneum/__init__.py b/server/atheneum/__init__.py index 996a056..48e6e57 100644 --- a/server/atheneum/__init__.py +++ b/server/atheneum/__init__.py @@ -56,6 +56,7 @@ def create_app(test_config=None): app.json_encoder = utility.CustomJSONEncoder + app.logger.debug('Initializing Application') db.init_app(app) app.logger.debug('Registering Database Models') diff --git a/server/atheneum/api/authentication_api.py b/server/atheneum/api/authentication_api.py index 4fbe872..e2143c6 100644 --- a/server/atheneum/api/authentication_api.py +++ b/server/atheneum/api/authentication_api.py @@ -30,7 +30,7 @@ def login_bump() -> APIResponse: :return: A time stamp for the bumped login """ authentication_service.bump_login(g.user) - return APIResponse(g.user.last_login_time, 200) + return APIResponse({'last_login_time': g.user.last_login_time}, 200) @auth_blueprint.route('/logout', methods=['POST']) diff --git a/server/atheneum/middleware/__init__.py b/server/atheneum/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/atheneum/middleware/authentication_middleware.py b/server/atheneum/middleware/authentication_middleware.py new file mode 100644 index 0000000..afd5d8a --- /dev/null +++ b/server/atheneum/middleware/authentication_middleware.py @@ -0,0 +1,84 @@ +import base64 +from functools import wraps +from typing import Optional, Callable + +from flask import request, Response, g +from werkzeug.datastructures import Authorization +from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes + +from atheneum.service import ( + authentication_service, + user_service, + user_token_service +) + + +def authenticate_with_password(name: str, password: str) -> bool: + user = user_service.find_by_name(name) + if user is not None \ + and authentication_service.is_valid_password(user, password): + g.user = user + return True + return False + + +def authenticate_with_token(name: str, token: str) -> bool: + user = user_service.find_by_name(name) + user_token = user_token_service.find_by_user_and_token(user, token) + if user is not None \ + and authentication_service.is_valid_token(user_token): + g.user = user + g.user_token = user_token + return True + return False + + +def authentication_failed(auth_type: str) -> Response: + return Response( + status=401, + headers={ + 'WWW-Authenticate': '%s realm="Login Required"' % auth_type + }) + + +def parse_token_authorization_header(header_value) -> Optional[Authorization]: + if not header_value: + return + value = wsgi_to_bytes(header_value) + try: + auth_type, auth_info = value.split(None, 1) + auth_type = auth_type.lower() + except ValueError: + return + if auth_type == b'token': + try: + username, token = base64.b64decode(auth_info).split(b':', 1) + except Exception: + return + return Authorization('token', {'username': bytes_to_wsgi(username), + 'password': bytes_to_wsgi(token)}) + + +def require_basic_auth(func: Callable) -> Callable: + @wraps(func) + def decorate(*args, **kwargs): + auth = request.authorization + if auth and authenticate_with_password(auth.username, auth.password): + return func(*args, **kwargs) + else: + return authentication_failed('Basic') + + return decorate + + +def require_token_auth(func: Callable) -> Callable: + @wraps(func) + def decorate(*args, **kwargs): + token = parse_token_authorization_header( + request.headers.get('Authorization', None)) + if token and authenticate_with_token(token.username, token.password): + return func(*args, **kwargs) + else: + return authentication_failed('Bearer') + + return decorate diff --git a/server/atheneum/service/user_token_service.py b/server/atheneum/service/user_token_service.py new file mode 100644 index 0000000..c98a765 --- /dev/null +++ b/server/atheneum/service/user_token_service.py @@ -0,0 +1,52 @@ +from datetime import datetime +from typing import Optional + +from atheneum import db +from atheneum.model import User, UserToken +from atheneum.service import authentication_service + + +def create( + user: User, + note: Optional[str] = None, + enabled: bool = True, + expiration_time: Optional[datetime] = None) -> UserToken: + """ + Create and save a UserToken + + :param user: The User object to bind the token to + :param note: An optional field to store additional information about a token + :param enabled: A boolean to indicate whether a token can be considered + eligible for authentication + :param expiration_time: An optional argument to determine when the token + becomes invalid as a means of authentication. Defaults to None, which means + no expiration + :return: + """ + token = authentication_service.generate_token() + user_token = UserToken( + user_id=user.id, + token=token.__str__(), + note=note, + enabled=enabled, + creation_time=datetime.now(), + expiration_time=expiration_time, + version=0) + + db.session.add(user_token) + db.session.commit() + + return user_token + + +def delete(user_token: UserToken) -> bool: + existing_user_token = db.session.delete(user_token) + if existing_user_token is None: + db.session.commit() + return True + return False + + +def find_by_user_and_token(user: User, token: str) -> Optional[UserToken]: + return UserToken.query.filter_by(user_id=user.id, token=token).first() + diff --git a/server/test_settings.py b/server/test_settings.py new file mode 100644 index 0000000..62276b8 --- /dev/null +++ b/server/test_settings.py @@ -0,0 +1,4 @@ +DEBUG = False +SECRET_KEY = b'\xb4\x89\x0f\x0f\xe5\x88\x97\xfe\x8d<\x0b@d\xe9\xa5\x87%' \ + b'\xc6\xf0@l1\xe3\x90g\xfaA.?u=s' # CHANGE ME IN REAL CONFIG +SQLALCHEMY_TRACK_MODIFICATIONS=False diff --git a/server/tests/conftest.py b/server/tests/conftest.py new file mode 100644 index 0000000..34565da --- /dev/null +++ b/server/tests/conftest.py @@ -0,0 +1,122 @@ +import base64 +import os +import random +import string +import tempfile +from typing import Tuple, Any + +import pytest +from werkzeug.test import Client + +from atheneum import create_app, init_db, register_blueprints +from atheneum.model import User +from atheneum.service import user_service + + +def add_test_user() -> Tuple[str, str]: + test_username = 'test_' + ''.join( + random.choices(string.ascii_letters + string.digits, k=17)).strip() + test_password = ''.join( + random.choices(string.ascii_letters + string.digits, k=32)).strip() + user_service.register(test_username, test_password, User.ROLE_ADMIN) + return test_username, test_password + + +@pytest.fixture +def app(): + """Create and configure a new app instance for each test.""" + # create a temporary file to isolate the database for each test + db_fd, db_path = tempfile.mkstemp() + # create the app with common test config + app = create_app({ + 'TESTING': True, + 'DATABASE': db_path, + }) + register_blueprints(app) + + # create the database and load test data + with app.app_context(): + init_db() + test_username, test_password = add_test_user() + app.config['test_username'] = test_username + app.config['test_password'] = test_password + # get_db().executescript(_data_sql) + + yield app + + # close and remove the temporary database + os.close(db_fd) + os.unlink(db_path) + + +@pytest.fixture +def client(app): + """A test client for the app.""" + return app.test_client() + + +@pytest.fixture +def runner(app): + """A test runner for the app's Click commands.""" + return app.test_cli_runner() + + +class AuthActions(object): + def __init__(self, client: Client, username: str = "", password: str = ""): + self._client = client + self.username: str = username + self.password: str = password + self.token: str = "" + + def configure(self, username, password) -> Any: + self.username = username + self.password = password + return self + + def login(self): + auth_header = self.get_authorization_header_basic() + result = self._client.post( + '/auth/login', + headers={ + auth_header[0]: auth_header[1] + } + ) + self.token = result.json['token'] + return result + + def bump(self): + auth_header = self.get_authorization_header_token() + return self._client.post( + '/auth/bump', + headers={ + auth_header[0]: auth_header[1] + } + ) + + def logout(self): + auth_header = self.get_authorization_header_token() + return self._client.post( + '/auth/logout', + headers={ + auth_header[0]: auth_header[1] + } + ) + + def get_authorization_header_basic(self) -> Tuple[str, str]: + credentials = base64.b64encode( + '{}:{}'.format(self.username, self.password).encode('utf8')) \ + .decode('utf8').strip() + return 'Authorization', 'Basic {}'.format(credentials) + + def get_authorization_header_token(self) -> Tuple[str, str]: + credentials = base64.b64encode( + '{}:{}'.format(self.username, self.token).encode('utf8')) \ + .decode('utf8').strip() + return 'Authorization', 'Token {}'.format(credentials) + + +@pytest.fixture +def auth(client: Client): + return AuthActions(client, + client.application.config.get('test_username'), + client.application.config.get('test_password'))