"""Handle Model Serialization.""" import logging import re from typing import Dict, Callable, Any, List, Optional, Type from atheneum import errors from atheneum.db import db LOGGER = logging.getLogger(__name__) class BaseTransformer: """Base Model serializer.""" type: Type[db.Model] def __init__(self, model: Type[db.Model]) -> None: """Initialize the base serializer.""" self.model = model def serialize(self, options: Optional[List[str]]) -> Any: """Convert Model field and factories to dicts.""" field_factories = self._serializers() if not options: options = list(field_factories.keys()) ret = {} for key in options: if key not in field_factories: raise errors.ValidationError( 'Invalid key: %r. Valid keys: %r.' % ( key, list(sorted(field_factories.keys())))) factory = field_factories[key] val = factory() if val is not None: ret[key] = val return ret def deserialize(self, json_model: Optional[dict], options: Optional[List[str]]) -> Any: """ Convert dict to Model. If the dict is None or empty, return an empty model. :param json_model: The dict representing the serialized model :param options: the fields to deserialize :return: an instance of the model """ if json_model is None or not json_model: return self.model field_factories = self._deserializers() if not options: options = list(field_factories.keys()) for key in options: if key not in field_factories: raise errors.ValidationError( 'Invalid key: %r. Valid keys: %r.' % ( key, list(sorted(field_factories.keys())))) factory = field_factories[key] try: value = json_model[key] if value is not None: factory(self.model, value) except KeyError as key_error: LOGGER.error( 'Unable to transform field: %s %s', key, key_error) return self.model def _serializers(self) -> Dict[str, Callable[[], Any]]: """Field definitions.""" raise NotImplementedError() def _deserializers( self) -> Dict[str, Callable[[db.Model, Any], None]]: """Field definitions.""" raise NotImplementedError() _model_transformers: Dict[str, Type[BaseTransformer]] = {} def register_transformer( model_serializer: Type[BaseTransformer]) -> Type[BaseTransformer]: """Add a model to the serializer mapping.""" model_name = model_serializer.type.__name__ if model_name not in _model_transformers: _model_transformers[model_name] = model_serializer else: raise KeyError( ' '.join([ 'A transformer for type "{}" already exists with class "{}".', 'Cannot register a new transformer with class "{}"' ]).format( model_name, _model_transformers[model_name].__name__, model_serializer.__name__)) return model_serializer def serialize_model(model_obj: db.Model, options: Optional[List[str]] = None) -> Any: """Lookup a Model and hand off to the serializer.""" try: return _model_transformers[ type(model_obj).__name__](model_obj).serialize(options) except KeyError: raise NotImplementedError( '{} has no registered serializers'.format(model_obj.__name__)) def deserialize_model( model_type: Type[db.Model], json_model_object: Optional[dict], options: Optional[List[str]] = None) -> db.Model: """Lookup a Model and hand it off to the deserializer.""" try: transformer = _model_transformers[model_type.__name__] return transformer( transformer.type()).deserialize(json_model_object, options) except KeyError: raise NotImplementedError( '{} has no registered serializers'.format(model_type)) def convert_key_from_json(key: str) -> str: """Convert a key from camelCase.""" substitute = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', key) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', substitute).lower()