Drew Short
6 years ago
15 changed files with 314 additions and 38 deletions
-
BINserver/.admin_credentials.swp
-
17server/corvus/__init__.py
-
2server/corvus/api/decorators.py
-
14server/corvus/api/model.py
-
30server/corvus/api/user_api.py
-
36server/corvus/errors.py
-
30server/corvus/middleware/authentication_middleware.py
-
7server/corvus/model/user_model.py
-
88server/corvus/service/role_service.py
-
2server/corvus/service/transformation_service.py
-
39server/corvus/service/user_service.py
-
11server/corvus/utility/json_utility.py
-
49server/tests/api/test_user_api.py
-
8server/tests/conftest.py
-
19server/tests/service/test_role_service.py
@ -1,9 +1,15 @@ |
|||
"""Model definitions for the api module.""" |
|||
from typing import Any, NamedTuple |
|||
from typing import Any, List, Optional |
|||
|
|||
|
|||
class APIResponse(NamedTuple): # pylint: disable=too-few-public-methods |
|||
class APIResponse: # pylint: disable=too-few-public-methods |
|||
"""Custom class to wrap api responses.""" |
|||
|
|||
payload: Any |
|||
status: int |
|||
def __init__(self, |
|||
payload: Any, |
|||
status: int = 200, |
|||
options: Optional[List[str]] = None) -> None: |
|||
"""Construct an APIResponse object.""" |
|||
self.payload = payload |
|||
self.status = status |
|||
self.options = options |
@ -1,20 +1,50 @@ |
|||
"""Error definitions for Corvus.""" |
|||
from typing import Dict |
|||
|
|||
from corvus.api.decorators import return_json |
|||
from corvus.api.model import APIResponse |
|||
|
|||
class BaseError(RuntimeError): |
|||
"""Corvus Base Error Class.""" |
|||
|
|||
class BaseError(Exception): |
|||
"""Corvus Base Error Class (5xx errors).""" |
|||
|
|||
def __init__( |
|||
self, |
|||
message: str = 'Unknown error', |
|||
status_code: int = 500, |
|||
extra_fields: Dict[str, str] = None) -> None: |
|||
"""Populate The Error Definition.""" |
|||
super().__init__(message) |
|||
self.message = message |
|||
self.status_code = status_code |
|||
self.extra_fields = extra_fields |
|||
|
|||
def to_dict(self) -> dict: |
|||
"""Serialize an error message to return.""" |
|||
return { |
|||
'message': self.message, |
|||
'status_code': self.status_code |
|||
} |
|||
|
|||
|
|||
class ClientError(BaseError): |
|||
"""Corvus errors where the client is wrong (4xx errors).""" |
|||
|
|||
def __init__(self, |
|||
message: str = 'Unknown client error', |
|||
status_code: int = 400, |
|||
extra_fields: Dict[str, str] = None) -> None: |
|||
"""Init for client originated errors.""" |
|||
super().__init__(message, status_code, extra_fields) |
|||
|
|||
class ValidationError(BaseError): |
|||
|
|||
class ValidationError(ClientError): |
|||
"""Corvus Validation Error.""" |
|||
|
|||
pass |
|||
|
|||
|
|||
@return_json |
|||
def handle_corvus_base_error(error: BaseError) -> APIResponse: |
|||
"""Error handler for basic Corvus raised errors.""" |
|||
return APIResponse(payload=error, status=error.status_code) |
@ -0,0 +1,88 @@ |
|||
"""Role service for Corvus.""" |
|||
from collections import defaultdict |
|||
from enum import Enum |
|||
from typing import Optional, List, Set, Dict |
|||
|
|||
|
|||
class Role(Enum): |
|||
"""User role definitions.""" |
|||
|
|||
ADMIN = 'ADMIN' |
|||
AUDITOR = 'AUDITOR' |
|||
MODERATOR = 'MODERATOR' |
|||
USER = 'USER' |
|||
ANONYMOUS = 'ANONYMOUS' |
|||
NONE = 'NONE' |
|||
|
|||
|
|||
class RoleTree(defaultdict): |
|||
"""Simple tree structure to handle hierarchy.""" |
|||
|
|||
def __call__(self, data: Role) -> 'RoleTree': |
|||
"""Handle direct calls to the tree.""" |
|||
return RoleTree(self, data) |
|||
|
|||
# def __hash__(self): |
|||
|
|||
def __init__( |
|||
self, |
|||
parent: Optional['RoleTree'], |
|||
data: Role, |
|||
**kwargs: dict) -> None: |
|||
"""Configure a RoleTree.""" |
|||
super().__init__(**kwargs) |
|||
self.parent: Optional[RoleTree] = parent |
|||
self.data: Role = data |
|||
self.default_factory = self # type: ignore |
|||
self.roles: Dict[Role, List[RoleTree]] = {data: [self]} |
|||
|
|||
def populate( |
|||
self, children: Dict[Role, Optional[dict]]) -> List['RoleTree']: |
|||
"""Populate a RoleTree from a dictionary of a Role hierarchy.""" |
|||
role_list: List[RoleTree] = [] |
|||
for child_role in children.keys(): |
|||
element = children[child_role] |
|||
new_node = self(child_role) |
|||
if isinstance(element, dict) and element: |
|||
role_list += new_node.populate(element) |
|||
self[child_role] = new_node |
|||
role_list.append(new_node) |
|||
for role_tree in role_list: |
|||
if role_tree.data not in self.roles.keys(): |
|||
self.roles[role_tree.data] = [] |
|||
self.roles[role_tree.data].append(role_tree) |
|||
return role_list |
|||
|
|||
def find_role(self, request_role: Role) -> List['RoleTree']: |
|||
"""Identify all instances of a role.""" |
|||
try: |
|||
return [role_tree for role_tree in self.roles[request_role]] |
|||
except KeyError: |
|||
return [] |
|||
|
|||
def get_parent_roles(self) -> List[Role]: |
|||
"""Return all the roles from self to the highest parent.""" |
|||
if self.parent is not None: |
|||
return [self.data] + self.parent.get_parent_roles() |
|||
return [self.data] |
|||
|
|||
def find_roles_in_hierarchy(self, request_role: Role) -> Set[Role]: |
|||
"""Find a set of all roles that fall within the hierarchy.""" |
|||
roles: List[Role] = [] |
|||
role_trees = self.find_role(request_role) |
|||
for role_tree in role_trees: |
|||
roles.extend(role_tree.get_parent_roles()) |
|||
return set(role for role in roles) |
|||
|
|||
|
|||
ROLES = RoleTree(None, Role.ADMIN) |
|||
ROLES.populate({ |
|||
Role.MODERATOR: { |
|||
Role.USER: { |
|||
Role.ANONYMOUS: None |
|||
} |
|||
}, |
|||
Role.AUDITOR: { |
|||
Role.USER: None |
|||
} |
|||
}) |
@ -0,0 +1,19 @@ |
|||
from corvus.service.role_service import ROLES, Role |
|||
|
|||
|
|||
def test_role_tree_find_roles_in_hierarchy(): |
|||
roles = ROLES.find_roles_in_hierarchy(Role.USER) |
|||
assert len(roles) == 4 |
|||
assert Role.USER in roles |
|||
assert Role.MODERATOR in roles |
|||
assert Role.AUDITOR in roles |
|||
assert Role.ADMIN in roles |
|||
|
|||
roles = ROLES.find_roles_in_hierarchy(Role.AUDITOR) |
|||
assert len(roles) == 2 |
|||
assert Role.AUDITOR in roles |
|||
assert Role.ADMIN in roles |
|||
|
|||
|
|||
def test_role_tree_find_role_key_error(): |
|||
assert len(ROLES.find_role(Role.NONE)) == 0 |
Write
Preview
Loading…
Cancel
Save
Reference in new issue