Browse Source

Allow login to define created token properties

* Changed the header to be Authorization: Bearer <Token>
* Hedged the deserializer to return empty models on empty json
framework
Drew Short 6 years ago
parent
commit
49c712ab50
  1. 15
      server/atheneum/api/authentication_api.py
  2. 10
      server/atheneum/middleware/authentication_middleware.py
  3. 16
      server/atheneum/service/transformation_service.py
  4. 4
      server/documentation/api/authentication.rst
  5. 10
      server/documentation/api/user.rst
  6. 15
      server/tests/api/test_authentication_api.py
  7. 22
      server/tests/api/test_user_api.py
  8. 46
      server/tests/conftest.py

15
server/atheneum/api/authentication_api.py

@ -1,13 +1,15 @@
"""Authentication API blueprint and endpoint definitions.""" """Authentication API blueprint and endpoint definitions."""
from flask import Blueprint, g
from flask import Blueprint, g, request
from atheneum.api.decorators import return_json from atheneum.api.decorators import return_json
from atheneum.api.model import APIMessage, APIResponse from atheneum.api.model import APIMessage, APIResponse
from atheneum.middleware import authentication_middleware from atheneum.middleware import authentication_middleware
from atheneum.model import UserToken
from atheneum.service import ( from atheneum.service import (
user_token_service,
authentication_service, authentication_service,
user_service
transformation_service,
user_service,
user_token_service,
) )
AUTH_BLUEPRINT = Blueprint( AUTH_BLUEPRINT = Blueprint(
@ -23,7 +25,12 @@ def login() -> APIResponse:
:return: A login token for continued authentication :return: A login token for continued authentication
""" """
user_token = user_token_service.create(g.user)
new_token_options: UserToken = transformation_service.deserialize_model(
UserToken, request.json, ['note', 'expirationTime'])
user_token = user_token_service.create(
g.user,
note=new_token_options.note,
expiration_time=new_token_options.expiration_time)
return APIResponse(user_token, 200) return APIResponse(user_token, 200)

10
server/atheneum/middleware/authentication_middleware.py

@ -79,7 +79,7 @@ def authorization_failed(required_role: str) -> Response:
def parse_token_header( def parse_token_header(
header_value: str) -> Optional[Authorization]: header_value: str) -> Optional[Authorization]:
""" """
Parse the Authorization: Token header for the username and token.
Parse the Authorization: Bearer header for the username and token.
:param header_value: :param header_value:
:return: :return:
@ -92,13 +92,13 @@ def parse_token_header(
auth_type = auth_type.lower() auth_type = auth_type.lower()
except ValueError: except ValueError:
return None return None
if auth_type == b'token':
if auth_type == b'bearer':
try: try:
username, token = base64.b64decode(auth_info).split(b':', 1) username, token = base64.b64decode(auth_info).split(b':', 1)
except binascii.Error: except binascii.Error:
return None return None
return Authorization('token', {'username': bytes_to_wsgi(username),
'password': bytes_to_wsgi(token)})
return Authorization('bearer', {'username': bytes_to_wsgi(username),
'password': bytes_to_wsgi(token)})
return None return None
@ -146,7 +146,7 @@ def require_token_auth(func: Callable) -> Callable:
request.headers.get('Authorization', None)) request.headers.get('Authorization', None))
if token and authenticate_with_token(token.username, token.password): if token and authenticate_with_token(token.username, token.password):
return func(*args, **kwargs) return func(*args, **kwargs)
return authentication_failed('Token')
return authentication_failed('Bearer')
return decorate return decorate

16
server/atheneum/service/transformation_service.py

@ -36,9 +36,19 @@ class BaseTransformer:
return ret return ret
def deserialize(self, def deserialize(self,
json_model: dict,
json_model: Optional[dict],
options: Optional[List[str]]) -> Any: options: Optional[List[str]]) -> Any:
"""Convert dict to Model."""
"""
Convert dict to Model.
If the dict is None or empty, return an empty model.
:param json_model: The dict representing the serialized model
:param options: the fields to deserialize
:return: an instance of the model
"""
if json_model is None or not json_model:
return self.model
field_factories = self._deserializers() field_factories = self._deserializers()
if not options: if not options:
options = list(field_factories.keys()) options = list(field_factories.keys())
@ -101,7 +111,7 @@ def serialize_model(model_obj: db.Model,
def deserialize_model( def deserialize_model(
model_type: Type[db.Model], model_type: Type[db.Model],
json_model_object: dict,
json_model_object: Optional[dict],
options: Optional[List[str]] = None) -> db.Model: options: Optional[List[str]] = None) -> db.Model:
"""Lookup a Model and hand it off to the deserializer.""" """Lookup a Model and hand it off to the deserializer."""
try: try:

4
server/documentation/api/authentication.rst

@ -51,7 +51,7 @@ Authentication API
POST /auth/bump HTTP/1.1 POST /auth/bump HTTP/1.1
Host: example.tld Host: example.tld
Accept: application/json Accept: application/json
Authorization: Token <Base64(user:userToken)>
Authorization: Bearer <Base64(user:userToken)>
**Example response**: **Example response**:
@ -83,7 +83,7 @@ Authentication API
POST /auth/logout HTTP/1.1 POST /auth/logout HTTP/1.1
Host: example.tld Host: example.tld
Accept: application/json Accept: application/json
Authorization: Token <Base64(user:userToken)>
Authorization: Bearer <Base64(user:userToken)>
**Example response**: **Example response**:

10
server/documentation/api/user.rst

@ -12,7 +12,7 @@ User API
GET /user HTTP/1.1 GET /user HTTP/1.1
Host: example.tld Host: example.tld
Accept: application/json Accept: application/json
Authorization: Token <Base64(user:userToken)>
Authorization: Bearer <Base64(user:userToken)>
**Example response**: **Example response**:
@ -65,7 +65,7 @@ User API
GET /user/atheneum_administrator HTTP/1.1 GET /user/atheneum_administrator HTTP/1.1
Host: example.tld Host: example.tld
Accept: application/json Accept: application/json
Authorization: Token <Base64(user:userToken)>
Authorization: Bearer <Base64(user:userToken)>
**Example response**: **Example response**:
@ -107,7 +107,7 @@ User API
PATCH /user/atheneum_administrator HTTP/1.1 PATCH /user/atheneum_administrator HTTP/1.1
Host: example.tld Host: example.tld
Accept: application/json Accept: application/json
Authorization: Token <Base64(user:userToken)>
Authorization: Bearer <Base64(user:userToken)>
Content-Type: application/json Content-Type: application/json
{ {
@ -162,7 +162,7 @@ User API
POST /user HTTP/1.1 POST /user HTTP/1.1
Host: example.tld Host: example.tld
Accept: application/json Accept: application/json
Authorization: Token <Base64(user:userToken)>
Authorization: Bearer <Base64(user:userToken)>
Content-Type: application/json Content-Type: application/json
{ {
@ -212,7 +212,7 @@ User API
DELETE /user/test_user HTTP/1.1 DELETE /user/test_user HTTP/1.1
Host: example.tld Host: example.tld
Accept: application/json Accept: application/json
Authorization: Token <Base64(user:userToken)>
Authorization: Bearer <Base64(user:userToken)>
**Example response**: **Example response**:

15
server/tests/api/test_authentication_api.py

@ -1,3 +1,7 @@
from datetime import datetime
import rfc3339
from tests.conftest import AuthActions from tests.conftest import AuthActions
@ -7,6 +11,17 @@ def test_login_happy_path(auth: AuthActions):
assert result.json['token'] is not None and len(result.json['token']) > 0 assert result.json['token'] is not None and len(result.json['token']) > 0
def test_login_happy_path_with_options(auth: AuthActions):
token_note = 'Test Note'
token_expiration_time = datetime.now()
result = auth.login(token_note, token_expiration_time)
assert result.status_code == 200
assert result.json['token'] is not None and len(result.json['token']) > 0
assert result.json['note'] == token_note
assert result.json['expirationTime'] == rfc3339.format(
token_expiration_time)
def test_bump_happy_path(auth: AuthActions): def test_bump_happy_path(auth: AuthActions):
auth.login() auth.login()
result = auth.bump() result = auth.bump()

22
server/tests/api/test_user_api.py

@ -13,7 +13,7 @@ def test_get_users_happy_path(auth: AuthActions, client: FlaskClient):
result = client.get( result = client.get(
'/user', '/user',
headers={ headers={
auth_header[0]: auth_header[1]
auth_header.header: auth_header.data
}) })
assert 200 == result.status_code assert 200 == result.status_code
assert result.json is not None assert result.json is not None
@ -30,7 +30,7 @@ def test_get_users_nonexistent_page(auth: AuthActions, client: FlaskClient):
result = client.get( result = client.get(
'/user?page=2', '/user?page=2',
headers={ headers={
auth_header[0]: auth_header[1]
auth_header.header: auth_header.data
}) })
assert 404 == result.status_code assert 404 == result.status_code
assert result.json is not None assert result.json is not None
@ -42,7 +42,7 @@ def test_get_user_happy_path(auth: AuthActions, client: FlaskClient):
result = client.get( result = client.get(
'/user/{}'.format(client.application.config['test_username']), '/user/{}'.format(client.application.config['test_username']),
headers={ headers={
auth_header[0]: auth_header[1]
auth_header.header: auth_header.data
}) })
assert 200 == result.status_code assert 200 == result.status_code
assert result.json is not None assert result.json is not None
@ -57,7 +57,7 @@ def test_patch_user_happy_path(auth: AuthActions, client: FlaskClient):
user = client.get( user = client.get(
'/user/{}'.format(client.application.config['test_username']), '/user/{}'.format(client.application.config['test_username']),
headers={ headers={
auth_header[0]: auth_header[1]
auth_header.header: auth_header.data
}) })
patched_user = client.patch( patched_user = client.patch(
@ -67,7 +67,7 @@ def test_patch_user_happy_path(auth: AuthActions, client: FlaskClient):
'lastLoginTime': last_login_time 'lastLoginTime': last_login_time
}), }),
headers={ headers={
auth_header[0]: auth_header[1],
auth_header.header: auth_header.data,
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
@ -85,7 +85,7 @@ def test_register_user_happy_path(auth: AuthActions, client: FlaskClient):
'name': 'test_registered_user' 'name': 'test_registered_user'
}), }),
headers={ headers={
auth_header[0]: auth_header[1],
auth_header.header: auth_header.data,
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
assert 200 == result.status_code assert 200 == result.status_code
@ -104,7 +104,7 @@ def test_register_user_invalid_password(
'password': '' 'password': ''
}), }),
headers={ headers={
auth_header[0]: auth_header[1],
auth_header.header: auth_header.data,
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
assert 400 == result.status_code assert 400 == result.status_code
@ -121,7 +121,7 @@ def test_register_user_twice_failure(auth: AuthActions, client: FlaskClient):
'name': 'test_registered_user' 'name': 'test_registered_user'
}), }),
headers={ headers={
auth_header[0]: auth_header[1],
auth_header.header: auth_header.data,
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
result2 = client.post( result2 = client.post(
@ -130,7 +130,7 @@ def test_register_user_twice_failure(auth: AuthActions, client: FlaskClient):
'name': 'test_registered_user' 'name': 'test_registered_user'
}), }),
headers={ headers={
auth_header[0]: auth_header[1],
auth_header.header: auth_header.data,
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
assert 200 == result1.status_code assert 200 == result1.status_code
@ -150,13 +150,13 @@ def test_delete_user_happy_path(auth: AuthActions, client: FlaskClient):
'name': 'test_registered_user' 'name': 'test_registered_user'
}), }),
headers={ headers={
auth_header[0]: auth_header[1],
auth_header.header: auth_header.data,
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
result2 = client.delete( result2 = client.delete(
'/user/'+result1.json['name'], '/user/'+result1.json['name'],
headers={ headers={
auth_header[0]: auth_header[1]
auth_header.header: auth_header.data
}) })
assert 200 == result1.status_code assert 200 == result1.status_code
assert result1.json is not None assert result1.json is not None

46
server/tests/conftest.py

@ -3,10 +3,13 @@ import os
import random import random
import string import string
import tempfile import tempfile
from typing import Tuple, Any
from collections import namedtuple
from datetime import datetime
from typing import Tuple, Any, Optional
import pytest import pytest
from flask import Flask
import rfc3339
from flask import Flask, json
from flask.testing import FlaskClient, FlaskCliRunner from flask.testing import FlaskClient, FlaskCliRunner
from werkzeug.test import Client from werkzeug.test import Client
@ -66,6 +69,9 @@ def runner(app: Flask) -> FlaskCliRunner:
return app.test_cli_runner() return app.test_cli_runner()
AuthorizationHeader = namedtuple('AuthorizationHeader', ['header', 'data'])
class AuthActions(object): class AuthActions(object):
def __init__(self, def __init__(self,
client: Client, client: Client,
@ -81,13 +87,31 @@ class AuthActions(object):
self.password = password self.password = password
return self return self
def login(self) -> Any:
def login(
self,
note: Optional[str] = None,
expiration_time: Optional[datetime] = None) -> Any:
auth_header = self.get_authorization_header_basic() auth_header = self.get_authorization_header_basic()
auth_json = None
request_headers = {
auth_header.header: auth_header.data
}
if note is not None or expiration_time is not None:
token_options = {}
if note is not None:
token_options['note'] = note
if expiration_time is not None:
token_options['expirationTime'] = rfc3339.format(
expiration_time)
auth_json = json.dumps(token_options)
request_headers['Content-Type'] = 'application/json'
result = self._client.post( result = self._client.post(
'/auth/login', '/auth/login',
headers={
auth_header[0]: auth_header[1]
}
headers=request_headers,
data=auth_json
) )
self.token = result.json['token'] self.token = result.json['token']
return result return result
@ -110,17 +134,19 @@ class AuthActions(object):
} }
) )
def get_authorization_header_basic(self) -> Tuple[str, str]:
def get_authorization_header_basic(self) -> AuthorizationHeader:
credentials = base64.b64encode( credentials = base64.b64encode(
'{}:{}'.format(self.username, self.password).encode('utf8')) \ '{}:{}'.format(self.username, self.password).encode('utf8')) \
.decode('utf8').strip() .decode('utf8').strip()
return 'Authorization', 'Basic {}'.format(credentials)
return AuthorizationHeader(
'Authorization', 'Basic {}'.format(credentials))
def get_authorization_header_token(self) -> Tuple[str, str]:
def get_authorization_header_token(self) -> AuthorizationHeader:
credentials = base64.b64encode( credentials = base64.b64encode(
'{}:{}'.format(self.username, self.token).encode('utf8')) \ '{}:{}'.format(self.username, self.token).encode('utf8')) \
.decode('utf8').strip() .decode('utf8').strip()
return 'Authorization', 'Token {}'.format(credentials)
return AuthorizationHeader(
'Authorization', 'Bearer {}'.format(credentials))
@pytest.fixture @pytest.fixture

Loading…
Cancel
Save