Browse Source

Adding some simple tests for the login/bump/logout features

merge-requests/1/head
Drew Short 7 years ago
parent
commit
43fc990ddc
  1. BIN
      server/.admin_credentials.swp
  2. 1
      server/atheneum/__init__.py
  3. 2
      server/atheneum/api/authentication_api.py
  4. 0
      server/atheneum/middleware/__init__.py
  5. 84
      server/atheneum/middleware/authentication_middleware.py
  6. 52
      server/atheneum/service/user_token_service.py
  7. 4
      server/test_settings.py
  8. 122
      server/tests/conftest.py

BIN
server/.admin_credentials.swp

1
server/atheneum/__init__.py

@ -56,6 +56,7 @@ def create_app(test_config=None):
app.json_encoder = utility.CustomJSONEncoder
app.logger.debug('Initializing Application')
db.init_app(app)
app.logger.debug('Registering Database Models')

2
server/atheneum/api/authentication_api.py

@ -30,7 +30,7 @@ def login_bump() -> APIResponse:
:return: A time stamp for the bumped login
"""
authentication_service.bump_login(g.user)
return APIResponse(g.user.last_login_time, 200)
return APIResponse({'last_login_time': g.user.last_login_time}, 200)
@auth_blueprint.route('/logout', methods=['POST'])

0
server/atheneum/middleware/__init__.py

84
server/atheneum/middleware/authentication_middleware.py

@ -0,0 +1,84 @@
import base64
from functools import wraps
from typing import Optional, Callable
from flask import request, Response, g
from werkzeug.datastructures import Authorization
from werkzeug.http import bytes_to_wsgi, wsgi_to_bytes
from atheneum.service import (
authentication_service,
user_service,
user_token_service
)
def authenticate_with_password(name: str, password: str) -> bool:
user = user_service.find_by_name(name)
if user is not None \
and authentication_service.is_valid_password(user, password):
g.user = user
return True
return False
def authenticate_with_token(name: str, token: str) -> bool:
user = user_service.find_by_name(name)
user_token = user_token_service.find_by_user_and_token(user, token)
if user is not None \
and authentication_service.is_valid_token(user_token):
g.user = user
g.user_token = user_token
return True
return False
def authentication_failed(auth_type: str) -> Response:
return Response(
status=401,
headers={
'WWW-Authenticate': '%s realm="Login Required"' % auth_type
})
def parse_token_authorization_header(header_value) -> Optional[Authorization]:
if not header_value:
return
value = wsgi_to_bytes(header_value)
try:
auth_type, auth_info = value.split(None, 1)
auth_type = auth_type.lower()
except ValueError:
return
if auth_type == b'token':
try:
username, token = base64.b64decode(auth_info).split(b':', 1)
except Exception:
return
return Authorization('token', {'username': bytes_to_wsgi(username),
'password': bytes_to_wsgi(token)})
def require_basic_auth(func: Callable) -> Callable:
@wraps(func)
def decorate(*args, **kwargs):
auth = request.authorization
if auth and authenticate_with_password(auth.username, auth.password):
return func(*args, **kwargs)
else:
return authentication_failed('Basic')
return decorate
def require_token_auth(func: Callable) -> Callable:
@wraps(func)
def decorate(*args, **kwargs):
token = parse_token_authorization_header(
request.headers.get('Authorization', None))
if token and authenticate_with_token(token.username, token.password):
return func(*args, **kwargs)
else:
return authentication_failed('Bearer')
return decorate

52
server/atheneum/service/user_token_service.py

@ -0,0 +1,52 @@
from datetime import datetime
from typing import Optional
from atheneum import db
from atheneum.model import User, UserToken
from atheneum.service import authentication_service
def create(
user: User,
note: Optional[str] = None,
enabled: bool = True,
expiration_time: Optional[datetime] = None) -> UserToken:
"""
Create and save a UserToken
:param user: The User object to bind the token to
:param note: An optional field to store additional information about a token
:param enabled: A boolean to indicate whether a token can be considered
eligible for authentication
:param expiration_time: An optional argument to determine when the token
becomes invalid as a means of authentication. Defaults to None, which means
no expiration
:return:
"""
token = authentication_service.generate_token()
user_token = UserToken(
user_id=user.id,
token=token.__str__(),
note=note,
enabled=enabled,
creation_time=datetime.now(),
expiration_time=expiration_time,
version=0)
db.session.add(user_token)
db.session.commit()
return user_token
def delete(user_token: UserToken) -> bool:
existing_user_token = db.session.delete(user_token)
if existing_user_token is None:
db.session.commit()
return True
return False
def find_by_user_and_token(user: User, token: str) -> Optional[UserToken]:
return UserToken.query.filter_by(user_id=user.id, token=token).first()

4
server/test_settings.py

@ -0,0 +1,4 @@
DEBUG = False
SECRET_KEY = b'\xb4\x89\x0f\x0f\xe5\x88\x97\xfe\x8d<\x0b@d\xe9\xa5\x87%' \
b'\xc6\xf0@l1\xe3\x90g\xfaA.?u=s' # CHANGE ME IN REAL CONFIG
SQLALCHEMY_TRACK_MODIFICATIONS=False

122
server/tests/conftest.py

@ -0,0 +1,122 @@
import base64
import os
import random
import string
import tempfile
from typing import Tuple, Any
import pytest
from werkzeug.test import Client
from atheneum import create_app, init_db, register_blueprints
from atheneum.model import User
from atheneum.service import user_service
def add_test_user() -> Tuple[str, str]:
test_username = 'test_' + ''.join(
random.choices(string.ascii_letters + string.digits, k=17)).strip()
test_password = ''.join(
random.choices(string.ascii_letters + string.digits, k=32)).strip()
user_service.register(test_username, test_password, User.ROLE_ADMIN)
return test_username, test_password
@pytest.fixture
def app():
"""Create and configure a new app instance for each test."""
# create a temporary file to isolate the database for each test
db_fd, db_path = tempfile.mkstemp()
# create the app with common test config
app = create_app({
'TESTING': True,
'DATABASE': db_path,
})
register_blueprints(app)
# create the database and load test data
with app.app_context():
init_db()
test_username, test_password = add_test_user()
app.config['test_username'] = test_username
app.config['test_password'] = test_password
# get_db().executescript(_data_sql)
yield app
# close and remove the temporary database
os.close(db_fd)
os.unlink(db_path)
@pytest.fixture
def client(app):
"""A test client for the app."""
return app.test_client()
@pytest.fixture
def runner(app):
"""A test runner for the app's Click commands."""
return app.test_cli_runner()
class AuthActions(object):
def __init__(self, client: Client, username: str = "", password: str = ""):
self._client = client
self.username: str = username
self.password: str = password
self.token: str = ""
def configure(self, username, password) -> Any:
self.username = username
self.password = password
return self
def login(self):
auth_header = self.get_authorization_header_basic()
result = self._client.post(
'/auth/login',
headers={
auth_header[0]: auth_header[1]
}
)
self.token = result.json['token']
return result
def bump(self):
auth_header = self.get_authorization_header_token()
return self._client.post(
'/auth/bump',
headers={
auth_header[0]: auth_header[1]
}
)
def logout(self):
auth_header = self.get_authorization_header_token()
return self._client.post(
'/auth/logout',
headers={
auth_header[0]: auth_header[1]
}
)
def get_authorization_header_basic(self) -> Tuple[str, str]:
credentials = base64.b64encode(
'{}:{}'.format(self.username, self.password).encode('utf8')) \
.decode('utf8').strip()
return 'Authorization', 'Basic {}'.format(credentials)
def get_authorization_header_token(self) -> Tuple[str, str]:
credentials = base64.b64encode(
'{}:{}'.format(self.username, self.token).encode('utf8')) \
.decode('utf8').strip()
return 'Authorization', 'Token {}'.format(credentials)
@pytest.fixture
def auth(client: Client):
return AuthActions(client,
client.application.config.get('test_username'),
client.application.config.get('test_password'))
Loading…
Cancel
Save