A multipurpose python flask API server and administration SPA
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.9 KiB

  1. """Validation service for Corvus models."""
  2. from typing import Type, Dict, Callable, Any, Set, Optional, Tuple
  3. from sqlalchemy import orm
  4. from corvus import errors
  5. from corvus.db import db_model
  6. from corvus.model import User
  7. _changable_attribute_names: Dict[str, Set[str]] = {}
  8. def get_changable_attribute_names(model: db_model) -> Set[str]:
  9. """
  10. Retrieve columns from a SQLAlchemy model.
  11. Caches already seen models to improve performance.
  12. :param model:
  13. :return: A list of changeable model attribute names
  14. """
  15. class_name = model.__class__.__name__
  16. if class_name in _changable_attribute_names:
  17. return _changable_attribute_names[class_name]
  18. model_attributes = {prop.key for prop in
  19. orm.class_mapper(model.__class__).iterate_properties
  20. if isinstance(prop, orm.ColumnProperty)}
  21. _changable_attribute_names[class_name] = model_attributes
  22. return model_attributes
  23. def determine_change_set(original_model: db_model,
  24. update_model: db_model,
  25. model_attributes: Set[str],
  26. options: Optional[Set[str]]) -> Dict[str, Any]:
  27. """
  28. Determine the change set for two models.
  29. :param options:
  30. :param original_model:
  31. :param update_model:
  32. :param model_attributes:
  33. :return:
  34. """
  35. if options is None:
  36. options = model_attributes
  37. else:
  38. options = model_attributes.intersection(options)
  39. change_set = {}
  40. for attribute in options:
  41. original_attribute = getattr(original_model, attribute)
  42. changed_attribute = getattr(update_model, attribute)
  43. if original_attribute != changed_attribute:
  44. change_set[attribute] = changed_attribute
  45. return change_set
  46. class ModelValidationResult: # pylint: disable=too-few-public-methods
  47. """Result from model validation."""
  48. field_results: Dict[str, Tuple[bool, str]]
  49. success: bool
  50. failed: Dict[str, str] = {}
  51. def __init__(self, field_results: Dict[str, Tuple[bool, str]]) -> None:
  52. """Initialize the validation results."""
  53. self.field_results = field_results
  54. self.success = len(
  55. [result for (result, _) in self.field_results.values() if
  56. result is False]) == 0
  57. if not self.success:
  58. failed = [(field, rslt[1]) for (field, rslt) in
  59. self.field_results.items() if rslt[0] is False]
  60. self.failed = {}
  61. for field, reason in failed:
  62. self.failed[field] = reason
  63. def get_change_set_value(
  64. change_set: Optional[Dict[str, Any]], field: str) -> Any:
  65. """Read a value or default from changeset."""
  66. if change_set is not None and field in change_set.keys():
  67. return change_set[field]
  68. return None
  69. class BaseValidator:
  70. """Base Model validator."""
  71. type: db_model
  72. def __init__(self, request_user: User, model: db_model) -> None:
  73. """Initialize the base validator."""
  74. self.request_user = request_user
  75. self._fields: Set[str] = get_changable_attribute_names(model)
  76. self.model = model
  77. def validate(self,
  78. change_set: Optional[Dict[str, Any]] = None) \
  79. -> ModelValidationResult:
  80. """Validate Model fields."""
  81. field_validators = self._validators()
  82. fields_to_validate = self._fields
  83. if change_set:
  84. fields_to_validate = set(change_set.keys())
  85. validation_results: Dict[str, Tuple[bool, str]] = {}
  86. for field in fields_to_validate:
  87. if field not in field_validators:
  88. raise errors.ValidationError(
  89. 'Invalid key: %r. Valid keys: %r.' % (
  90. field, list(sorted(field_validators.keys()))))
  91. field_validator = field_validators[field]
  92. field_result = field_validator(
  93. get_change_set_value(change_set, field))
  94. validation_results[field] = field_result
  95. return ModelValidationResult(validation_results)
  96. def _validators(
  97. self) -> Dict[str, Callable[[Any], Tuple[bool, str]]]:
  98. """Field definitions."""
  99. raise NotImplementedError()
  100. @staticmethod
  101. def no_validation(_new_value: Any) -> Tuple[bool, str]:
  102. """Perform no validation."""
  103. return True, ''
  104. def validate_version(self, new_version: Any) -> Tuple[bool, str]:
  105. """Perform a standard version validation."""
  106. if new_version is not None:
  107. version_increasing = self.model.version <= new_version
  108. if version_increasing:
  109. return version_increasing, ''
  110. return version_increasing, 'Unacceptable version change'
  111. return True, ''
  112. _model_validators: Dict[str, Type[BaseValidator]] = {}
  113. def register_validator(
  114. model_validator: Type[BaseValidator]) -> Type[BaseValidator]:
  115. """Add a model to the serializer mapping."""
  116. model_name = model_validator.type.__name__
  117. if model_name not in _model_validators:
  118. _model_validators[model_name] = model_validator
  119. else:
  120. raise KeyError(
  121. ' '.join([
  122. 'A validator for type "{}" already exists with class "{}".',
  123. 'Cannot register a new validator with class "{}"'
  124. ]).format(
  125. model_name,
  126. _model_validators[model_name].__name__,
  127. model_validator.__name__))
  128. return model_validator
  129. def validate_model(request_user: User,
  130. model_obj: db_model,
  131. change_set: Optional[Dict[str, Any]] = None) \
  132. -> ModelValidationResult:
  133. """Lookup a Model and hand off to the validator."""
  134. try:
  135. return _model_validators[type(model_obj).__name__](
  136. request_user, model_obj).validate(change_set)
  137. except KeyError:
  138. raise NotImplementedError(
  139. '{} has no registered validator'.format(model_obj.__name__))